Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import functools
- import os
- from typing import List
- import uuid
- import redis
- import rq
- def check_remaining_jobs(
- pipeline: redis.client.Pipeline,
- multi_job_key: str,
- job_id: str,
- ):
- pipeline.srem(multi_job_key, job_id)
- return len(pipeline.smembers(multi_job_key)) == 0
- def check_multi_job_status(
- multi_job_key: str,
- child_job_ids: List[str]
- ):
- current_job = rq.job.get_current_job()
- parent_job = current_job.dependency
- if parent_job.is_finished:
- func = functools.partial(
- check_remaining_jobs,
- multi_job_key=multi_job_key,
- job_id=parent_job.id,
- )
- conn = rq.connections.resolve_connection()
- check = conn.transaction(func, multi_job_key, value_from_callable=True)
- if check is True:
- queue = rq.Queue(name=parent_job.origin)
- registry = rq.registry.DeferredJobRegistry(parent_job.origin, conn)
- for child_job_id in child_job_ids:
- child_job = rq.job.Job.fetch(child_job_id)
- registry.remove(child_job)
- queue.enqueue_job(child_job)
- conn.delete(multi_job_key)
- def enqueue_multiply_dependent_jobs(
- queue: rq.queue.Queue,
- parent_jobs: List[rq.job.Job],
- child_jobs: List[rq.job.Job]
- ):
- multi_job_key = 'rq:multijob:{}'.format(uuid.uuid4())
- conn = rq.connections.resolve_connection()
- conn.sadd(multi_job_key, *(job.id for job in parent_jobs))
- for parent_job in parent_jobs:
- queue.enqueue_job(parent_job)
- registry = rq.registry.DeferredJobRegistry(queue.name)
- child_job_ids = []
- for child_job in child_jobs:
- child_job.set_status(rq.job.JobStatus.DEFERRED)
- child_job.save()
- registry.add(child_job)
- child_job_ids.append(child_job.id)
- for job in parent_jobs:
- queue.enqueue_call(
- func=check_multi_job_status,
- kwargs=dict(
- multi_job_key=multi_job_key,
- child_job_ids=child_job_ids,
- ),
- depends_on=job,
- )
- def parent_job(key):
- print('NOW RUNNING PARENT JOB: ', key)
- def child_job(key):
- print('NOW RUNNING CHILD JOB: ', key)
- def main():
- parent_jobs = [
- rq.job.Job.create(func=parent_job, kwargs=dict(key=key))
- for key in ('a', 'b', 'c')
- ]
- child_jobs = [
- rq.job.Job.create(func=child_job, kwargs=dict(key=key))
- for key in ('x', 'y', 'z')
- ]
- queue_name = 'test-multi'
- queue = rq.Queue(queue_name)
- enqueue_multiply_dependent_jobs(queue, parent_jobs, child_jobs)
- worker = rq.Worker(queues=[queue])
- worker.work(burst=True)
- if __name__ == '__main__':
- rq.logutils.setup_loghandlers('WARNING')
- conn = redis.from_url(os.getenv('REDIS_URL', 'redis://localhost:6379'))
- with rq.Connection(conn):
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement