Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- import os
- import sys
- import argparse
- import subprocess
- import time
- import yaml
- import pika
- from multiprocessing import Pool, TimeoutError
- from datetime import datetime
- startTime = datetime.now()
- def initConnect(user, password, host, port, vhost):
- credentials = pika.PlainCredentials(user, password)
- parameters = pika.ConnectionParameters(host, port, vhost, credentials)
- connection = pika.BlockingConnection(parameters)
- try:
- #connect
- global channel
- channel = connection.channel()
- except pika.exceptions.IncompatibleProtocolError:
- print("Incompatible Protocol Error")
- except pika.exceptions.ProbableAuthenticationError:
- print("Probable Authentication Error")
- except pika.exceptions.ProbableAccessDeniedError:
- print("Probable Access Denied Error")
- except pika.exceptions.ChannelClosed:
- print("Exception: Channel Closed")
- except pika.exceptions.ConnectionClosed:
- print("Exception: Connection Closed")
- def messageWrapper(itr):
- if connection == None:
- try:
- initConnect(user, password, host, port, vhost)
- except pika.exceptions.ProbableAuthenticationError:
- print("Auth error")
- sys.exit(0)
- sendMessage(user, password, message, mode, queue, host, port, vhost, exchange, verbosity)
- def sendMessage(user, password, message, mode, queue, host, port, vhost, exchange, verbosity):
- try:
- #queue
- channel.queue_declare(queue=queue, durable=True)
- channel.queue_bind(exchange=exchange, queue=queue)
- #publish method
- channel.basic_publish(exchange=exchange, routing_key=queue, body=message, properties=pika.BasicProperties( delivery_mode = +int(mode) ))
- if verbosity > 0:
- print("[Sent]-> " +str(message))
- except KeyboardInterrupt:
- connection.close()
- sys.exit(0)
- def read_message(channel, method_frame, header_frame, body):
- print method_frame.delivery_tag
- print body
- print
- channel.basic_ack(delivery_tag=method_frame.delivery_tag)
- def consumer(user, password, message, mode, queue, host, port, vhost, exchange):
- try:
- channel.basic_consume(read_message, queue)
- channel.start_consuming()
- except KeyboardInterrupt:
- channel.stop_consuming()
- def load_config(config_file="{0}/.pika-choo.conf".format(os.environ['HOME'])):
- default = {"user": "guest",
- "password": "guest",
- "port": "5672",
- "host": "localhost",
- "exchange": "amq.fanout",
- "vhost": "/"}
- try:
- conf = yaml.load(open(config_file, "r").read())
- except IOError:
- conf = {}
- default.update(conf)
- return default
- # Main
- if __name__ == "__main__":
- global connection
- connection = None
- offset = 0
- verbosity = 0
- conf = load_config()
- exchange = conf['exchange']
- user = conf['user']
- password = conf['password']
- host = conf['hostname']
- port = conf['port']
- vhost = conf['vhost']
- parser = argparse.ArgumentParser(description='throw random stuff at rabbits')
- parser.add_argument('-v', '--verbose', action='store_true', help='Enable debug logging')
- parser.add_argument('-d', '--daemon', action='store_true', help='Daemonize and enable logging to file')
- parser.add_argument('-S', '--send', action='store_const', const='send', dest='method')
- parser.add_argument('-R', '--receive', action='store_const', const='receive', dest='method')
- parser.add_argument('-q', '--queue', default='test', required=False, help='queue')
- parser.add_argument('-m', '--message', default='hello', required=False, help='message')
- parser.add_argument('-M', '--mode', default=2, type=int, required=False, help='message mode') #delivery_mode 2 - persist
- parser.add_argument('-p', '--password', required=False, help='password')
- parser.add_argument('-u', '--user', required=False, help='user')
- parser.add_argument('-x', '--exchange', required=False, help='rabbit exchange')
- parser.add_argument('-c', '--count', default=1, type=int, required=False, help='number of messages to send')
- parser.add_argument('-C', '--concurrency', default=1, type=int, required=False, help='number of processes to spawn')
- parser.add_argument('-o', '--offset', default=0, type=float, required=False, help='delay')
- parser.add_argument('--purge', action='store_true', help='Purge all message from queue')
- args = parser.parse_args()
- if not args.method:
- parser.error("Required arguments missing: --send or --receive")
- if args.queue:
- queue = args.queue
- if args.message:
- message = args.message
- if args.password:
- password = args.password
- if args.user:
- user = args.user
- if args.count:
- limit = args.count
- if args.mode:
- mode = args.mode
- if args.offset:
- offset = args.offset
- if args.exchange:
- exchange = args.exchange
- if args.concurrency:
- concurrency = args.concurrency
- if args.verbose:
- verbosity += 1
- if concurrency > limit:
- concurrency = limit
- children = []
- if args.method == "send":
- while True:
- pool = Pool(concurrency)
- pool.map(messageWrapper, range(limit))
- break
- print("Fin. Sent: {0} messages".format(limit))
- print("Time elapsed: {0}".format(datetime.now() - startTime))
- if args.method == "receive":
- consumer(user, password, message, mode, queue, host, port, vhost, exchange)
- print("Time elapsed: {0}".format(datetime.now() - startTime))
- if connection != None:
- connection.close()
- sys.exit(0)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement