Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- # Check connection to the RabbitMQ server
- # import parser for command-line options
- import argparse
- # import a pure-Python implementation of the AMQP 0-9-1
- import pika
- import ssl
- # define and parse command-line options
- parser = argparse.ArgumentParser(description='Check connection to RabbitMQ server')
- parser.add_argument('--server', required=True, help='Define RabbitMQ server')
- parser.add_argument('--virtual_host', default='/', help='Define virtual host')
- parser.add_argument('--ssl', action='store_true', help='Enable SSL (default: %(default)s)')
- parser.add_argument('--port', type=int, default=5672, help='Define port (default: %(default)s)')
- parser.add_argument('--username', default='guest', help='Define username (default: %(default)s)')
- parser.add_argument('--password', default='guest', help='Define password (default: %(default)s)')
- args = vars(parser.parse_args())
- # set amqp credentials
- credentials = pika.PlainCredentials(args['username'], args['password'])
- if args['ssl']:
- context = ssl.create_default_context()
- ssl_options = pika.SSLOptions(context, args['server'])
- else:
- ssl_options = None
- parameters = pika.ConnectionParameters(host=args['server'], port=args['port'], virtual_host=args['virtual_host'], credentials=credentials, ssl_options=ssl_options)
- # try to establish connection and check its status
- try:
- connection = pika.BlockingConnection(parameters)
- if connection.is_open:
- print('OK')
- connection.close()
- exit(0)
- except Exception as error:
- print('Error:', error.__class__.__name__)
- exit(1)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement