Files
taskncoffee/src/services/users.py
2025-08-06 23:38:17 +03:00

39 lines
1.5 KiB
Python

from fastapi import HTTPException
from src.schemas.users import User, UserUpdate
from src.services.base import BaseService
class UserService(BaseService):
async def get_user_by_filter(self, **filter_by) -> User | None:
result = await self.session.user.get_one_or_none(**filter_by)
if result is None:
return None
return User.model_validate(result)
async def get_user_by_filter_or_raise(self, **filter_by) -> User:
user = await self.get_user_by_filter(**filter_by)
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
async def validate_admin_user(self, username: str) -> User:
user = await self.get_user_by_filter_or_raise(username=username)
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Admin access required")
return user
async def get_all_users(self) -> list[User]:
users = await self.session.user.get_all_users()
return [User.model_validate(user) for user in users]
async def delete_user(self, id: int) -> None:
await self.session.user.delete_one(id=id)
await self.session.commit()
async def update_user(self, id: int, update_data: UserUpdate) -> User:
await self.get_user_by_filter_or_raise(id=id)
user = await self.session.user.update_one(id=id, data=update_data)
await self.session.commit()
return User.model_validate(user)