Advertisement
rharder

Communicate Across Multiple aiohttp Websocket Servers

Feb 15th, 2017
712
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 5.14 KB | None | 0 0
  1. #!/usr/bin/env python3
  2. """
  3. Illustrates how to have multiple websocket servers running and send
  4. messages to all their various clients at once.
  5.  
  6. In response to stackoverflow question:
  7. http://stackoverflow.com/questions/35820782/how-to-manage-websockets-across-multiple-servers-workers
  8.  
  9. Pastebin: http://pastebin.com/xDSACmdV
  10. """
  11. import asyncio
  12. import datetime
  13. import random
  14. import time
  15. import webbrowser
  16.  
  17. import aiohttp
  18. from aiohttp import web
  19.  
  20. __author__ = "Robert Harder"
  21. __email__ = "rob@iharder.net"
  22. __license__ = "Public Domain"
  23.  
  24.  
  25. def main():
  26.     # Create servers
  27.     cap_srv = CapitalizeEchoServer(port=9990)
  28.     rnd_srv = RandomQuoteServer(port=9991)
  29.     tim_srv = TimeOfDayServer(port=9992)
  30.  
  31.     # Queue their start operation
  32.     loop = asyncio.get_event_loop()
  33.     loop.create_task(cap_srv.start())
  34.     loop.create_task(rnd_srv.start())
  35.     loop.create_task(tim_srv.start())
  36.  
  37.     # Open web pages to test them
  38.     webtests = [9990, 9991, 9991, 9992, 9992]
  39.     for port in webtests:
  40.         url = "http://www.websocket.org/echo.html?location=ws://localhost:{}".format(port)
  41.         webbrowser.open(url)
  42.     print("Be sure to click 'Connect' on the webpages that just opened.")
  43.  
  44.     # Queue a simulated broadcast-to-all message
  45.     def _alert_all(msg):
  46.         print("Sending alert:", msg)
  47.         msg_dict = {"alert": msg}
  48.         cap_srv.broadcast_message(msg_dict)
  49.         rnd_srv.broadcast_message(msg_dict)
  50.         tim_srv.broadcast_message(msg_dict)
  51.  
  52.     loop.call_later(17, _alert_all, "ALL YOUR BASE ARE BELONG TO US")
  53.  
  54.     # Run event loop
  55.     loop.run_forever()
  56.  
  57.  
  58. class MyServer:
  59.     def __init__(self, port):
  60.         self.port = port  # type: int
  61.         self.loop = None  # type: asyncio.AbstractEventLoop
  62.         self.app = None  # type: web.Application
  63.         self.srv = None  # type: asyncio.base_events.Server
  64.  
  65.     async def start(self):
  66.         self.loop = asyncio.get_event_loop()
  67.         self.app = web.Application()
  68.         self.app["websockets"] = []  # type: [web.WebSocketResponse]
  69.         self.app.router.add_get("/", self._websocket_handler)
  70.         await self.app.startup()
  71.         handler = self.app.make_handler()
  72.         self.srv = await asyncio.get_event_loop().create_server(handler, port=self.port)
  73.         print("{} listening on port {}".format(self.__class__.__name__, self.port))
  74.  
  75.     async def close(self):
  76.         assert self.loop is asyncio.get_event_loop()
  77.         self.srv.close()
  78.         await self.srv.wait_closed()
  79.  
  80.         for ws in self.app["websockets"]:  # type: web.WebSocketResponse
  81.             await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY, message='Server shutdown')
  82.  
  83.         await self.app.shutdown()
  84.         await self.app.cleanup()
  85.  
  86.     async def _websocket_handler(self, request):
  87.         assert self.loop is asyncio.get_event_loop()
  88.         ws = web.WebSocketResponse()
  89.         await ws.prepare(request)
  90.         self.app["websockets"].append(ws)
  91.  
  92.         await self.do_websocket(ws)
  93.  
  94.         self.app["websockets"].remove(ws)
  95.         return ws
  96.  
  97.     async def do_websocket(self, ws: web.WebSocketResponse):
  98.         async for ws_msg in ws:  # type: aiohttp.WSMessage
  99.             pass
  100.  
  101.     def broadcast_message(self, msg: dict):
  102.         for ws in self.app["websockets"]:  # type: web.WebSocketResponse
  103.             ws.send_json(msg)
  104.  
  105.  
  106. class CapitalizeEchoServer(MyServer):
  107.     """ Echoes back to client whatever they sent, but capitalized. """
  108.  
  109.     async def do_websocket(self, ws: web.WebSocketResponse):
  110.         async for ws_msg in ws:  # type: aiohttp.WSMessage
  111.             cap = ws_msg.data.upper()
  112.             ws.send_str(cap)
  113.  
  114.  
  115. class RandomQuoteServer(MyServer):
  116.     """ Sends a random quote to the client every so many seconds. """
  117.     QUOTES = ["Wherever you go, there you are.",
  118.               "80% of all statistics are made up.",
  119.               "If a tree falls in the woods, and no one is around to hear it, does it make a noise?"]
  120.  
  121.     def __init__(self, interval: float = 10, *kargs, **kwargs):
  122.         super().__init__(*kargs, **kwargs)
  123.         self.interval = interval
  124.  
  125.     async def do_websocket(self, ws: web.WebSocketResponse):
  126.         async def _regular_interval():
  127.             while self.srv.sockets is not None:
  128.                 quote = random.choice(RandomQuoteServer.QUOTES)
  129.                 ws.send_json({"quote": quote})
  130.                 await asyncio.sleep(self.interval)
  131.  
  132.         self.loop.create_task(_regular_interval())
  133.  
  134.         await super().do_websocket(ws)  # leave client connected here indefinitely
  135.  
  136.  
  137. class TimeOfDayServer(MyServer):
  138.     """ Sends a message to all clients simultaneously about time of day. """
  139.  
  140.     async def start(self):
  141.         await super().start()
  142.  
  143.         async def _regular_interval():
  144.             while self.srv.sockets is not None:
  145.                 if int(time.time()) % 10 == 0:  # Only on the 10 second mark
  146.                     timestamp = "{:%Y-%m-%d %H:%M:%S}".format(datetime.datetime.now())
  147.                     self.broadcast_message({"timestamp": timestamp})
  148.                 await asyncio.sleep(1)
  149.  
  150.         self.loop.create_task(_regular_interval())
  151.  
  152.  
  153. if __name__ == "__main__":
  154.     main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement