SHOW:
|
|
- or go back to the newest paste.
1 | # vim: set fileencoding=utf-8 sw=4 ts=4 et : | |
2 | ||
3 | import sys | |
4 | ||
5 | from twisted.internet.defer import inlineCallbacks, returnValue | |
6 | from twisted.internet import reactor, task | |
7 | from twisted.internet.protocol import ClientCreator | |
8 | ||
9 | from txamqp.protocol import AMQClient | |
10 | from txamqp.client import TwistedDelegate | |
11 | from txamqp.content import Content | |
12 | import txamqp.spec | |
13 | ||
14 | # Set this variable to 0 or None to make the problem disappear | |
15 | PREFETCH_COUNT = 2 | |
16 | # Or set this one to False to only receive messages | |
17 | SEND_MESSAGES = True | |
18 | ||
19 | ||
20 | - | # Set it to 0 or None to make the concurrency problem disappear |
20 | + | |
21 | - | PREFETCH_COUNT = 1 |
21 | + | |
22 | ||
23 | VHOST = "/" | |
24 | QUEUE_NAME = "connector-metro-localhost" | |
25 | CONSUMER_TAG = "test_consumer_tag" | |
26 | ||
27 | ||
28 | credentials = {"LOGIN": "guest", "PASSWORD": "guest"} | |
29 | ||
30 | ||
31 | @inlineCallbacks | |
32 | def getConnection(client): | |
33 | conn = yield client.connectTCP( | |
34 | RABBIT_MQ_HOST, RABBIT_MQ_PORT) | |
35 | yield conn.start(credentials) | |
36 | returnValue(conn) | |
37 | ||
38 | ||
39 | @inlineCallbacks | |
40 | def getChannel(conn): | |
41 | chan = yield conn.channel(1) | |
42 | yield chan.channel_open() | |
43 | returnValue(chan) | |
44 | ||
45 | ||
46 | @inlineCallbacks | |
47 | def getQueue(conn, chan): | |
48 | # get the message queue on the message server | |
49 | yield chan.queue_declare(queue=QUEUE_NAME, durable=True, | |
50 | exclusive=False, auto_delete=False) | |
51 | if PREFETCH_COUNT is not None: | |
52 | reply = yield chan.basic_qos(prefetch_count=PREFETCH_COUNT) | |
53 | reply = yield chan.basic_consume(queue=QUEUE_NAME, consumer_tag=QUEUE_NAME) | |
54 | # get the queue that's associated with our consumer | |
55 | queue = yield conn.queue(QUEUE_NAME) | |
56 | returnValue(queue) | |
57 | ||
58 | ||
59 | @inlineCallbacks | |
60 | def processMessage(chan, queue): | |
61 | print "Queue size: %d" % len(queue.pending) | |
62 | msg = yield queue.get() | |
63 | print "Received: %s from channel #%s (n°%s)" % ( | |
64 | msg.content.body, chan.id, msg.delivery_tag) | |
65 | yield chan.basic_ack(msg.delivery_tag) | |
66 | print "ACKed n°%s" % msg.delivery_tag | |
67 | ||
68 | @inlineCallbacks | |
69 | def sendMessage(chan): | |
70 | msg = Content("blabla") | |
71 | yield chan.basic_publish(exchange="perf", routing_key="perf", content=msg) | |
72 | print "Published on chan #%s" % chan.id | |
73 | ||
74 | ||
75 | @inlineCallbacks | |
76 | def main(spec): | |
77 | delegate = TwistedDelegate() | |
78 | # create the Twisted consumer client | |
79 | consumer = ClientCreator( | |
80 | reactor, AMQClient, delegate=delegate, | |
81 | vhost=VHOST, spec=spec) | |
82 | # connect to the RabbitMQ server | |
83 | conn = yield getConnection(consumer) | |
84 | # get the channel | |
85 | - | send = task.LoopingCall(sendMessage, chan) |
85 | + | |
86 | # get the message queue | |
87 | - | reactor.callLater(2, send.start, 0.5) |
87 | + | |
88 | # start the workers | |
89 | recv = task.LoopingCall(processMessage, chan, queue) | |
90 | recv.start(0.3) | |
91 | - | send.stop() |
91 | + | if SEND_MESSAGES: |
92 | reactor.callLater(2, sendMessage, chan) | |
93 | def stop(): | |
94 | print "Stopping" | |
95 | recv.stop() | |
96 | reactor.stop() | |
97 | reactor.callLater(15, stop) | |
98 | ||
99 | ||
100 | if __name__ == "__main__": | |
101 | if len(sys.argv) != 2: | |
102 | print "%s path_to_spec" % sys.argv[0] | |
103 | sys.exit(1) | |
104 | spec = txamqp.spec.load(sys.argv[1]) | |
105 | main(spec) | |
106 | reactor.run() |