View difference between Paste ID: 9hbAxk9r and 2XdBeapT
SHOW: | | - or go back to the newest paste.
1
# This code results in only one element being consumed from the queue, do you know what's wrong?
2
3
from kombu import  Exchange, Queue, BrokerConnection, pools
4
5
EXCHANGE_ID = 'queued-requests'
6
EXCHANGE_TYPE = 'direct'
7
QUEUE_ID = EXCHANGE_ID
8
ROUTING_KEY = EXCHANGE_ID
9
CONNECTION_URI = 'amqp://guest:guest@localhost//'
10
SERIALIZED_FORMAT='pickle'
11
12
DEFAULT_CONNECTION_POOL_SIZE = 16
13
14
exchange = Exchange(EXCHANGE_ID, EXCHANGE_TYPE, durable=True)
15
queue = Queue(QUEUE_ID, exchange=exchange, routing_key=ROUTING_KEY)
16
broker_connection = BrokerConnection(CONNECTION_URI)
17
queue_connections = pools.Connections(limit=DEFAULT_CONNECTION_POOL_SIZE)
18
19
20
def on_error(m, exc):
21
    print "Error decoding message: %s", exc
22
23
def on_message(m):
24
    m.ack()
25
    print "Message received"
26
27
while True:
28
    try:
29
        print "Getting into consumer loop"
30
        with queue_connections[broker_connection].acquire(block=True) as consumer_connection:
31
            with consumer_connection.Consumer(queue, on_message=on_message,
32
                                              on_decode_error=on_error, accept=[SERIALIZED_FORMAT]) as consumer:
33
                print "Calling drain events, connection", consumer_connection,  "consumer",  consumer
34
                consumer_connection.drain_events()
35
    except Exception, e:
36
            print "Error while attempting to consume from queue", e