Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- rt asyncio
- import os
- import aio_pika
- # RabbitMQ
- from aio_pika import DeliveryMode
- RABBITMQ_HOST = os.environ.get('RABBITMQ_HOST', 'localhost')
- RABBITMQ_USER = os.environ.get('RABBITMQ_USER', 'rabbitmq')
- RABBITMQ_PASS = os.environ.get('RABBITMQ_PASS', 'rabbitmq')
- class RestWorker(object):
- def __init__(self, rabbit_host, rabbit_user, rabbit_pass, loop) -> None:
- super().__init__()
- self.loop = loop
- self.rabbit_host = rabbit_host
- self.rabbit_user = rabbit_user
- self.rabbit_pass = rabbit_pass
- self.rabbit_conn = None # type: aio_pika.Connection
- self.bg_tasks = []
- async def run(self):
- # Create RabbitMQ connection
- self.rabbit_conn = await aio_pika.connect_robust(
- host=self.rabbit_host, login=self.rabbit_user, password=self.rabbit_pass, loop=loop
- )
- self.rabbit_channel = await self.rabbit_conn.channel()
- # Create received messages queue
- recv_queue = await self.rabbit_channel.declare_queue(name=f'recv_messages', durable=True)
- # Create sent messages queue
- sended_queue = await self.rabbit_channel.declare_queue(name=f'sended_messages', durable=True)
- self.bg_tasks.append(
- self.loop.create_task(
- self.consume(recv_queue)
- )
- )
- async def close(self):
- # Killing background tasks
- for task in self.bg_tasks:
- task.cancel()
- # Closing RabbitMQ connection
- await self.rabbit_channel.close()
- await self.rabbit_conn.close()
- def consume(self, recv_queue):
- while True:
- async for message in recv_queue:
- await self.rabbit_channel.default_exchange.publish(
- aio_pika.Message(
- body=message.body,
- delivery_mode=DeliveryMode.PERSISTENT,
- headers=message.headers
- ), routing_key=f"sended_messages",
- )
- message.ack()
- if __name__ == "__main__":
- loop = asyncio.get_event_loop()
- server = RestWorker(
- rabbit_host=RABBITMQ_HOST, rabbit_user=RABBITMQ_USER, rabbit_pass=RABBITMQ_PASS, loop=loop
- )
- loop.run_until_complete(server.run())
- try:
- loop.run_forever()
- except KeyboardInterrupt:
- pass
- loop.run_until_complete(server.close())
- loop.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement