import secrets import uuid from sqlalchemy.ext.asyncio import AsyncSession from domains.users.models import User from domains.users.repo import ( get_user_by_id, get_user_by_username, create_user, ) from core.security import hash_password from core.config import Settings def generate_user_secret(): return secrets.token_urlsafe(Settings.SECRET_PASS_LENGTH) async def admin_create_user( db: AsyncSession, username: str, phone_number: str | None = None ): return await _create_user_with_role( db=db, username=username, phone_number=phone_number, is_admin=False ) async def _create_user_with_role( db: AsyncSession, username: str, phone_number: str | None, is_admin: bool ): existing = await get_user_by_username(db, username) if existing: raise ValueError("نام کاربری تکراری است") secret = generate_user_secret() user = User( username=username, phone_number=phone_number, is_admin=is_admin, secret_hash=hash_password(secret) ) await create_user(db, user) return user, secret async def admin_logout_user( db: AsyncSession, user_id: str | uuid.UUID ): user = await get_user_by_id(db, user_id) if not user: return None user.token_version += 1 await db.commit() return user async def admin_reset_user_secret( db: AsyncSession, user_id: str | uuid.UUID ): user = await get_user_by_id(db, user_id) if not user: return None new_secret = generate_user_secret() user.secret_hash = hash_password(new_secret) await db.commit() return new_secret