from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import jwt from jose.exceptions import JWTError from pydantic import ValidationError from sqlalchemy.orm import Session from app.core.config import settings from app.core.security import ALGORITHM from app.db.session import get_db from app.models.user import User from app.schemas.token import TokenPayload oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login") def get_current_user( db: Session = Depends(get_db), token: str = Depends(oauth2_scheme) ) -> User: """ Validate and decode the JWT token to get the current user """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM]) token_data = TokenPayload(**payload) if token_data.sub is None: raise credentials_exception user_id: int = token_data.sub except (JWTError, ValidationError): raise credentials_exception user = db.query(User).filter(User.id == user_id).first() if not user: raise credentials_exception if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user" ) return user def get_current_active_user( current_user: User = Depends(get_current_user), ) -> User: """ Get the current active user """ if not current_user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user" ) return current_user def get_current_active_superuser( current_user: User = Depends(get_current_user), ) -> User: """ Get the current active superuser """ if not current_user.is_superuser: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions" ) return current_user