Neda/Back/domains/realtime/speaker_service.py

67 lines
1.9 KiB
Python

import uuid
from livekit import api
from livekit.api.twirp_client import TwirpError
from sqlalchemy.ext.asyncio import AsyncSession
from core.logger import get_logger
from integrations.livekit.client import get_livekit_api
from db.redis import (
acquire_speaker,
release_speaker,
get_active_speaker
)
logger = get_logger(__name__)
async def request_speak(
group_id: str | uuid.UUID,
user_id: str | uuid.UUID,
) -> bool:
group_id_str = str(group_id)
user_id_str = str(user_id)
granted = await acquire_speaker(group_id_str, user_id_str)
if not granted:
return False
await grant_publish_permission(group_id_str, user_id_str, True)
return True
async def stop_speaking(
group_id: str | uuid.UUID,
user_id: str | uuid.UUID,
):
group_id_str = str(group_id)
user_id_str = str(user_id)
released = await release_speaker(group_id_str, user_id_str)
if released:
await grant_publish_permission(group_id_str, user_id_str, False)
return released
async def current_speaker(group_id: str | uuid.UUID):
group_id_str = str(group_id)
return await get_active_speaker(group_id_str)
async def grant_publish_permission(room_name: str, identity: str, can_publish: bool):
lk_api = get_livekit_api()
try:
await lk_api.room.update_participant(
api.UpdateParticipantRequest(
room=room_name,
identity=identity,
permission=api.ParticipantPermission(
can_publish=can_publish,
can_subscribe=True
)
)
)
except TwirpError as e:
if "not_found" in str(e).lower() or "exist" in str(e).lower():
logger.warning(f"Participant {identity} already left the room.")
else:
logger.error(f"Error updating participant: {e}")
except Exception as e:
logger.error(f"Unexpected error in grant_permission: {e}")