Advertisement
Guest User

Untitled

a guest
Jul 18th, 2011
1,492
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.47 KB | None | 0 0
  1. class Connection:
  2.     def __init__(self, config):
  3.         self.publish_channel = None
  4.         self.exchange = config.get('exchange', 'thot')
  5.         self.connection = stormed.Connection(
  6.             host=config.get('host', 'localhost'),
  7.             port=config.get('port', 5672),
  8.             username=config.get('username', 'guest'),
  9.             password=config.get('password', 'guest'))
  10.         self.connection.on_error = self.error_handler
  11.    
  12.     def connect(self, callback):
  13.         def on_connect():
  14.             logger.debug('QUEUE: connection established')
  15.             callback()
  16.         self.connection.connect(on_connect)
  17.  
  18.     def error_handler(self, e):
  19.         logger.error("Got Connection error %s: %s" % (e.reply_code, e.reply_text))
  20.        
  21.     def create_queue(self, routing_key):
  22.         ch = self.connection.channel()
  23.         ch.exchange_declare(exchange=self.exchange, type='direct', durable=True)
  24.         ch.queue_declare(queue=routing_key, durable=True, auto_delete=False, exclusive=False)
  25.         ch.queue_bind(exchange=self.exchange, queue=routing_key, routing_key=routing_key)
  26.         return ch
  27.  
  28.     def create_listener(self, handler_class, no_ack=True):
  29.         return Listener(self, handler_class, no_ack)
  30.  
  31.     def publish(self, routing_key, data):
  32.         if self.publish_channel == None:
  33.             self.publish_channel = self.connection.channel()
  34.         logger.debug('QUEUE: publish message in %s: %s' % (routing_key, data))
  35.         msg = stormed.Message(json.dumps(data))
  36.         self.publish_channel.publish(msg,
  37.             exchange=self.exchange,
  38.             routing_key=routing_key)
  39.  
  40.     def purge(self, routing_key, callback=None):
  41.         def on_purge(a):
  42.             logger.debug('QUEUE: purged %s' % routing_key)
  43.             channel.close(callback)
  44.         channel = self.create_queue(routing_key)
  45.         channel.queue_purge(routing_key, on_purge)
  46.  
  47.     def close(self, callback=None):
  48.         def on_close():
  49.             logger.debug('QUEUE: connection closed')
  50.             if callback != None: callback()
  51.         self.connection.close(on_close)
  52.  
  53.  
  54. class MessageHandler:
  55.     def __init__(self, channel):
  56.         self.channel = channel
  57.         self.initialize()
  58.  
  59.     def initialize(self):
  60.         pass
  61.  
  62.     def handle(self, data):
  63.         pass
  64.    
  65.  
  66. class Listener:
  67.     def __init__(self, connection, handler_class, no_ack=True):
  68.         self.connection = connection
  69.         self.handler_class = handler_class
  70.         self.no_ack = no_ack
  71.    
  72.     def receiver(self, message):
  73.         logger.debug('LISTENER: message received: %s' % message.body)
  74.         try:
  75.             handler = self.handler_class(self)
  76.             handler.handle(json.loads(message.body))
  77.             if not self.no_ack: message.ack()
  78.             del(handler)
  79.         except Exception as e:
  80.             if not self.no_ack: message.reject()
  81.             logger.error("Unexpected error %s: %s\n%s" % (type(e), e, traceback.format_exc()))
  82.        
  83.     def listen(self, routing_key):
  84.         chann = self.connection.create_queue(routing_key)
  85.         self.routing_key = routing_key
  86.         self.channel = chann
  87.  
  88.         def on_qos():
  89.             logger.debug('LISTENER:  %s' % routing_key)
  90.             chann.consume(routing_key, self.receiver, no_ack=self.no_ack)
  91.  
  92.         if self.no_ack:
  93.             on_qos()
  94.         else:
  95.             chann.qos(0, 100, callback=on_qos)
  96.    
  97.     def close(self, callback=None):
  98.         self.channel.close(callback)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement