from typing import Annotated from fastapi import Depends, HTTPException, Path from fastapi.security import OAuth2PasswordBearer from jwt import InvalidTokenError from src.api.dependacies.db_dep import sessionDep from src.core.auth_manager import AuthManager from src.core.settings import settings from src.schemas.auth import TokenData from src.services.tasks import TaskService from src.services.users import UserService oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.api.v1_login_url}/login") async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): credentials_exception = HTTPException( status_code=401, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = AuthManager.decode_access_token(token=token) if payload is None: raise credentials_exception user = TokenData(**payload) except InvalidTokenError: raise credentials_exception return user CurrentUser = Annotated[TokenData, Depends(get_current_user)] def get_current_active_user( current_user: CurrentUser, ): if not current_user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return current_user ActiveUser = Annotated[TokenData, Depends(get_current_active_user)] async def get_admin_user(db: sessionDep, current_user: ActiveUser): await UserService(db).validate_admin_user(current_user.sub) return current_user AdminUser = Annotated[TokenData, Depends(get_admin_user)] async def user_or_admin(db: sessionDep, current_user: ActiveUser, owner_id: int): if current_user.id == owner_id: return current_user else: admin = await get_admin_user(db, current_user) return admin async def CurrentOrAdminOwner( db: sessionDep, current_user: ActiveUser, id: Annotated[int, Path()] ): authorized_user = await user_or_admin(db, current_user, id) if not authorized_user: raise HTTPException(status_code=403, detail="Not authorized") return authorized_user async def CurrentOrAdminTask( db: sessionDep, id: Annotated[int, Path()], current_user: ActiveUser, ): task = await TaskService(db).get_task(id) if not task: raise HTTPException(status_code=404, detail="Task not found") return await CurrentOrAdminOwner(db, current_user, task.user_id) OwnerDep = Annotated[TokenData, Depends(CurrentOrAdminOwner)] TaskOwnerDep = Annotated[TokenData, Depends(CurrentOrAdminTask)]