Guest User

Untitled

a guest
May 25th, 2018
133
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.05 KB | None | 0 0
  1. BOT_PROTOCOL_LIST = ['jabber']
  2. _base_dir = os.path.dirname(
  3. os.path.dirname(os.path.abspath(__file__)))
  4. AMQP0_8_SPEC = load_spec(os.path.join(_base_dir, "spec", "amqp0-8.xml"))
  5.  
  6.  
  7. class queueWriter:
  8. def __init__(self, conn, exchange, basename):
  9. self._conn = conn
  10. self._basename = basename
  11. self._exchange = exchange
  12.  
  13. def associateConnection(self, connection):
  14. self._conn = connection
  15. self.openChannel().addErrback(log.err)
  16.  
  17. def openChannel(self):
  18. self._chan = self._conn.channel(BOT_PROTOCOL_LIST.index(self._basename)+1)
  19. return self._chan.addCallback(lambda chan: chan.channel_open())
  20.  
  21. def write(self, content):
  22. msg = Content(content)
  23. msg['delivery mode'] = NON_PERSISTENT
  24. self._chan.addCallback(self._chan.basic_publish(exchange = self._exchange, content = msg)).addErrback(log.err)
  25.  
  26. def clientConnectionLost(self, connector, reason):
  27. print reason
  28.  
  29. def clientConnectionFailed(self, connector, reason):
  30. print reason
  31.  
  32.  
  33. class AMQFactory(ReconnectingClientFactory):
  34. """
  35. A C{ClientFactory} for C{AMQClient} protocol with reconnecting facilities.
  36.  
  37. @ivar user: the user name to use to connect to the AMQP server.
  38. @ivar password: the corresponding password of the user.
  39. @ivar vhost: the AMQP vhost to create connections against.
  40. @ivar connected_callback: callback called when a successful connection
  41. happened. It takes one argument, the channel opened for the connection.
  42. @ivar disconnected_callback: callback called when a previously connected
  43. connection was lost. It takes no argument.
  44. """
  45. protocol = AMQClientWithCallback
  46. _exchange = 'ehelp'
  47.  
  48. def __init__(self, user, password, vhost, connected_callback,
  49. disconnected_callback, failed_callback, spec=None):
  50. self.user = user
  51. self.password = password
  52. self.vhost = vhost
  53. self.delegate = TwistedDelegate()
  54. if spec is None:
  55. spec = AMQP0_8_SPEC
  56. self.spec = spec
  57.  
  58. self.connected_callback = connected_callback
  59. self.disconnected_callback = disconnected_callback
  60. self.failed_callback = failed_callback
  61.  
  62. self._writer_list = {}
  63. for proto in BOT_PROTOCOL_LIST:
  64. self._writer_list[proto+'_writer'] = queueWriter(None, self._exchange, proto)
  65.  
  66. def associateConnectionWithBots():
  67. log.msg("Associating connection")
  68. for k,writer in self._writer_list.iteritems():
  69. writer.associateConnection(self._conn)
  70.  
  71.  
  72. def buildProtocol(self, addr):
  73. """
  74. Create the protocol instance and returns it for letting Twisted
  75. connect it to the transport.
  76.  
  77. @param addr: the attributed address, unused for now.
  78. """
  79. protocol = self.protocol(self.clientConnectionMade, self.delegate,
  80. self.vhost, spec=self.spec)
  81. protocol.factory = self
  82. return protocol
  83.  
  84. def clientConnectionMade(self, client):
  85. """
  86. Called when a connection succeeds: login to the server, and open a
  87. channel against it.
  88. """
  89. self.resetDelay()
  90.  
  91. def started(ignored):
  92. # We don't care about authenticate result as long as it succeeds
  93. return client.channel(1).addCallback(got_channel)
  94.  
  95. def got_channel(channel):
  96. return channel.channel_open().addCallback(opened, channel)
  97.  
  98. def opened(ignored, channel):
  99. deferred = maybeDeferred(self.connected_callback, (client, channel))
  100. deferred.addErrback(catch_closed)
  101.  
  102. def catch_closed(failure):
  103. failure.trap(Closed)
  104.  
  105.  
  106. deferred = client.authenticate(self.user, self.password)
  107. return deferred.addCallback(started)
  108.  
  109. def clientConnectionLost(self, connector, reason):
  110. log.msg(":E")
  111. ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
  112.  
  113. # self.disconnected_callback()
  114.  
  115. def clientConnectionFailed(self, connector, reason):
  116. # print type(reason)
  117. log.msg(":E")
  118. ReconnectingClientFactory.clientConnectionFailed(
  119. self, connector, reason)
  120. # self.failed_callback((connector, reason))
  121.  
  122.  
  123.  
  124.  
  125.  
  126. class AMQService(MultiService):
  127. """
  128. Сервис, обслуживающий чтение\запись информации в очередь
  129. """
  130. def __init__(self, host, port=5672, vhost='/', user="guest", password="guest"):
  131. MultiService.__init__(self)
  132. self._host = host
  133. self._port = port
  134. self._user = user
  135. self._pass = password
  136. self._vhost = vhost
  137.  
  138. def startService(self):
  139. Service.startService(self)
  140. self._queueClientService = internet.TCPClient(self._host, self._port, AMQFactory(self._user, self._pass, self._vhost, None, None, None))
  141. self._queueClientService.setServiceParent(self)
  142.  
  143.  
  144. application = service.Application('ehelp_queueWorkers')
  145. serviceCollection = service.IServiceCollection(application)
  146. writerService = AMQService('localhost')
  147. writerService.setServiceParent(serviceCollection)
Add Comment
Please, Sign In to add comment