Advertisement
Guest User

Untitled

a guest
Apr 1st, 2016
98
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.66 KB | None | 0 0
  1. #!/usr/bin/env python
  2. import os
  3. import sys
  4. import argparse
  5. import subprocess
  6. import time
  7. import yaml
  8. import pika
  9. from multiprocessing import Pool, TimeoutError
  10. from datetime import datetime
  11.  
  12. startTime = datetime.now()
  13.  
  14. def initConnect(user, password, host, port, vhost):
  15. credentials = pika.PlainCredentials(user, password)
  16. parameters = pika.ConnectionParameters(host, port, vhost, credentials)
  17.  
  18. connection = pika.BlockingConnection(parameters)
  19.  
  20. try:
  21. #connect
  22. global channel
  23. channel = connection.channel()
  24.  
  25. except pika.exceptions.IncompatibleProtocolError:
  26. print("Incompatible Protocol Error")
  27. except pika.exceptions.ProbableAuthenticationError:
  28. print("Probable Authentication Error")
  29. except pika.exceptions.ProbableAccessDeniedError:
  30. print("Probable Access Denied Error")
  31. except pika.exceptions.ChannelClosed:
  32. print("Exception: Channel Closed")
  33. except pika.exceptions.ConnectionClosed:
  34. print("Exception: Connection Closed")
  35.  
  36. def messageWrapper(itr):
  37. if connection == None:
  38. try:
  39. initConnect(user, password, host, port, vhost)
  40. except pika.exceptions.ProbableAuthenticationError:
  41. print("Auth error")
  42. sys.exit(0)
  43. sendMessage(user, password, message, mode, queue, host, port, vhost, exchange, verbosity)
  44.  
  45. def sendMessage(user, password, message, mode, queue, host, port, vhost, exchange, verbosity):
  46. try:
  47. #queue
  48. channel.queue_declare(queue=queue, durable=True)
  49. channel.queue_bind(exchange=exchange, queue=queue)
  50.  
  51. #publish method
  52. channel.basic_publish(exchange=exchange, routing_key=queue, body=message, properties=pika.BasicProperties( delivery_mode = +int(mode) ))
  53.  
  54. if verbosity > 0:
  55. print("[Sent]-> " +str(message))
  56. except KeyboardInterrupt:
  57. connection.close()
  58. sys.exit(0)
  59.  
  60. def read_message(channel, method_frame, header_frame, body):
  61. print method_frame.delivery_tag
  62. print body
  63. print
  64. channel.basic_ack(delivery_tag=method_frame.delivery_tag)
  65.  
  66. def consumer(user, password, message, mode, queue, host, port, vhost, exchange):
  67. try:
  68. channel.basic_consume(read_message, queue)
  69. channel.start_consuming()
  70. except KeyboardInterrupt:
  71. channel.stop_consuming()
  72.  
  73. def load_config(config_file="{0}/.pika-choo.conf".format(os.environ['HOME'])):
  74. default = {"user": "guest",
  75. "password": "guest",
  76. "port": "5672",
  77. "host": "localhost",
  78. "exchange": "amq.fanout",
  79. "vhost": "/"}
  80. try:
  81. conf = yaml.load(open(config_file, "r").read())
  82. except IOError:
  83. conf = {}
  84.  
  85. default.update(conf)
  86. return default
  87.  
  88. # Main
  89. if __name__ == "__main__":
  90. global connection
  91. connection = None
  92.  
  93. offset = 0
  94. verbosity = 0
  95.  
  96. conf = load_config()
  97.  
  98. exchange = conf['exchange']
  99. user = conf['user']
  100. password = conf['password']
  101. host = conf['hostname']
  102. port = conf['port']
  103. vhost = conf['vhost']
  104.  
  105. parser = argparse.ArgumentParser(description='throw random stuff at rabbits')
  106. parser.add_argument('-v', '--verbose', action='store_true', help='Enable debug logging')
  107. parser.add_argument('-d', '--daemon', action='store_true', help='Daemonize and enable logging to file')
  108. parser.add_argument('-S', '--send', action='store_const', const='send', dest='method')
  109. parser.add_argument('-R', '--receive', action='store_const', const='receive', dest='method')
  110. parser.add_argument('-q', '--queue', default='test', required=False, help='queue')
  111. parser.add_argument('-m', '--message', default='hello', required=False, help='message')
  112. parser.add_argument('-M', '--mode', default=2, type=int, required=False, help='message mode') #delivery_mode 2 - persist
  113. parser.add_argument('-p', '--password', required=False, help='password')
  114. parser.add_argument('-u', '--user', required=False, help='user')
  115. parser.add_argument('-x', '--exchange', required=False, help='rabbit exchange')
  116. parser.add_argument('-c', '--count', default=1, type=int, required=False, help='number of messages to send')
  117. parser.add_argument('-C', '--concurrency', default=1, type=int, required=False, help='number of processes to spawn')
  118. parser.add_argument('-o', '--offset', default=0, type=float, required=False, help='delay')
  119. parser.add_argument('--purge', action='store_true', help='Purge all message from queue')
  120.  
  121. args = parser.parse_args()
  122.  
  123. if not args.method:
  124. parser.error("Required arguments missing: --send or --receive")
  125.  
  126. if args.queue:
  127. queue = args.queue
  128.  
  129. if args.message:
  130. message = args.message
  131.  
  132. if args.password:
  133. password = args.password
  134.  
  135. if args.user:
  136. user = args.user
  137.  
  138. if args.count:
  139. limit = args.count
  140.  
  141. if args.mode:
  142. mode = args.mode
  143.  
  144. if args.offset:
  145. offset = args.offset
  146.  
  147. if args.exchange:
  148. exchange = args.exchange
  149.  
  150. if args.concurrency:
  151. concurrency = args.concurrency
  152.  
  153. if args.verbose:
  154. verbosity += 1
  155.  
  156. if concurrency > limit:
  157. concurrency = limit
  158.  
  159.  
  160. children = []
  161.  
  162. if args.method == "send":
  163. while True:
  164. pool = Pool(concurrency)
  165. pool.map(messageWrapper, range(limit))
  166. break
  167.  
  168. print("Fin. Sent: {0} messages".format(limit))
  169. print("Time elapsed: {0}".format(datetime.now() - startTime))
  170.  
  171. if args.method == "receive":
  172. consumer(user, password, message, mode, queue, host, port, vhost, exchange)
  173. print("Time elapsed: {0}".format(datetime.now() - startTime))
  174.  
  175.  
  176. if connection != None:
  177. connection.close()
  178. sys.exit(0)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement