Advertisement
Guest User

Untitled

a guest
Apr 3rd, 2019
125
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.45 KB | None | 0 0
  1. rt asyncio
  2. import os
  3.  
  4. import aio_pika
  5.  
  6.  
  7. # RabbitMQ
  8. from aio_pika import DeliveryMode
  9.  
  10. RABBITMQ_HOST = os.environ.get('RABBITMQ_HOST', 'localhost')
  11. RABBITMQ_USER = os.environ.get('RABBITMQ_USER', 'rabbitmq')
  12. RABBITMQ_PASS = os.environ.get('RABBITMQ_PASS', 'rabbitmq')
  13.  
  14.  
  15. class RestWorker(object):
  16.  
  17.     def __init__(self, rabbit_host, rabbit_user, rabbit_pass, loop) -> None:
  18.         super().__init__()
  19.         self.loop = loop
  20.         self.rabbit_host = rabbit_host
  21.         self.rabbit_user = rabbit_user
  22.         self.rabbit_pass = rabbit_pass
  23.         self.rabbit_conn = None  # type: aio_pika.Connection
  24.         self.bg_tasks = []
  25.  
  26.     async def run(self):
  27.         # Create RabbitMQ connection
  28.         self.rabbit_conn = await aio_pika.connect_robust(
  29.             host=self.rabbit_host, login=self.rabbit_user, password=self.rabbit_pass, loop=loop
  30.         )
  31.         self.rabbit_channel = await self.rabbit_conn.channel()
  32.  
  33.         # Create received messages queue
  34.         recv_queue = await self.rabbit_channel.declare_queue(name=f'recv_messages', durable=True)
  35.  
  36.         # Create sent messages queue
  37.         sended_queue = await self.rabbit_channel.declare_queue(name=f'sended_messages', durable=True)
  38.  
  39.         self.bg_tasks.append(
  40.             self.loop.create_task(
  41.                 self.consume(recv_queue)
  42.             )
  43.         )
  44.  
  45.     async def close(self):
  46.         # Killing background tasks
  47.         for task in self.bg_tasks:
  48.             task.cancel()
  49.  
  50.         # Closing RabbitMQ connection
  51.         await self.rabbit_channel.close()
  52.         await self.rabbit_conn.close()
  53.  
  54.     def consume(self, recv_queue):
  55.         while True:
  56.             async for message in recv_queue:
  57.                 await self.rabbit_channel.default_exchange.publish(
  58.                     aio_pika.Message(
  59.                         body=message.body,
  60.                         delivery_mode=DeliveryMode.PERSISTENT,
  61.                         headers=message.headers
  62.                     ), routing_key=f"sended_messages",
  63.                 )
  64.                 message.ack()
  65.    
  66.    
  67. if __name__ == "__main__":
  68.     loop = asyncio.get_event_loop()
  69.  
  70.     server = RestWorker(
  71.         rabbit_host=RABBITMQ_HOST, rabbit_user=RABBITMQ_USER, rabbit_pass=RABBITMQ_PASS, loop=loop
  72.     )
  73.  
  74.     loop.run_until_complete(server.run())
  75.  
  76.     try:
  77.         loop.run_forever()
  78.     except KeyboardInterrupt:
  79.         pass
  80.  
  81.     loop.run_until_complete(server.close())
  82.     loop.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement