Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #-*- coding: utf8 -*-
- from twisted.internet.defer import inlineCallbacks
- from twisted.application.service import Service, MultiService
- from twisted.application import service, internet
- from twisted.internet.protocol import ClientFactory
- from twisted.python import log
- from txamqp.protocol import AMQClient
- from txamqp.content import Content
- from txamqp.client import TwistedDelegate
- import txamqp
- NON_PERSISTENT = 1
- PERSISTENT = 2
- SPEC_PATH = "../spec/amqp0-8.xml"
- BOT_PROTOCOL_LIST = ['jabber']
- class queueWriter:
- def __init__(self, conn, exchange, basename):
- self._conn = conn
- self._basename = basename
- self._exchange = exchange
- def associateConnection(self, connection):
- self._conn = connection
- self.openChannel().addErrback(log.err)
- def openChannel(self):
- self._chan = self._conn.channel(BOT_PROTOCOL_LIST.index(self._basename)+1)
- return self._chan.addCallback(lambda chan: chan.channel_open())
- def write(self, content):
- msg = Content(content)
- msg['delivery mode'] = NON_PERSISTENT
- self._chan.addCallback(self._chan.basic_publish(exchange = self._exchange, content = msg)).addErrback(log.err)
- def clientConnectionLost(self, connector, reason):
- print reason
- def clientConnectionFailed(self, connector, reason):
- print reason
- class AMQClientFactory(ClientFactory):
- protocol = AMQClient
- _exchange = 'ehelp'
- def __init__(self, user, password, vhost, spec_path):
- self._spec = txamqp.spec.load(spec_path)
- self._vhost = vhost
- self._user = user
- self._password = password
- self._writer_list = {}
- for proto in BOT_PROTOCOL_LIST:
- self._writer_list[proto+'_writer'] = queueWriter(None, self._exchange, proto)
- def buildProtocol(self, addr):
- p = self.protocol( delegate = TwistedDelegate(),spec = self._spec, vhost = self._vhost)
- for k,writer in self._writer_list.iteritems():
- writer.associateConnection(p)
- p.factory = self
- return p
- class AMQService(MultiService):
- def __init__(self, host, port=5672, vhost='/', user="guest", password="guest"):
- MultiService.__init__(self)
- self._host = host
- self._port = port
- self._user = user
- self._pass = password
- self._vhost = vhost
- def startService(self):
- Service.startService(self)
- self._queueClientService = internet.TCPClient(self._host, self._port, AMQClientFactory(self._user, self._pass, self._vhost, SPEC_PATH))
- self._queueClientService.setServiceParent(self)
- application = service.Application('ehelp_queueWorkers')
- serviceCollection = service.IServiceCollection(application)
- writerService = AMQService('localhost')
- writerService.setServiceParent(serviceCollection)
Add Comment
Please, Sign In to add comment