Guest User

Untitled

a guest
Jul 9th, 2018
157
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.61 KB | None | 0 0
  1. import txamqp
  2. from twisted.internet.protocol import ClientCreator
  3. from txamqp.protocol import AMQClient
  4. from twisted.internet import defer, reactor
  5. from txamqp.content import Content
  6. from txamqp.client import TwistedDelegate
  7. import time
  8.  
  9.  
  10. def receive(msg, cb, queue, channel):
  11. reactor.callInThread(cb, msg, channel)
  12. consume_queue(cb, queue, channel)
  13.  
  14. def fast_process(msg, channel):
  15. print('Fast consumer received message: %s' % (msg.content.body, ))
  16. reactor.callFromThread(channel.basic_ack,delivery_tag=msg.delivery_tag)
  17.  
  18. def slow_process(msg, channel):
  19. print('Slow consumer received message: %s' % (msg.content.body, ))
  20. time.sleep(5)
  21. reactor.callFromThread(channel.basic_ack,delivery_tag=msg.delivery_tag)
  22.  
  23. def consume_queue(cb, queue, channel):
  24. queue.get().addCallback(receive, cb, queue, channel)
  25.  
  26. def publish(channel):
  27. for i in range(10):
  28. msg = Content("Message %d" % i)
  29. reactor.callFromThread(channel.basic_publish, routing_key="queue", content=msg)
  30.  
  31. @defer.inlineCallbacks
  32. def authenticate(client, username, password):
  33. yield client.authenticate(username, password)
  34.  
  35. # open channel
  36. channel = yield client.channel(1)
  37. yield channel.channel_open()
  38.  
  39. # set prefetch
  40. msg = yield channel.basic_qos(prefetch_count=1)
  41.  
  42. # subscribe
  43. yield channel.queue_declare(queue='queue', auto_delete=True)
  44. reply = yield channel.basic_consume(queue='queue')
  45. queue = yield client.queue(reply.consumer_tag)
  46. consume_queue(fast_process, queue, channel)
  47.  
  48. # open channel
  49. channel = yield client.channel(2)
  50. yield channel.channel_open()
  51.  
  52. # set prefetch
  53. msg = yield channel.basic_qos(prefetch_count=1)
  54.  
  55. # subscribe
  56. yield channel.queue_declare(queue='queue', auto_delete=True)
  57. reply = yield channel.basic_consume(queue='queue')
  58. queue = yield client.queue(reply.consumer_tag)
  59. consume_queue(slow_process, queue, channel)
  60.  
  61. # publish
  62. reactor.callInThread(publish, channel)
  63.  
  64. if __name__ == '__main__':
  65. import sys
  66. if len(sys.argv) != 7:
  67. print "%s host port vhost username password path_to_spec" % sys.argv[0]
  68. sys.exit(1)
  69.  
  70. host = sys.argv[1]
  71. port = int(sys.argv[2])
  72. vhost = sys.argv[3]
  73. username = sys.argv[4]
  74. password = sys.argv[5]
  75. specFile = sys.argv[6]
  76.  
  77. spec = txamqp.spec.load(specFile)
  78.  
  79. delegate = TwistedDelegate()
  80.  
  81. d = ClientCreator(reactor, AMQClient, delegate, vhost,
  82. spec).connectTCP(host, port)
  83. d.addCallback(authenticate, username, password)
  84. reactor.run()
Add Comment
Please, Sign In to add comment