from typing import Generator from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from pydantic import ValidationError from sqlalchemy.orm import Session from app.core.config import settings from app.db.session import SessionLocal from app.models.user import User from app.schemas.token import TokenPayload oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login") def get_db() -> Generator: """ Dependency for getting database session """ db = SessionLocal() try: yield db finally: db.close() def get_current_user( db: Session = Depends(get_db), token: str = Depends(oauth2_scheme) ) -> User: """ Dependency for getting the current authenticated user """ try: payload = jwt.decode( token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) token_data = TokenPayload(**payload) except (JWTError, ValidationError) as e: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) from e user = db.query(User).filter(User.id == token_data.sub).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) return user def get_current_active_user( current_user: User = Depends(get_current_user), ) -> User: """ Dependency for getting the current active user """ if not current_user.is_active: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user" ) return current_user def get_current_admin_user( current_user: User = Depends(get_current_active_user), ) -> User: """ Dependency for getting the current admin user """ if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions" ) return current_user