from typing import Generator from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import jwt from pydantic import ValidationError from sqlalchemy.orm import Session from app import crud, models, schemas from app.core.config import settings from app.db.session import SessionLocal reusable_oauth2 = OAuth2PasswordBearer( tokenUrl=f"{settings.API_V1_STR}/auth/login" ) def get_db() -> Generator: try: db = SessionLocal() yield db finally: db.close() def get_current_user( db: Session = Depends(get_db), token: str = Depends(reusable_oauth2) ) -> models.User: try: payload = jwt.decode( token, settings.JWT_SECRET_KEY, algorithms=[settings.ALGORITHM] ) token_data = schemas.TokenPayload(**payload) except (jwt.JWTError, ValidationError): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) user = crud.user.get(db, id=token_data.sub) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) if not crud.user.is_active(user): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user", ) return user def get_current_active_verified_user( current_user: models.User = Depends(get_current_user), ) -> models.User: if not current_user.is_verified: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Email not verified", ) return current_user def get_current_admin( current_user: models.User = Depends(get_current_user), ) -> models.User: if current_user.role != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions", ) return current_user