from functools import wraps from typing import Generator from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import jwt from pydantic import ValidationError from sqlalchemy.orm import Session from app.core.config import settings from app.crud.crud_user import get_user from app.db.session import SessionLocal from app.models.user import User, UserRole from app.schemas.token import TokenPayload oauth2_scheme = OAuth2PasswordBearer( tokenUrl=f"{settings.API_V1_STR}/auth/login" ) def get_db() -> Generator: db = SessionLocal() try: yield db finally: db.close() def get_current_user( db: Session = Depends(get_db), token: str = Depends(oauth2_scheme) ) -> User: try: payload = jwt.decode( token, settings.SECRET_KEY, algorithms=["HS256"] ) token_data = TokenPayload(**payload) except (jwt.JWTError, ValidationError): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Could not validate credentials", ) user = get_user(db, user_id=token_data.sub) if not user: raise HTTPException(status_code=404, detail="User not found") return user def get_current_active_user( current_user: User = Depends(get_current_user), ) -> User: if not current_user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return current_user def get_current_admin_user( current_user: User = Depends(get_current_active_user), ) -> User: if current_user.role != UserRole.ADMIN: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="The user doesn't have enough privileges", ) return current_user def check_role(required_role: UserRole): def decorator(func): @wraps(func) async def wrapper(*args, current_user: User = Depends(get_current_active_user), **kwargs): if current_user.role != required_role and current_user.role != UserRole.ADMIN: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="The user doesn't have enough privileges", ) return await func(*args, current_user=current_user, **kwargs) return wrapper return decorator