Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # This code results in only one element being consumed from the queue, do you know what's wrong?
- from kombu import Exchange, Queue, BrokerConnection, pools
- EXCHANGE_ID = 'queued-requests'
- EXCHANGE_TYPE = 'direct'
- QUEUE_ID = EXCHANGE_ID
- ROUTING_KEY = EXCHANGE_ID
- CONNECTION_URI = 'amqp://guest:guest@localhost//'
- SERIALIZED_FORMAT='pickle'
- DEFAULT_CONNECTION_POOL_SIZE = 16
- exchange = Exchange(EXCHANGE_ID, EXCHANGE_TYPE, durable=True)
- queue = Queue(QUEUE_ID, exchange=exchange, routing_key=ROUTING_KEY)
- broker_connection = BrokerConnection(CONNECTION_URI)
- queue_connections = pools.Connections(limit=DEFAULT_CONNECTION_POOL_SIZE)
- def on_error(m, exc):
- print "Error decoding message: %s", exc
- def on_message(m):
- m.ack()
- print "Message received"
- while True:
- try:
- print "Getting into consumer loop"
- with queue_connections[broker_connection].acquire(block=True) as consumer_connection:
- with consumer_connection.Consumer(queue, on_message=on_message,
- on_decode_error=on_error, accept=[SERIALIZED_FORMAT]) as consumer:
- print "Calling drain events, connection", consumer_connection, "consumer", consumer
- consumer_connection.drain_events()
- except Exception, e:
- print "Error while attempting to consume from queue", e
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement