add admin check

This commit is contained in:
IluaAir
2025-07-26 12:48:01 +03:00
parent 1a495e28c5
commit 36dad7b441
4 changed files with 48 additions and 9 deletions

View File

@@ -7,6 +7,8 @@ from jwt import InvalidTokenError
from src.core.auth_manager import AuthManager
from src.core.settings import settings
from src.schemas.auth import TokenData
from src.services.users import UserService
from src.api.dependacies.db_dep import sessionDep
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.api.v1_login_url}/login")
@@ -27,15 +29,21 @@ async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
return user
CurUsr = Annotated[TokenData, Depends(get_current_user)]
CurrentUser = Annotated[TokenData, Depends(get_current_user)]
async def get_current_active_user(
current_user: CurUsr,
def get_current_active_user(
current_user: CurrentUser,
):
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
ActiveUser = Annotated[TokenData, Depends(get_current_active_user)]
async def get_admin_user(db: sessionDep, current_user: ActiveUser):
await UserService(db).validate_admin_user(current_user.sub)
return current_user
ActCurUser = Annotated[TokenData, Depends(get_current_active_user)]
AdminUser = Annotated[TokenData, Depends(get_admin_user)]

View File

@@ -1,11 +1,17 @@
from fastapi import APIRouter
from src.api.dependacies.user_dep import ActCurUser
from src.api.dependacies.user_dep import ActiveUser, AdminUser
from src.core.settings import settings
router = APIRouter(prefix=settings.api.v1.users, tags=["Users"])
@router.get("/me")
async def get_me(user: ActCurUser):
return {"user": user}
async def get_me(user: ActiveUser):
return user
@router.get("/")
async def get_all_users(user: AdminUser):
users = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
return {"users": users}

24
src/services/users.py Normal file
View File

@@ -0,0 +1,24 @@
from fastapi import HTTPException
from src.schemas.users import User
from src.services.base import BaseService
class UserService(BaseService):
async def get_user_by_username(self, username: str) -> User | None:
result = await self.session.user.get_one_or_none(username=username)
if result is None:
return None
return User.model_validate(result)
async def get_user_by_username_or_raise(self, username: str) -> User:
user = await self.get_user_by_username(username)
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
async def validate_admin_user(self, username: str) -> User:
user = await self.get_user_by_username_or_raise(username)
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Admin access required")
return user