from typing import Annotated from fastapi import Depends, HTTPException from fastapi.security import ( HTTPAuthorizationCredentials, HTTPBearer, OAuth2PasswordBearer, ) from jwt import InvalidTokenError from src.core.auth_manager import AuthManager from src.core.settings import settings from src.schemas.auth import TokenData http_bearer = HTTPBearer(auto_error=False) oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.api.v1_login_url}/login") AccessTokenDep = Annotated[HTTPAuthorizationCredentials, Depends(http_bearer)] async def get_current_user( token: AccessTokenDep, verify_exp: bool = True, check_active: bool = False ): credentials_exception = HTTPException( status_code=401, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = AuthManager.decode_access_token(token.credentials, verify_exp) if payload is None: raise credentials_exception user = TokenData(**payload) if check_active and not user.is_active: raise HTTPException(status_code=400, detail="Inactive user") except (InvalidTokenError, AttributeError): raise credentials_exception return user async def get_current_user_basic(token: AccessTokenDep): return await get_current_user(token, verify_exp=True, check_active=False) async def get_current_active_user(token: AccessTokenDep): return await get_current_user(token, verify_exp=True, check_active=True) async def get_current_user_for_refresh(token: AccessTokenDep): return await get_current_user(token, verify_exp=False, check_active=True) async def get_current_user_for_admin(token: AccessTokenDep): admin = await get_current_user(token, verify_exp=True, check_active=True) if not admin.is_superuser: raise HTTPException(status_code=403, detail="Admin access required") return admin CurrentUser = Annotated[TokenData, Depends(get_current_user_basic)] ActiveUser = Annotated[TokenData, Depends(get_current_active_user)] RefreshUser = Annotated[TokenData, Depends(get_current_user_for_refresh)] AdminUser = Annotated[TokenData, Depends(get_current_user_for_admin)]