Neda/Back/domains/realtime/ws_manager.py
2026-03-06 15:16:41 +03:30

27 lines
811 B
Python

from fastapi import WebSocket
from collections import defaultdict
class ConnectionManager:
def __init__(self):
self.groups: dict[str, set[WebSocket]] = defaultdict(set)
async def connect(self, group_id: str, websocket: WebSocket):
await websocket.accept()
self.groups[group_id].add(websocket)
def disconnect(self, group_id: str, websocket: WebSocket):
if group_id in self.groups:
self.groups[group_id].discard(websocket)
async def broadcast(self, group_id: str, message: dict):
if group_id not in self.groups:
return
for ws in list(self.groups[group_id]):
try:
await ws.send_json(message)
except:
self.groups[group_id].discard(ws)
manager = ConnectionManager()