Guest User

Untitled

a guest
Sep 23rd, 2018
120
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 16.83 KB | None | 0 0
  1. from abc import ABCMeta, abstractmethod
  2. from google.protobuf.message import Message
  3. from twisted.application.internet import _AbstractClient, _maybeGlobalReactor
  4. from twisted.internet import protocol
  5. from twisted.internet.defer import inlineCallbacks, Deferred, returnValue, \
  6. maybeDeferred, DeferredLock
  7. from twisted.python import log
  8. from txamqp.client import Closed as ClientClosed, TwistedDelegate
  9. from txamqp.content import Content
  10. from txamqp.protocol import AMQClient
  11. from txamqp.queue import Closed as QueueClosed
  12. from txamqp.spec import load
  13. from uuid import uuid4
  14. import os
  15.  
  16.  
  17. class ProtobufContent(Content):
  18. def __init__(self, body="", children=None,
  19. properties=None, message=None):
  20. # Set defaults value
  21. self._body = self._message = None
  22. # Set current message
  23. self.message = message
  24. # Initialize class
  25. super(ProtobufContent, self).__init__(body, children, properties)
  26.  
  27. def getMessage(self):
  28. return self._message
  29.  
  30. def setMessage(self, message):
  31. assert(isinstance(message, Message))
  32. self._message = message
  33.  
  34. message = property(getMessage, setMessage)
  35.  
  36. def getBody(self):
  37. return self.message.SerializeToString()
  38.  
  39. def setBody(self, body):
  40. self._body = body
  41. if body:
  42. self._message.ParseFromString(body)
  43.  
  44. body = property(getBody, setBody)
  45.  
  46. @classmethod
  47. def create(cls, content, message):
  48. return cls(body=content.body,
  49. children=content.children,
  50. properties=content.properties,
  51. message=message)
  52.  
  53.  
  54. class AmqpProtocol(AMQClient):
  55.  
  56. def __init__(self, reactor, *args, **kwargs):
  57. self.factory = None
  58. self.connected = False
  59. reactor.addSystemEventTrigger("before", "shutdown", self.disconnect)
  60. AMQClient.__init__(self, *args, **kwargs)
  61.  
  62. @inlineCallbacks
  63. def connectionMade(self):
  64. AMQClient.connectionMade(self)
  65.  
  66. yield self.start({"LOGIN": self.factory.user,
  67. "PASSWORD": self.factory.password})
  68.  
  69. self.connected = True
  70. self.factory.deferred.callback(self)
  71.  
  72. def connectionLost(self, reason):
  73. self.connected = False
  74. AMQClient.connectionLost(self, reason)
  75. del self.factory
  76.  
  77. @inlineCallbacks
  78. def disconnect(self):
  79. if self.connected:
  80. chan0 = yield self.channel(0)
  81. try:
  82. yield chan0.connection_close()
  83. except ClientClosed, e:
  84. pass
  85. self.connected = False
  86.  
  87.  
  88. class Client(object):
  89. __metaclass__ = ABCMeta
  90.  
  91. def __init__(self, client):
  92. # Wait for initialization
  93. self.deferred = Deferred()
  94. # Our client
  95. self.client = client
  96. # Our channel
  97. self._channel = None
  98. # Setup client
  99. self.setup()
  100.  
  101. @inlineCallbacks
  102. def createChannel(self):
  103. channel = yield self.client.channel(len(self.client.channels) + 1)
  104. yield channel.channel_open()
  105. returnValue(channel)
  106.  
  107. @inlineCallbacks
  108. def createExchange(self, name, options=None):
  109. ch = yield self.channel()
  110. if name != '':
  111. _options = {"type": "direct",
  112. "durable": True,
  113. "auto_delete": False}
  114. _options.update(options or {})
  115. yield ch.exchange_declare(exchange=name, **_options)
  116.  
  117. @inlineCallbacks
  118. def createQueue(self, name, exchange_name, routing_key, options=None):
  119. ch = yield self.channel()
  120.  
  121. _options = {"durable": True,
  122. "exclusive": False,
  123. "auto_delete": False}
  124. _options.update(options or {})
  125.  
  126. method = yield ch.queue_declare(queue=name, **_options)
  127. name = method[0]
  128.  
  129. if exchange_name != "":
  130. yield ch.queue_bind(queue=name,
  131. exchange=exchange_name,
  132. routing_key=routing_key)
  133.  
  134. returnValue(name)
  135.  
  136. @inlineCallbacks
  137. def channel(self):
  138. if self._channel is None:
  139. _channel = self._channel = yield self.createChannel()
  140. else:
  141. _channel = yield self._channel
  142. returnValue(_channel)
  143.  
  144. @inlineCallbacks
  145. def queue(self, name):
  146. queue = yield self.client.queue(name)
  147. returnValue(queue)
  148.  
  149. def error(self, failure):
  150. if failure.check(QueueClosed) is not None:
  151. log.msg("Queue closed")
  152. else:
  153. log.msg("Error reading item: ", failure)
  154.  
  155. def setup(self):
  156. log.msg('Register %r' % self)
  157. maybeDeferred(self.initialize).chainDeferred(self.deferred)
  158.  
  159. @abstractmethod
  160. def initialize(self):
  161. raise NotImplementedError()
  162.  
  163. @inlineCallbacks
  164. def wait(self):
  165. if self.deferred.called:
  166. yield
  167. else:
  168. yield self.deferred
  169.  
  170.  
  171. class Consumer(Client):
  172. def __init__(self, client, exchange, routing_key, callback,
  173. queue=None, consumer_tag=None,
  174. exchange_options=None, queue_options=None):
  175. # Set exchange name
  176. self.exchange_name = exchange
  177. self.exchange_options = exchange_options
  178. # For now use the exchange name as the queue name.
  179. self.queue_name = queue or exchange
  180. self.queue_options = queue_options
  181. # Use the exchange name for the consumer tag for now.
  182. self.consumer_tag = consumer_tag or exchange
  183. self.routing_key = routing_key
  184. # Set callback
  185. self.callback = callback
  186.  
  187. # Start consumer
  188. super(Consumer, self).__init__(client)
  189.  
  190. @inlineCallbacks
  191. def initialize(self):
  192. # Declare the exchange in case it doesn't exist.
  193. yield self.createExchange(self.exchange_name, self.exchange_options)
  194.  
  195. # Declare the queue and bind to it.
  196. self.queue_name = yield self.createQueue(self.queue_name,
  197. self.exchange_name,
  198. self.routing_key,
  199. self.queue_options)
  200.  
  201. # Update consumer tag
  202. self.consumer_tag = (self.consumer_tag
  203. if self.consumer_tag
  204. else self.queue_name)
  205.  
  206. # Get channel
  207. ch = yield self.channel()
  208.  
  209. # Consume.
  210. yield ch.basic_consume(queue=self.queue_name,
  211. no_ack=True,
  212. consumer_tag=self.consumer_tag)
  213.  
  214. # Get queue
  215. queue = yield self.queue(self.consumer_tag)
  216.  
  217. # Start receive events
  218. self.receive(queue)
  219.  
  220. @inlineCallbacks
  221. def receive(self, queue):
  222. yield self.wait()
  223.  
  224. def _process(item):
  225. if item is not None:
  226. return self.process(item.content)
  227.  
  228. def _get(item=None):
  229. d = queue.get()
  230. d.addCallback(_get)
  231. d.addErrback(self.error)
  232. d.addCallback(_process)
  233. return item
  234.  
  235. _get()
  236.  
  237. @inlineCallbacks
  238. def process(self, item):
  239. assert(item is not None)
  240. try:
  241. yield maybeDeferred(self.callback, item)
  242. except:
  243. log.err()
  244.  
  245. def __repr__(self):
  246. return ('<%s: exchange "%s", routing_key "%s">' %
  247. (self.__class__.__name__,
  248. self.exchange_name,
  249. self.routing_key))
  250.  
  251.  
  252. class ProtobufConsumer(Consumer):
  253. def __init__(self, *args, **kwargs):
  254. assert('message_cls' in kwargs)
  255. self.message_cls = kwargs.pop('message_cls')
  256. super(ProtobufConsumer, self).__init__(*args, **kwargs)
  257.  
  258. def process(self, item):
  259. item = ProtobufContent.create(item, self.message_cls())
  260. return super(ProtobufConsumer, self).process(item)
  261.  
  262.  
  263. class Producer(Client):
  264. def __init__(self, client, exchange, exchange_options=None):
  265. # Set exchange name
  266. self.exchange_name = exchange
  267. self.exchange_options = exchange_options
  268.  
  269. # Register producer
  270. super(Producer, self).__init__(client)
  271.  
  272. @inlineCallbacks
  273. def initialize(self):
  274. # First declare the exchange just in case it doesn't exist.
  275. yield self.createExchange(self.exchange_name, self.exchange_options)
  276.  
  277. @inlineCallbacks
  278. def send(self, content, routing_key=''):
  279. assert(isinstance(content, Content))
  280. yield self.wait()
  281. ch = yield self.channel()
  282. yield ch.basic_publish(exchange=self.exchange_name,
  283. routing_key=routing_key,
  284. content=content)
  285.  
  286. def __repr__(self):
  287. return ('<%s: exchange "%s">' %
  288. (self.__class__.__name__,
  289. self.exchange_name))
  290.  
  291.  
  292. class ProtobufProducer(Producer):
  293. def send(self, message, routing_key='', **kwargs):
  294. assert(isinstance(message, Message))
  295. content = ProtobufContent(message=message, **kwargs)
  296. return super(ProtobufProducer, self).send(content, routing_key)
  297.  
  298.  
  299. class RPCServer(object):
  300. def __init__(self, factory):
  301. self.factory = factory
  302.  
  303. @inlineCallbacks
  304. def register(self, exchange, name, callback, request_cls, response_cls):
  305. log.msg("Register method '%s' in exchange '%s'" % (name, exchange))
  306.  
  307. # Create producer
  308. producer = yield self.factory.producer(exchange='')
  309.  
  310. # Callback for consumer
  311. def _cb(content):
  312. return self.process(content=content,
  313. name=name,
  314. producer=producer,
  315. callback=callback,
  316. response=response_cls())
  317.  
  318. # Create consumer
  319. consumer = yield self.factory.consumer(exchange=exchange,
  320. routing_key=name,
  321. callback=_cb,
  322. message_cls=request_cls)
  323.  
  324. # Wait for callback registred
  325. yield consumer.wait()
  326.  
  327. @inlineCallbacks
  328. def process(self, content, name, producer, callback, response):
  329. # check input message
  330. properties = content.properties
  331. assert("correlation id" in properties and "reply to" in properties)
  332.  
  333. # log input request
  334. log.msg("Handle request '%s' with id '%s'" %
  335. (name, properties['correlation id']))
  336.  
  337. # handle request
  338. try:
  339. yield maybeDeferred(callback,
  340. content.message,
  341. response)
  342. except:
  343. log.err()
  344.  
  345. # log response
  346. log.msg("Send response with id '%s' to '%s'" %
  347. (properties['correlation id'], properties['reply to']))
  348.  
  349. # send reply
  350. yield producer.send(response,
  351. routing_key=properties['reply to'],
  352. properties={'correlation id':
  353. properties['correlation id']})
  354.  
  355.  
  356. class RPCClient(object):
  357. def __init__(self, exchange, factory):
  358. self.exchange = exchange
  359. self.factory = factory
  360. self.producer = self.consumer = None
  361. self.requests = {}
  362.  
  363. @inlineCallbacks
  364. def initialize(self):
  365. self.producer = yield self.factory.producer(exchange=self.exchange)
  366. yield self.producer.wait()
  367.  
  368. self.consumer = yield self.factory.consumer(exchange='', queue='',
  369. routing_key='',
  370. queue_options={'exclusive': True},
  371. callback=self.receive,
  372. kls=Consumer)
  373. yield self.consumer.wait()
  374.  
  375. @inlineCallbacks
  376. def invoke(self, name, request, response):
  377. uuid = str(uuid4())
  378. yield self.producer.send(request, routing_key=name,
  379. properties={'correlation id': uuid,
  380. 'reply to': self.consumer.queue_name})
  381. body = yield self.register(uuid)
  382. response.ParseFromString(body)
  383. returnValue(response)
  384.  
  385. def register(self, uuid):
  386. d = Deferred()
  387. self.requests[uuid] = d
  388. return d
  389.  
  390. def receive(self, content):
  391. properties = content.properties
  392. assert("correlation id" in properties)
  393. d = self.requests.get(properties["correlation id"], None)
  394. if d is None:
  395. log.msg("Stale response with id '%s'" %
  396. properties["correlation id"])
  397. return
  398. d.callback(content.body)
  399. del self.requests[properties["correlation id"]]
  400.  
  401.  
  402. class AmqpFactory(protocol.ReconnectingClientFactory):
  403. protocol = AmqpProtocol
  404.  
  405. def __init__(self, reactor=None, vhost=None, host=None, port=None,
  406. user=None, password=None):
  407. self.spec = load(os.path.join(os.path.dirname(__file__),
  408. '..', 'contrib', 'amqp0-8.stripped.rabbitmq.xml'))
  409. self.user = user or 'guest'
  410. self.password = password or 'guest'
  411. self.vhost = vhost or '/'
  412. self.host = host or 'localhost'
  413. self.port = port or 5672
  414. self.delegate = TwistedDelegate()
  415. self.deferred = Deferred()
  416. self.reactor = _maybeGlobalReactor(reactor)
  417. self._client = None
  418.  
  419. def buildProtocol(self, addr):
  420. self._client = p = self.protocol(self.reactor, self.delegate,
  421. self.vhost, self.spec)
  422. p.factory = self
  423. self.resetDelay()
  424. return p
  425.  
  426. def doStop(self):
  427. self._client = None
  428. self.deferred = Deferred()
  429. protocol.ReconnectingClientFactory.doStop(self)
  430.  
  431. @inlineCallbacks
  432. def client(self):
  433. if self.deferred.called:
  434. client = yield self._client
  435. else:
  436. client = yield self.deferred
  437. returnValue(client)
  438.  
  439. @inlineCallbacks
  440. def consumer(self, kls=None, **kwargs):
  441. kls = kls or Consumer
  442. returnValue(kls((yield self.client()), **kwargs))
  443.  
  444. @inlineCallbacks
  445. def producer(self, kls=None, **kwargs):
  446. kls = kls or Producer
  447. returnValue(kls((yield self.client()), **kwargs))
  448.  
  449.  
  450. class ProtobufAmqpFactory(AmqpFactory):
  451. def __init__(self, *args, **kwargs):
  452. AmqpFactory.__init__(self, *args, **kwargs)
  453. self._rpc_server = None
  454.  
  455. def consumer(self, kls=None, **kwargs):
  456. kls = kls or ProtobufConsumer
  457. return AmqpFactory.consumer(self, kls=kls, **kwargs)
  458.  
  459. def producer(self, kls=None, **kwargs):
  460. kls = kls or ProtobufProducer
  461. return AmqpFactory.consumer(self, kls=kls, **kwargs)
  462.  
  463. @property
  464. def rpc_server(self):
  465. if self._rpc_server is None:
  466. self._rpc_server = RPCServer(self)
  467. return self._rpc_server
  468.  
  469. @inlineCallbacks
  470. def rpc_client(self, exchange):
  471. client = RPCClient(exchange=exchange,
  472. factory=self)
  473. yield client.initialize()
  474. returnValue(client)
  475.  
  476.  
  477. class AmqpService(_AbstractClient):
  478. method = 'TCP'
  479.  
  480. def __init__(self, factory):
  481. self.factory = factory
  482. _AbstractClient.__init__(self, factory.host, factory.port, factory)
  483.  
  484. def stopService(self):
  485. self.factory.stopTrying()
  486. self.factory.doStop()
  487. _AbstractClient.stopService(self)
  488.  
  489.  
  490. class ProducerMixin(object):
  491. def __init__(self, *args, **kwargs):
  492. self.__producers = {}
  493. self.__lock = DeferredLock()
  494. super(ProducerMixin, self).__init__(*args, **kwargs)
  495.  
  496. @inlineCallbacks
  497. def producer(self, exchange_name):
  498. self.__lock.acquire()
  499. try:
  500. producer = self.__producers.get(exchange_name, None)
  501. if producer is None:
  502. producer = self.amqp.producer(exchange=exchange_name)
  503. self.__producers[exchange_name] = producer = yield producer
  504. returnValue(producer)
  505. finally:
  506. self.__lock.release()
  507.  
  508.  
  509. def rpc_handler(amqp, exchange, name,
  510. request_cls, response_cls,
  511. deferred=None):
  512. def decorator(f):
  513. d = amqp.rpc_server.register(exchange=exchange,
  514. name=name,
  515. callback=f,
  516. request_cls=request_cls,
  517. response_cls=response_cls)
  518. if deferred is not None:
  519. d.chainDeferred(deferred)
  520. return f
  521. return decorator
  522.  
  523.  
  524. def consume(amqp, exchange, routing_key,
  525. message_cls, deferred=None,
  526. **kwargs):
  527. def decorator(f):
  528. d = amqp.consumer(exchange=exchange,
  529. routing_key=routing_key,
  530. callback=f,
  531. message_cls=message_cls,
  532. **kwargs)
  533. if deferred is not None:
  534. d.chainDeferred(deferred)
  535. return f
  536. return decorator
Add Comment
Please, Sign In to add comment