Guest User

Untitled

a guest
Sep 1st, 2014
249
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.23 KB | None | 0 0
  1. # -*- coding: utf-8 -*-
  2. import pudb
  3.  
  4. #kombu circuits
  5. from circuits import handler, Component, Debugger, Event, BaseComponent
  6.  
  7. #kombu import
  8. import kombu
  9. from kombu.async import Hub
  10. from kombu.async.hub import Stop
  11. from kombu.mixins import ConsumerMixin
  12. from kombu import Connection, Exchange, Queue, Producer, Consumer
  13.  
  14. class AMQP_CONNECTED(Event):
  15. """YES SIR! I'M CONNECTED!"""
  16.  
  17. class AMQP_START_CONSUMING(Event):
  18. """YES SIR! I'LL WAIT FOR YOU"""
  19.  
  20. class AMQP_MESSAGE_SENT(Event):
  21. """YES SIR! MESSAGE SENT"""
  22.  
  23. class AMQP_MESSAGE_RECEIVED(Event):
  24. """SIR? A NEW MESSAGE FOR YOU"""
  25.  
  26. class KombuWrapper(BaseComponent):
  27.  
  28. def __init__(self, *args, **kwargs):
  29. super(KombuWrapper, self).__init__()
  30.  
  31. #kombu init
  32. self.hub = Hub()
  33. self.exchange = Exchange('asynt')
  34. self.queue = Queue('asynt', self.exchange, 'asynt')
  35. self.conn = None
  36.  
  37. def _send_amqp_msg(self, conn):
  38. producer = Producer(conn)
  39. producer.publish('hello world', exchange=self.exchange, routing_key='asynt')
  40. self.fire(AMQP_MESSAGE_SENT())
  41. print('MESSAGE SENT')
  42.  
  43. def _on_amqp_message(self, message):
  44. print('RECEIVED: %r' % (message.body, ))
  45. self.fire(AMQP_MESSAGE_RECEIVED())
  46. message.ack()
  47.  
  48. def send_receive(self):
  49. self.consumer.consume()
  50. self._send_amqp_msg(self.conn)
  51.  
  52. #circuits handlers
  53. @handler('generate_events')
  54. def on_generate_event(self, event):
  55. if not self.hub or self.conn == None:
  56. print ("Hub not running")
  57. return
  58. try:
  59. self.consumer.consume()
  60. self.hub.run_once()
  61. event.reduce_time_left(0)
  62. print("i'm running")
  63. except Stop:
  64. print ("We stop now, but we have a lot to do.")
  65.  
  66. @handler('started')
  67. def on_start(self, event, *eargs, **ekwargs):
  68. print ("on start")
  69. self.conn = Connection('amqp://guest:guest@localhost:5672')
  70. print ("We have a connection")
  71. self.consumer = Consumer(self.conn, [self.queue], on_message=self._on_amqp_message)
  72. print ("We have a consumer")
  73. self.fire(AMQP_CONNECTED())
  74. self.conn.register_with_event_loop(self.hub)
  75. self.send_receive()
  76.  
  77. #internal handlers
  78. @handler('AMQP_START_CONSUMING')
  79. def start_consuming(self):
  80. self.hub.run_once()
  81.  
  82. if __name__ == '__main__':
  83. app = KombuWrapper() + Debugger()
  84. app.run()
Advertisement
Add Comment
Please, Sign In to add comment