Guest User

Untitled

a guest
Jul 18th, 2022
290
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 0.69 KB | None | 0 0
  1. class ConnectionManager:
  2.     def __init__(self):
  3.         self.active_connections: List[WebSocket] = []
  4.    
  5.     async def connect(self, websocket: WebSocket):
  6.         await websocket.accept()
  7.         self.active_connections.append(websocket)
  8.    
  9.     async def broadcast(self, message: str):
  10.         for connection in self.active_connections:
  11.             await connection.send_text(message)
  12.  
  13.  
  14. manager = ConnectionManager()
  15. @app.websocket("/ws/{client_id}")
  16. async def websocket_endpoint(websocket: WebSocket, client_id: int):
  17.     await manager.connect(websocket)
  18.         try:
  19.             while True:
  20.                     data = await websocket.receive_text()
  21.                     await manager.broadcast(f"Client #{client_id} says: {data}")
Advertisement
Add Comment
Please, Sign In to add comment