Advertisement
Guest User

Untitled

a guest
Jan 23rd, 2017
85
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.92 KB | None | 0 0
  1. from kombu import Connection, Exchange, Queue
  2. import logging
  3.  
  4. LOG = logging.getLogger(__name__)
  5.  
  6. cvworker_exchange = Exchange('cvworker', 'direct', durable=True)
  7. queue = Queue('video', exchange=cvworker_exchange, routing_key='model1')
  8. return_queue = Queue('return_queue')
  9.  
  10.  
  11. def process_request(request, message):
  12. """Process incoming request."""
  13. LOG.debug("Start processing request {0}.".format(request))
  14.  
  15. correlation_id = message.properties.get('correlation_id', None)
  16. if not correlation_id:
  17. LOG.error("The 'correlation_id' message property is missing.")
  18. return
  19. else:
  20. LOG.debug("Correlation id: {0}".format(correlation_id))
  21.  
  22. reply_to = message.properties.get('reply_to')
  23. if not reply_to:
  24. LOG.error("The 'reply_to' message property is missing.")
  25. return
  26. else:
  27. LOG.debug("Reply to: {0}".format(reply_to))
  28.  
  29. # execute function
  30. #
  31. #
  32. #
  33. response = 'response'
  34. with Connection('amqp://guest:guest@localhost//') as conn:
  35. producer = conn.Producer(serializer='json')
  36. producer.publish(
  37. body=response,
  38. exchange='',
  39. correlation_id=correlation_id,
  40. declare=[return_queue]
  41. )
  42.  
  43.  
  44. def on_request(request, message):
  45. """This method is automatically called when a request is incoming.
  46.  
  47. :param request: the body of the amqp message already deserialized by kombu
  48.  
  49. :param message: the plain amqp kombu.message with additional information
  50. """
  51. LOG.debug("Got request: {0}".format(request))
  52. try:
  53. message.ack()
  54. except Exception:
  55. LOG.exception("Failed to acknowledge AMQP message.")
  56. else:
  57. LOG.debug("AMQP message acknowledged.")
  58.  
  59. # process request
  60. process_request(request, message)
  61.  
  62. with Connection('amqp://guest:guest@localhost//') as conn:
  63. with conn.Consumer(queues=queue, callback=[on_request]):
  64. while True:
  65. conn.drain_events()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement