Advertisement
frolkin28

Sitemap concept

Sep 16th, 2021
840
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.21 KB | None | 0 0
  1. import typing as t
  2. from celery import Task
  3. import abc
  4.  
  5. StepsType = t.Iterable[Task]
  6.  
  7. context = dict()
  8.  
  9.  
  10. def inject_context(func):
  11.     '''
  12.        stores params from step to step.
  13.        wrap your celery task steps with it
  14.    '''
  15.     def wrapper(*args, **kwargs):
  16.         return func(context, *args, **kwargs)
  17.     return wrapper
  18.  
  19.  
  20. class StepBase(metaclass=abc.ABCMeta):
  21.     @abc.abstractmethod
  22.     def run():
  23.         pass
  24.  
  25.  
  26. class SequentialStep(StepBase):
  27.     def run() -> None:
  28.         '''Wait for the result (scroll_id) and run once again if scroll_id != None'''
  29.         pass
  30.  
  31.  
  32. class ParallelStep(StepBase):
  33.     def run() -> None:
  34.         '''Run tasks with celery.group'''
  35.         pass
  36.  
  37.  
  38. class Pipeline:
  39.     def __init__(self, config: t.OrderedDict, steps: StepsType) -> None:
  40.         self.steps = []
  41.         self.config = config
  42.  
  43.     def execute(self) -> None:
  44.         pass
  45.  
  46.     def run(self) -> None:
  47.         executor.delay(self)
  48.  
  49.  
  50. task = t.Callable
  51.  
  52.  
  53. @task
  54. def step():
  55.     '''
  56.        Convention:
  57.            - return scroll_id if Sequential step
  58.            - return None if Parallel step
  59.    '''
  60.     pass
  61.  
  62.  
  63. @task
  64. def executor(pipeline: Pipeline):
  65.     pipeline.execute()
  66.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement