Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import typing as t
- from celery import Task
- import abc
- StepsType = t.Iterable[Task]
- context = dict()
- def inject_context(func):
- '''
- stores params from step to step.
- wrap your celery task steps with it
- '''
- def wrapper(*args, **kwargs):
- return func(context, *args, **kwargs)
- return wrapper
- class StepBase(metaclass=abc.ABCMeta):
- @abc.abstractmethod
- def run():
- pass
- class SequentialStep(StepBase):
- def run() -> None:
- '''Wait for the result (scroll_id) and run once again if scroll_id != None'''
- pass
- class ParallelStep(StepBase):
- def run() -> None:
- '''Run tasks with celery.group'''
- pass
- class Pipeline:
- def __init__(self, config: t.OrderedDict, steps: StepsType) -> None:
- self.steps = []
- self.config = config
- def execute(self) -> None:
- pass
- def run(self) -> None:
- executor.delay(self)
- task = t.Callable
- @task
- def step():
- '''
- Convention:
- - return scroll_id if Sequential step
- - return None if Parallel step
- '''
- pass
- @task
- def executor(pipeline: Pipeline):
- pipeline.execute()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement