from __future__ import annotations from datetime import datetime, timezone from typing import TYPE_CHECKING from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from pydantic import ValidationError from app import models, schemas from app.core.config import settings from app.core import security from app.db.session import get_db if TYPE_CHECKING: from sqlalchemy.orm import Session oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login") def get_current_user( db: Session = Depends(get_db), token: str = Depends(oauth2_scheme) ) -> models.User: """ Validate access token and return current user. """ try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) token_data = schemas.TokenPayload(**payload) if datetime.fromtimestamp(token_data.exp, tz=timezone.utc) < datetime.now(tz=timezone.utc): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired", headers={"WWW-Authenticate": "Bearer"}, ) except (JWTError, ValidationError) as e: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) from e user = db.query(models.User).filter(models.User.id == token_data.sub).first() if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") return user def get_current_active_user( current_user: models.User = Depends(get_current_user), ) -> models.User: """ Get current active user. """ if not current_user.is_active: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user") return current_user def get_current_active_superuser( current_user: models.User = Depends(get_current_user), ) -> models.User: """ Get current active superuser. """ if not current_user.is_superuser: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="The user doesn't have enough privileges" ) return current_user