Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # vim: set fileencoding=utf-8 sw=4 ts=4 et :
- import sys
- from twisted.internet.defer import inlineCallbacks, returnValue
- from twisted.internet import reactor, task
- from twisted.internet.protocol import ClientCreator
- from txamqp.protocol import AMQClient
- from txamqp.client import TwistedDelegate
- from txamqp.content import Content
- import txamqp.spec
- # Set this variable to 0 or None to make the problem disappear
- PREFETCH_COUNT = 2
- # Or set this one to False to only receive messages
- SEND_MESSAGES = True
- RABBIT_MQ_HOST = "localhost"
- RABBIT_MQ_PORT = 5672
- VHOST = "/"
- QUEUE_NAME = "connector-metro-localhost"
- CONSUMER_TAG = "test_consumer_tag"
- credentials = {"LOGIN": "guest", "PASSWORD": "guest"}
- @inlineCallbacks
- def getConnection(client):
- conn = yield client.connectTCP(
- RABBIT_MQ_HOST, RABBIT_MQ_PORT)
- yield conn.start(credentials)
- returnValue(conn)
- @inlineCallbacks
- def getChannel(conn):
- chan = yield conn.channel(1)
- yield chan.channel_open()
- returnValue(chan)
- @inlineCallbacks
- def getQueue(conn, chan):
- # get the message queue on the message server
- yield chan.queue_declare(queue=QUEUE_NAME, durable=True,
- exclusive=False, auto_delete=False)
- if PREFETCH_COUNT is not None:
- reply = yield chan.basic_qos(prefetch_count=PREFETCH_COUNT)
- reply = yield chan.basic_consume(queue=QUEUE_NAME, consumer_tag=QUEUE_NAME)
- # get the queue that's associated with our consumer
- queue = yield conn.queue(QUEUE_NAME)
- returnValue(queue)
- @inlineCallbacks
- def processMessage(chan, queue):
- print "Queue size: %d" % len(queue.pending)
- msg = yield queue.get()
- print "Received: %s from channel #%s (n°%s)" % (
- msg.content.body, chan.id, msg.delivery_tag)
- yield chan.basic_ack(msg.delivery_tag)
- print "ACKed n°%s" % msg.delivery_tag
- @inlineCallbacks
- def sendMessage(chan):
- msg = Content("blabla")
- yield chan.basic_publish(exchange="perf", routing_key="perf", content=msg)
- print "Published on chan #%s" % chan.id
- @inlineCallbacks
- def main(spec):
- delegate = TwistedDelegate()
- # create the Twisted consumer client
- consumer = ClientCreator(
- reactor, AMQClient, delegate=delegate,
- vhost=VHOST, spec=spec)
- # connect to the RabbitMQ server
- conn = yield getConnection(consumer)
- # get the channel
- chan = yield getChannel(conn)
- # get the message queue
- queue = yield getQueue(conn, chan)
- # start the workers
- recv = task.LoopingCall(processMessage, chan, queue)
- recv.start(0.3)
- if SEND_MESSAGES:
- reactor.callLater(2, sendMessage, chan)
- def stop():
- print "Stopping"
- recv.stop()
- reactor.stop()
- reactor.callLater(15, stop)
- if __name__ == "__main__":
- if len(sys.argv) != 2:
- print "%s path_to_spec" % sys.argv[0]
- sys.exit(1)
- spec = txamqp.spec.load(sys.argv[1])
- main(spec)
- reactor.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement