from fastapi import HTTPException from src.core.auth_manager import AuthManager from src.core.settings import settings from src.schemas.auth import TokenData from src.schemas.users import User, UserAdd, UserRequestADD, UserWithHashedPass from src.services.base import BaseService class AuthService(BaseService): async def registration(self, cred: UserRequestADD) -> User: hashed_pass = AuthManager.get_password_hash(cred.password) user_to_insert = UserAdd( username=cred.username, email=cred.email, hashed_password=hashed_pass, ) result = await self.session.user.create_one(user_to_insert.model_dump()) await self.session.commit() return User.model_validate(result) async def _tokens_create(self, user_data: TokenData, fingerprint: str): access_token = AuthManager.create_access_token(user_data.model_dump()) refresh_token = AuthManager.create_refresh_token() await self.session.auth.create_one({ "token": refresh_token, "user_id": user_data.id, "fingerprint": fingerprint, }) return { "access_token": access_token, "token_type": settings.access_token.token_type, "refresh_token": refresh_token, } async def login(self, username: str, password: str, fingerprint: str) -> dict: login_exception = HTTPException( status_code=401, detail="Incorrect username or password", ) result = await self.session.user.get_one_or_none(username=username) if result is None: raise login_exception user = UserWithHashedPass.model_validate(result) token_data = TokenData.model_validate(user.model_dump()) verify = AuthManager.verify_password( plain_password=password, hashed_password=user.hashed_password ) if not verify or user.is_active is False: raise login_exception tokens = await self._tokens_create(token_data, fingerprint) await self.session.commit() return tokens async def refresh_tokens( self, refresh_token: str, user_data: TokenData, fingerprint: str ) -> dict: token_record = await self.session.auth.get_one_or_none(token=refresh_token) if not token_record or token_record.user_id != user_data.id: raise HTTPException(status_code=401, detail="Invalid refresh token") token = await self._tokens_create(user_data, fingerprint) await self.session.auth.delete_one(token=refresh_token) await self.session.commit() return token async def delete_token(self, token: str) -> None: await self.session.auth.delete_one(token=token) await self.session.commit()