import uuid import time import json from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Request, Header, status, HTTPException from livekit import api from core.config import settings from core.websocket import get_ws_current_user from db.session import AsyncSessionLocal from domains.groups.repo import get_group_member from domains.realtime.ws_manager import manager from domains.realtime.presence_service import ( user_join_group, user_leave_group, list_online_users ) from domains.realtime.speaker_service import ( request_speak, stop_speaking, current_speaker ) from integrations.livekit.token_service import generate_join_token router = APIRouter() @router.post("/lk-webhook") async def livekit_webhook( request: Request, authorization: str = Header(None) ): """ LiveKit Webhook to sync presence and handle participant events. """ receiver = api.WebhookReceiver( api.TokenVerifier( settings.LIVEKIT_API_KEY, settings.LIVEKIT_API_SECRET ) ) try: body = await request.body() event = receiver.receive(body.decode("utf-8"), authorization) except Exception: raise HTTPException(status_code=401, detail="Invalid webhook signature") room_name = event.room.name identity = event.participant.identity if event.event == "participant_joined": await user_join_group(room_name, identity) await manager.broadcast(room_name, { "type": "presence", "users": await list_online_users(room_name, use_livekit=True) }) elif event.event == "participant_left": await user_leave_group(room_name, identity) await manager.broadcast(room_name, { "type": "presence", "users": await list_online_users(room_name, use_livekit=True) }) return {"status": "ok"} @router.websocket("/ws/groups/{group_id}") async def group_ws(websocket: WebSocket, group_id: str): user = await get_ws_current_user(websocket) try: group_id_uuid = uuid.UUID(group_id) except ValueError: await websocket.close(code=status.WS_1008_POLICY_VIOLATION) return # check if user is member of group async with AsyncSessionLocal() as db: membership = await get_group_member(db, group_id_uuid, user.id) if not membership: await websocket.close(code=status.WS_1008_POLICY_VIOLATION) return user_id = str(user.id) # connect websocket await manager.connect(group_id, websocket) # add presence await user_join_group(group_id, user_id) # give listener token listener_token = generate_join_token( user_id=user_id, group_id=group_id, can_publish=False ) await websocket.send_json({ "type": "livekit_token", "token": listener_token }) # broadcast updated presence await manager.broadcast( group_id, { "type": "presence", "users": await list_online_users(group_id) } ) last_action_time = 0.0 try: while True: text_data = await websocket.receive_text() if len(text_data) > 20000: await websocket.close(code=status.WS_1009_MESSAGE_TOO_BIG) return data = json.loads(text_data) event = data.get("type") # anti spam current_time = time.time() if current_time - last_action_time < 0.1: continue last_action_time = current_time # user wants to speak if event == "request_speak": success = await request_speak(group_id, user_id) if success: # Broadcast globally that someone is speaking await manager.broadcast( group_id, { "type": "speaker", "user_id": user_id } ) # Signal the specific client to unmute await websocket.send_json({ "type": "speaker_granted" }) else: # someone else is speaking speaker = await current_speaker(group_id) await websocket.send_json( { "type": "speaker_busy", "speaker": speaker } ) # user stops speaking elif event == "stop_speak": released = await stop_speaking(group_id, user_id) if released: await manager.broadcast( group_id, { "type": "speaker_released" } ) elif event == "keep_alive_speaker": from db.redis import extend_speaker_lock await extend_speaker_lock(group_id, user_id) except WebSocketDisconnect: manager.disconnect(group_id, websocket) await user_leave_group(group_id, user_id) await stop_speaking(group_id, user_id) await manager.broadcast( group_id, { "type": "presence", "users": await list_online_users(group_id) } )