Advertisement
ddrazvan

Worker bit

Aug 28th, 2013
285
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.33 KB | None | 0 0
  1.             def work_from_prefetch_queue():
  2.                 """ Continuously performs the work from the prefetch queue """
  3.                 while True:
  4.                     # As long as we have tasks, we process them
  5.                     while prefetch_queue:
  6.                         ch, method, properties, body = prefetch_queue[0]
  7.  
  8.                         self.log("Performing prefetched task %s in '%s' " %
  9.                                  (method.delivery_tag, queue_name))
  10.                         perform_callback(ch, method, properties, body)
  11.                        
  12.                         self.log("Sending ACK %s" % method.delivery_tag)
  13.                         ch.basic_ack(delivery_tag=method.delivery_tag)
  14.                        
  15.                         del prefetch_queue[0]
  16.  
  17.                     # when no more tasks, we sleep a little
  18.                     sleep(0.1)
  19.  
  20.             # Start the actual thread
  21.             working_thread = Thread(target=work_from_prefetch_queue)
  22.             working_thread.start()
  23.  
  24.             self.log("Started the working thread for prefetched tasks")
  25.  
  26.             # PREFETCHING PART
  27.  
  28.             def prefetch_callback(ch, method, properties, body):
  29.                 """ Simple callback that just adds to the prefetch queue """
  30.                 prefetch_queue.append((ch, method, properties, body))
  31.                 self.log("Prefetched one task (tag %s), there are %s in the queue now" %
  32.                          (method.delivery_tag, len(prefetch_queue)))
  33.  
  34.                 # while len(prefetch_queue) == self.prefetch_count:
  35.                 #     sleep(0.01)
  36.  
  37.             # This will continuously prefetch items and put them in the prefetch queue
  38.             while True:
  39.                 try:
  40.                     conn = pika.BlockingConnection(self.param)
  41.                     channel = conn.channel()
  42.                     channel.basic_qos(prefetch_count=self.prefetch_count)
  43.                     channel.basic_consume(prefetch_callback, queue=self.queues[queue_name])
  44.                     self.log("Started consuming for prefetching")
  45.                     channel.start_consuming()
  46.                 except Exception, ex:
  47.                     self.log("Something went wrong, with prefetch task watcher '%s' trying again.\n %s "
  48.                              % (queue_name, ex))
  49.                     sleep(1)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement