Compare commits

..

10 Commits

Author SHA1 Message Date
IluaAir
b9a9e4e094 current or admin dep created 2025-07-27 13:54:39 +03:00
IluaAir
9f105c32c6 fix router request user data 2025-07-27 13:06:00 +03:00
IluaAir
7c334491c3 add user get, access token add id 2025-07-27 12:55:58 +03:00
IluaAir
d3eba77444 add get users 2025-07-27 12:41:35 +03:00
IluaAir
36dad7b441 add admin check 2025-07-27 12:30:06 +03:00
IluaAir
1a495e28c5 update .gitignore 2025-07-21 12:02:41 +03:00
IluaAir
4789da1b70 create active user 2025-07-20 12:37:57 +03:00
IluaAir
8ec8639848 create cur user 2025-07-20 12:34:18 +03:00
IluaAir
d639abfbc5 service auth fix 2025-07-19 12:10:00 +03:00
IluaAir
d530412805 ready login endpoint 2025-07-14 12:41:03 +03:00
12 changed files with 202 additions and 15 deletions

5
.gitignore vendored
View File

@@ -1,2 +1,7 @@
/.venv/ /.venv/
/.idea /.idea
__pycache__/
*.db
.DS_Store
.env
.vscode/

View File

@@ -0,0 +1,72 @@
from typing import Annotated
from fastapi import HTTPException, Depends, Path
from fastapi.security import OAuth2PasswordBearer
from jwt import InvalidTokenError
from src.core.auth_manager import AuthManager
from src.core.settings import settings
from src.schemas.auth import TokenData
from src.services.users import UserService
from src.api.dependacies.db_dep import sessionDep
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.api.v1_login_url}/login")
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = AuthManager.decode_access_token(token=token)
if payload is None:
raise credentials_exception
user = TokenData(**payload)
except InvalidTokenError:
raise credentials_exception
return user
CurrentUser = Annotated[TokenData, Depends(get_current_user)]
def get_current_active_user(
current_user: CurrentUser,
):
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
ActiveUser = Annotated[TokenData, Depends(get_current_active_user)]
async def get_admin_user(db: sessionDep, current_user: ActiveUser):
await UserService(db).validate_admin_user(current_user.sub)
return current_user
AdminUser = Annotated[TokenData, Depends(get_admin_user)]
async def user_or_admin_path(db: sessionDep, id: int, current_user: ActiveUser):
if current_user.id == id:
return current_user
else:
admin = await get_admin_user(db, current_user)
return admin
async def user_or_admin(
db: sessionDep, current_user: ActiveUser, id: Annotated[int, Path()]
):
if current_user.id == id:
return current_user
else:
admin = await get_admin_user(db, current_user)
return admin
CurrentOrAdmin = Annotated[TokenData, Depends(user_or_admin)]

View File

@@ -22,4 +22,7 @@ async def login(
session: sessionDep, session: sessionDep,
credential: Annotated[OAuth2PasswordRequestForm, Depends()], credential: Annotated[OAuth2PasswordRequestForm, Depends()],
): ):
user = AuthService(session).login(credential.username, credential.password) access_token = await AuthService(session).login(
credential.username, credential.password
)
return access_token

View File

@@ -1,5 +1,25 @@
from fastapi import APIRouter from fastapi import APIRouter
from src.api.dependacies.user_dep import ActiveUser, AdminUser, CurrentOrAdmin
from src.api.dependacies.db_dep import sessionDep
from src.core.settings import settings from src.core.settings import settings
from src.services.users import UserService
router = APIRouter(prefix=settings.api.v1.users, tags=["Users"]) router = APIRouter(prefix=settings.api.v1.users, tags=["Users"])
@router.get("/me")
async def get_me(user: ActiveUser):
return user
@router.get("/")
async def get_all_users(db: sessionDep, _: AdminUser):
users = await UserService(db).get_all_users()
return users
@router.get("/{id}")
async def get_user_by_id(db: sessionDep, id: int, _: CurrentOrAdmin):
user = await UserService(db).get_user_by_filter_or_raise(id=id)
return user

View File

@@ -6,7 +6,7 @@ from passlib.context import CryptContext
from src.core.settings import settings from src.core.settings import settings
class AuthManger: class AuthManager:
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@classmethod @classmethod
@@ -33,3 +33,11 @@ class AuthManger:
algorithm=settings.access_token.algorithm, algorithm=settings.access_token.algorithm,
) )
return encoded_jwt return encoded_jwt
@classmethod
def decode_access_token(cls, token: str) -> dict:
return jwt.decode(
token,
settings.access_token.secret_key,
algorithms=[settings.access_token.algorithm],
)

View File

@@ -12,11 +12,19 @@ class ApiV1Prefix(BaseModel):
auth: str = "/auth" auth: str = "/auth"
users: str = "/users" users: str = "/users"
@property
def login_url(self) -> str:
return f"{self.prefix}{self.auth}"
class ApiPrefix(BaseModel): class ApiPrefix(BaseModel):
prefix: str = "/api" prefix: str = "/api"
v1: ApiV1Prefix = ApiV1Prefix() v1: ApiV1Prefix = ApiV1Prefix()
@property
def v1_login_url(self) -> str:
return f"{self.prefix}{self.v1.login_url}"
class DbSettings(BaseModel): class DbSettings(BaseModel):
url: str = f"sqlite+aiosqlite:///{DB_PATH}" url: str = f"sqlite+aiosqlite:///{DB_PATH}"

View File

@@ -1,5 +1,5 @@
from pydantic import BaseModel from pydantic import BaseModel
from sqlalchemy import insert from sqlalchemy import insert, select
from src.core.database import Base from src.core.database import Base
@@ -15,3 +15,9 @@ class BaseRepo:
result = await self.session.execute(statement) result = await self.session.execute(statement)
obj = result.scalar_one() obj = result.scalar_one()
return obj return obj
async def get_one_or_none(self, **filter_by):
query = select(self.model).filter_by(**filter_by)
result = await self.session.execute(query)
model = result.scalars().one_or_none()
return model

View File

@@ -1,6 +1,14 @@
from sqlalchemy import select
from src.models import UsersORM from src.models import UsersORM
from src.repository.base import BaseRepo from src.repository.base import BaseRepo
class UsersRepo(BaseRepo): class UsersRepo(BaseRepo):
model = UsersORM model = UsersORM
async def get_all_users(self):
query = select(self.model)
result = await self.session.execute(query)
models = result.scalars().all()
return models

12
src/schemas/auth.py Normal file
View File

@@ -0,0 +1,12 @@
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
id: int | None = None
sub: str | None = None
is_active: bool

View File

@@ -32,6 +32,4 @@ class UserRequestADD(BaseModel):
class UserAdd(BaseModel): class UserAdd(BaseModel):
email: EmailStr | None email: EmailStr | None
username: str username: str
is_active: bool
is_superuser: bool
hashed_password: str hashed_password: str

View File

@@ -1,21 +1,40 @@
from src.schemas.users import UserRequestADD, User, UserAdd from fastapi import HTTPException
from src.schemas.auth import Token
from src.schemas.users import UserRequestADD, User, UserAdd, UserWithHashedPass
from src.services.base import BaseService from src.services.base import BaseService
from src.core.auth_manager import AuthManger from src.core.auth_manager import AuthManager
class AuthService(BaseService): class AuthService(BaseService):
async def registration(self, cred: UserRequestADD) -> User: async def registration(self, cred: UserRequestADD) -> User:
hashed_pass = AuthManger.get_password_hash(cred.password) hashed_pass = AuthManager.get_password_hash(cred.password)
user_to_insert = UserAdd( user_to_insert = UserAdd(
username=cred.username, username=cred.username,
email=cred.email, email=cred.email,
hashed_password=hashed_pass, hashed_password=hashed_pass,
is_active=True,
is_superuser=False,
) )
result = await self.session.user.create_one(user_to_insert) result = await self.session.user.create_one(user_to_insert)
await self.session.commit() await self.session.commit()
return User.model_validate(result) return User.model_validate(result)
async def login(self, username: str, password: str): async def login(self, username: str, password: str):
... result = await self.session.user.get_one_or_none(username=username)
if result is None:
raise HTTPException(
status_code=401,
detail="Incorrect username or password",
)
user = UserWithHashedPass.model_validate(result)
verify = AuthManager.verify_password(
plain_password=password, hashed_password=user.hashed_password
)
if not verify or user.is_active is False:
raise HTTPException(
status_code=401,
detail="Incorrect username or password",
)
access_token = AuthManager.create_access_token(
data={"id": user.id, "sub": user.username, "is_active": user.is_active}
)
return Token(access_token=access_token, token_type="bearer")

28
src/services/users.py Normal file
View File

@@ -0,0 +1,28 @@
from fastapi import HTTPException
from src.schemas.users import User
from src.services.base import BaseService
class UserService(BaseService):
async def get_user_by_filter(self, **filter_by) -> User | None:
result = await self.session.user.get_one_or_none(**filter_by)
if result is None:
return None
return User.model_validate(result)
async def get_user_by_filter_or_raise(self, **filter_by) -> User:
user = await self.get_user_by_filter(**filter_by)
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
async def validate_admin_user(self, username: str) -> User:
user = await self.get_user_by_filter_or_raise(username=username)
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Admin access required")
return user
async def get_all_users(self) -> list[User]:
users = await self.session.user.get_all_users()
return [User.model_validate(user) for user in users]