from typing import Annotated from fastapi import HTTPException, Depends, Path from fastapi.security import OAuth2PasswordBearer from jwt import InvalidTokenError from src.core.auth_manager import AuthManager from src.core.settings import settings from src.schemas.auth import TokenData from src.services.users import UserService from src.api.dependacies.db_dep import sessionDep oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.api.v1_login_url}/login") async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): credentials_exception = HTTPException( status_code=401, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = AuthManager.decode_access_token(token=token) if payload is None: raise credentials_exception user = TokenData(**payload) except InvalidTokenError: raise credentials_exception return user CurrentUser = Annotated[TokenData, Depends(get_current_user)] def get_current_active_user( current_user: CurrentUser, ): if not current_user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return current_user ActiveUser = Annotated[TokenData, Depends(get_current_active_user)] async def get_admin_user(db: sessionDep, current_user: ActiveUser): await UserService(db).validate_admin_user(current_user.sub) return current_user AdminUser = Annotated[TokenData, Depends(get_admin_user)] async def user_or_admin( db: sessionDep, current_user: ActiveUser, id: Annotated[int, Path()] ): if current_user.id == id: return current_user else: admin = await get_admin_user(db, current_user) return admin CurrentOrAdmin = Annotated[TokenData, Depends(user_or_admin)]