Guest User

Untitled

a guest
Jul 20th, 2018
95
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.75 KB | None | 0 0
  1. # -*- encoding:utf-8 -*-
  2. from __future__ import unicode_literals
  3. import unittest
  4. import uuid
  5. import time
  6. from rabbitmq.rabbitmq_rpc import AsyncRabbitMQ
  7. from rabbitmq.rabbitmq_util import make_properties
  8. from tornado.gen import coroutine,Return, sleep
  9. from tornado.testing import AsyncTestCase, gen_test
  10. from tornado.queues import Queue
  11. from concurrent.futures import ThreadPoolExecutor
  12.  
  13. EXECUTOR = ThreadPoolExecutor(max_workers=4)
  14.  
  15.  
  16. class TestAsyncRabbitMQPublish(AsyncTestCase):
  17. def setUp(self):
  18. super(TestAsyncRabbitMQPublish, self).setUp()
  19. self._url = '127.0.0.1'
  20. self._client = AsyncRabbitMQ(self._url, io_loop=self.io_loop)
  21. self._exchange = "test_asyncrabbitmq_exchange"
  22. self._queue_name = "test_asyncrabbitmq_queue"
  23. self._result_queue = Queue(maxsize=10)
  24. self._fib_queue = Queue(maxsize=1)
  25.  
  26. def _processMessage(self, channel, method, props, body):
  27. self._result_queue.put(body)
  28. channel.basic_ack(delivery_tag=method.delivery_tag)
  29. raise Return(True)
  30.  
  31. @gen_test(timeout=10)
  32. def test_publish(self):
  33. yield self._client.wait_connected()
  34. yield self._client.consume(self._exchange, self._queue_name,"dog.*", self._processMessage)
  35. yield self._client.publish(self._exchange, "dog.yellow", "A yellow dog")
  36. actual = yield self._result_queue.get()
  37. self.assertEqual(actual, "A yellow dog")
  38.  
  39. @gen_test(timeout=10)
  40. def test_concurrent_publish(self):
  41. yield self._client.wait_connected()
  42. yield self._client.consume(self._exchange, self._queue_name, "dog.*", self._processMessage)
  43. yield [
  44. self._client.publish(self._exchange, "dog.yellow", "a yellow dog"),
  45. self._client.publish(self._exchange, "dog.red", "a red dog"),
  46. self._client.publish(self._exchange, "dog.blue", "a blue dog"),
  47. self._client.publish(self._exchange, "dog.green", "a green dog"),
  48. self._client.publish(self._exchange, "cat.yellow", "a yellow cat"), # this message will be discarded
  49. self._client.publish(self._exchange, "dog.colorful", "a colorful dog"),
  50. ]
  51. result_set = set(["a yellow dog", "a red dog", "a blue dog",
  52. "a green dog", "a colorful dog"])
  53. for _ in range(5):
  54. actual = yield self._result_queue.get()
  55. self.assertTrue(actual in result_set)
  56.  
  57. @coroutine
  58. def _process(self, channel, method, props, body):
  59. n = int(body)
  60. result = self._fib(n)
  61. if props is not None:
  62. channel.basic_publish(exchange=self._exchange,
  63. routing_key=props.reply_to,
  64. properties=make_properties(correlation_id=props.correlation_id),
  65. body=str(result))
  66. channel.basic_ack(delivery_tag=method.delivery_tag)
  67. raise Return(True)
  68.  
  69. def _fib(self, n):
  70. if n < 2:
  71. return n
  72. else:
  73. return self._fib(n - 1) + self._fib(n - 2)
  74.  
  75. @coroutine
  76. def _fib_back(self, channel, method, props, body):
  77. self._fib_queue.put(body)
  78. channel.basic_ack(delivery_tag=method.delivery_tag)
  79. raise Return(True)
  80.  
  81. @gen_test(timeout=10)
  82. def test_publish_with_reply(self):
  83. fib_back_queue = "fibnacci_call_back"
  84. corr_id = str(uuid.uuid4())
  85. yield self._client.wait_connected()
  86. yield self._client.consume(self._exchange, self._queue_name, "fib.*", self._process)
  87. yield self._client.consume(self._exchange, fib_back_queue, fib_back_queue, self._fib_back)
  88. yield self._client.publish(self._exchange, "fib.call", "10", properties=make_properties(
  89. correlation_id=corr_id, reply_to=fib_back_queue))
  90. actual = yield self._fib_queue.get()
  91. expect = str(self._fib(10))
  92. self.assertEqual(actual, expect)
  93.  
  94.  
  95. class TestAsyncRabbitMQCall(AsyncTestCase):
  96. def setUp(self):
  97. super(TestAsyncRabbitMQCall, self).setUp()
  98. self._url = '127.0.0.1'
  99. self._client = AsyncRabbitMQ(self._url, io_loop=self.io_loop)
  100. self._client = AsyncRabbitMQ(self._url, io_loop=self.io_loop)
  101. self._exchange = "test_asyncrabbitmq_exchange"
  102. self._queue_name = "test_asyncrabbitmq_queue"
  103. self._client = AsyncRabbitMQ(self._url, io_loop=self.io_loop)
  104.  
  105. @coroutine
  106. def fib(self, body):
  107. n = int(body)
  108. result = yield EXECUTOR.submit(self._fib, *(n,))
  109. raise Return(str(result))
  110.  
  111. def _fib(self, n):
  112. if n < 2:
  113. return n
  114. else:
  115. return self._fib(n - 1) + self._fib(n - 2)
  116.  
  117. @gen_test(timeout=10)
  118. def test_call(self):
  119. yield self._client.wait_connected()
  120. yield self._client.service(self._exchange, self._queue_name, "fib.*", self.fib)
  121. values = [5, 10, 8, 9, 10, 23, 12]
  122. got_values = yield [self._client.call(self._exchange, "fib.call", str(value), "fib_call_back_queue")
  123. for value in values]
  124. for expect, actual in zip(values, got_values):
  125. self.assertEqual(str(self._fib(expect)), actual)
  126.  
  127. @coroutine
  128. def fib_timeout(self, body):
  129. result = yield EXECUTOR.submit(self._fib_timeout)
  130. raise Return(str(result))
  131.  
  132. @staticmethod
  133. def _fib_timeout():
  134. time.sleep(2)
  135. return "Task done"
  136.  
  137. @gen_test(timeout=10)
  138. def test_call_timeout(self):
  139. yield self._client.wait_connected()
  140. yield self._client.service(self._exchange, self._queue_name, "fibtimtout.*", self.fib_timeout)
  141. value = yield self._client.call(self._exchange, "fibtimtout.call", "message", "fib_call_back_queue_timeout", timeout=1)
  142. self.assertIsNone(value)
  143. EXECUTOR.shutdown(True)
  144.  
  145.  
  146. if __name__ == "__main__":
  147. unittest.main()
Add Comment
Please, Sign In to add comment