Compare commits

..

10 Commits

Author SHA1 Message Date
IluaAir
b9a9e4e094 current or admin dep created 2025-07-27 13:54:39 +03:00
IluaAir
9f105c32c6 fix router request user data 2025-07-27 13:06:00 +03:00
IluaAir
7c334491c3 add user get, access token add id 2025-07-27 12:55:58 +03:00
IluaAir
d3eba77444 add get users 2025-07-27 12:41:35 +03:00
IluaAir
36dad7b441 add admin check 2025-07-27 12:30:06 +03:00
IluaAir
1a495e28c5 update .gitignore 2025-07-21 12:02:41 +03:00
IluaAir
4789da1b70 create active user 2025-07-20 12:37:57 +03:00
IluaAir
8ec8639848 create cur user 2025-07-20 12:34:18 +03:00
IluaAir
d639abfbc5 service auth fix 2025-07-19 12:10:00 +03:00
IluaAir
d530412805 ready login endpoint 2025-07-14 12:41:03 +03:00
12 changed files with 202 additions and 15 deletions

5
.gitignore vendored
View File

@@ -1,2 +1,7 @@
/.venv/
/.idea
__pycache__/
*.db
.DS_Store
.env
.vscode/

View File

@@ -0,0 +1,72 @@
from typing import Annotated
from fastapi import HTTPException, Depends, Path
from fastapi.security import OAuth2PasswordBearer
from jwt import InvalidTokenError
from src.core.auth_manager import AuthManager
from src.core.settings import settings
from src.schemas.auth import TokenData
from src.services.users import UserService
from src.api.dependacies.db_dep import sessionDep
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.api.v1_login_url}/login")
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = AuthManager.decode_access_token(token=token)
if payload is None:
raise credentials_exception
user = TokenData(**payload)
except InvalidTokenError:
raise credentials_exception
return user
CurrentUser = Annotated[TokenData, Depends(get_current_user)]
def get_current_active_user(
current_user: CurrentUser,
):
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
ActiveUser = Annotated[TokenData, Depends(get_current_active_user)]
async def get_admin_user(db: sessionDep, current_user: ActiveUser):
await UserService(db).validate_admin_user(current_user.sub)
return current_user
AdminUser = Annotated[TokenData, Depends(get_admin_user)]
async def user_or_admin_path(db: sessionDep, id: int, current_user: ActiveUser):
if current_user.id == id:
return current_user
else:
admin = await get_admin_user(db, current_user)
return admin
async def user_or_admin(
db: sessionDep, current_user: ActiveUser, id: Annotated[int, Path()]
):
if current_user.id == id:
return current_user
else:
admin = await get_admin_user(db, current_user)
return admin
CurrentOrAdmin = Annotated[TokenData, Depends(user_or_admin)]

View File

@@ -19,7 +19,10 @@ async def registration(session: sessionDep, credential: UserRequestADD):
@router.post(path="/login")
async def login(
session: sessionDep,
credential: Annotated[OAuth2PasswordRequestForm, Depends()],
):
user = AuthService(session).login(credential.username, credential.password)
session: sessionDep,
credential: Annotated[OAuth2PasswordRequestForm, Depends()],
):
access_token = await AuthService(session).login(
credential.username, credential.password
)
return access_token

View File

@@ -1,5 +1,25 @@
from fastapi import APIRouter
from src.api.dependacies.user_dep import ActiveUser, AdminUser, CurrentOrAdmin
from src.api.dependacies.db_dep import sessionDep
from src.core.settings import settings
from src.services.users import UserService
router = APIRouter(prefix=settings.api.v1.users, tags=["Users"])
@router.get("/me")
async def get_me(user: ActiveUser):
return user
@router.get("/")
async def get_all_users(db: sessionDep, _: AdminUser):
users = await UserService(db).get_all_users()
return users
@router.get("/{id}")
async def get_user_by_id(db: sessionDep, id: int, _: CurrentOrAdmin):
user = await UserService(db).get_user_by_filter_or_raise(id=id)
return user

View File

@@ -6,7 +6,7 @@ from passlib.context import CryptContext
from src.core.settings import settings
class AuthManger:
class AuthManager:
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@classmethod
@@ -33,3 +33,11 @@ class AuthManger:
algorithm=settings.access_token.algorithm,
)
return encoded_jwt
@classmethod
def decode_access_token(cls, token: str) -> dict:
return jwt.decode(
token,
settings.access_token.secret_key,
algorithms=[settings.access_token.algorithm],
)

View File

@@ -12,11 +12,19 @@ class ApiV1Prefix(BaseModel):
auth: str = "/auth"
users: str = "/users"
@property
def login_url(self) -> str:
return f"{self.prefix}{self.auth}"
class ApiPrefix(BaseModel):
prefix: str = "/api"
v1: ApiV1Prefix = ApiV1Prefix()
@property
def v1_login_url(self) -> str:
return f"{self.prefix}{self.v1.login_url}"
class DbSettings(BaseModel):
url: str = f"sqlite+aiosqlite:///{DB_PATH}"

View File

@@ -1,5 +1,5 @@
from pydantic import BaseModel
from sqlalchemy import insert
from sqlalchemy import insert, select
from src.core.database import Base
@@ -15,3 +15,9 @@ class BaseRepo:
result = await self.session.execute(statement)
obj = result.scalar_one()
return obj
async def get_one_or_none(self, **filter_by):
query = select(self.model).filter_by(**filter_by)
result = await self.session.execute(query)
model = result.scalars().one_or_none()
return model

View File

@@ -1,6 +1,14 @@
from sqlalchemy import select
from src.models import UsersORM
from src.repository.base import BaseRepo
class UsersRepo(BaseRepo):
model = UsersORM
async def get_all_users(self):
query = select(self.model)
result = await self.session.execute(query)
models = result.scalars().all()
return models

12
src/schemas/auth.py Normal file
View File

@@ -0,0 +1,12 @@
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
id: int | None = None
sub: str | None = None
is_active: bool

View File

@@ -32,6 +32,4 @@ class UserRequestADD(BaseModel):
class UserAdd(BaseModel):
email: EmailStr | None
username: str
is_active: bool
is_superuser: bool
hashed_password: str

View File

@@ -1,21 +1,40 @@
from src.schemas.users import UserRequestADD, User, UserAdd
from fastapi import HTTPException
from src.schemas.auth import Token
from src.schemas.users import UserRequestADD, User, UserAdd, UserWithHashedPass
from src.services.base import BaseService
from src.core.auth_manager import AuthManger
from src.core.auth_manager import AuthManager
class AuthService(BaseService):
async def registration(self, cred: UserRequestADD) -> User:
hashed_pass = AuthManger.get_password_hash(cred.password)
hashed_pass = AuthManager.get_password_hash(cred.password)
user_to_insert = UserAdd(
username=cred.username,
email=cred.email,
hashed_password=hashed_pass,
is_active=True,
is_superuser=False,
)
result = await self.session.user.create_one(user_to_insert)
await self.session.commit()
return User.model_validate(result)
async def login(self, username: str, password: str):
...
result = await self.session.user.get_one_or_none(username=username)
if result is None:
raise HTTPException(
status_code=401,
detail="Incorrect username or password",
)
user = UserWithHashedPass.model_validate(result)
verify = AuthManager.verify_password(
plain_password=password, hashed_password=user.hashed_password
)
if not verify or user.is_active is False:
raise HTTPException(
status_code=401,
detail="Incorrect username or password",
)
access_token = AuthManager.create_access_token(
data={"id": user.id, "sub": user.username, "is_active": user.is_active}
)
return Token(access_token=access_token, token_type="bearer")

28
src/services/users.py Normal file
View File

@@ -0,0 +1,28 @@
from fastapi import HTTPException
from src.schemas.users import User
from src.services.base import BaseService
class UserService(BaseService):
async def get_user_by_filter(self, **filter_by) -> User | None:
result = await self.session.user.get_one_or_none(**filter_by)
if result is None:
return None
return User.model_validate(result)
async def get_user_by_filter_or_raise(self, **filter_by) -> User:
user = await self.get_user_by_filter(**filter_by)
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
async def validate_admin_user(self, username: str) -> User:
user = await self.get_user_by_filter_or_raise(username=username)
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Admin access required")
return user
async def get_all_users(self) -> list[User]:
users = await self.session.user.get_all_users()
return [User.model_validate(user) for user in users]