86 lines
1.6 KiB
Python
86 lines
1.6 KiB
Python
import secrets
|
|
import uuid
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from domains.users.models import User
|
|
from domains.users.repo import (
|
|
get_user_by_id,
|
|
get_user_by_username,
|
|
create_user,
|
|
)
|
|
|
|
from core.security import hash_password
|
|
|
|
|
|
def generate_user_secret():
|
|
# return secrets.token_urlsafe(16)
|
|
#for test
|
|
return "1234"
|
|
|
|
async def admin_create_user(
|
|
db: AsyncSession,
|
|
username: str,
|
|
phone_number: str | None = None
|
|
):
|
|
return await _create_user_with_role(
|
|
db=db,
|
|
username=username,
|
|
phone_number=phone_number,
|
|
is_admin=False
|
|
)
|
|
|
|
|
|
async def _create_user_with_role(
|
|
db: AsyncSession,
|
|
username: str,
|
|
phone_number: str | None,
|
|
is_admin: bool
|
|
):
|
|
|
|
existing = await get_user_by_username(db, username)
|
|
|
|
if existing:
|
|
raise ValueError("Username already exists")
|
|
|
|
secret = generate_user_secret()
|
|
|
|
user = User(
|
|
username=username,
|
|
phone_number=phone_number,
|
|
is_admin=is_admin,
|
|
secret_hash=hash_password(secret)
|
|
)
|
|
|
|
await create_user(db, user)
|
|
|
|
return user, secret
|
|
|
|
|
|
async def admin_logout_user(
|
|
db: AsyncSession,
|
|
user_id: str | uuid.UUID
|
|
):
|
|
user = await get_user_by_id(db, user_id)
|
|
if not user:
|
|
return None
|
|
|
|
user.token_version += 1
|
|
await db.commit()
|
|
return user
|
|
|
|
|
|
async def admin_reset_user_secret(
|
|
db: AsyncSession,
|
|
user_id: str | uuid.UUID
|
|
):
|
|
|
|
user = await get_user_by_id(db, user_id)
|
|
|
|
if not user:
|
|
return None
|
|
new_secret = generate_user_secret()
|
|
user.secret_hash = hash_password(new_secret)
|
|
await db.commit()
|
|
return new_secret
|