Neda/Back/core/deps.py

78 lines
2.0 KiB
Python

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from db.session import get_db
from core.jwt import decode_token
from domains.users.repo import get_user_by_id
# Bearer authentication scheme
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db),
):
"""
Validate JWT token and return the authenticated user
"""
token = credentials.credentials
payload = decode_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="توکن نامعتبر است",
)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="توکن نامعتبر است",
)
user = await get_user_by_id(db, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="کاربری یافت نشد",
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="کاربر غیرفعال است",
)
# Check token version for remote logout
token_version = payload.get("token_version")
if token_version != user.token_version:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="توکن نامعتبر است",
)
return user
async def get_current_admin(
user=Depends(get_current_user)
):
"""
Ensure the authenticated user is an admin
"""
if not user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="شما دسترسی لازم را ندارید",
)
return user