Advertisement
Guest User

Untitled

a guest
Jan 17th, 2017
93
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.81 KB | None | 0 0
  1. import functools
  2. import os
  3. from typing import List
  4. import uuid
  5.  
  6. import redis
  7. import rq
  8.  
  9.  
  10. def check_remaining_jobs(
  11. pipeline: redis.client.Pipeline,
  12. multi_job_key: str,
  13. job_id: str,
  14. ):
  15. pipeline.srem(multi_job_key, job_id)
  16. return len(pipeline.smembers(multi_job_key)) == 0
  17.  
  18.  
  19. def check_multi_job_status(
  20. multi_job_key: str,
  21. child_job_ids: List[str]
  22. ):
  23. current_job = rq.job.get_current_job()
  24. parent_job = current_job.dependency
  25.  
  26. if parent_job.is_finished:
  27. func = functools.partial(
  28. check_remaining_jobs,
  29. multi_job_key=multi_job_key,
  30. job_id=parent_job.id,
  31. )
  32. conn = rq.connections.resolve_connection()
  33. check = conn.transaction(func, multi_job_key, value_from_callable=True)
  34. if check is True:
  35. queue = rq.Queue(name=parent_job.origin)
  36. registry = rq.registry.DeferredJobRegistry(parent_job.origin, conn)
  37. for child_job_id in child_job_ids:
  38. child_job = rq.job.Job.fetch(child_job_id)
  39. registry.remove(child_job)
  40. queue.enqueue_job(child_job)
  41. conn.delete(multi_job_key)
  42.  
  43.  
  44. def enqueue_multiply_dependent_jobs(
  45. queue: rq.queue.Queue,
  46. parent_jobs: List[rq.job.Job],
  47. child_jobs: List[rq.job.Job]
  48. ):
  49. multi_job_key = 'rq:multijob:{}'.format(uuid.uuid4())
  50.  
  51. conn = rq.connections.resolve_connection()
  52. conn.sadd(multi_job_key, *(job.id for job in parent_jobs))
  53.  
  54. for parent_job in parent_jobs:
  55. queue.enqueue_job(parent_job)
  56.  
  57. registry = rq.registry.DeferredJobRegistry(queue.name)
  58.  
  59. child_job_ids = []
  60.  
  61. for child_job in child_jobs:
  62. child_job.set_status(rq.job.JobStatus.DEFERRED)
  63. child_job.save()
  64. registry.add(child_job)
  65. child_job_ids.append(child_job.id)
  66.  
  67. for job in parent_jobs:
  68. queue.enqueue_call(
  69. func=check_multi_job_status,
  70. kwargs=dict(
  71. multi_job_key=multi_job_key,
  72. child_job_ids=child_job_ids,
  73. ),
  74. depends_on=job,
  75. )
  76.  
  77.  
  78. def parent_job(key):
  79. print('NOW RUNNING PARENT JOB: ', key)
  80.  
  81.  
  82. def child_job(key):
  83. print('NOW RUNNING CHILD JOB: ', key)
  84.  
  85.  
  86. def main():
  87.  
  88. parent_jobs = [
  89. rq.job.Job.create(func=parent_job, kwargs=dict(key=key))
  90. for key in ('a', 'b', 'c')
  91. ]
  92.  
  93. child_jobs = [
  94. rq.job.Job.create(func=child_job, kwargs=dict(key=key))
  95. for key in ('x', 'y', 'z')
  96. ]
  97.  
  98. queue_name = 'test-multi'
  99. queue = rq.Queue(queue_name)
  100.  
  101. enqueue_multiply_dependent_jobs(queue, parent_jobs, child_jobs)
  102.  
  103. worker = rq.Worker(queues=[queue])
  104. worker.work(burst=True)
  105.  
  106. if __name__ == '__main__':
  107. rq.logutils.setup_loghandlers('WARNING')
  108. conn = redis.from_url(os.getenv('REDIS_URL', 'redis://localhost:6379'))
  109. with rq.Connection(conn):
  110. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement