Advertisement
Guest User

Consumer using connection pools which only pops one element

a guest
Feb 13th, 2014
48
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. # This code results in only one element being consumed from the queue, do you know what's wrong?
  2.  
  3. from kombu import  Exchange, Queue, BrokerConnection, pools
  4.  
  5. EXCHANGE_ID = 'queued-requests'
  6. EXCHANGE_TYPE = 'direct'
  7. QUEUE_ID = EXCHANGE_ID
  8. ROUTING_KEY = EXCHANGE_ID
  9. CONNECTION_URI = 'amqp://guest:guest@localhost//'
  10. SERIALIZED_FORMAT='pickle'
  11.  
  12. DEFAULT_CONNECTION_POOL_SIZE = 16
  13.  
  14. exchange = Exchange(EXCHANGE_ID, EXCHANGE_TYPE, durable=True)
  15. queue = Queue(QUEUE_ID, exchange=exchange, routing_key=ROUTING_KEY)
  16. broker_connection = BrokerConnection(CONNECTION_URI)
  17. queue_connections = pools.Connections(limit=DEFAULT_CONNECTION_POOL_SIZE)
  18.  
  19.  
  20. def on_error(m, exc):
  21.     print "Error decoding message: %s", exc
  22.  
  23. def on_message(m):
  24.     m.ack()
  25.     print "Message received"
  26.  
  27. while True:
  28.     try:
  29.         print "Getting into consumer loop"
  30.         with queue_connections[broker_connection].acquire(block=True) as consumer_connection:
  31.             with consumer_connection.Consumer(queue, on_message=on_message,
  32.                                               on_decode_error=on_error, accept=[SERIALIZED_FORMAT]) as consumer:
  33.                 print "Calling drain events, connection", consumer_connection,  "consumer",  consumer
  34.                 consumer_connection.drain_events()
  35.     except Exception, e:
  36.             print "Error while attempting to consume from queue", e
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement