from typing import Generator, Optional from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import jwt, JWTError from pydantic import ValidationError from sqlalchemy.orm import Session from app.db.session import SessionLocal from app.models.user import User from app.core.config import settings from app.core.security import ALGORITHM from app.crud.user import user from app.schemas.user import TokenPayload oauth2_scheme = OAuth2PasswordBearer( tokenUrl=f"{settings.API_V1_STR}/auth/login" ) def get_db() -> Generator: try: db = SessionLocal() yield db finally: db.close() def get_current_user( db: Session = Depends(get_db), token: str = Depends(oauth2_scheme) ) -> User: try: payload = jwt.decode( token, settings.SECRET_KEY, algorithms=[ALGORITHM] ) token_data = TokenPayload(**payload) except (JWTError, ValidationError): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Could not validate credentials", ) db_user = user.get(db, id=token_data.sub) if not db_user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) return db_user def get_current_active_user( current_user: User = Depends(get_current_user), ) -> User: if not user.is_active(current_user): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user", ) return current_user def get_current_admin_user( current_user: User = Depends(get_current_active_user), ) -> User: if not user.is_admin(current_user): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="The user doesn't have enough privileges", ) return current_user