Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # test.py
- from celery import Celery, chord
- from celery.utils.log import get_task_logger
- app = Celery('test', backend='redis://localhost:45000/10?new_join=1', broker='redis://localhost:45000/11')
- app.conf.CELERY_ALWAYS_EAGER = False
- logger = get_task_logger(__name__)
- @app.task(bind=True)
- def get_one(self):
- print('hello world')
- self.replace(get_two.s())
- return 1
- @app.task
- def get_two():
- print('Returning two')
- return 2
- @app.task
- def sum_all(data):
- print('Logging data')
- logger.error(data)
- return sum(data)
- if __name__ == '__main__':
- print('Running test')
- x = chord(get_one.s() for i in range(3))
- body = sum_all.s()
- result = x(body)
- print(result.get())
- print('Finished w/ test')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement