Guest User

Untitled

a guest
Jul 16th, 2018
154
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 17.44 KB | None | 0 0
  1. # -*- encoding:utf-8 -*-
  2. from __future__ import unicode_literals
  3. import logging
  4. import uuid
  5. import re
  6. import datetime
  7. import functools
  8. import pika
  9. from tornado.ioloop import IOLoop
  10. from tornado.queues import Queue
  11. from tornado.gen import Return, coroutine
  12.  
  13. __author__ = ["feng.gao@aispeech.com"]
  14.  
  15.  
  16. class AMQPRpcObject(object):
  17. EXCHANGE_TYPE = "topic"
  18. LOCALHOST = "127.0.0.1"
  19.  
  20. @classmethod
  21. def _get_log(cls, *names):
  22. return logging.getLogger(".".join((cls.__module__, cls.__name__) + names))
  23.  
  24. def __init__(self, amqp_url):
  25. """
  26. initialize an AMQPRpcObject instance
  27. :param amqp_url: amqp url, it can be either 'amqp://dev:aispeech2018@10.12.7.22:5672/' or "127.0.0.1"
  28. """
  29. self._parameter = pika.ConnectionParameters(amqp_url) if amqp_url == self.LOCALHOST else \
  30. pika.URLParameters(amqp_url)
  31.  
  32.  
  33. class AsyncRabbitMQ(AMQPRpcObject):
  34. """
  35. It is an `Everything-in-One` RabbitMQ client, including features as follows:
  36. - producer
  37. - consumer
  38. - rpc client
  39. - rpc server
  40. All of above clients share the only one connection.
  41. """
  42. def __init__(self, amqp_url, io_loop=None):
  43. """
  44. Initialize a AsyncRabbitMQ instance
  45. :param amqp_url: amqp_url: amqp url, it can be either 'amqp://dev:aispeech2018@10.12.7.22:5672/' or "127.0.0.1"
  46. :param io_loop: io_loop, the default is tornado.ioloop.IOLoop.current()
  47. """
  48. super(AsyncRabbitMQ, self).__init__(amqp_url)
  49. if io_loop is None:
  50. io_loop = IOLoop.current()
  51. self._io_loop = io_loop
  52. self._connection = None
  53. self._channel = None
  54. self._channel_queue = Queue(maxsize=1)
  55. self._exchange_declare_dict = dict()
  56. self._queue_declare_dict = dict()
  57. self._queue_bind_dict = dict()
  58. self._consumer_routing_key_handlers_dict = dict()
  59. self._service_routing_key_handlers_dict = dict()
  60. self._reply_queue_dict = dict()
  61.  
  62. def _connect(self):
  63. pika.TornadoConnection(parameters=self._parameter,
  64. on_open_callback=self._on_open_connection,
  65. on_open_error_callback=self._on_open_connection_error,
  66. on_close_callback=self._on_close_connection,
  67. custom_ioloop=self._io_loop)
  68.  
  69. def _on_open_connection(self, conn):
  70. log = self._get_log("_on_open_connection")
  71. self._connection = conn
  72. log.info("initializing connection")
  73. self._connection.channel(self._on_open_channel)
  74.  
  75. def _on_open_channel(self, channel):
  76. log = self._get_log("_on_open_channel")
  77. self._channel = channel
  78. log.info("initializing channel")
  79. self._channel_queue.put(True)
  80.  
  81. def _on_close_connection(self, connection, reason_code, reason_tex):
  82. log = self._get_log("_on_close_connection")
  83. log.info("close connection. reason code %s, reason text %s" % (reason_code, reason_tex))
  84.  
  85. def _on_open_connection_error(self, error):
  86. log = self._get_log("_on_open_connection_error")
  87. if isinstance(error, str):
  88. log.error("error: %s" % (error,))
  89. else:
  90. log.error("exception: %s" % (error,))
  91.  
  92. def _on_exchange_declare(self, exchange_name, passive=True):
  93. log = self._get_log("_on_exchange_declare")
  94. try:
  95. self._channel.exchange_declare(callback=self._on_exchange_declare_ok,
  96. exchange=exchange_name,
  97. exchange_type=self.EXCHANGE_TYPE,
  98. passive=passive,
  99. auto_delete=True)
  100. self._exchange_declare_dict[exchange_name].put(True)
  101. log.info("exchange %s has been declared" % exchange_name)
  102. except Exception as e:
  103. log.error("error with exchange declaring %s" % e)
  104. raise e
  105.  
  106. def _on_exchange_declare_ok(self, unframe):
  107. log = self._get_log("_on_exchange_declare_ok")
  108. log.info("exchange declare ok")
  109. pass
  110.  
  111. @coroutine
  112. def publish(self, exchange_name, routing_key, message, properties=None):
  113. """
  114. publisher client for rabbitmq.
  115. :param exchange_name: exchange name
  116. :param routing_key: routing key
  117. :param message: message
  118. :param properties: properties for publish
  119. :return:
  120. """
  121. log = self._get_log("publish")
  122. if self._channel is None:
  123. log.info("publish start connect")
  124. self._connect()
  125. yield self._channel_queue.get()
  126. if exchange_name not in self._exchange_declare_dict:
  127. log.info("declaring exchange: %s" % exchange_name)
  128. self._exchange_declare_dict[exchange_name] = Queue(maxsize=1)
  129. self._on_exchange_declare(exchange_name, True)
  130. yield self._exchange_declare_dict[exchange_name].get()
  131. self._channel.basic_publish(exchange=exchange_name,
  132. routing_key=routing_key,
  133. body=message,
  134. properties=properties)
  135.  
  136. @coroutine
  137. def consume(self, exchange_name, queue_name, routing_key, handler):
  138. """
  139. consumer client rabbitmq
  140. :param exchange_name: exchange name
  141. :param queue_name: binding queue
  142. :param routing_key: routing key
  143. :param handler: handler for message
  144. :return: None
  145. """
  146. log = self._get_log("consume")
  147. if self._channel is None:
  148. log.info("consume connects")
  149. self._connect()
  150. yield self._channel_queue.get()
  151. if exchange_name not in self._exchange_declare_dict:
  152. log.info("consume declares exchange %s" % exchange_name)
  153. self._exchange_declare_dict[exchange_name] = Queue(maxsize=1)
  154. self._on_exchange_declare(exchange_name, False)
  155. yield self._exchange_declare_dict[exchange_name].get()
  156. if queue_name not in self._queue_declare_dict:
  157. log.info("consume declares queue %s" % queue_name)
  158. self._queue_declare_dict[queue_name] = Queue(maxsize=1)
  159. self._on_queue_declare(queue_name)
  160. yield self._queue_declare_dict[queue_name].get()
  161. if queue_name not in self._queue_bind_dict:
  162. log.info("consume binds queue %s" % queue_name)
  163. self._queue_bind_dict[queue_name] = Queue(maxsize=1)
  164. self._on_queue_bind(exchange_name, queue_name, routing_key)
  165. yield self._queue_bind_dict[queue_name].get()
  166. self._consumer_routing_key_handlers_dict[self._routing_key_pattern(routing_key)] = handler
  167. self._channel.basic_consume(self._consume_handler_delivery, queue=queue_name)
  168.  
  169. def _consume_handler_delivery(self, channel, method, header, body):
  170. log = self._get_log("_consume_handler_delivery")
  171. log.info("consume body %s" % (body,))
  172. self._io_loop.spawn_callback(self._consume_process_message, body=body, channel=channel, method=method,
  173. header=header)
  174.  
  175. @coroutine
  176. def _consume_process_message(self, body, channel, method, header):
  177. log = self._get_log("_consume_process_message")
  178. log.info("start processing")
  179. handler = self._lookup_handler(method.routing_key)
  180. if handler is None:
  181. log.info("routing_key %s handler not found" % method.routing_key)
  182. return
  183. result = yield handler(body)
  184. if result:
  185. log.info("message process success")
  186. channel.basic_ack(delivery_tag=method.delivery_tag)
  187. else:
  188. log.error("message process failed")
  189. pass
  190.  
  191. def _on_queue_declare(self, queue_name):
  192. self._channel.queue_declare(callback=self._on_queue_declare_ok,
  193. queue=queue_name,
  194. durable=True,
  195. exclusive=False,
  196. auto_delete=True)
  197. self._queue_declare_dict[queue_name].put(True)
  198.  
  199. def _on_queue_declare_ok(self, method_frame):
  200. log = self._get_log("_on_queue_declare_ok")
  201. log.info("queue declare ok")
  202.  
  203. def _on_queue_bind(self, exchange_name, queue_name, routing_key):
  204. log = self._get_log("_on_queue_bind")
  205. log.info("exchange: %s; queue %s; routing_key %s", exchange_name, queue_name, routing_key)
  206. self._channel.queue_bind(callback=self._on_queue_bind_ok,
  207. queue=queue_name,
  208. exchange=exchange_name,
  209. routing_key=routing_key)
  210. self._queue_bind_dict[queue_name].put(True)
  211.  
  212. def _on_queue_bind_ok(self, method_frame):
  213. log = self._get_log("_on_queue_bind_ok")
  214. log.info("queue binds ok")
  215.  
  216. @staticmethod
  217. def _routing_key_pattern(routing_key):
  218. """
  219. as topic exchange routing_key pattern. only supports `topic.*` form.
  220. rules:
  221. - 'aispeech.*" => "aispeech\.\w+?"
  222. - "aispeech.aihome.*" => "aipseech\.aihome\.\w+?"
  223. :param routing_key: consumer routing key
  224. :return: stored handler
  225. """
  226. pattern = routing_key.replace("*", "\w+?").replace(".", "\.")
  227. return re.compile(pattern)
  228.  
  229. def _lookup_handler(self, routing_key, is_consume=True):
  230. """
  231. according to routing key, lookup the best match handler.
  232. e.g.
  233. routing_key: dog.black
  234. handlers: dog\.\w+? matches
  235. :param routing_key: routing key
  236. :param is_consume: True: lookup handler in consume; False: lookup handler in service
  237. :return: if matching, return handler otherwise return None
  238. """
  239. if is_consume:
  240. for pattern, handler in self._consumer_routing_key_handlers_dict.items():
  241. if pattern.match(routing_key):
  242. return handler
  243. return None
  244. else:
  245. for pattern, handler in self._service_routing_key_handlers_dict.items():
  246. if pattern.match(routing_key):
  247. return handler
  248. return None
  249.  
  250. @coroutine
  251. def service(self, exchange_name, queue_name, routing_key, handler):
  252. """
  253. start service for rpc
  254. :param exchange_name: exchange name
  255. :param queue_name: queue name
  256. :param routing_key: routing key. e.g. dog.*
  257. :param handler: handler for this routing key
  258. :return: None
  259. """
  260. log = self._get_log("service")
  261. if self._channel is None:
  262. log.info("service connects")
  263. self._connect()
  264. yield self._channel_queue.get()
  265. if exchange_name not in self._exchange_declare_dict:
  266. log.info("service declares exchange %s " % exchange_name)
  267. self._exchange_declare_dict[exchange_name] = Queue(maxsize=1)
  268. self._on_exchange_declare(exchange_name, False)
  269. yield self._exchange_declare_dict[exchange_name].get()
  270. if queue_name not in self._queue_declare_dict:
  271. log.info("service declare queue %s" % queue_name)
  272. self._queue_declare_dict[queue_name] = Queue(maxsize=1)
  273. self._on_queue_declare(queue_name)
  274. yield self._queue_declare_dict[queue_name].get()
  275. if queue_name not in self._queue_bind_dict:
  276. log.info("service bind queue")
  277. self._queue_bind_dict[queue_name] = Queue(maxsize=1)
  278. self._on_queue_bind(exchange_name, queue_name, routing_key)
  279. yield self._queue_bind_dict[queue_name].get()
  280. self._service_routing_key_handlers_dict[self._routing_key_pattern(routing_key)] = handler
  281. self._channel.basic_consume(self._service_handler_delivery, queue=queue_name)
  282.  
  283. def _service_handler_delivery(self, channel, method, props, body):
  284. log = self._get_log("_service_handler_delivery")
  285. log.info("service body %s " % body)
  286. self._io_loop.spawn_callback(self._service_process_message,
  287. channel=channel,
  288. method=method,
  289. props=props,
  290. body=body)
  291.  
  292. @coroutine
  293. def _service_process_message(self, channel, method, props, body):
  294. log = self._get_log("_service_process_message")
  295. log.info("start process")
  296. handler = self._lookup_handler(method.routing_key, is_consume=False)
  297. if handler is None:
  298. log.info("handler not found")
  299. return
  300. response = yield handler(body)
  301. if response is not None:
  302. log.info('service response routing key: %s' % props.reply_to)
  303. log.info('service correlation id: %s' % props.correlation_id)
  304. log.info("service sends body %s" % response)
  305. self._channel.basic_publish(exchange=method.exchange,
  306. routing_key=props.reply_to,
  307. properties=pika.BasicProperties(correlation_id=props.correlation_id),
  308. body=str(response))
  309. self._channel.basic_ack(delivery_tag=method.delivery_tag)
  310. else:
  311. log.info("response is None")
  312.  
  313. @coroutine
  314. def call(self, exchange_name, queue_name, routing_key, body, timeout=None):
  315. """
  316. call client for rpc.
  317. :param exchange_name: exchange name
  318. :param queue_name: queue name
  319. :param routing_key: routing key
  320. :param body: send body
  321. :param timeout: timeout after rpc call
  322. :return: result
  323. """
  324. log = self._get_log("call")
  325. callback_queue = "rpc_answer_%s" % str(uuid.uuid4())
  326. corr_id = str(uuid.uuid4())
  327. log.info("generating correlation id %s" % corr_id)
  328. log.info("to send body %s" % body)
  329. queue = Queue(maxsize=1)
  330. self._reply_queue_dict[corr_id] = queue
  331. # open connection and send message
  332. if self._channel is None:
  333. log.info("client connect")
  334. self._connect()
  335. yield self._channel_queue.get()
  336. if exchange_name not in self._exchange_declare_dict:
  337. log.info("client declares exchange %s. " % exchange_name)
  338. self._exchange_declare_dict[exchange_name] = Queue(maxsize=1)
  339. self._on_exchange_declare(exchange_name, True)
  340. yield self._exchange_declare_dict[exchange_name].get()
  341. if queue_name not in self._queue_declare_dict:
  342. log.info("client declares queue: %s." % queue_name)
  343. self._queue_declare_dict[queue_name] = Queue(maxsize=1)
  344. self._on_queue_declare(queue_name)
  345. yield self._queue_declare_dict[queue_name].get()
  346. if callback_queue not in self._queue_declare_dict:
  347. log.info("client declares queue: %s." % callback_queue)
  348. self._queue_declare_dict[callback_queue] = Queue(maxsize=1)
  349. self._on_queue_declare(callback_queue)
  350. yield self._queue_declare_dict[callback_queue].get()
  351. if queue_name not in self._queue_bind_dict:
  352. log.info("client binds queue %s" % queue_name)
  353. self._queue_bind_dict[queue_name] = Queue(maxsize=1)
  354. self._on_queue_bind(exchange_name, queue_name, routing_key)
  355. yield self._queue_bind_dict[queue_name].get()
  356. if callback_queue not in self._queue_bind_dict:
  357. log.info("client binds queue %s" % callback_queue)
  358. self._queue_bind_dict[callback_queue] = Queue(maxsize=1)
  359. self._on_queue_bind(exchange_name, queue_name=callback_queue, routing_key=callback_queue)
  360. self._channel.basic_consume(self._client_on_message, queue=callback_queue)
  361. log.info("routing_key: %s" % routing_key)
  362. log.info("correlation_id: %s " % corr_id)
  363. log.info("reply to: %s " % callback_queue)
  364. log.info("send body: %s" % body)
  365. self._channel.basic_publish(exchange=exchange_name,
  366. routing_key=routing_key,
  367. properties=pika.BasicProperties(correlation_id=corr_id,
  368. reply_to=callback_queue),
  369. body=body)
  370. # end up with push and wait request
  371. if timeout is not None:
  372. log.info("add timeout %s" % timeout)
  373. self._io_loop.add_timeout(datetime.timedelta(days=0, seconds=timeout),
  374. functools.partial(self._on_timeout,correlation_id=corr_id))
  375. result = yield queue.get()
  376. raise Return(result)
  377.  
  378. def _client_on_message(self,ch, method, props, body):
  379. log = self._get_log("_client_on_message")
  380. log.info("receive body: %s" % body)
  381. corr_id = props.correlation_id
  382. if corr_id in self._reply_queue_dict:
  383. log.info("get response")
  384. self._reply_queue_dict[corr_id].put(body)
  385. log.info("delete corr_id %s in _reply_queue." % corr_id)
  386. del self._reply_queue_dict[corr_id]
  387. else:
  388. log.info("valid response")
  389. pass
  390.  
  391. @coroutine
  392. def _on_timeout(self, correlation_id):
  393. log = self._get_log("_on_timeout")
  394. log.info("timeout")
  395. if correlation_id in self._reply_queue_dict:
  396. self._reply_queue_dict[correlation_id].put(None)
  397. log.info("delete correlation_id %s in _reply_queue_dict" % correlation_id)
  398. del self._reply_queue_dict[correlation_id]
  399. else:
  400. log.info("correlation_id %s doest not exist. " % correlation_id)
Add Comment
Please, Sign In to add comment