Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import argparse
- import asyncio
- import colored
- import serial
- from serial.tools import list_ports
- import sys
- import uuid
- import websockets
- parser = argparse.ArgumentParser(
- description='Listen for finger game players and control hardware.',
- )
- parser.add_argument(
- '--server',
- dest='server_address',
- action='store',
- default='localhost:9000',
- help='The URL and port of the coordinating server to connect to.',
- )
- parser.add_argument(
- '--debug',
- dest='debug',
- action='store_true',
- default=False,
- help='Whether to print debugging information or not.',
- )
- args = parser.parse_args()
- VALID_COMMANDS = [
- 'BEGIN_TOUCHING_CUP',
- 'STOPPED_TOUCHING_CUP',
- 'HEALTH_CHECK',
- ]
- BAUD_RATE = 9600
- def log(message, color):
- color_code = {
- 'danger': 13,
- 'good': 40,
- 'info': 6,
- }.get(color, 9)
- print(colored.stylize(message, colored.bg(color_code)))
- def find_device():
- arduino_devices = []
- for device in list_ports.comports():
- if 'arduino' in device.description.lower() or (device.manufacturer and 'arduino' in device.manufacturer.lower()):
- arduino_devices.append(device)
- if len(arduino_devices) > 1:
- log(
- ' - more than one arduino found: {}'.format(','.join([device.product for device in arduino_devices])),
- 'danger',
- )
- if not arduino_devices:
- log(
- ' - no arduino like devices found.',
- 'danger',
- )
- return
- return arduino_devices[0]
- async def client(identifier, arduino):
- async with websockets.connect('wss://{}'.format(args.server_address)) as websocket:
- await websocket.send('REGISTER:{}'.format(identifier))
- while True:
- command = await websocket.recv()
- if command not in VALID_COMMANDS:
- if args.debug:
- log('Unknown command: {}'.format(command), 'danger')
- else:
- if command == 'HEALTH_CHECK':
- await websocket.send('HEALTHY:{}'.format(identifier))
- else:
- log(command, 'good')
- arduino.write('{}\n'.format(command).encode('utf-8'))
- if __name__ == '__main__':
- identifier = str(uuid.uuid4())
- arduino = find_device()
- if not arduino:
- sys.exit(1)
- try:
- arduino = serial.Serial(arduino.device, BAUD_RATE)
- log(' - arduino found and connected', 'info')
- log(' - connecting client to server as: {}'.format(identifier), 'info')
- asyncio.get_event_loop().run_until_complete(client(identifier, arduino))
- except websockets.exceptions.ConnectionClosed:
- log(' - connection closed by remote', 'danger')
- except OSError as error:
- log(' - cannot connect to remote server: {}'.format(error), 'danger')
- except KeyboardInterrupt:
- log(' - disconnecting', 'info')
- except serial.serialutil.SerialException:
- log(' - cannot connect to arduino, it is in use', 'danger')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement