Advertisement
Guest User

Untitled

a guest
Jul 23rd, 2017
67
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.49 KB | None | 0 0
  1. import asyncio
  2. from aio_pika import connect, Message
  3.  
  4. async def main(future):
  5. USERNAME = 'myuser'
  6. PASSWORD = 'mypass'
  7. HOST = 'localhost'
  8. PORT = 5672
  9. QUEUE = 'text'
  10. ROUTING_KEY = 'example.text'
  11. message = Message(
  12. bytes('Hello', 'utf-8'),
  13. content_type='text/plain',
  14. headers={'foo': 'bar'}
  15. )
  16.  
  17. connection = await connect("amqp://{}:{}@{}/".format(USERNAME, PASSWORD, HOST), loop=loop)
  18. channel = await connection.channel()
  19. exchange = await channel.declare_exchange('direct', auto_delete=True)
  20. queue = await channel.declare_queue(QUEUE, auto_delete=False)
  21.  
  22. await queue.bind(exchange, ROUTING_KEY)
  23.  
  24. # Receiving message
  25. print("waiting to recieve")
  26. incoming_message = await queue.get() #set as arg to get timeout: timeout=10
  27. print("recieved message")
  28. incoming_message.ack()
  29. print("acknowledged message")
  30. future.set_result(incoming_message.body) #.decode('utf-8')
  31. print("set future")
  32.  
  33. #print("unbind")
  34. #await queue.unbind(exchange, ROUTING_KEY)
  35. ## await queue.delete()
  36. #print("close")
  37. #await connection.close()
  38.  
  39.  
  40. def doit(future):
  41. print(future.result())
  42. loop.stop()
  43.  
  44. loop = asyncio.get_event_loop()
  45. future = asyncio.Future()
  46. asyncio.ensure_future(main(future))
  47. future.add_done_callback(doit)
  48. # loop.run_until_complete(main(loop))
  49.  
  50. try:
  51. loop.run_forever()
  52. finally:
  53. loop.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement