Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketServerFactory
- import asyncio
- import serial.aio
- serial_transport = None
- class Output(asyncio.Protocol):
- def connection_made(self, transport):
- global serial_transport
- serial_transport = transport
- self.transport = transport
- print('port opened', transport)
- transport.serial.rts = False
- def data_received(self, data):
- print('data received', repr(data))
- def connection_lost(self, exc):
- print('port closed')
- asyncio.get_event_loop().stop()
- class MyServerProtocol(WebSocketServerProtocol):
- def onConnect(self, request):
- print("Client connecting: {0}".format(request.peer))
- def onOpen(self):
- print("WebSocket connection open.")
- def onMessage(self, payload, isBinary):
- if isBinary:
- print("Binary message received: {0} bytes".format(len(payload)))
- if serial_transport is not None:
- serial_transport.write(payload)
- else:
- print("Text message received: {0}".format(payload.decode('utf8')))
- def onClose(self, wasClean, code, reason):
- print("WebSocket connection closed: {0}".format(reason))
- if __name__ == '__main__':
- loop = asyncio.get_event_loop()
- # Web Socket Server
- factory = WebSocketServerFactory(u"ws://127.0.0.1:9000")
- factory.protocol = MyServerProtocol
- coro = loop.create_server(factory, '0.0.0.0', 9000)
- server = loop.run_until_complete(coro)
- # Serial Server
- coro = serial.aio.create_serial_connection(loop, Output, '/dev/cu.usbmodem401111', baudrate=115200)
- loop.run_until_complete(coro)
- try:
- loop.run_forever()
- except KeyboardInterrupt:
- pass
- finally:
- server.close()
- loop.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement