Guest User

Untitled

a guest
Jul 9th, 2018
129
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.10 KB | None | 0 0
  1. import txamqp
  2. from twisted.internet.protocol import ClientCreator
  3. from txamqp.protocol import AMQClient
  4. from twisted.internet import defer, reactor
  5. from txamqp.content import Content
  6. from txamqp.client import TwistedDelegate
  7. import time
  8.  
  9.  
  10. messages = 10000
  11. start = time.time()
  12.  
  13. def receive(msg, queue):
  14. print('receiving: %s' % msg.content.body)
  15. if msg.content.body == str(messages):
  16. stop = time.time()
  17. total = stop - start
  18. mps = messages/total
  19. print("total time (%s messages): %s seconds" % (messages, total))
  20. print("messages per second: %s" % mps)
  21. reactor.stop()
  22. else:
  23. consume_queue(queue)
  24.  
  25. def consume_queue(queue):
  26. queue.get().addCallback(receive, queue)
  27.  
  28. def send(channel, content):
  29. reactor.callFromThread(channel.basic_publish, routing_key='queue', content=content)
  30. print('sending: %s' % content.body)
  31.  
  32. def send_msgs_from_for_loop(channel):
  33. for i in range(messages):
  34. content = Content(str(i+1))
  35. send(channel, content)
  36.  
  37. @defer.inlineCallbacks
  38. def authenticate(client, username, password):
  39. yield client.authenticate(username, password)
  40. channel = yield client.channel(1)
  41. yield channel.channel_open()
  42.  
  43. #subscribe
  44. yield channel.queue_declare(queue='queue', auto_delete=True)
  45. reply = yield channel.basic_consume(queue='queue')
  46. queue = yield client.queue(reply.consumer_tag)
  47. consume_queue(queue)
  48.  
  49. # send messages
  50. reactor.callInThread(send_msgs_from_for_loop, channel)
  51.  
  52.  
  53. if __name__ == '__main__':
  54. import sys
  55. if len(sys.argv) != 7:
  56. print "%s host port vhost username password path_to_spec" % sys.argv[0]
  57. sys.exit(1)
  58.  
  59. host = sys.argv[1]
  60. port = int(sys.argv[2])
  61. vhost = sys.argv[3]
  62. username = sys.argv[4]
  63. password = sys.argv[5]
  64. specFile = sys.argv[6]
  65.  
  66. spec = txamqp.spec.load(specFile)
  67.  
  68. delegate = TwistedDelegate()
  69.  
  70. d = ClientCreator(reactor, AMQClient, delegate, vhost,
  71. spec).connectTCP(host, port)
  72. d.addCallback(authenticate, username, password)
  73. reactor.run()
Add Comment
Please, Sign In to add comment