Advertisement
Guest User

Untitled

a guest
May 24th, 2017
115
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.84 KB | None | 0 0
  1. import pickle
  2. import uuid
  3. from threading import Thread
  4. import amqplib.client_0_8 as amqp
  5.  
  6. EXCHANGE = "zamqp.broadcast.fanout"
  7.  
  8. client_uuid = str(uuid.uuid1())
  9.  
  10. class AMQPProps(object):
  11.    
  12.     def __init__(self,
  13.                  queue,
  14.                  host='localhost',
  15.                  user='guest',
  16.                  password='guest',
  17.                  ssl=False):
  18.         self.queue = queue
  19.         self.host = host
  20.         self.user = user
  21.         self.password = password
  22.         self.ssl = ssl
  23.  
  24. class AMQPConnection(object):
  25.  
  26.     def __init__(self, mode, props):
  27.         self.mode = mode
  28.         self.props = props
  29.         self.channel = None
  30.         self.connect()
  31.    
  32.     def connect(self):
  33.         read = self.mode == 'r'
  34.         write = self.mode == 'w'
  35.         props = self.props
  36.         conn = amqp.Connection(props.host,
  37.                                userid=props.user,
  38.                                password=props.password,
  39.                                ssl=props.ssl)
  40.         ch = conn.channel()
  41.         ch.access_request('/data', active=True, read=read, write=write)
  42.         ch.exchange_declare(EXCHANGE, 'fanout',
  43.                             durable=False, auto_delete=False)
  44.         if read:
  45.             qname, n_msgs, n_consumers = ch.queue_declare(
  46.                 props.queue, durable=False, exclusive=True, auto_delete=True)
  47.             ch.queue_bind(props.queue, EXCHANGE, props.queue)
  48.         self.channel = ch
  49.    
  50.     def close(self):
  51.         self.channel.close()
  52.         self.channel = None
  53.  
  54. class AMQPProducer(object):
  55.    
  56.     def __init__(self, props):
  57.         self.connection = AMQPConnection('w', props)
  58.  
  59.     def __call__(self, message):
  60.         channel = self.connection.channel
  61.         message = amqp.Message(pickle.dumps(message))
  62.         channel.basic_publish(message, EXCHANGE, '')
  63.  
  64. class AMQPConsumer(object):
  65.    
  66.     def __init__(self, props, callback):
  67.         queue = '%s_%s' % (props.queue, client_uuid)
  68.         props = AMQPProps(queue, host=props.host, user=props.user,
  69.                           password=props.password, ssl=props.ssl)
  70.         self.connection = AMQPConnection('w', props)
  71.         self.callback = callback
  72.    
  73.     def perform(self, message):
  74.         message = pickle.loads(message.body)
  75.         self.callback(message)
  76.  
  77.     def run(self):
  78.         channel = self.conection.channel
  79.         props = self.connection.props
  80.         channel.basic_consume(props.queue, callback=self.perform, no_ack=True)
  81.         while channel.callbacks:
  82.             channel.wait()
  83.  
  84.     def close(self):
  85.         self.connection.close()
  86.  
  87. class AMQPThread(Thread):
  88.    
  89.     def __init__(self, props, callback):
  90.         self.consumer = AMQPConsumer(props, callback)
  91.    
  92.     def run(self):
  93.         self.consumer.run()
  94.    
  95.     def close(self):
  96.         self.consumer.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement