Guest User

Untitled

a guest
May 25th, 2018
139
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.78 KB | None | 0 0
  1. #-*- coding: utf8 -*-
  2. from twisted.internet.defer import inlineCallbacks
  3. from twisted.application.service import Service, MultiService
  4. from twisted.application import service, internet
  5. from twisted.internet.protocol import ClientFactory
  6. from twisted.python import log
  7.  
  8. from txamqp.protocol import AMQClient
  9. from txamqp.content import Content
  10. from txamqp.client import TwistedDelegate
  11. import txamqp
  12.  
  13. NON_PERSISTENT = 1
  14. PERSISTENT = 2
  15. SPEC_PATH = "../spec/amqp0-8.xml"
  16.  
  17. BOT_PROTOCOL_LIST = ['jabber']
  18.  
  19. class queueWriter:
  20. def __init__(self, conn, exchange, basename):
  21. self._conn = conn
  22. self._basename = basename
  23. self._exchange = exchange
  24.  
  25. def associateConnection(self, connection):
  26. self._conn = connection
  27. self.openChannel().addErrback(log.err)
  28.  
  29. def openChannel(self):
  30. self._chan = self._conn.channel(BOT_PROTOCOL_LIST.index(self._basename)+1)
  31. return self._chan.addCallback(lambda chan: chan.channel_open())
  32.  
  33. def write(self, content):
  34. msg = Content(content)
  35. msg['delivery mode'] = NON_PERSISTENT
  36. self._chan.addCallback(self._chan.basic_publish(exchange = self._exchange, content = msg)).addErrback(log.err)
  37.  
  38. def clientConnectionLost(self, connector, reason):
  39. print reason
  40.  
  41. def clientConnectionFailed(self, connector, reason):
  42. print reason
  43.  
  44. class AMQClientFactory(ClientFactory):
  45. protocol = AMQClient
  46. _exchange = 'ehelp'
  47. def __init__(self, user, password, vhost, spec_path):
  48. self._spec = txamqp.spec.load(spec_path)
  49. self._vhost = vhost
  50. self._user = user
  51. self._password = password
  52. self._writer_list = {}
  53. for proto in BOT_PROTOCOL_LIST:
  54. self._writer_list[proto+'_writer'] = queueWriter(None, self._exchange, proto)
  55.  
  56.  
  57. def buildProtocol(self, addr):
  58. p = self.protocol( delegate = TwistedDelegate(),spec = self._spec, vhost = self._vhost)
  59.  
  60. for k,writer in self._writer_list.iteritems():
  61. writer.associateConnection(p)
  62.  
  63. p.factory = self
  64.  
  65. return p
  66.  
  67.  
  68.  
  69.  
  70. class AMQService(MultiService):
  71. def __init__(self, host, port=5672, vhost='/', user="guest", password="guest"):
  72. MultiService.__init__(self)
  73. self._host = host
  74. self._port = port
  75. self._user = user
  76. self._pass = password
  77. self._vhost = vhost
  78.  
  79. def startService(self):
  80. Service.startService(self)
  81. self._queueClientService = internet.TCPClient(self._host, self._port, AMQClientFactory(self._user, self._pass, self._vhost, SPEC_PATH))
  82. self._queueClientService.setServiceParent(self)
  83.  
  84.  
  85. application = service.Application('ehelp_queueWorkers')
  86. serviceCollection = service.IServiceCollection(application)
  87. writerService = AMQService('localhost')
  88. writerService.setServiceParent(serviceCollection)
Add Comment
Please, Sign In to add comment