Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # how to await call-and-response over a websocket
- # assuming the remote server preserves the message ID
- from threading import Thread, Event
- import queue
- from time import sleep
- # websocket simulator
- ws_in = queue.Queue()
- ws_out = queue.Queue()
- # server simulator
- def pubsub(): # server publishes data; I subscribe
- for n in range(8):
- sleep(1)
- ws_in.put({'type': 'notif', 'data': n+101})
- def rpc(): # I call procedure on server; server replies
- def put(id):
- sleep(3)
- ws_in.put({'type': 'reply', 'id': id, 'data': id+10})
- while True:
- Thread(target=put, args=[ws_out.get()]).start()
- Thread(target=pubsub).start()
- Thread(target=rpc).start()
- # me
- awaiting = {}
- def ws_receiver():
- while True:
- on_message(ws_in.get())
- def on_message(msg): # ws callback
- if msg['type'] == 'reply': # handle rpc like this
- awaiting[msg['id']]['reply'] = msg['data'] # first writes reply
- awaiting[msg['id']]['received'].set() # then signals that reply is set
- elif msg['type'] == 'notif': # handle pubsub as usual
- print(msg['data'])
- def send(id): # called by whatever wants to send something
- received = Event() # a sync primitive like Lock
- awaiting[id] = {'received': received} # init state of rpc; 'reply' field to be set
- ws_out.put(id) # sends rpc
- print('ws send %s' % (id)) # awaiting[id] set by ws callback
- if received.wait(5): # await received event to be set
- reply = awaiting.pop(id)['reply'] # get reply
- print('ws receive %s: %s' % (id, reply))
- return reply # back to caller
- else: # timeout
- print('ws timeout on %s' % (id))
- Thread(target=ws_receiver).start()
- # test
- sleep(2.7)
- Thread(target=send, args=[3]).start()
- sleep(0.5)
- Thread(target=send, args=[4]).start()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement