Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import aiohttp
- import asyncio
- import logging
- from gpiozero import Button
- # Configure logging
- logging.basicConfig(level=logging.INFO)
- # Define buttons
- button1 = Button(17) # Use the correct GPIO pin numbers
- button2 = Button(18)
- async def keep_alive(ws):
- while True:
- await asyncio.sleep(20)
- await ws.ping()
- logging.info("Sent keep-alive ping")
- async def buzzer(ws):
- while True:
- try:
- player = ''
- if button1.is_pressed:
- logging.info('button1 is pressed')
- player = {'id': 'buzzer', 'data': 'player1'}
- if ws.closed:
- logging.error('WebSocket is closed')
- break
- await ws.send_json(player)
- await asyncio.sleep(1)
- elif button2.is_pressed:
- logging.info('button2 is pressed')
- player = {'id': 'buzzer', 'data': 'player2'}
- if ws.closed:
- logging.error('WebSocket is closed')
- break
- await ws.send_json(player)
- await asyncio.sleep(1)
- else:
- await asyncio.sleep(0.1)
- except Exception as e:
- logging.error(f"Error in buzzer function: {e}")
- break
- async def main():
- async with aiohttp.ClientSession() as session:
- try:
- async with session.ws_connect('ws://rasp-wanexa:8001') as ws:
- logging.info('Connected to WebSocket')
- # Start keep_alive task
- asyncio.create_task(keep_alive(ws))
- # Start buzzer task
- asyncio.create_task(buzzer(ws))
- async for msg in ws:
- logging.info(f"Received message: {msg.data}")
- except aiohttp.ClientError as e:
- logging.error(f"WebSocket connection error: {e}")
- except Exception as e:
- logging.error(f"Unexpected error: {e}")
- if __name__ == "__main__":
- try:
- asyncio.run(main())
- except Exception as e:
- logging.error(f"Error running main: {e}")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement