Advertisement
Guest User

Untitled

a guest
Jul 23rd, 2019
110
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.31 KB | None | 0 0
  1. import threading, asyncio, websockets, time
  2.  
  3. # Global stop flag
  4. stopFlag = False
  5.  
  6. """
  7. DataWorker thread responsible for data generation.
  8. """
  9. class DataWorker (threading.Thread):
  10.  
  11. # Constructor
  12. def __init__(self):
  13. threading.Thread.__init__(self)
  14. self.data = 0
  15. self.lastData = 0
  16. self.inc = 0
  17.  
  18. # Generate data
  19. def run(self):
  20. while not stopFlag:
  21. self.data = self.inc
  22. self.inc += 1
  23. time.sleep(1)
  24.  
  25. # Data getter
  26. def get(self):
  27. if self.lastData is not self.data:
  28. self.lastData = self.data
  29. return self.data
  30.  
  31.  
  32. """
  33. MessagingWorker thread responsible for sending
  34. messages over websockets.
  35. """
  36. class MessagingWorker (threading.Thread):
  37.  
  38. # Constructor
  39. def __init__(self, interval=0.04):
  40. threading.Thread.__init__(self)
  41. self.interval = interval
  42. self.connected = set()
  43.  
  44. # Send data on predefined intervals
  45. def run(self):
  46. while not stopFlag:
  47. data = dataWorker.get()
  48. if data:
  49. self.broadcast('{"DATA": "%s"}' % data)
  50.  
  51. time.sleep(self.interval)
  52.  
  53. # Websockets handler
  54. async def handler(self, websocket, path):
  55. self.connected.add(websocket)
  56. try:
  57. await websocket.recv()
  58. except websockets.exceptions.ConnectionClosed:
  59. pass
  60. finally:
  61. self.connected.remove(websocket)
  62.  
  63. # Broadcast to all clients
  64. def broadcast(self, data):
  65. for websocket in self.connected.copy():
  66. print("Sending data: %s" % data)
  67. coro = websocket.send(data)
  68. future = asyncio.run_coroutine_threadsafe(coro, loop)
  69.  
  70.  
  71. if __name__ == "__main__":
  72.  
  73. print('Data publisher')
  74. dataWorker = DataWorker()
  75. messagingWorker = MessagingWorker()
  76.  
  77. try:
  78. # Create data and messaging threads
  79. dataWorker.start()
  80. messagingWorker.start()
  81.  
  82. # Create server
  83. ws_server = websockets.serve(messagingWorker.handler, '0.0.0.0', 4545)
  84.  
  85. # Create async loop
  86. loop = asyncio.get_event_loop()
  87. loop.run_until_complete(ws_server)
  88. loop.run_forever()
  89.  
  90. except KeyboardInterrupt:
  91. stopFlag = True
  92.  
  93. # Close async loop
  94. loop.stop()
  95. loop.close()
  96. print("Exiting program...")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement