121 lines
2.5 KiB
Python
Executable File
121 lines
2.5 KiB
Python
Executable File
import redis.asyncio as redis
|
|
from typing import Optional, Any, Awaitable, cast
|
|
|
|
from core.config import settings
|
|
|
|
|
|
redis_client: redis.Redis = redis.from_url(
|
|
settings.REDIS_URL,
|
|
decode_responses=True,
|
|
)
|
|
|
|
|
|
# =========================
|
|
# Redis Keys
|
|
# =========================
|
|
|
|
def speaker_key(group_id: str) -> str:
|
|
return f"speaker:{group_id}"
|
|
|
|
|
|
def presence_key(group_id: str) -> str:
|
|
return f"presence:{group_id}"
|
|
|
|
|
|
# =========================
|
|
# Speaker Lock
|
|
# =========================
|
|
|
|
async def get_active_speaker(group_id: str) -> Optional[str]:
|
|
return await redis_client.get(speaker_key(group_id))
|
|
|
|
async def acquire_speaker(
|
|
group_id: str,
|
|
user_id: str,
|
|
ttl: int = 30,
|
|
) -> bool:
|
|
"""
|
|
Try to become the active speaker.
|
|
Uses Redis SET NX.
|
|
"""
|
|
|
|
result = await redis_client.set(
|
|
speaker_key(group_id),
|
|
user_id,
|
|
nx=True,
|
|
ex=ttl,
|
|
)
|
|
|
|
return result is True
|
|
|
|
|
|
async def release_speaker(group_id: str, user_id: str) -> bool:
|
|
"""
|
|
Release speaker only if owned by the user.
|
|
Prevents race conditions.
|
|
"""
|
|
|
|
lua_script = """
|
|
if redis.call("GET", KEYS[1]) == ARGV[1]
|
|
then
|
|
return redis.call("DEL", KEYS[1])
|
|
else
|
|
return 0
|
|
end
|
|
"""
|
|
|
|
result = await cast(
|
|
Awaitable[Any],
|
|
redis_client.eval(
|
|
lua_script,
|
|
1,
|
|
speaker_key(group_id),
|
|
user_id,
|
|
),
|
|
)
|
|
|
|
return result == 1
|
|
|
|
async def extend_speaker_lock(group_id: str, user_id: str, ttl: int = 30) -> bool:
|
|
lua_script = """
|
|
if redis.call("GET", KEYS[1]) == ARGV[1]
|
|
then
|
|
return redis.call("EXPIRE", KEYS[1], ARGV[2])
|
|
else
|
|
return 0
|
|
end
|
|
"""
|
|
result = await redis_client.eval(lua_script, 1, speaker_key(group_id), user_id, ttl) # type: ignore
|
|
return result == 1
|
|
|
|
# =========================
|
|
# Presence
|
|
# =========================
|
|
|
|
async def add_presence(group_id: str, user_id: str) -> None:
|
|
await redis_client.sadd(presence_key(group_id), user_id) # type:ignore
|
|
|
|
|
|
async def remove_presence(group_id: str, user_id: str) -> None:
|
|
await redis_client.srem(presence_key(group_id), user_id) # type:ignore
|
|
|
|
|
|
async def get_presence(group_id: str) -> list[str]:
|
|
|
|
members = await redis_client.smembers(
|
|
presence_key(group_id)
|
|
) # type:ignore
|
|
|
|
return list(members)
|
|
|
|
|
|
# =========================
|
|
# Cleanup
|
|
# =========================
|
|
|
|
async def clear_group_state(group_id: str) -> None:
|
|
|
|
await redis_client.delete(
|
|
speaker_key(group_id),
|
|
presence_key(group_id),
|
|
) |