Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import logging
- import pika
- import time
- LOG_FORMAT = ('%(levelname) -1s %(asctime)s %(name) -1s %(funcName) '
- '-1s %(lineno) -5d: %(message)s')
- LOGGER = logging.getLogger(__name__)
- user="user"
- password="password"
- host="localhost"
- queue="test"
- class amqpConsumer(object):
- def __init__(self):
- """Create a new instance of the consumer class, passing in the AMQP
- URL used to connect to RabbitMQ.
- :param str amqp_url: The AMQP url to connect with
- """
- self._connection = None
- self._channel = None
- self._closing = False
- self._consumer_tag = None
- def connect(self):
- """This method connects to RabbitMQ, returning the connection handle.
- When the connection is established, the on_connection_open method
- will be invoked by pika.
- :rtype: pika.SelectConnection
- """
- LOGGER.info('Connecting to %s', host)
- credentials=pika.PlainCredentials(user, password)
- return pika.SelectConnection(pika.ConnectionParameters(host=host,credentials=credentials,heartbeat_interval=0),
- self.on_connection_open,
- stop_ioloop_on_close=False)
- def on_connection_open(self, unused_connection):
- """This method is called by pika once the connection to RabbitMQ has
- been established. It passes the handle to the connection object in
- case we need it, but in this case, we'll just mark it unused.
- :type unused_connection: pika.SelectConnection
- """
- LOGGER.info('Connection opened')
- self.add_on_connection_close_callback()
- self.open_channel()
- def add_on_connection_close_callback(self):
- """This method adds an on close callback that will be invoked by pika
- when RabbitMQ closes the connection to the publisher unexpectedly.
- """
- LOGGER.info('Adding connection close callback')
- self._connection.add_on_close_callback(self.on_connection_closed)
- def on_connection_closed(self, connection, reply_code, reply_text):
- """This method is invoked by pika when the connection to RabbitMQ is
- closed unexpectedly. Since it is unexpected, we will reconnect to
- RabbitMQ if it disconnects.
- :param pika.connection.Connection connection: The closed connection obj
- :param int reply_code: The server provided reply_code if given
- :param str reply_text: The server provided reply_text if given
- """
- self._channel = None
- if self._closing:
- self._connection.ioloop.stop()
- else:
- LOGGER.warning('Connection closed, reopening in 5 seconds: (%s) %s',
- reply_code, reply_text)
- self._connection.add_timeout(5, self.reconnect)
- def reconnect(self):
- """Will be invoked by the IOLoop timer if the connection is
- closed. See the on_connection_closed method.
- """
- # This is the old connection IOLoop instance, stop its ioloop
- self._connection.ioloop.stop()
- if not self._closing:
- # Create a new connection
- self._connection = self.connect()
- # There is now a new connection, needs a new ioloop to run
- self._connection.ioloop.start()
- def open_channel(self):
- """Open a new channel with RabbitMQ by issuing the Channel.Open RPC
- command. When RabbitMQ responds that the channel is open, the
- on_channel_open callback will be invoked by pika.
- """
- LOGGER.info('Creating a new channel')
- self._connection.channel(on_open_callback=self.on_channel_open)
- def on_channel_open(self, channel):
- """This method is invoked by pika when the channel has been opened.
- The channel object is passed in so we can make use of it.
- Since the channel is now open, we'll declare the exchange to use.
- :param pika.channel.Channel channel: The channel object
- """
- LOGGER.info('Channel opened')
- self._channel = channel
- self.add_on_channel_close_callback()
- self.start_consuming()
- def add_on_channel_close_callback(self):
- """This method tells pika to call the on_channel_closed method if
- RabbitMQ unexpectedly closes the channel.
- """
- LOGGER.info('Adding channel close callback')
- self._channel.add_on_close_callback(self.on_channel_closed)
- def on_channel_closed(self, channel, reply_code, reply_text):
- """Invoked by pika when RabbitMQ unexpectedly closes the channel.
- Channels are usually closed if you attempt to do something that
- violates the protocol, such as re-declare an exchange or queue with
- different parameters. In this case, we'll close the connection
- to shutdown the object.
- :param pika.channel.Channel: The closed channel
- :param int reply_code: The numeric reason the channel was closed
- :param str reply_text: The text reason the channel was closed
- """
- LOGGER.warning('Channel %i was closed: (%s) %s',
- channel, reply_code, reply_text)
- self._connection.close()
- def start_consuming(self):
- """This method sets up the consumer by first calling
- add_on_cancel_callback so that the object is notified if RabbitMQ
- cancels the consumer. It then issues the Basic.Consume RPC command
- which returns the consumer tag that is used to uniquely identify the
- consumer with RabbitMQ. We keep the value to use it when we want to
- cancel consuming. The on_message method is passed in as a callback pika
- will invoke when a message is fully received.
- """
- LOGGER.info('Issuing consumer related RPC commands')
- self.add_on_cancel_callback()
- self._channel.basic_qos(prefetch_count=1)
- self._consumer_tag = self._channel.basic_consume(self.on_message,queue)
- def add_on_cancel_callback(self):
- """Add a callback that will be invoked if RabbitMQ cancels the consumer
- for some reason. If RabbitMQ does cancel the consumer,
- on_consumer_cancelled will be invoked by pika.
- """
- LOGGER.info('Adding consumer cancellation callback')
- self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
- def on_consumer_cancelled(self, method_frame):
- """Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer
- receiving messages.
- :param pika.frame.Method method_frame: The Basic.Cancel frame
- """
- LOGGER.info('Consumer was cancelled remotely, shutting down: %r',
- method_frame)
- if self._channel:
- self._channel.close()
- def on_message(self, unused_channel, basic_deliver, properties, body):
- """Invoked by pika when a message is delivered from RabbitMQ. The
- channel is passed for your convenience. The basic_deliver object that
- is passed in carries the exchange, routing key, delivery tag and
- a redelivered flag for the message. The properties passed in is an
- instance of BasicProperties with the message properties and the body
- is the message that was sent.
- :param pika.channel.Channel unused_channel: The channel object
- :param pika.Spec.Basic.Deliver: basic_deliver method
- :param pika.Spec.BasicProperties: properties
- :param str|unicode body: The message body
- """
- LOGGER.info('Received message # %s from %s',
- basic_deliver.delivery_tag, properties.app_id)
- time.sleep(10*60)
- self.acknowledge_message(basic_deliver.delivery_tag)
- def not_acknowledge_message(self, delivery_tag):
- """Not Acknowledge the message delivery from RabbitMQ by sending a
- Basic.Nack RPC method for the delivery tag.
- :param int delivery_tag: The delivery tag from the Basic.Deliver frame
- """
- LOGGER.info('Not acknowledging message %s', delivery_tag)
- self._channel.basic_nack(delivery_tag)
- def acknowledge_message(self, delivery_tag):
- """Acknowledge the message delivery from RabbitMQ by sending a
- Basic.Ack RPC method for the delivery tag.
- :param int delivery_tag: The delivery tag from the Basic.Deliver frame
- """
- LOGGER.info('Acknowledging message %s', delivery_tag)
- self._channel.basic_ack(delivery_tag)
- def stop_consuming(self):
- """Tell RabbitMQ that you would like to stop consuming by sending the
- Basic.Cancel RPC command.
- """
- if self._channel:
- LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ')
- self._channel.basic_cancel(self.on_cancelok, self._consumer_tag,nowait=False)
- def on_cancelok(self, unused_frame):
- """This method is invoked by pika when RabbitMQ acknowledges the
- cancellation of a consumer. At this point we will close the channel.
- This will invoke the on_channel_closed method once the channel has been
- closed, which will in-turn close the connection.
- :param pika.frame.Method unused_frame: The Basic.CancelOk frame
- """
- LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer')
- self.close_channel()
- def close_channel(self):
- """Call to close the channel with RabbitMQ cleanly by issuing the
- Channel.Close RPC command.
- """
- LOGGER.info('Closing the channel')
- self._channel.close()
- def run(self):
- """Run the example consumer by connecting to RabbitMQ and then
- starting the IOLoop to block and allow the SelectConnection to operate.
- """
- self._connection = self.connect()
- self._connection.ioloop.start()
- def stop(self):
- """Cleanly shutdown the connection to RabbitMQ by stopping the consumer
- with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
- will be invoked by pika, which will then closing the channel and
- connection. The IOLoop is started again because this method is invoked
- when CTRL-C is pressed raising a KeyboardInterrupt exception. This
- exception stops the IOLoop which needs to be running for pika to
- communicate with RabbitMQ. All of the commands issued prior to starting
- the IOLoop will be buffered but not processed.
- """
- LOGGER.info('Stopping')
- self._closing = True
- self.stop_consuming()
- self.close_connection()
- #self._connection.ioloop.start()
- LOGGER.info('Stopped')
- def close_connection(self):
- """This method closes the connection to RabbitMQ."""
- LOGGER.info('Closing connection')
- self._connection.close()
- def main():
- print ' [*] Waiting for messages. To exit press CTRL+C'
- logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
- consumer = amqpConsumer()
- try:
- consumer.run()
- except KeyboardInterrupt:
- consumer.stop()
- if __name__ == '__main__':
- main() # (this code was run as script)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement