Neda/Back/core/deps.py
2026-03-06 20:01:04 +03:30

70 lines
1.6 KiB
Python

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from db.session import get_db
from core.jwt import decode_token
from domains.users.repo import get_user_by_id
# Bearer authentication scheme
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db),
):
"""
Validate JWT token and return the authenticated user
"""
token = credentials.credentials
payload = decode_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication token",
)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload",
)
user = await get_user_by_id(db, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User is inactive",
)
return user
async def get_current_admin(
user=Depends(get_current_user)
):
"""
Ensure the authenticated user is an admin
"""
if user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin privileges required",
)
return user