Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import txamqp
- from twisted.internet.protocol import ClientCreator
- from txamqp.protocol import AMQClient
- from twisted.internet import defer, reactor
- from txamqp.content import Content
- from txamqp.client import TwistedDelegate
- import time
- def receive(msg, cb, queue, channel):
- reactor.callInThread(cb, msg, channel)
- consume_queue(cb, queue, channel)
- def fast_process(msg, channel):
- print('Fast consumer received message: %s' % (msg.content.body, ))
- reactor.callFromThread(channel.basic_ack,delivery_tag=msg.delivery_tag)
- def slow_process(msg, channel):
- print('Slow consumer received message: %s' % (msg.content.body, ))
- time.sleep(5)
- reactor.callFromThread(channel.basic_ack,delivery_tag=msg.delivery_tag)
- def consume_queue(cb, queue, channel):
- queue.get().addCallback(receive, cb, queue, channel)
- def publish(channel):
- for i in range(10):
- msg = Content("Message %d" % i)
- reactor.callFromThread(channel.basic_publish, routing_key="queue", content=msg)
- @defer.inlineCallbacks
- def authenticate(client, username, password):
- yield client.authenticate(username, password)
- # open channel
- channel = yield client.channel(1)
- yield channel.channel_open()
- # set prefetch
- msg = yield channel.basic_qos(prefetch_count=1)
- # subscribe
- yield channel.queue_declare(queue='queue', auto_delete=True)
- reply = yield channel.basic_consume(queue='queue')
- queue = yield client.queue(reply.consumer_tag)
- consume_queue(fast_process, queue, channel)
- # open channel
- channel = yield client.channel(2)
- yield channel.channel_open()
- # set prefetch
- msg = yield channel.basic_qos(prefetch_count=1)
- # subscribe
- yield channel.queue_declare(queue='queue', auto_delete=True)
- reply = yield channel.basic_consume(queue='queue')
- queue = yield client.queue(reply.consumer_tag)
- consume_queue(slow_process, queue, channel)
- # publish
- reactor.callInThread(publish, channel)
- if __name__ == '__main__':
- import sys
- if len(sys.argv) != 7:
- print "%s host port vhost username password path_to_spec" % sys.argv[0]
- sys.exit(1)
- host = sys.argv[1]
- port = int(sys.argv[2])
- vhost = sys.argv[3]
- username = sys.argv[4]
- password = sys.argv[5]
- specFile = sys.argv[6]
- spec = txamqp.spec.load(specFile)
- delegate = TwistedDelegate()
- d = ClientCreator(reactor, AMQClient, delegate, vhost,
- spec).connectTCP(host, port)
- d.addCallback(authenticate, username, password)
- reactor.run()
Add Comment
Please, Sign In to add comment