Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class ConnectionManager:
- def __init__(self):
- self.active_connections: List[WebSocket] = []
- async def connect(self, websocket: WebSocket):
- await websocket.accept()
- self.active_connections.append(websocket)
- async def broadcast(self, message: str):
- for connection in self.active_connections:
- await connection.send_text(message)
- manager = ConnectionManager()
- @app.websocket("/ws/{client_id}")
- async def websocket_endpoint(websocket: WebSocket, client_id: int):
- await manager.connect(websocket)
- try:
- while True:
- data = await websocket.receive_text()
- await manager.broadcast(f"Client #{client_id} says: {data}")
Advertisement
Add Comment
Please, Sign In to add comment