Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pika
- import tornado
- import tornado.websocket as websocket
- from tornado.options import options, define, parse_command_line
- import tornado.httpserver
- import tornado.ioloop
- import tornado.wsgi
- from pika.adapters.tornado_connection import TornadoConnection
- from witbot import recebeInput
- import logging
- LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
- '-35s %(lineno) -5d: %(message)s')
- LOGGER = logging.getLogger(__name__)
- class MyWebSocketHandler(websocket.WebSocketHandler):
- def check_origin(self, origin):
- print("origem: " + origin)
- return True
- def open(self, *args, **kwargs):
- self.application.pc.setup_queue(args[0],self)
- def on_close(self):
- print ("WebSocket closed")
- self.application.pc.remove_event_listener(self)
- def on_message(self, message):
- print (message)
- self.application.pc.send_message(message)
- class PikaClient(object):
- def __init__(self, io_loop):
- print ('PikaClient: __init__')
- self.io_loop = io_loop
- self.connected = False
- self.connecting = False
- self.connection = None
- self.channel = None
- self.uri = None
- self.websocket = None
- self.event_listeners = []
- def connect(self):
- logging.basicConfig(level=logging.DEBUG)
- if self.connecting:
- print ('PikaClient: Already connecting to RabbitMQ')
- return
- print ('PikaClient: Connecting to RabbitMQ')
- self.connecting = True
- port = 5672
- cred = pika.PlainCredentials(username='guest', password='guest')
- param = pika.ConnectionParameters(host='192.168.99.100',port=port,credentials=cred,heartbeat_interval=5)
- print(param)
- self.connection = TornadoConnection(param,
- on_open_callback=self.on_connected)
- print(self.connection)
- self.connection.add_on_close_callback(self.on_closed)
- def on_connected(self, connection):
- print ('PikaClient: connected to RabbitMQ')
- self.connected = True
- self.connection = connection
- self.connection.channel(self.on_channel_open)
- def on_channel_open(self, channel):
- print ('PikaClient: Channel open, Declaring exchange')
- self.channel = channel
- self.setup_exchange('amq.topic')
- def setup_exchange(self, exchange_name):
- print('Declaring exchange %s', exchange_name)
- self.channel.exchange_declare(exchange=exchange_name,exchange_type='topic',durable=True)
- def setup_queue(self, queue_name, wsParam):
- print('Declaring queue guest' + queue_name)
- self.uri = "guest"+queue_name
- self.websocket = {'uri':self.uri,'wsParam':wsParam}
- print("websocket1:"+str(self.websocket))
- self.channel.queue_declare(self.on_queue_declareok, self.uri,durable=True,arguments={"x-expires": 180000})
- def on_queue_declareok(self, method_frame):
- print ('Binding %s to %s with %s', 'amq.topic',self.uri, self.uri)
- self.channel.queue_bind(self.on_bindok, self.uri,
- 'amq.topic', self.uri)
- def on_bindok(self, unused_frame):
- print('Queue bound')
- self.start_consuming()
- def start_consuming(self):
- print('Issuing consumer related RPC commands')
- print("websocket2:"+str(self.websocket))
- self.add_event_listener(self.websocket)
- self._consumer_tag = self.channel.basic_consume(self.on_message,self.uri)
- def on_closed(self, connection):
- print ('PikaClient: rabbit connection closed')
- self.io_loop.stop()
- def on_message(self, channel, method, header, body):
- print ('PikaClient: message received: %s' % body)
- self.notify_listeners(body)
- self.acknowledge_message(method.delivery_tag)
- def acknowledge_message(self, delivery_tag):
- print('Acknowledging message %s', delivery_tag)
- self.channel.basic_ack(delivery_tag)
- def send_message(self, body):
- self.channel.basic_publish(exchange='topic',
- routing_key=self.uri,
- body=body)
- def notify_listeners(self, body):
- bot = recebeInput.trataJson(body)
- bot = recebeInput.decisoes(recebeInput.devolveEntidade(bot))
- print(bot)
- print(len(self.event_listeners))
- for position in range(0,len(self.event_listeners)):
- if self.event_listeners[position]['uri'] == self.uri:
- self.event_listeners[position]['wsParam'].write_message(bot)
- print ('PikaClient: notified %s' % str(self.event_listeners))
- def add_event_listener(self,listener):
- print ("added listener")
- print("websocket3:"+str(listener))
- self.event_listeners.append(listener)
- print ('PikaClient: listener %s added' % repr(listener))
- def remove_event_listener(self, listener):
- try:
- print(len(self.event_listeners))
- for position in range(0,(len(self.event_listeners)-1)):
- print(self.event_listeners[position])
- if self.event_listeners[position]['wsParam'] == listener:
- self.event_listeners.pop(position)
- print ('PikaClient: listener %s removed' % str(listener))
- except KeyError:
- pass
- def looping():
- return tornado.ioloop.IOLoop.instance()
- def main():
- print("iniciando")
- parse_command_line()
- handler = (r'/(.*)', MyWebSocketHandler)
- aplication = tornado.web.Application([
- handler
- ])
- pc = PikaClient(looping())
- aplication.pc = pc
- aplication.pc.connect()
- aplication.listen(8081)
- # PikaClient is our rabbitmq consumer
- looping().start()
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement