Advertisement
Guest User

Untitled

a guest
Aug 27th, 2018
143
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.87 KB | None | 0 0
  1. import pika
  2. import tornado
  3. import tornado.websocket as websocket
  4. from tornado.options import options, define, parse_command_line
  5. import tornado.httpserver
  6. import tornado.ioloop
  7. import tornado.wsgi
  8. from pika.adapters.tornado_connection import TornadoConnection
  9. from witbot import recebeInput
  10. import logging
  11.  
  12. LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
  13. '-35s %(lineno) -5d: %(message)s')
  14. LOGGER = logging.getLogger(__name__)
  15.  
  16. class MyWebSocketHandler(websocket.WebSocketHandler):
  17. def check_origin(self, origin):
  18. print("origem: " + origin)
  19. return True
  20.  
  21. def open(self, *args, **kwargs):
  22. self.application.pc.setup_queue(args[0],self)
  23.  
  24. def on_close(self):
  25. print ("WebSocket closed")
  26. self.application.pc.remove_event_listener(self)
  27.  
  28. def on_message(self, message):
  29. print (message)
  30. self.application.pc.send_message(message)
  31.  
  32. class PikaClient(object):
  33. def __init__(self, io_loop):
  34. print ('PikaClient: __init__')
  35. self.io_loop = io_loop
  36. self.connected = False
  37. self.connecting = False
  38. self.connection = None
  39. self.channel = None
  40. self.uri = None
  41. self.websocket = None
  42. self.event_listeners = []
  43.  
  44. def connect(self):
  45. logging.basicConfig(level=logging.DEBUG)
  46. if self.connecting:
  47. print ('PikaClient: Already connecting to RabbitMQ')
  48. return
  49.  
  50. print ('PikaClient: Connecting to RabbitMQ')
  51. self.connecting = True
  52. port = 5672
  53. cred = pika.PlainCredentials(username='guest', password='guest')
  54. param = pika.ConnectionParameters(host='192.168.99.100',port=port,credentials=cred,heartbeat_interval=5)
  55. print(param)
  56. self.connection = TornadoConnection(param,
  57. on_open_callback=self.on_connected)
  58. print(self.connection)
  59. self.connection.add_on_close_callback(self.on_closed)
  60.  
  61. def on_connected(self, connection):
  62. print ('PikaClient: connected to RabbitMQ')
  63. self.connected = True
  64. self.connection = connection
  65. self.connection.channel(self.on_channel_open)
  66.  
  67. def on_channel_open(self, channel):
  68. print ('PikaClient: Channel open, Declaring exchange')
  69. self.channel = channel
  70. self.setup_exchange('amq.topic')
  71.  
  72. def setup_exchange(self, exchange_name):
  73. print('Declaring exchange %s', exchange_name)
  74. self.channel.exchange_declare(exchange=exchange_name,exchange_type='topic',durable=True)
  75.  
  76. def setup_queue(self, queue_name, wsParam):
  77. print('Declaring queue guest' + queue_name)
  78. self.uri = "guest"+queue_name
  79. self.websocket = {'uri':self.uri,'wsParam':wsParam}
  80. print("websocket1:"+str(self.websocket))
  81. self.channel.queue_declare(self.on_queue_declareok, self.uri,durable=True,arguments={"x-expires": 180000})
  82.  
  83. def on_queue_declareok(self, method_frame):
  84. print ('Binding %s to %s with %s', 'amq.topic',self.uri, self.uri)
  85. self.channel.queue_bind(self.on_bindok, self.uri,
  86. 'amq.topic', self.uri)
  87. def on_bindok(self, unused_frame):
  88. print('Queue bound')
  89. self.start_consuming()
  90.  
  91. def start_consuming(self):
  92. print('Issuing consumer related RPC commands')
  93. print("websocket2:"+str(self.websocket))
  94. self.add_event_listener(self.websocket)
  95. self._consumer_tag = self.channel.basic_consume(self.on_message,self.uri)
  96.  
  97. def on_closed(self, connection):
  98. print ('PikaClient: rabbit connection closed')
  99. self.io_loop.stop()
  100.  
  101. def on_message(self, channel, method, header, body):
  102. print ('PikaClient: message received: %s' % body)
  103. self.notify_listeners(body)
  104. self.acknowledge_message(method.delivery_tag)
  105.  
  106. def acknowledge_message(self, delivery_tag):
  107. print('Acknowledging message %s', delivery_tag)
  108. self.channel.basic_ack(delivery_tag)
  109.  
  110.  
  111. def send_message(self, body):
  112. self.channel.basic_publish(exchange='topic',
  113. routing_key=self.uri,
  114. body=body)
  115.  
  116. def notify_listeners(self, body):
  117. bot = recebeInput.trataJson(body)
  118. bot = recebeInput.decisoes(recebeInput.devolveEntidade(bot))
  119. print(bot)
  120. print(len(self.event_listeners))
  121. for position in range(0,len(self.event_listeners)):
  122. if self.event_listeners[position]['uri'] == self.uri:
  123. self.event_listeners[position]['wsParam'].write_message(bot)
  124. print ('PikaClient: notified %s' % str(self.event_listeners))
  125.  
  126. def add_event_listener(self,listener):
  127. print ("added listener")
  128. print("websocket3:"+str(listener))
  129. self.event_listeners.append(listener)
  130. print ('PikaClient: listener %s added' % repr(listener))
  131.  
  132. def remove_event_listener(self, listener):
  133. try:
  134. print(len(self.event_listeners))
  135. for position in range(0,(len(self.event_listeners)-1)):
  136. print(self.event_listeners[position])
  137. if self.event_listeners[position]['wsParam'] == listener:
  138. self.event_listeners.pop(position)
  139. print ('PikaClient: listener %s removed' % str(listener))
  140. except KeyError:
  141. pass
  142.  
  143. def looping():
  144. return tornado.ioloop.IOLoop.instance()
  145.  
  146. def main():
  147. print("iniciando")
  148. parse_command_line()
  149. handler = (r'/(.*)', MyWebSocketHandler)
  150. aplication = tornado.web.Application([
  151. handler
  152. ])
  153. pc = PikaClient(looping())
  154. aplication.pc = pc
  155. aplication.pc.connect()
  156. aplication.listen(8081)
  157. # PikaClient is our rabbitmq consumer
  158. looping().start()
  159.  
  160.  
  161. if __name__ == '__main__':
  162. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement