Advertisement
Guest User

Untitled

a guest
Mar 1st, 2016
83
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.50 KB | None | 0 0
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. #
  4.  
  5.  
  6. from amqp.connection import Connection as amqp_connection
  7. from amqp import basic_message
  8.  
  9.  
  10. class RabbitMQTest(object):
  11.  
  12. def __init__(self, host, port, exchange="test", queue="test", username="test", password="test", virtual_host="/"):
  13.  
  14. self.connection = amqp_connection(
  15. host=host,
  16. port=port,
  17. virtual_host=virtual_host,
  18. userid=username,
  19. password=password
  20. )
  21.  
  22. self.exchange = exchange
  23. self.queue = queue
  24. self.channel = self.connection.channel()
  25.  
  26. def createSimpleExchangeQueue(self):
  27.  
  28. self.channel.exchange_declare(
  29. exchange=self.exchange,
  30. type="direct",
  31. auto_delete=True
  32.  
  33. )
  34.  
  35. self.channel.queue_declare(
  36. queue=self.queue,
  37. auto_delete=True
  38. )
  39.  
  40. self.channel.queue_bind(
  41. queue=self.queue,
  42. exchange=self.exchange
  43. )
  44.  
  45. def dumpMessages(self, number=10240, size=1024):
  46.  
  47. print ("Dumping %s messages of %s bytes." % (number, size))
  48. message = basic_message.Message(
  49. body="x" * size)
  50.  
  51. for _ in range(number):
  52. self.channel.basic_publish(
  53. message,
  54. exchange=self.exchange
  55. )
  56.  
  57. def createFeedbackLoop(self, total_messages):
  58.  
  59. print ("Producing and consuming %s messages." % (total_messages))
  60.  
  61. self.counter = 0
  62.  
  63. def produce(message):
  64.  
  65. self.channel.basic_publish(
  66. message,
  67. exchange=self.exchange
  68. )
  69.  
  70. self.channel.basic_ack(message.delivery_tag)
  71. self.counter += 1
  72.  
  73. self.channel.basic_qos(
  74. prefetch_size=0,
  75. prefetch_count=100,
  76. a_global=False
  77. )
  78.  
  79. self.channel.basic_consume(
  80. queue=self.exchange,
  81. callback=produce,
  82. no_ack=False)
  83.  
  84. while self.counter < total_messages:
  85. self.connection.drain_events()
  86.  
  87.  
  88. def main():
  89.  
  90. setup = RabbitMQTest(
  91. "your-rabbitmq-host-01",
  92. 5672,
  93. exchange="ha_test",
  94. queue="ha_test")
  95.  
  96. setup.createSimpleExchangeQueue()
  97.  
  98. # dump 1000mb worth of messages into the queue
  99. setup.dumpMessages(number=102400)
  100.  
  101. # Consume & produce 1000000 messages
  102. setup.createFeedbackLoop(1000000)
  103.  
  104. if __name__ == "__main__":
  105. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement