Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3.13
- # date: 2025.08.13
- # [python - How to aggregate results from parallel tasks and store only the final result (ignore intermediate outputs)? - Stack Overflow](https://stackoverflow.com/questions/79733975/how-to-aggregate-results-from-parallel-tasks-and-store-only-the-final-result-ig)
- # --- start celery before running script ---
- # celery -A main worker --loglevel=info
- # (parameter `-A main` because my script has name `main.py`)
- # --- check "tasks" after running script ---
- # redis-cli -n 0 keys '*' # broker
- # redis-cli -n 1 keys '*' # backend
- # --- delete all results in redis backend ---
- # redis-cli -n 1 del $(redis-cli -n 1 keys 'celery-task-meta-*') # workers
- # redis-cli -n 1 del $(redis-cli -n 1 keys 'celery-taskset-meta-*') # chord ???
- # --- show content in task (use `fzf` to select task from list, and `jq` to format JSON output ---
- # redis-cli -n 1 get $(redis-cli -n 1 keys 'celery-task-meta-*' |fzf) | jq
- # --- running (script has to be in `main.py` to run celery correctly) ---
- # python main.py # run latest version_2
- # python main.py 0 # run version_0
- # [fzf](https://github.com/junegunn/fzf)
- # [jq](https://jqlang.org/)
- from celery import Celery
- from celery import chord, group, chain
- from celery.result import AsyncResult
- print('creating app ... with Redis as broker and backend')
- app = Celery(
- 'main', # my script name `main.py`
- broker='redis://localhost:6379/0',
- backend='redis://localhost:6379/1',
- )
- def compute_something(arg):
- print(f'compute_something: {arg}') # print as one string to get all in one moment
- return arg * 2
- @app.task
- def worker_task(arg):
- print(f'worker_task: {arg}')
- return compute_something(arg)
- @app.task
- def aggregate(results):
- print(f'aggregate: {results}')
- return sum(results)
- @app.task
- def add(a, b):
- print(f'add: {a}, {b}')
- return a+b
- def cleanup(final_result):
- print(f"cleanup: {final_result = }")
- print(f"cleanup: {final_result.get() = }")
- print(f"cleanup: {type(final_result) = }")
- for r in final_result.parent.results:
- print(f"cleanup: forget: {r.get() = }")
- r.forget()
- # --- versions ----
- def version_0():
- """Different test - to see if `chain` has the same properties `.parent.results` as `chord`"""
- print('--- version 0 - chain add ---')
- inputs = range(5)
- final_result = chain(add.s(1,2), add.s(3), add.s(4), add.s(10))()
- print(f"{final_result.get() = }")
- print(f"{final_result.result = }")
- print(f"{final_result.parent = }")
- item = final_result
- while True:
- if item.parent:
- print('forget parent:', item.parent.get())
- #item.parent.forget()
- item = item.parent
- else:
- break
- #final_result.forget()
- def version_1():
- print('--- version 1 - chord aggregae add ---')
- inputs = range(5)
- workers = [worker_task.s(arg) for arg in inputs]
- #workers_ids = [sig.id for sig in workers] # not work because task not send yet, eventually you could create own IDs
- #print(f"{workers_ids = }")
- final_result = chord(workers)(aggregate.s())
- print(f"{final_result.get() = }")
- results_ids = [r.id for r in result.parent.results] # + [final_result.id]
- print(f"{results_ids = }")
- print(f"{len(results_ids) = }")
- #for w_id in workers_ids:
- for item_id in results_ids:
- print('--- forgetting ---')
- print(f"{item_id = }")
- result = AsyncResult(item_id)
- print(f"{result.get() = }")
- result.forget()
- def version_2():
- """Reduced version - without list of workers, and list of IDs"""
- print('--- version 2 - chord aggregate add + cleanup ---')
- inputs = range(5)
- final_result = chord(worker_task.s(arg) for arg in inputs)(aggregate.s())
- print(f"{final_result.get() = }")
- print(f"{final_result.result = }")
- print(f"{final_result.parent = }")
- # --- remove all intermediate results ---
- #for r in final_result.parent.results:
- # print(f"{type(r) = }")
- # #print('forget:', r.get(), f"{type(r.get()) = }", f"{type(r.result) = }")
- # #print('parent:', r.parent)
- # #r.forget()
- # #AsyncResult(r.id).forget()
- # --- remove final result, and it also removes intermediate results ---
- #final_result.forget()
- # --- use `.then()` ---
- #promise = chord((worker_task.s(arg) for arg in inputs))(aggregate.s())
- #promise.then(cleanup)
- final_result.then(cleanup)
- versions = [
- version_0, # chain add
- version_1, # chord aggregate add
- version_2, # chord aggregate add, then()
- ]
- if __name__ == '__main__':
- import sys
- if len(sys.argv) > 1:
- v = int(sys.argv[1])
- else:
- v = -1 # last version
- versions[v]()
Advertisement
Add Comment
Please, Sign In to add comment