SHOW:
|
|
- or go back to the newest paste.
1 | # This code results in only one element being consumed from the queue, do you know what's wrong? | |
2 | ||
3 | from kombu import Exchange, Queue, BrokerConnection, pools | |
4 | ||
5 | EXCHANGE_ID = 'queued-requests' | |
6 | EXCHANGE_TYPE = 'direct' | |
7 | QUEUE_ID = EXCHANGE_ID | |
8 | ROUTING_KEY = EXCHANGE_ID | |
9 | CONNECTION_URI = 'amqp://guest:guest@localhost//' | |
10 | SERIALIZED_FORMAT='pickle' | |
11 | ||
12 | DEFAULT_CONNECTION_POOL_SIZE = 16 | |
13 | ||
14 | exchange = Exchange(EXCHANGE_ID, EXCHANGE_TYPE, durable=True) | |
15 | queue = Queue(QUEUE_ID, exchange=exchange, routing_key=ROUTING_KEY) | |
16 | broker_connection = BrokerConnection(CONNECTION_URI) | |
17 | queue_connections = pools.Connections(limit=DEFAULT_CONNECTION_POOL_SIZE) | |
18 | ||
19 | ||
20 | def on_error(m, exc): | |
21 | print "Error decoding message: %s", exc | |
22 | ||
23 | def on_message(m): | |
24 | m.ack() | |
25 | print "Message received" | |
26 | ||
27 | while True: | |
28 | try: | |
29 | print "Getting into consumer loop" | |
30 | with queue_connections[broker_connection].acquire(block=True) as consumer_connection: | |
31 | with consumer_connection.Consumer(queue, on_message=on_message, | |
32 | on_decode_error=on_error, accept=[SERIALIZED_FORMAT]) as consumer: | |
33 | print "Calling drain events, connection", consumer_connection, "consumer", consumer | |
34 | consumer_connection.drain_events() | |
35 | except Exception, e: | |
36 | print "Error while attempting to consume from queue", e |