from fastapi import Depends, HTTPException, status from sqlalchemy.orm import Session from typing import Optional from datetime import timedelta from jose import JWTError, jwt from fastapi.security import OAuth2PasswordBearer from app.db.database import get_db from app.db.repositories import user_repository from app.models.user import User from app.schemas.user import UserCreate, UserLogin, TokenData from app.core.security import verify_password, get_password_hash, create_access_token, ALGORITHM from app.core.config import settings oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login") class AuthService: def authenticate_user(self, db: Session, email: str, password: str) -> Optional[User]: user = user_repository.get_by_email(db, email) if not user: return None if not verify_password(password, user.hashed_password): return None return user def register_user(self, db: Session, user_in: UserCreate) -> User: # Check if email already exists db_user = user_repository.get_by_email(db, user_in.email) if db_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) # Check if username already exists db_user = user_repository.get_by_username(db, user_in.username) if db_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already taken" ) # Create new user hashed_password = get_password_hash(user_in.password) user_data = user_in.model_dump(exclude={"password"}) user_data["hashed_password"] = hashed_password return user_repository.create(db, user_data) def create_token(self, user: User) -> dict: access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( subject=user.id, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} def get_current_user(self, db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)) -> User: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM]) user_id: str = payload.get("sub") if user_id is None: raise credentials_exception token_data = TokenData(user_id=int(user_id)) except JWTError: raise credentials_exception user = user_repository.get_by_id(db, token_data.user_id) if user is None: raise credentials_exception return user auth_service = AuthService()