Advertisement
Guest User

Untitled

a guest
May 22nd, 2018
129
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.29 KB | None | 0 0
  1. from collections import namedtuple
  2.  
  3. import pika
  4.  
  5.  
  6. Message = namedtuple('Message', ['method', 'properties', 'body'])
  7.  
  8.  
  9. class MQClient(object):
  10.  
  11.     def __init__(self, host, user, password, vhost, port=5672, exchange='', conn_arguments=None):
  12.         """Create a new MQSender instance to push events into RabbitMQ
  13.  
  14.        :param str host: Host Name or IP Address
  15.        :param str user: User Name for authentication
  16.        :param str password: User Password for authentication
  17.        :param str vhost: Virtual Host Name to use
  18.        :param int port: TCP port to connect
  19.        :param str exchange: Exchange Name for declaration or message publishing
  20.        :param dict conn_arguments: The rest of properties from pika.connection.ConnectionParameters
  21.                                    to use for RabbitMQ connection
  22.        """
  23.         self.host = host
  24.         self.user = user
  25.         self.password = password
  26.         self.vhost = vhost
  27.         self.port = port
  28.         self.exchange = exchange
  29.         self._conn_arguments = conn_arguments
  30.         self._channel = None
  31.  
  32.     @property
  33.     def channel(self):
  34.         if not self._channel:
  35.             conn_params = {
  36.                 'host': self.host,
  37.                 'port': self.port,
  38.                 'virtual_host': self.vhost,
  39.                 'credentials': pika.PlainCredentials(self.user, self.password)
  40.             }
  41.             conn_params.update(self._conn_arguments or {})
  42.             conn = pika.BlockingConnection(pika.ConnectionParameters(**conn_params))
  43.             self._channel = conn.channel()
  44.         return self._channel
  45.  
  46.     def publish(self, exchange, routing_key, msg_body, properties=None):
  47.         self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=msg_body,
  48.                                    properties=properties)
  49.  
  50.     def get_message(self, queue, consume=False):
  51.         return Message(*self.channel.basic_get(queue=queue, no_ack=consume))
  52.  
  53.     def get_all_messages(self, queue, consume=False):
  54.         messages = []
  55.         while True:
  56.             m = self.get_message(queue, consume)
  57.             if not m.method:
  58.                 break
  59.             messages.append(m)
  60.         return messages
  61.  
  62.     def ack_message(self, delivery_tag):
  63.         self.channel.basic_ack(delivery_tag)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement