Advertisement
Pyorot

Websocket call-and-response

Oct 20th, 2018
212
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.99 KB | None | 0 0
  1. # how to await call-and-response over a websocket
  2. # assuming the remote server preserves the message ID
  3.  
  4. from threading import Thread, Event
  5. import queue
  6. from time import sleep
  7.  
  8. # websocket simulator
  9.  
  10. ws_in = queue.Queue()
  11. ws_out = queue.Queue()
  12.  
  13. # server simulator
  14.  
  15. def pubsub():   # server publishes data; I subscribe
  16.     for n in range(8):
  17.         sleep(1)
  18.         ws_in.put({'type': 'notif', 'data': n+101})
  19. def rpc():      # I call procedure on server; server replies
  20.     def put(id):
  21.         sleep(3)
  22.         ws_in.put({'type': 'reply', 'id': id, 'data': id+10})
  23.     while True:
  24.         Thread(target=put, args=[ws_out.get()]).start()
  25. Thread(target=pubsub).start()
  26. Thread(target=rpc).start()
  27.  
  28. # me
  29.  
  30. awaiting = {}
  31.  
  32. def ws_receiver():
  33.     while True:
  34.         on_message(ws_in.get())
  35.  
  36. def on_message(msg):    # ws callback
  37.     if msg['type'] == 'reply':      # handle rpc like this
  38.         awaiting[msg['id']]['reply'] = msg['data']  # first writes reply
  39.         awaiting[msg['id']]['received'].set()       # then signals that reply is set
  40.     elif msg['type'] == 'notif':    # handle pubsub as usual
  41.         print(msg['data'])
  42.  
  43. def send(id):           # called by whatever wants to send something
  44.     received = Event()                      # a sync primitive like Lock
  45.     awaiting[id] = {'received': received}   # init state of rpc; 'reply' field to be set
  46.     ws_out.put(id)                          # sends rpc
  47.     print('ws send %s' % (id))              # awaiting[id] set by ws callback
  48.     if received.wait(5):                    # await received event to be set
  49.         reply = awaiting.pop(id)['reply']   # get reply
  50.         print('ws receive %s: %s' % (id, reply))
  51.         return reply                        # back to caller
  52.     else:                                   # timeout
  53.         print('ws timeout on %s' % (id))
  54.  
  55. Thread(target=ws_receiver).start()
  56.  
  57. # test
  58.  
  59. sleep(2.7)
  60. Thread(target=send, args=[3]).start()
  61. sleep(0.5)
  62. Thread(target=send, args=[4]).start()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement