Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from abc import ABCMeta, abstractmethod
- from google.protobuf.message import Message
- from twisted.application.internet import _AbstractClient, _maybeGlobalReactor
- from twisted.internet import protocol
- from twisted.internet.defer import inlineCallbacks, Deferred, returnValue, \
- maybeDeferred, DeferredLock
- from twisted.python import log
- from txamqp.client import Closed as ClientClosed, TwistedDelegate
- from txamqp.content import Content
- from txamqp.protocol import AMQClient
- from txamqp.queue import Closed as QueueClosed
- from txamqp.spec import load
- from uuid import uuid4
- import os
- class ProtobufContent(Content):
- def __init__(self, body="", children=None,
- properties=None, message=None):
- # Set defaults value
- self._body = self._message = None
- # Set current message
- self.message = message
- # Initialize class
- super(ProtobufContent, self).__init__(body, children, properties)
- def getMessage(self):
- return self._message
- def setMessage(self, message):
- assert(isinstance(message, Message))
- self._message = message
- message = property(getMessage, setMessage)
- def getBody(self):
- return self.message.SerializeToString()
- def setBody(self, body):
- self._body = body
- if body:
- self._message.ParseFromString(body)
- body = property(getBody, setBody)
- @classmethod
- def create(cls, content, message):
- return cls(body=content.body,
- children=content.children,
- properties=content.properties,
- message=message)
- class AmqpProtocol(AMQClient):
- def __init__(self, reactor, *args, **kwargs):
- self.factory = None
- self.connected = False
- reactor.addSystemEventTrigger("before", "shutdown", self.disconnect)
- AMQClient.__init__(self, *args, **kwargs)
- @inlineCallbacks
- def connectionMade(self):
- AMQClient.connectionMade(self)
- yield self.start({"LOGIN": self.factory.user,
- "PASSWORD": self.factory.password})
- self.connected = True
- self.factory.deferred.callback(self)
- def connectionLost(self, reason):
- self.connected = False
- AMQClient.connectionLost(self, reason)
- del self.factory
- @inlineCallbacks
- def disconnect(self):
- if self.connected:
- chan0 = yield self.channel(0)
- try:
- yield chan0.connection_close()
- except ClientClosed, e:
- pass
- self.connected = False
- class Client(object):
- __metaclass__ = ABCMeta
- def __init__(self, client):
- # Wait for initialization
- self.deferred = Deferred()
- # Our client
- self.client = client
- # Our channel
- self._channel = None
- # Setup client
- self.setup()
- @inlineCallbacks
- def createChannel(self):
- channel = yield self.client.channel(len(self.client.channels) + 1)
- yield channel.channel_open()
- returnValue(channel)
- @inlineCallbacks
- def createExchange(self, name, options=None):
- ch = yield self.channel()
- if name != '':
- _options = {"type": "direct",
- "durable": True,
- "auto_delete": False}
- _options.update(options or {})
- yield ch.exchange_declare(exchange=name, **_options)
- @inlineCallbacks
- def createQueue(self, name, exchange_name, routing_key, options=None):
- ch = yield self.channel()
- _options = {"durable": True,
- "exclusive": False,
- "auto_delete": False}
- _options.update(options or {})
- method = yield ch.queue_declare(queue=name, **_options)
- name = method[0]
- if exchange_name != "":
- yield ch.queue_bind(queue=name,
- exchange=exchange_name,
- routing_key=routing_key)
- returnValue(name)
- @inlineCallbacks
- def channel(self):
- if self._channel is None:
- _channel = self._channel = yield self.createChannel()
- else:
- _channel = yield self._channel
- returnValue(_channel)
- @inlineCallbacks
- def queue(self, name):
- queue = yield self.client.queue(name)
- returnValue(queue)
- def error(self, failure):
- if failure.check(QueueClosed) is not None:
- log.msg("Queue closed")
- else:
- log.msg("Error reading item: ", failure)
- def setup(self):
- log.msg('Register %r' % self)
- maybeDeferred(self.initialize).chainDeferred(self.deferred)
- @abstractmethod
- def initialize(self):
- raise NotImplementedError()
- @inlineCallbacks
- def wait(self):
- if self.deferred.called:
- yield
- else:
- yield self.deferred
- class Consumer(Client):
- def __init__(self, client, exchange, routing_key, callback,
- queue=None, consumer_tag=None,
- exchange_options=None, queue_options=None):
- # Set exchange name
- self.exchange_name = exchange
- self.exchange_options = exchange_options
- # For now use the exchange name as the queue name.
- self.queue_name = queue or exchange
- self.queue_options = queue_options
- # Use the exchange name for the consumer tag for now.
- self.consumer_tag = consumer_tag or exchange
- self.routing_key = routing_key
- # Set callback
- self.callback = callback
- # Start consumer
- super(Consumer, self).__init__(client)
- @inlineCallbacks
- def initialize(self):
- # Declare the exchange in case it doesn't exist.
- yield self.createExchange(self.exchange_name, self.exchange_options)
- # Declare the queue and bind to it.
- self.queue_name = yield self.createQueue(self.queue_name,
- self.exchange_name,
- self.routing_key,
- self.queue_options)
- # Update consumer tag
- self.consumer_tag = (self.consumer_tag
- if self.consumer_tag
- else self.queue_name)
- # Get channel
- ch = yield self.channel()
- # Consume.
- yield ch.basic_consume(queue=self.queue_name,
- no_ack=True,
- consumer_tag=self.consumer_tag)
- # Get queue
- queue = yield self.queue(self.consumer_tag)
- # Start receive events
- self.receive(queue)
- @inlineCallbacks
- def receive(self, queue):
- yield self.wait()
- def _process(item):
- if item is not None:
- return self.process(item.content)
- def _get(item=None):
- d = queue.get()
- d.addCallback(_get)
- d.addErrback(self.error)
- d.addCallback(_process)
- return item
- _get()
- @inlineCallbacks
- def process(self, item):
- assert(item is not None)
- try:
- yield maybeDeferred(self.callback, item)
- except:
- log.err()
- def __repr__(self):
- return ('<%s: exchange "%s", routing_key "%s">' %
- (self.__class__.__name__,
- self.exchange_name,
- self.routing_key))
- class ProtobufConsumer(Consumer):
- def __init__(self, *args, **kwargs):
- assert('message_cls' in kwargs)
- self.message_cls = kwargs.pop('message_cls')
- super(ProtobufConsumer, self).__init__(*args, **kwargs)
- def process(self, item):
- item = ProtobufContent.create(item, self.message_cls())
- return super(ProtobufConsumer, self).process(item)
- class Producer(Client):
- def __init__(self, client, exchange, exchange_options=None):
- # Set exchange name
- self.exchange_name = exchange
- self.exchange_options = exchange_options
- # Register producer
- super(Producer, self).__init__(client)
- @inlineCallbacks
- def initialize(self):
- # First declare the exchange just in case it doesn't exist.
- yield self.createExchange(self.exchange_name, self.exchange_options)
- @inlineCallbacks
- def send(self, content, routing_key=''):
- assert(isinstance(content, Content))
- yield self.wait()
- ch = yield self.channel()
- yield ch.basic_publish(exchange=self.exchange_name,
- routing_key=routing_key,
- content=content)
- def __repr__(self):
- return ('<%s: exchange "%s">' %
- (self.__class__.__name__,
- self.exchange_name))
- class ProtobufProducer(Producer):
- def send(self, message, routing_key='', **kwargs):
- assert(isinstance(message, Message))
- content = ProtobufContent(message=message, **kwargs)
- return super(ProtobufProducer, self).send(content, routing_key)
- class RPCServer(object):
- def __init__(self, factory):
- self.factory = factory
- @inlineCallbacks
- def register(self, exchange, name, callback, request_cls, response_cls):
- log.msg("Register method '%s' in exchange '%s'" % (name, exchange))
- # Create producer
- producer = yield self.factory.producer(exchange='')
- # Callback for consumer
- def _cb(content):
- return self.process(content=content,
- name=name,
- producer=producer,
- callback=callback,
- response=response_cls())
- # Create consumer
- consumer = yield self.factory.consumer(exchange=exchange,
- routing_key=name,
- callback=_cb,
- message_cls=request_cls)
- # Wait for callback registred
- yield consumer.wait()
- @inlineCallbacks
- def process(self, content, name, producer, callback, response):
- # check input message
- properties = content.properties
- assert("correlation id" in properties and "reply to" in properties)
- # log input request
- log.msg("Handle request '%s' with id '%s'" %
- (name, properties['correlation id']))
- # handle request
- try:
- yield maybeDeferred(callback,
- content.message,
- response)
- except:
- log.err()
- # log response
- log.msg("Send response with id '%s' to '%s'" %
- (properties['correlation id'], properties['reply to']))
- # send reply
- yield producer.send(response,
- routing_key=properties['reply to'],
- properties={'correlation id':
- properties['correlation id']})
- class RPCClient(object):
- def __init__(self, exchange, factory):
- self.exchange = exchange
- self.factory = factory
- self.producer = self.consumer = None
- self.requests = {}
- @inlineCallbacks
- def initialize(self):
- self.producer = yield self.factory.producer(exchange=self.exchange)
- yield self.producer.wait()
- self.consumer = yield self.factory.consumer(exchange='', queue='',
- routing_key='',
- queue_options={'exclusive': True},
- callback=self.receive,
- kls=Consumer)
- yield self.consumer.wait()
- @inlineCallbacks
- def invoke(self, name, request, response):
- uuid = str(uuid4())
- yield self.producer.send(request, routing_key=name,
- properties={'correlation id': uuid,
- 'reply to': self.consumer.queue_name})
- body = yield self.register(uuid)
- response.ParseFromString(body)
- returnValue(response)
- def register(self, uuid):
- d = Deferred()
- self.requests[uuid] = d
- return d
- def receive(self, content):
- properties = content.properties
- assert("correlation id" in properties)
- d = self.requests.get(properties["correlation id"], None)
- if d is None:
- log.msg("Stale response with id '%s'" %
- properties["correlation id"])
- return
- d.callback(content.body)
- del self.requests[properties["correlation id"]]
- class AmqpFactory(protocol.ReconnectingClientFactory):
- protocol = AmqpProtocol
- def __init__(self, reactor=None, vhost=None, host=None, port=None,
- user=None, password=None):
- self.spec = load(os.path.join(os.path.dirname(__file__),
- '..', 'contrib', 'amqp0-8.stripped.rabbitmq.xml'))
- self.user = user or 'guest'
- self.password = password or 'guest'
- self.vhost = vhost or '/'
- self.host = host or 'localhost'
- self.port = port or 5672
- self.delegate = TwistedDelegate()
- self.deferred = Deferred()
- self.reactor = _maybeGlobalReactor(reactor)
- self._client = None
- def buildProtocol(self, addr):
- self._client = p = self.protocol(self.reactor, self.delegate,
- self.vhost, self.spec)
- p.factory = self
- self.resetDelay()
- return p
- def doStop(self):
- self._client = None
- self.deferred = Deferred()
- protocol.ReconnectingClientFactory.doStop(self)
- @inlineCallbacks
- def client(self):
- if self.deferred.called:
- client = yield self._client
- else:
- client = yield self.deferred
- returnValue(client)
- @inlineCallbacks
- def consumer(self, kls=None, **kwargs):
- kls = kls or Consumer
- returnValue(kls((yield self.client()), **kwargs))
- @inlineCallbacks
- def producer(self, kls=None, **kwargs):
- kls = kls or Producer
- returnValue(kls((yield self.client()), **kwargs))
- class ProtobufAmqpFactory(AmqpFactory):
- def __init__(self, *args, **kwargs):
- AmqpFactory.__init__(self, *args, **kwargs)
- self._rpc_server = None
- def consumer(self, kls=None, **kwargs):
- kls = kls or ProtobufConsumer
- return AmqpFactory.consumer(self, kls=kls, **kwargs)
- def producer(self, kls=None, **kwargs):
- kls = kls or ProtobufProducer
- return AmqpFactory.consumer(self, kls=kls, **kwargs)
- @property
- def rpc_server(self):
- if self._rpc_server is None:
- self._rpc_server = RPCServer(self)
- return self._rpc_server
- @inlineCallbacks
- def rpc_client(self, exchange):
- client = RPCClient(exchange=exchange,
- factory=self)
- yield client.initialize()
- returnValue(client)
- class AmqpService(_AbstractClient):
- method = 'TCP'
- def __init__(self, factory):
- self.factory = factory
- _AbstractClient.__init__(self, factory.host, factory.port, factory)
- def stopService(self):
- self.factory.stopTrying()
- self.factory.doStop()
- _AbstractClient.stopService(self)
- class ProducerMixin(object):
- def __init__(self, *args, **kwargs):
- self.__producers = {}
- self.__lock = DeferredLock()
- super(ProducerMixin, self).__init__(*args, **kwargs)
- @inlineCallbacks
- def producer(self, exchange_name):
- self.__lock.acquire()
- try:
- producer = self.__producers.get(exchange_name, None)
- if producer is None:
- producer = self.amqp.producer(exchange=exchange_name)
- self.__producers[exchange_name] = producer = yield producer
- returnValue(producer)
- finally:
- self.__lock.release()
- def rpc_handler(amqp, exchange, name,
- request_cls, response_cls,
- deferred=None):
- def decorator(f):
- d = amqp.rpc_server.register(exchange=exchange,
- name=name,
- callback=f,
- request_cls=request_cls,
- response_cls=response_cls)
- if deferred is not None:
- d.chainDeferred(deferred)
- return f
- return decorator
- def consume(amqp, exchange, routing_key,
- message_cls, deferred=None,
- **kwargs):
- def decorator(f):
- d = amqp.consumer(exchange=exchange,
- routing_key=routing_key,
- callback=f,
- message_cls=message_cls,
- **kwargs)
- if deferred is not None:
- d.chainDeferred(deferred)
- return f
- return decorator
Add Comment
Please, Sign In to add comment