Neda/Back/domains/admin/service.py

84 lines
1.6 KiB
Python

import secrets
import uuid
from sqlalchemy.ext.asyncio import AsyncSession
from domains.users.models import User
from domains.users.repo import (
get_user_by_id,
get_user_by_username,
create_user,
)
from core.security import hash_password
def generate_user_secret():
return secrets.token_urlsafe(16)
async def admin_create_user(
db: AsyncSession,
username: str,
phone_number: str | None = None
):
return await _create_user_with_role(
db=db,
username=username,
phone_number=phone_number,
is_admin=False
)
async def _create_user_with_role(
db: AsyncSession,
username: str,
phone_number: str | None,
is_admin: bool
):
existing = await get_user_by_username(db, username)
if existing:
raise ValueError("Username already exists")
secret = generate_user_secret()
user = User(
username=username,
phone_number=phone_number,
is_admin=is_admin,
secret_hash=hash_password(secret)
)
await create_user(db, user)
return user, secret
async def admin_logout_user(
db: AsyncSession,
user_id: str | uuid.UUID
):
user = await get_user_by_id(db, user_id)
if not user:
return None
user.token_version += 1
await db.commit()
return user
async def admin_reset_user_secret(
db: AsyncSession,
user_id: str | uuid.UUID
):
user = await get_user_by_id(db, user_id)
if not user:
return None
new_secret = generate_user_secret()
user.secret_hash = hash_password(new_secret)
await db.commit()
return new_secret