Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class Connection:
- def __init__(self, config):
- self.publish_channel = None
- self.exchange = config.get('exchange', 'thot')
- self.connection = stormed.Connection(
- host=config.get('host', 'localhost'),
- port=config.get('port', 5672),
- username=config.get('username', 'guest'),
- password=config.get('password', 'guest'))
- self.connection.on_error = self.error_handler
- def connect(self, callback):
- def on_connect():
- logger.debug('QUEUE: connection established')
- callback()
- self.connection.connect(on_connect)
- def error_handler(self, e):
- logger.error("Got Connection error %s: %s" % (e.reply_code, e.reply_text))
- def create_queue(self, routing_key):
- ch = self.connection.channel()
- ch.exchange_declare(exchange=self.exchange, type='direct', durable=True)
- ch.queue_declare(queue=routing_key, durable=True, auto_delete=False, exclusive=False)
- ch.queue_bind(exchange=self.exchange, queue=routing_key, routing_key=routing_key)
- return ch
- def create_listener(self, handler_class, no_ack=True):
- return Listener(self, handler_class, no_ack)
- def publish(self, routing_key, data):
- if self.publish_channel == None:
- self.publish_channel = self.connection.channel()
- logger.debug('QUEUE: publish message in %s: %s' % (routing_key, data))
- msg = stormed.Message(json.dumps(data))
- self.publish_channel.publish(msg,
- exchange=self.exchange,
- routing_key=routing_key)
- def purge(self, routing_key, callback=None):
- def on_purge(a):
- logger.debug('QUEUE: purged %s' % routing_key)
- channel.close(callback)
- channel = self.create_queue(routing_key)
- channel.queue_purge(routing_key, on_purge)
- def close(self, callback=None):
- def on_close():
- logger.debug('QUEUE: connection closed')
- if callback != None: callback()
- self.connection.close(on_close)
- class MessageHandler:
- def __init__(self, channel):
- self.channel = channel
- self.initialize()
- def initialize(self):
- pass
- def handle(self, data):
- pass
- class Listener:
- def __init__(self, connection, handler_class, no_ack=True):
- self.connection = connection
- self.handler_class = handler_class
- self.no_ack = no_ack
- def receiver(self, message):
- logger.debug('LISTENER: message received: %s' % message.body)
- try:
- handler = self.handler_class(self)
- handler.handle(json.loads(message.body))
- if not self.no_ack: message.ack()
- del(handler)
- except Exception as e:
- if not self.no_ack: message.reject()
- logger.error("Unexpected error %s: %s\n%s" % (type(e), e, traceback.format_exc()))
- def listen(self, routing_key):
- chann = self.connection.create_queue(routing_key)
- self.routing_key = routing_key
- self.channel = chann
- def on_qos():
- logger.debug('LISTENER: %s' % routing_key)
- chann.consume(routing_key, self.receiver, no_ack=self.no_ack)
- if self.no_ack:
- on_qos()
- else:
- chann.qos(0, 100, callback=on_qos)
- def close(self, callback=None):
- self.channel.close(callback)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement