From 7f37d7fb605d25fdb2346869cf9c61b7073a39d3 Mon Sep 17 00:00:00 2001 From: roai_linux Date: Sun, 22 Mar 2026 14:28:13 +0330 Subject: [PATCH] feat: add pg_backup and refactor codes for livekit and websocket --- Back/core/websocket.py | 10 +- Back/db/redis.py | 11 ++ Back/docker-compose.yml | 57 ++++++---- Back/domains/realtime/presence_service.py | 23 +++- Back/domains/realtime/speaker_service.py | 80 ++++++-------- Back/domains/realtime/ws.py | 127 +++++++++++++++------- Back/domains/realtime/ws_manager.py | 69 +++++++++--- Back/integrations/livekit/client.py | 23 +++- Back/livekit.yaml | 1 + Back/main.py | 11 +- 10 files changed, 274 insertions(+), 138 deletions(-) diff --git a/Back/core/websocket.py b/Back/core/websocket.py index 5bcf367..19252df 100755 --- a/Back/core/websocket.py +++ b/Back/core/websocket.py @@ -5,24 +5,24 @@ from domains.users.repo import get_user_by_id async def get_ws_current_user(websocket: WebSocket): - token = websocket.query_params.get("token") - if not token: raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION) payload = decode_token(token) - if payload is None: raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION) user_id = payload.get("sub") - + token_version = payload.get("token_version") + async with AsyncSessionLocal() as db: + if user_id is None: + raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION) user = await get_user_by_id(db, user_id) - if not user: + if not user or not user.is_active or user.token_version != token_version: raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION) return user \ No newline at end of file diff --git a/Back/db/redis.py b/Back/db/redis.py index 0d32559..48e9f9c 100755 --- a/Back/db/redis.py +++ b/Back/db/redis.py @@ -76,6 +76,17 @@ async def release_speaker(group_id: str, user_id: str) -> bool: return result == 1 +async def extend_speaker_lock(group_id: str, user_id: str, ttl: int = 30) -> bool: + lua_script = """ + if redis.call("GET", KEYS[1]) == ARGV[1] + then + return redis.call("EXPIRE", KEYS[1], ARGV[2]) + else + return 0 + end + """ + result = await redis_client.eval(lua_script, 1, speaker_key(group_id), user_id, ttl) # type: ignore + return result == 1 # ========================= # Presence diff --git a/Back/docker-compose.yml b/Back/docker-compose.yml index 122206f..890f7ac 100755 --- a/Back/docker-compose.yml +++ b/Back/docker-compose.yml @@ -1,23 +1,4 @@ services: - - api: - build: . - container_name: neda_api - ports: - - "8000:8000" - volumes: - - "./:/app" - env_file: - - .env - depends_on: - postgres: - condition: service_healthy - redis: - condition: service_healthy - livekit: - condition: service_started - restart: always - postgres: image: postgres:17-alpine container_name: neda_postgres @@ -79,6 +60,44 @@ services: depends_on: - postgres + pg_backup: + image: prodrigestivill/postgres-backup-local + container_name: neda_pg_backup + restart: always + profiles: + - "prod" + volumes: + - ./backups:/backups + environment: + POSTGRES_HOST: postgres + POSTGRES_DB: ${POSTGRES_DB} + POSTGRES_USER: ${POSTGRES_USER} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + SCHEDULE: '@daily' + BACKUP_KEEP_DAYS: 7 + + depends_on: + postgres: + condition: service_healthy + + api: + build: . + container_name: neda_api + ports: + - "8000:8000" + volumes: + - "./:/app" + env_file: + - .env + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + livekit: + condition: service_started + restart: always + volumes: postgres_data: redis_data: diff --git a/Back/domains/realtime/presence_service.py b/Back/domains/realtime/presence_service.py index f31e0ca..bdf6ba3 100644 --- a/Back/domains/realtime/presence_service.py +++ b/Back/domains/realtime/presence_service.py @@ -7,22 +7,35 @@ from db.redis import ( ) +from integrations.livekit.client import get_livekit_api +from livekit import api + + async def user_join_group(group_id: str | uuid.UUID, user_id: str | uuid.UUID): """ - Called when websocket connects + Called when websocket connects or LiveKit webhook received """ await add_presence(str(group_id), str(user_id)) async def user_leave_group(group_id: str | uuid.UUID, user_id: str | uuid.UUID): """ - Called when websocket disconnects + Called when websocket disconnects or LiveKit webhook received """ await remove_presence(str(group_id), str(user_id)) -async def list_online_users(group_id: str | uuid.UUID): +async def list_online_users(group_id: str | uuid.UUID, use_livekit: bool = False): """ - Returns online users in a group + Returns online users in a group. + If use_livekit is True, fetches directly from LiveKit server. """ - return await get_presence(str(group_id)) + group_id_str = str(group_id) + + if use_livekit: + lk_api = get_livekit_api() + res = await lk_api.room.list_participants(api.ListParticipantsRequest(room=group_id_str)) + online_users = [p.identity for p in res.participants] + return online_users + + return await get_presence(group_id_str) diff --git a/Back/domains/realtime/speaker_service.py b/Back/domains/realtime/speaker_service.py index 8a5382f..ae667a6 100644 --- a/Back/domains/realtime/speaker_service.py +++ b/Back/domains/realtime/speaker_service.py @@ -1,7 +1,7 @@ import uuid - +from livekit import api from sqlalchemy.ext.asyncio import AsyncSession - +from integrations.livekit.client import get_livekit_api from db.redis import ( acquire_speaker, release_speaker, @@ -10,69 +10,44 @@ from db.redis import ( from domains.groups.repo import get_group_by_id -from integrations.livekit.token_service import generate_join_token - - async def request_speak( - db: AsyncSession, group_id: str | uuid.UUID, - user_id: str | uuid.UUID -): - group_id_uuid = group_id if isinstance(group_id, uuid.UUID) else uuid.UUID(group_id) - group_id_str = str(group_id_uuid) + user_id: str | uuid.UUID, + group_type: str +) -> bool: + group_id_str = str(group_id) user_id_str = str(user_id) - group = await get_group_by_id(db, group_id_uuid) - if not group: - return None - - # private chat → no speaker lock - if str(group.type) == "private": - - token = generate_join_token( - user_id=user_id_str, - group_id=group_id_str, - can_publish=True - ) - - return token + if group_type == "private": + await grant_publish_permission(group_id_str, user_id_str, True) + return True # group chat → push-to-talk granted = await acquire_speaker(group_id_str, user_id_str) - if not granted: - return None + return False - token = generate_join_token( - user_id=user_id_str, - group_id=group_id_str, - can_publish=True - ) - - return token + await grant_publish_permission(group_id_str, user_id_str, True) + return True async def stop_speaking( - db: AsyncSession, group_id: str | uuid.UUID, - user_id: str | uuid.UUID + user_id: str | uuid.UUID, + group_type: str ): - group_id_uuid = group_id if isinstance(group_id, uuid.UUID) else uuid.UUID(group_id) - group_id_str = str(group_id_uuid) + group_id_str = str(group_id) user_id_str = str(user_id) - group = await get_group_by_id(db, group_id_uuid) - - if not group: - return False - - # private chat → nothing to release - if str(group.type) == "private": + if group_type == "private": + await grant_publish_permission(group_id_str, user_id_str, False) return True - return await release_speaker(group_id_str, user_id_str) - + released = await release_speaker(group_id_str, user_id_str) + if released: + await grant_publish_permission(group_id_str, user_id_str, False) + return released async def current_speaker( db: AsyncSession, @@ -90,3 +65,16 @@ async def current_speaker( return None return await get_active_speaker(group_id_str) + +async def grant_publish_permission(room_name: str, identity: str, can_publish: bool): + lk_api = get_livekit_api() # همان متدی که در client.py نوشتی + await lk_api.room.update_participant( + api.UpdateParticipantRequest( + room=room_name, + identity=identity, + permission=api.ParticipantPermission( + can_publish=can_publish, + can_subscribe=True # همیشه بتواند بشنود + ) + ) + ) \ No newline at end of file diff --git a/Back/domains/realtime/ws.py b/Back/domains/realtime/ws.py index 42a6e6c..1f49485 100644 --- a/Back/domains/realtime/ws.py +++ b/Back/domains/realtime/ws.py @@ -1,29 +1,67 @@ import uuid - -from fastapi import APIRouter, WebSocket, WebSocketDisconnect - +from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Request, Header, status, HTTPException +from livekit import api +from core.config import settings from core.websocket import get_ws_current_user from db.session import AsyncSessionLocal from domains.groups.repo import get_group_member - from domains.realtime.ws_manager import manager from domains.realtime.presence_service import ( user_join_group, user_leave_group, list_online_users ) - from domains.realtime.speaker_service import ( request_speak, stop_speaking, current_speaker ) - from integrations.livekit.token_service import generate_join_token router = APIRouter() +@router.post("/lk-webhook") +async def livekit_webhook( + request: Request, + authorization: str = Header(None) +): + """ + LiveKit Webhook to sync presence and handle participant events. + """ + receiver = api.WebhookReceiver( + api.TokenVerifier( + settings.LIVEKIT_API_KEY, + settings.LIVEKIT_API_SECRET + ) + ) + + try: + body = await request.body() + event = receiver.receive(body.decode("utf-8"), authorization) + except Exception: + raise HTTPException(status_code=401, detail="Invalid webhook signature") + + room_name = event.room.name + identity = event.participant.identity + + if event.event == "participant_joined": + await user_join_group(room_name, identity) + await manager.broadcast(room_name, { + "type": "presence", + "users": await list_online_users(room_name, use_livekit=True) + }) + + elif event.event == "participant_left": + await user_leave_group(room_name, identity) + await manager.broadcast(room_name, { + "type": "presence", + "users": await list_online_users(room_name, use_livekit=True) + }) + + return {"status": "ok"} + + @router.websocket("/ws/groups/{group_id}") async def group_ws(websocket: WebSocket, group_id: str): @@ -31,17 +69,21 @@ async def group_ws(websocket: WebSocket, group_id: str): try: group_id_uuid = uuid.UUID(group_id) except ValueError: - await websocket.close(code=1008) + await websocket.close(code=status.WS_1008_POLICY_VIOLATION) return + # check if user is member of group async with AsyncSessionLocal() as db: membership = await get_group_member(db, group_id_uuid, user.id) - if not membership: - await websocket.close(code=1008) - return + if not membership: + await websocket.close(code=status.WS_1008_POLICY_VIOLATION) + return + from domains.groups.repo import get_group_by_id + group = await get_group_by_id(db, group_id_uuid) + group_type = str(group.type) if group else "public" + user_id = str(user.id) - # connect websocket await manager.connect(group_id, websocket) @@ -70,50 +112,55 @@ async def group_ws(websocket: WebSocket, group_id: str): ) try: - async with AsyncSessionLocal() as db: - while True: - data = await websocket.receive_json() - event = data.get("type") - # user wants to speak - if event == "request_speak": - token = await request_speak( - db, + while True: + data = await websocket.receive_json() + event = data.get("type") + # user wants to speak + if event == "request_speak": + success = await request_speak(group_id, user_id, group_type) + if success: + # Broadcast globally that someone is speaking + await manager.broadcast( group_id, - user_id + { + "type": "speaker", + "user_id": user_id + } ) - if token: - await manager.broadcast( - group_id, - { - "type": "speaker", - "user_id": user_id - } - ) - await websocket.send_json({ - "type": "speaker_granted", - "token": token - }) - else: - speaker = await current_speaker(db, group_id) - - await websocket.send_json({ + # Signal the specific client to unmute + await websocket.send_json({ + "type": "speaker_granted" + }) + else: + # someone else is speaking + async with AsyncSessionLocal() as temp_db: + speaker = await current_speaker(temp_db, group_id) + await websocket.send_json( + { "type": "speaker_busy", "speaker": speaker - }) + } + ) - # user stops speaking - elif event == "stop_speak": - await stop_speaking(db, group_id, user_id) + # user stops speaking + elif event == "stop_speak": + released = await stop_speaking(group_id, user_id, group_type) + if released: await manager.broadcast( group_id, { "type": "speaker_released" } ) + + elif event == "keep_alive_speaker": + from db.redis import extend_speaker_lock + await extend_speaker_lock(group_id, user_id) except WebSocketDisconnect: manager.disconnect(group_id, websocket) await user_leave_group(group_id, user_id) + await stop_speaking(group_id, user_id, group_type) await manager.broadcast( group_id, { diff --git a/Back/domains/realtime/ws_manager.py b/Back/domains/realtime/ws_manager.py index 24fbac9..5714183 100644 --- a/Back/domains/realtime/ws_manager.py +++ b/Back/domains/realtime/ws_manager.py @@ -1,27 +1,70 @@ -from fastapi import WebSocket +import asyncio +import json from collections import defaultdict +from typing import Dict, Set, Optional, Any + +from fastapi import WebSocket +from db.redis import redis_client class ConnectionManager: - def __init__(self): - self.groups: dict[str, set[WebSocket]] = defaultdict(set) + # Local connections on this server instance + self.active_connections: Dict[str, Set[WebSocket]] = defaultdict(set) + self._pubsub: Any = None + self._listen_task: Optional[asyncio.Task] = None + + async def _setup_pubsub(self): + if self._pubsub is None: + self._pubsub = redis_client.pubsub() + # Subscribe to all websocket broadcasting channels + # We use Any for _pubsub to satisfy type checkers that don't know redis-py return types + await self._pubsub.psubscribe("ws:group:*") + self._listen_task = asyncio.create_task(self._redis_listener()) + + async def _redis_listener(self): + if self._pubsub is None: + return + try: + async for message in self._pubsub.listen(): + if message["type"] == "pmessage": + channel = message["channel"] + # Extract group_id from "ws:group:{group_id}" + group_id = channel.replace("ws:group:", "") + data = json.loads(message["data"]) + + # Forward to local websockets for this group + await self._local_broadcast(group_id, data) + except Exception: + # Re-initialize on error + self._pubsub = None + self._listen_task = None + + async def _local_broadcast(self, group_id: str, message: dict): + if group_id in self.active_connections: + for ws in list(self.active_connections[group_id]): + try: + await ws.send_json(message) + except Exception: + self.active_connections[group_id].discard(ws) async def connect(self, group_id: str, websocket: WebSocket): await websocket.accept() - self.groups[group_id].add(websocket) + await self._setup_pubsub() + self.active_connections[group_id].add(websocket) def disconnect(self, group_id: str, websocket: WebSocket): - if group_id in self.groups: - self.groups[group_id].discard(websocket) + if group_id in self.active_connections: + self.active_connections[group_id].discard(websocket) + if not self.active_connections[group_id]: + del self.active_connections[group_id] async def broadcast(self, group_id: str, message: dict): - if group_id not in self.groups: - return - for ws in list(self.groups[group_id]): - try: - await ws.send_json(message) - except: - self.groups[group_id].discard(ws) + """ + Public message to Redis. ALL server instances will receive it + and forward it to their local connections for this group. + """ + await redis_client.publish(f"ws:group:{group_id}", json.dumps(message)) + manager = ConnectionManager() \ No newline at end of file diff --git a/Back/integrations/livekit/client.py b/Back/integrations/livekit/client.py index e901925..57eb11f 100644 --- a/Back/integrations/livekit/client.py +++ b/Back/integrations/livekit/client.py @@ -1,10 +1,21 @@ +# integrations/livekit/client.py from livekit import api from core.config import settings -def get_livekit_api(): +_lk_api = None - return api.LiveKitAPI( - settings.LIVEKIT_HOST, - settings.LIVEKIT_API_KEY, - settings.LIVEKIT_API_SECRET - ) \ No newline at end of file +def get_livekit_api(): + global _lk_api + if _lk_api is None: + _lk_api = api.LiveKitAPI( + settings.LIVEKIT_HOST, + settings.LIVEKIT_API_KEY, + settings.LIVEKIT_API_SECRET + ) + return _lk_api + +async def close_livekit_api(): + global _lk_api + if _lk_api is not None: + await _lk_api.aclose() + _lk_api = None \ No newline at end of file diff --git a/Back/livekit.yaml b/Back/livekit.yaml index 491371c..85496e3 100644 --- a/Back/livekit.yaml +++ b/Back/livekit.yaml @@ -5,6 +5,7 @@ rtc: port_range_start: 50000 port_range_end: 50100 use_external_ip: false + # node_ip: "94.183.170.121" logging: level: info \ No newline at end of file diff --git a/Back/main.py b/Back/main.py index 8d057ea..17bd676 100644 --- a/Back/main.py +++ b/Back/main.py @@ -1,7 +1,7 @@ from contextlib import asynccontextmanager - from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware +from fastapi_swagger import patch_fastapi from domains.auth.api import router as auth_router from domains.users.api import router as users_router @@ -9,9 +9,8 @@ from domains.admin.api import router as admin_router from domains.groups.api import router as groups_router from domains.realtime.ws import router as realtime_router from domains.notifications.api import router as notifications_router - +from integrations.livekit.client import close_livekit_api from db.redis import redis_client -from fastapi_swagger import patch_fastapi @asynccontextmanager @@ -21,13 +20,17 @@ async def lifespan(app: FastAPI): try: await redis_client.ping() # type: ignore print("Redis connected") - + async for key in redis_client.scan_iter("speaker:*"): + await redis_client.delete(key) + async for key in redis_client.scan_iter("presence:*"): + await redis_client.delete(key) except Exception as e: print("Redis connection failed:", e) yield # ---------- Shutdown ---------- + await close_livekit_api() await redis_client.close()