Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from collections import namedtuple
- import pika
- Message = namedtuple('Message', ['method', 'properties', 'body'])
- class MQClient(object):
- def __init__(self, host, user, password, vhost, port=5672, exchange='', conn_arguments=None):
- """Create a new MQSender instance to push events into RabbitMQ
- :param str host: Host Name or IP Address
- :param str user: User Name for authentication
- :param str password: User Password for authentication
- :param str vhost: Virtual Host Name to use
- :param int port: TCP port to connect
- :param str exchange: Exchange Name for declaration or message publishing
- :param dict conn_arguments: The rest of properties from pika.connection.ConnectionParameters
- to use for RabbitMQ connection
- """
- self.host = host
- self.user = user
- self.password = password
- self.vhost = vhost
- self.port = port
- self.exchange = exchange
- self._conn_arguments = conn_arguments
- self._channel = None
- @property
- def channel(self):
- if not self._channel:
- conn_params = {
- 'host': self.host,
- 'port': self.port,
- 'virtual_host': self.vhost,
- 'credentials': pika.PlainCredentials(self.user, self.password)
- }
- conn_params.update(self._conn_arguments or {})
- conn = pika.BlockingConnection(pika.ConnectionParameters(**conn_params))
- self._channel = conn.channel()
- return self._channel
- def publish(self, exchange, routing_key, msg_body, properties=None):
- self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=msg_body,
- properties=properties)
- def get_message(self, queue, consume=False):
- return Message(*self.channel.basic_get(queue=queue, no_ack=consume))
- def get_all_messages(self, queue, consume=False):
- messages = []
- while True:
- m = self.get_message(queue, consume)
- if not m.method:
- break
- messages.append(m)
- return messages
- def ack_message(self, delivery_tag):
- self.channel.basic_ack(delivery_tag)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement