183 lines
5.4 KiB
Python
183 lines
5.4 KiB
Python
import uuid
|
|
import time
|
|
import json
|
|
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Request, Header, status, HTTPException
|
|
from livekit import api
|
|
from core.config import settings
|
|
from core.websocket import get_ws_current_user
|
|
from db.session import AsyncSessionLocal
|
|
from domains.groups.repo import get_group_member
|
|
from domains.realtime.ws_manager import manager
|
|
from domains.realtime.presence_service import (
|
|
user_join_group,
|
|
user_leave_group,
|
|
list_online_users
|
|
)
|
|
from domains.realtime.speaker_service import (
|
|
request_speak,
|
|
stop_speaking,
|
|
current_speaker
|
|
)
|
|
from integrations.livekit.token_service import generate_join_token
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("/lk-webhook")
|
|
async def livekit_webhook(
|
|
request: Request,
|
|
authorization: str = Header(None)
|
|
):
|
|
"""
|
|
LiveKit Webhook to sync presence and handle participant events.
|
|
"""
|
|
receiver = api.WebhookReceiver(
|
|
api.TokenVerifier(
|
|
settings.LIVEKIT_API_KEY,
|
|
settings.LIVEKIT_API_SECRET
|
|
)
|
|
)
|
|
|
|
try:
|
|
body = await request.body()
|
|
event = receiver.receive(body.decode("utf-8"), authorization)
|
|
except Exception:
|
|
raise HTTPException(status_code=401, detail="Invalid webhook signature")
|
|
|
|
room_name = event.room.name
|
|
identity = event.participant.identity
|
|
|
|
if event.event == "participant_joined":
|
|
await user_join_group(room_name, identity)
|
|
await manager.broadcast(room_name, {
|
|
"type": "presence",
|
|
"users": await list_online_users(room_name, use_livekit=True)
|
|
})
|
|
|
|
elif event.event == "participant_left":
|
|
await user_leave_group(room_name, identity)
|
|
await manager.broadcast(room_name, {
|
|
"type": "presence",
|
|
"users": await list_online_users(room_name, use_livekit=True)
|
|
})
|
|
|
|
return {"status": "ok"}
|
|
|
|
|
|
@router.websocket("/ws/groups/{group_id}")
|
|
async def group_ws(websocket: WebSocket, group_id: str):
|
|
|
|
user = await get_ws_current_user(websocket)
|
|
try:
|
|
group_id_uuid = uuid.UUID(group_id)
|
|
except ValueError:
|
|
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
|
|
return
|
|
|
|
# check if user is member of group
|
|
async with AsyncSessionLocal() as db:
|
|
membership = await get_group_member(db, group_id_uuid, user.id)
|
|
if not membership:
|
|
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
|
|
return
|
|
|
|
user_id = str(user.id)
|
|
# connect websocket
|
|
await manager.connect(group_id, websocket)
|
|
|
|
# add presence
|
|
await user_join_group(group_id, user_id)
|
|
|
|
# give listener token
|
|
listener_token = generate_join_token(
|
|
user_id=user_id,
|
|
group_id=group_id,
|
|
can_publish=False
|
|
)
|
|
|
|
await websocket.send_json({
|
|
"type": "livekit_token",
|
|
"token": listener_token
|
|
})
|
|
|
|
# broadcast updated presence
|
|
await manager.broadcast(
|
|
group_id,
|
|
{
|
|
"type": "presence",
|
|
"users": await list_online_users(group_id)
|
|
}
|
|
)
|
|
|
|
last_action_time = 0.0
|
|
|
|
try:
|
|
while True:
|
|
text_data = await websocket.receive_text()
|
|
|
|
if len(text_data) > 20000:
|
|
await websocket.close(code=status.WS_1009_MESSAGE_TOO_BIG)
|
|
return
|
|
|
|
data = json.loads(text_data)
|
|
event = data.get("type")
|
|
|
|
# anti spam
|
|
current_time = time.time()
|
|
if current_time - last_action_time < 0.1:
|
|
continue
|
|
last_action_time = current_time
|
|
|
|
# user wants to speak
|
|
if event == "request_speak":
|
|
success = await request_speak(group_id, user_id)
|
|
if success:
|
|
# Broadcast globally that someone is speaking
|
|
await manager.broadcast(
|
|
group_id,
|
|
{
|
|
"type": "speaker",
|
|
"user_id": user_id
|
|
}
|
|
)
|
|
# Signal the specific client to unmute
|
|
await websocket.send_json({
|
|
"type": "speaker_granted"
|
|
})
|
|
else:
|
|
# someone else is speaking
|
|
speaker = await current_speaker(group_id)
|
|
await websocket.send_json(
|
|
{
|
|
"type": "speaker_busy",
|
|
"speaker": speaker
|
|
}
|
|
)
|
|
|
|
# user stops speaking
|
|
elif event == "stop_speak":
|
|
released = await stop_speaking(group_id, user_id)
|
|
if released:
|
|
await manager.broadcast(
|
|
group_id,
|
|
{
|
|
"type": "speaker_released"
|
|
}
|
|
)
|
|
|
|
elif event == "keep_alive_speaker":
|
|
from db.redis import extend_speaker_lock
|
|
await extend_speaker_lock(group_id, user_id)
|
|
|
|
except WebSocketDisconnect:
|
|
manager.disconnect(group_id, websocket)
|
|
await user_leave_group(group_id, user_id)
|
|
await stop_speaking(group_id, user_id)
|
|
await manager.broadcast(
|
|
group_id,
|
|
{
|
|
"type": "presence",
|
|
"users": await list_online_users(group_id)
|
|
}
|
|
)
|