Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def work_from_prefetch_queue():
- """ Continuously performs the work from the prefetch queue """
- while True:
- # As long as we have tasks, we process them
- while prefetch_queue:
- ch, method, properties, body = prefetch_queue[0]
- self.log("Performing prefetched task %s in '%s' " %
- (method.delivery_tag, queue_name))
- perform_callback(ch, method, properties, body)
- self.log("Sending ACK %s" % method.delivery_tag)
- ch.basic_ack(delivery_tag=method.delivery_tag)
- del prefetch_queue[0]
- # when no more tasks, we sleep a little
- sleep(0.1)
- # Start the actual thread
- working_thread = Thread(target=work_from_prefetch_queue)
- working_thread.start()
- self.log("Started the working thread for prefetched tasks")
- # PREFETCHING PART
- def prefetch_callback(ch, method, properties, body):
- """ Simple callback that just adds to the prefetch queue """
- prefetch_queue.append((ch, method, properties, body))
- self.log("Prefetched one task (tag %s), there are %s in the queue now" %
- (method.delivery_tag, len(prefetch_queue)))
- # while len(prefetch_queue) == self.prefetch_count:
- # sleep(0.01)
- # This will continuously prefetch items and put them in the prefetch queue
- while True:
- try:
- conn = pika.BlockingConnection(self.param)
- channel = conn.channel()
- channel.basic_qos(prefetch_count=self.prefetch_count)
- channel.basic_consume(prefetch_callback, queue=self.queues[queue_name])
- self.log("Started consuming for prefetching")
- channel.start_consuming()
- except Exception, ex:
- self.log("Something went wrong, with prefetch task watcher '%s' trying again.\n %s "
- % (queue_name, ex))
- sleep(1)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement