View difference between Paste ID: eyWSwqkh and DX5KZNb4
SHOW: | | - or go back to the newest paste.
1
# vim: set fileencoding=utf-8 sw=4 ts=4 et :
2
3
import sys
4
5
from twisted.internet.defer import inlineCallbacks, returnValue
6
from twisted.internet import reactor, task
7
from twisted.internet.protocol import ClientCreator
8
9
from txamqp.protocol import AMQClient
10
from txamqp.client import TwistedDelegate
11
from txamqp.content import Content
12
import txamqp.spec
13
14
# Set this variable to 0 or None to make the problem disappear
15
PREFETCH_COUNT = 2
16
# Or set this one to False to only receive messages
17
SEND_MESSAGES = True
18
19
20-
# Set it to 0 or None to make the concurrency problem disappear
20+
21-
PREFETCH_COUNT = 1
21+
22
23
VHOST = "/"
24
QUEUE_NAME = "connector-metro-localhost"
25
CONSUMER_TAG = "test_consumer_tag"
26
27
28
credentials = {"LOGIN": "guest", "PASSWORD": "guest"}
29
30
31
@inlineCallbacks
32
def getConnection(client):
33
    conn = yield client.connectTCP(
34
        RABBIT_MQ_HOST, RABBIT_MQ_PORT)
35
    yield conn.start(credentials)
36
    returnValue(conn)
37
38
39
@inlineCallbacks
40
def getChannel(conn):
41
    chan = yield conn.channel(1)
42
    yield chan.channel_open()
43
    returnValue(chan)
44
45
46
@inlineCallbacks
47
def getQueue(conn, chan):
48
    # get the message queue on the message server
49
    yield chan.queue_declare(queue=QUEUE_NAME, durable=True,
50
                             exclusive=False, auto_delete=False)
51
    if PREFETCH_COUNT is not None:
52
        reply = yield chan.basic_qos(prefetch_count=PREFETCH_COUNT)
53
    reply = yield chan.basic_consume(queue=QUEUE_NAME, consumer_tag=QUEUE_NAME)
54
    # get the queue that's associated with our consumer
55
    queue = yield conn.queue(QUEUE_NAME)
56
    returnValue(queue)
57
58
59
@inlineCallbacks
60
def processMessage(chan, queue):
61
    print "Queue size: %d" % len(queue.pending)
62
    msg = yield queue.get()
63
    print "Received: %s from channel #%s (n°%s)" % (
64
        msg.content.body, chan.id, msg.delivery_tag)
65
    yield chan.basic_ack(msg.delivery_tag)
66
    print "ACKed n°%s" % msg.delivery_tag
67
68
@inlineCallbacks
69
def sendMessage(chan):
70
    msg = Content("blabla")
71
    yield chan.basic_publish(exchange="perf", routing_key="perf", content=msg)
72
    print "Published on chan #%s" % chan.id
73
74
75
@inlineCallbacks
76
def main(spec):
77
    delegate = TwistedDelegate()
78
    # create the Twisted consumer client
79
    consumer = ClientCreator(
80
        reactor, AMQClient, delegate=delegate,
81
        vhost=VHOST, spec=spec)
82
    # connect to the RabbitMQ server
83
    conn = yield getConnection(consumer)
84
    # get the channel
85-
    send = task.LoopingCall(sendMessage, chan)
85+
86
    # get the message queue
87-
    reactor.callLater(2, send.start, 0.5)
87+
88
    # start the workers
89
    recv = task.LoopingCall(processMessage, chan, queue)
90
    recv.start(0.3)
91-
        send.stop()
91+
    if SEND_MESSAGES:
92
        reactor.callLater(2, sendMessage, chan)
93
    def stop():
94
        print "Stopping"
95
        recv.stop()
96
        reactor.stop()
97
    reactor.callLater(15, stop)
98
99
100
if __name__ == "__main__":
101
    if len(sys.argv) != 2:
102
        print "%s path_to_spec" % sys.argv[0]
103
        sys.exit(1)
104
    spec = txamqp.spec.load(sys.argv[1])
105
    main(spec)
106
    reactor.run()