furas

Python - celery - remove intermediate results from Redis - (Stackoverflow)

Aug 14th, 2025 (edited)
75
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.83 KB | None | 0 0
  1. #!/usr/bin/env python3.13
  2.  
  3. # date: 2025.08.13
  4.  
  5. # [python - How to aggregate results from parallel tasks and store only the final result (ignore intermediate outputs)? - Stack Overflow](https://stackoverflow.com/questions/79733975/how-to-aggregate-results-from-parallel-tasks-and-store-only-the-final-result-ig)
  6.  
  7. # --- start celery before running script ---
  8. # celery -A main worker --loglevel=info
  9. # (parameter `-A main` because my script has name `main.py`)
  10.  
  11. # --- check "tasks" after running script ---
  12. # redis-cli -n 0 keys '*'    # broker
  13. # redis-cli -n 1 keys '*'    # backend
  14.  
  15. # --- delete all results in redis backend ---
  16. # redis-cli -n 1 del $(redis-cli -n 1 keys 'celery-task-meta-*')     # workers
  17. # redis-cli -n 1 del $(redis-cli -n 1 keys 'celery-taskset-meta-*')  # chord ???
  18.  
  19. # --- show content in task (use `fzf` to select task from list, and `jq` to format JSON output ---
  20. # redis-cli -n 1 get $(redis-cli -n 1 keys 'celery-task-meta-*' |fzf) | jq
  21.  
  22. # --- running (script has to be in `main.py` to run celery correctly) ---
  23. # python main.py     # run latest version_2
  24. # python main.py 0   # run version_0
  25.  
  26. # [fzf](https://github.com/junegunn/fzf)
  27. # [jq](https://jqlang.org/)
  28.  
  29.  
  30. from celery import Celery
  31. from celery import chord, group, chain
  32. from celery.result import AsyncResult
  33.  
  34. print('creating app ... with Redis as broker and backend')
  35. app = Celery(
  36.     'main',  # my script name `main.py`
  37.     broker='redis://localhost:6379/0',
  38.     backend='redis://localhost:6379/1',
  39. )
  40.  
  41. def compute_something(arg):
  42.     print(f'compute_something: {arg}')  # print as one string to get all in one moment
  43.     return arg * 2
  44.  
  45. @app.task
  46. def worker_task(arg):
  47.     print(f'worker_task: {arg}')
  48.     return compute_something(arg)
  49.  
  50. @app.task
  51. def aggregate(results):
  52.     print(f'aggregate: {results}')
  53.     return sum(results)
  54.  
  55. @app.task
  56. def add(a, b):
  57.     print(f'add: {a}, {b}')
  58.     return a+b
  59.  
  60. def cleanup(final_result):
  61.     print(f"cleanup: {final_result = }")
  62.     print(f"cleanup: {final_result.get() = }")
  63.     print(f"cleanup: {type(final_result) = }")
  64.  
  65.     for r in final_result.parent.results:
  66.         print(f"cleanup: forget: {r.get() = }")
  67.         r.forget()
  68.  
  69. # --- versions ----
  70.  
  71. def version_0():
  72.     """Different test - to see if `chain` has the same properties `.parent.results` as `chord`"""
  73.  
  74.     print('--- version 0 - chain add ---')
  75.  
  76.     inputs = range(5)
  77.  
  78.     final_result = chain(add.s(1,2), add.s(3), add.s(4), add.s(10))()
  79.     print(f"{final_result.get() = }")
  80.     print(f"{final_result.result = }")
  81.     print(f"{final_result.parent = }")
  82.  
  83.     item = final_result
  84.     while True:
  85.         if item.parent:
  86.             print('forget parent:', item.parent.get())
  87.             #item.parent.forget()
  88.             item = item.parent
  89.         else:
  90.             break
  91.  
  92.     #final_result.forget()
  93.  
  94. def version_1():
  95.     print('--- version 1 - chord aggregae add ---')
  96.  
  97.     inputs = range(5)
  98.  
  99.     workers = [worker_task.s(arg) for arg in inputs]
  100.  
  101.     #workers_ids = [sig.id for sig in workers]  # not work because task not send yet, eventually you could create own IDs
  102.     #print(f"{workers_ids = }")
  103.  
  104.     final_result = chord(workers)(aggregate.s())
  105.     print(f"{final_result.get() = }")
  106.  
  107.     results_ids = [r.id for r in result.parent.results] # + [final_result.id]
  108.     print(f"{results_ids = }")
  109.     print(f"{len(results_ids) = }")
  110.  
  111.     #for w_id in workers_ids:
  112.     for item_id in results_ids:
  113.         print('--- forgetting ---')
  114.         print(f"{item_id = }")
  115.         result = AsyncResult(item_id)
  116.         print(f"{result.get() = }")
  117.         result.forget()
  118.  
  119.  
  120. def version_2():
  121.     """Reduced version - without list of workers, and list of IDs"""
  122.  
  123.     print('--- version 2 - chord aggregate add + cleanup ---')
  124.  
  125.     inputs = range(5)
  126.  
  127.     final_result = chord(worker_task.s(arg) for arg in inputs)(aggregate.s())
  128.     print(f"{final_result.get() = }")
  129.     print(f"{final_result.result = }")
  130.     print(f"{final_result.parent = }")
  131.  
  132.     # --- remove all intermediate results ---
  133.     #for r in final_result.parent.results:
  134.     #    print(f"{type(r) = }")
  135.     #    #print('forget:', r.get(), f"{type(r.get()) = }", f"{type(r.result) = }")
  136.     #    #print('parent:', r.parent)
  137.     #    #r.forget()
  138.     #    #AsyncResult(r.id).forget()
  139.  
  140.     # --- remove final result, and it also removes intermediate results ---
  141.     #final_result.forget()
  142.  
  143.     # --- use `.then()` ---
  144.     #promise = chord((worker_task.s(arg) for arg in inputs))(aggregate.s())
  145.     #promise.then(cleanup)
  146.     final_result.then(cleanup)
  147.  
  148.  
  149. versions = [
  150.     version_0,  # chain add
  151.     version_1,  # chord aggregate add
  152.     version_2,  # chord aggregate add, then()
  153. ]
  154.  
  155. if __name__ == '__main__':
  156.     import sys
  157.  
  158.     if len(sys.argv) > 1:
  159.         v = int(sys.argv[1])
  160.     else:
  161.         v = -1  # last version
  162.  
  163.     versions[v]()
Advertisement
Add Comment
Please, Sign In to add comment