Advertisement
Guest User

Untitled

a guest
Jun 25th, 2019
79
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.77 KB | None | 0 0
  1. import asyncio
  2. from nats.aio.client import Client as NATS
  3. from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
  4.  
  5. async def run(loop):
  6. nc = NATS()
  7.  
  8. await nc.connect("127.0.0.1:4222", loop=loop)
  9.  
  10. async def message_handler(msg):
  11. subject = msg.subject
  12. reply = msg.reply
  13. data = msg.data.decode()
  14. print("Received a message on '{subject} {reply}': {data}".format(
  15. subject=subject, reply=reply, data=data))
  16.  
  17. # Simple publisher and async subscriber via coroutine.
  18. sid = await nc.subscribe("foo", cb=message_handler)
  19.  
  20. # Stop receiving after 2 messages.
  21. await nc.auto_unsubscribe(sid, 2)
  22. await nc.publish("foo", b'Hello')
  23. await nc.publish("foo", b'World')
  24. await nc.publish("foo", b'!!!!!')
  25.  
  26. async def help_request(msg):
  27. subject = msg.subject
  28. reply = msg.reply
  29. data = msg.data.decode()
  30. print("Received a message on '{subject} {reply}': {data}".format(
  31. subject=subject, reply=reply, data=data))
  32. await nc.publish(reply, b'I can help')
  33.  
  34. # Use queue named 'workers' for distributing requests
  35. # among subscribers.
  36. sid = await nc.subscribe("help", "workers", help_request)
  37.  
  38. # Send a request and expect a single response
  39. # and trigger timeout if not faster than 200 ms.
  40. try:
  41. response = await nc.request("help", b'help me', timeout=1)
  42. print("Received response: {message}".format(
  43. message=response.data.decode()))
  44. except ErrTimeout:
  45. print("Request timed out")
  46.  
  47. # Remove interest in subscription.
  48. await nc.unsubscribe(sid)
  49.  
  50. # Terminate connection to NATS.
  51. await nc.close()
  52.  
  53. if __name__ == '__main__':
  54. loop = asyncio.get_event_loop()
  55. loop.run_until_complete(run(loop))
  56. loop.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement