Neda/Back/domains/auth/api.py

89 lines
2.2 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from db.session import get_db
from core.jwt import decode_token, create_access_token
from domains.users.repo import get_user_by_id
from domains.auth.schemas import (
LoginRequest,
TokenResponse,
RefreshTokenRequest
)
from domains.auth.service import login_user
import uuid
router = APIRouter(
prefix="/auth",
tags=["auth"]
)
@router.post("/login", response_model=TokenResponse)
async def login(
payload: LoginRequest,
db: AsyncSession = Depends(get_db)
):
token = await login_user(
db,
payload.username,
payload.secret
)
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or secret"
)
return token
@router.post("/refresh", response_model=TokenResponse)
async def refresh(
payload: RefreshTokenRequest,
db: AsyncSession = Depends(get_db)
):
payload_data = decode_token(payload.refresh_token)
if not payload_data or payload_data.get("type") != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
)
user_id = payload_data.get("sub")
token_version = payload_data.get("token_version")
if not user_id or token_version is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token payload",
)
try:
user_uuid = uuid.UUID(user_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid user ID in token",
)
user = await get_user_by_id(db, user_uuid)
if not user or not user.is_active or user.token_version != token_version:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
)
access_token = create_access_token(
subject=str(user.id),
token_version=user.token_version
)
return {
"access_token": access_token,
"refresh_token": payload.refresh_token,
"token_type": "bearer"
}