Advertisement
Guest User

Untitled

a guest
Feb 6th, 2012
90
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. # vim: set fileencoding=utf-8 sw=4 ts=4 et :
  2.  
  3. import sys
  4.  
  5. from twisted.internet.defer import inlineCallbacks, returnValue
  6. from twisted.internet import reactor, task
  7. from twisted.internet.protocol import ClientCreator
  8.  
  9. from txamqp.protocol import AMQClient
  10. from txamqp.client import TwistedDelegate
  11. from txamqp.content import Content
  12. import txamqp.spec
  13.  
  14. # Set this variable to 0 or None to make the problem disappear
  15. PREFETCH_COUNT = 2
  16. # Or set this one to False to only receive messages
  17. SEND_MESSAGES = True
  18.  
  19.  
  20. RABBIT_MQ_HOST = "localhost"
  21. RABBIT_MQ_PORT = 5672
  22.  
  23. VHOST = "/"
  24. QUEUE_NAME = "connector-metro-localhost"
  25. CONSUMER_TAG = "test_consumer_tag"
  26.  
  27.  
  28. credentials = {"LOGIN": "guest", "PASSWORD": "guest"}
  29.  
  30.  
  31. @inlineCallbacks
  32. def getConnection(client):
  33.     conn = yield client.connectTCP(
  34.         RABBIT_MQ_HOST, RABBIT_MQ_PORT)
  35.     yield conn.start(credentials)
  36.     returnValue(conn)
  37.  
  38.  
  39. @inlineCallbacks
  40. def getChannel(conn):
  41.     chan = yield conn.channel(1)
  42.     yield chan.channel_open()
  43.     returnValue(chan)
  44.  
  45.  
  46. @inlineCallbacks
  47. def getQueue(conn, chan):
  48.     # get the message queue on the message server
  49.     yield chan.queue_declare(queue=QUEUE_NAME, durable=True,
  50.                              exclusive=False, auto_delete=False)
  51.     if PREFETCH_COUNT is not None:
  52.         reply = yield chan.basic_qos(prefetch_count=PREFETCH_COUNT)
  53.     reply = yield chan.basic_consume(queue=QUEUE_NAME, consumer_tag=QUEUE_NAME)
  54.     # get the queue that's associated with our consumer
  55.     queue = yield conn.queue(QUEUE_NAME)
  56.     returnValue(queue)
  57.  
  58.  
  59. @inlineCallbacks
  60. def processMessage(chan, queue):
  61.     print "Queue size: %d" % len(queue.pending)
  62.     msg = yield queue.get()
  63.     print "Received: %s from channel #%s (n°%s)" % (
  64.         msg.content.body, chan.id, msg.delivery_tag)
  65.     yield chan.basic_ack(msg.delivery_tag)
  66.     print "ACKed n°%s" % msg.delivery_tag
  67.  
  68. @inlineCallbacks
  69. def sendMessage(chan):
  70.     msg = Content("blabla")
  71.     yield chan.basic_publish(exchange="perf", routing_key="perf", content=msg)
  72.     print "Published on chan #%s" % chan.id
  73.  
  74.  
  75. @inlineCallbacks
  76. def main(spec):
  77.     delegate = TwistedDelegate()
  78.     # create the Twisted consumer client
  79.     consumer = ClientCreator(
  80.         reactor, AMQClient, delegate=delegate,
  81.         vhost=VHOST, spec=spec)
  82.     # connect to the RabbitMQ server
  83.     conn = yield getConnection(consumer)
  84.     # get the channel
  85.     chan = yield getChannel(conn)
  86.     # get the message queue
  87.     queue = yield getQueue(conn, chan)
  88.     # start the workers
  89.     recv = task.LoopingCall(processMessage, chan, queue)
  90.     recv.start(0.3)
  91.     if SEND_MESSAGES:
  92.         reactor.callLater(2, sendMessage, chan)
  93.     def stop():
  94.         print "Stopping"
  95.         recv.stop()
  96.         reactor.stop()
  97.     reactor.callLater(15, stop)
  98.  
  99.  
  100. if __name__ == "__main__":
  101.     if len(sys.argv) != 2:
  102.         print "%s path_to_spec" % sys.argv[0]
  103.         sys.exit(1)
  104.     spec = txamqp.spec.load(sys.argv[1])
  105.     main(spec)
  106.     reactor.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement