Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- import pudb
- #kombu circuits
- from circuits import handler, Component, Debugger, Event, BaseComponent
- #kombu import
- import kombu
- from kombu.async import Hub
- from kombu.async.hub import Stop
- from kombu.mixins import ConsumerMixin
- from kombu import Connection, Exchange, Queue, Producer, Consumer
- class AMQP_CONNECTED(Event):
- """YES SIR! I'M CONNECTED!"""
- class AMQP_START_CONSUMING(Event):
- """YES SIR! I'LL WAIT FOR YOU"""
- class AMQP_MESSAGE_SENT(Event):
- """YES SIR! MESSAGE SENT"""
- class AMQP_MESSAGE_RECEIVED(Event):
- """SIR? A NEW MESSAGE FOR YOU"""
- class KombuWrapper(BaseComponent):
- def __init__(self, *args, **kwargs):
- super(KombuWrapper, self).__init__()
- #kombu init
- self.hub = Hub()
- self.exchange = Exchange('asynt')
- self.queue = Queue('asynt', self.exchange, 'asynt')
- self.conn = None
- def _send_amqp_msg(self, conn):
- producer = Producer(conn)
- producer.publish('hello world', exchange=self.exchange, routing_key='asynt')
- self.fire(AMQP_MESSAGE_SENT())
- print('MESSAGE SENT')
- def _on_amqp_message(self, message):
- print('RECEIVED: %r' % (message.body, ))
- self.fire(AMQP_MESSAGE_RECEIVED())
- message.ack()
- def send_receive(self):
- self.consumer.consume()
- self._send_amqp_msg(self.conn)
- #circuits handlers
- @handler('generate_events')
- def on_generate_event(self, event):
- if not self.hub or self.conn == None:
- print ("Hub not running")
- return
- try:
- self.consumer.consume()
- self.hub.run_once()
- event.reduce_time_left(0)
- print("i'm running")
- except Stop:
- print ("We stop now, but we have a lot to do.")
- @handler('started')
- def on_start(self, event, *eargs, **ekwargs):
- print ("on start")
- self.conn = Connection('amqp://guest:guest@localhost:5672')
- print ("We have a connection")
- self.consumer = Consumer(self.conn, [self.queue], on_message=self._on_amqp_message)
- print ("We have a consumer")
- self.fire(AMQP_CONNECTED())
- self.conn.register_with_event_loop(self.hub)
- self.send_receive()
- #internal handlers
- @handler('AMQP_START_CONSUMING')
- def start_consuming(self):
- self.hub.run_once()
- if __name__ == '__main__':
- app = KombuWrapper() + Debugger()
- app.run()
Advertisement
Add Comment
Please, Sign In to add comment