Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- BOT_PROTOCOL_LIST = ['jabber']
- _base_dir = os.path.dirname(
- os.path.dirname(os.path.abspath(__file__)))
- AMQP0_8_SPEC = load_spec(os.path.join(_base_dir, "spec", "amqp0-8.xml"))
- class queueWriter:
- def __init__(self, conn, exchange, basename):
- self._conn = conn
- self._basename = basename
- self._exchange = exchange
- def associateConnection(self, connection):
- self._conn = connection
- self.openChannel().addErrback(log.err)
- def openChannel(self):
- self._chan = self._conn.channel(BOT_PROTOCOL_LIST.index(self._basename)+1)
- return self._chan.addCallback(lambda chan: chan.channel_open())
- def write(self, content):
- msg = Content(content)
- msg['delivery mode'] = NON_PERSISTENT
- self._chan.addCallback(self._chan.basic_publish(exchange = self._exchange, content = msg)).addErrback(log.err)
- def clientConnectionLost(self, connector, reason):
- print reason
- def clientConnectionFailed(self, connector, reason):
- print reason
- class AMQFactory(ReconnectingClientFactory):
- """
- A C{ClientFactory} for C{AMQClient} protocol with reconnecting facilities.
- @ivar user: the user name to use to connect to the AMQP server.
- @ivar password: the corresponding password of the user.
- @ivar vhost: the AMQP vhost to create connections against.
- @ivar connected_callback: callback called when a successful connection
- happened. It takes one argument, the channel opened for the connection.
- @ivar disconnected_callback: callback called when a previously connected
- connection was lost. It takes no argument.
- """
- protocol = AMQClientWithCallback
- _exchange = 'ehelp'
- def __init__(self, user, password, vhost, connected_callback,
- disconnected_callback, failed_callback, spec=None):
- self.user = user
- self.password = password
- self.vhost = vhost
- self.delegate = TwistedDelegate()
- if spec is None:
- spec = AMQP0_8_SPEC
- self.spec = spec
- self.connected_callback = connected_callback
- self.disconnected_callback = disconnected_callback
- self.failed_callback = failed_callback
- self._writer_list = {}
- for proto in BOT_PROTOCOL_LIST:
- self._writer_list[proto+'_writer'] = queueWriter(None, self._exchange, proto)
- def associateConnectionWithBots():
- log.msg("Associating connection")
- for k,writer in self._writer_list.iteritems():
- writer.associateConnection(self._conn)
- def buildProtocol(self, addr):
- """
- Create the protocol instance and returns it for letting Twisted
- connect it to the transport.
- @param addr: the attributed address, unused for now.
- """
- protocol = self.protocol(self.clientConnectionMade, self.delegate,
- self.vhost, spec=self.spec)
- protocol.factory = self
- return protocol
- def clientConnectionMade(self, client):
- """
- Called when a connection succeeds: login to the server, and open a
- channel against it.
- """
- self.resetDelay()
- def started(ignored):
- # We don't care about authenticate result as long as it succeeds
- return client.channel(1).addCallback(got_channel)
- def got_channel(channel):
- return channel.channel_open().addCallback(opened, channel)
- def opened(ignored, channel):
- deferred = maybeDeferred(self.connected_callback, (client, channel))
- deferred.addErrback(catch_closed)
- def catch_closed(failure):
- failure.trap(Closed)
- deferred = client.authenticate(self.user, self.password)
- return deferred.addCallback(started)
- def clientConnectionLost(self, connector, reason):
- log.msg(":E")
- ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
- # self.disconnected_callback()
- def clientConnectionFailed(self, connector, reason):
- # print type(reason)
- log.msg(":E")
- ReconnectingClientFactory.clientConnectionFailed(
- self, connector, reason)
- # self.failed_callback((connector, reason))
- class AMQService(MultiService):
- """
- Сервис, обслуживающий чтение\запись информации в очередь
- """
- def __init__(self, host, port=5672, vhost='/', user="guest", password="guest"):
- MultiService.__init__(self)
- self._host = host
- self._port = port
- self._user = user
- self._pass = password
- self._vhost = vhost
- def startService(self):
- Service.startService(self)
- self._queueClientService = internet.TCPClient(self._host, self._port, AMQFactory(self._user, self._pass, self._vhost, None, None, None))
- self._queueClientService.setServiceParent(self)
- application = service.Application('ehelp_queueWorkers')
- serviceCollection = service.IServiceCollection(application)
- writerService = AMQService('localhost')
- writerService.setServiceParent(serviceCollection)
Add Comment
Please, Sign In to add comment