62 lines
2.1 KiB
Python

from typing import List
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.core.config import settings
from app.db.session import get_db
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/login/access-token"
)
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> models.User:
try:
payload = jwt.decode(
token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]
)
token_data = schemas.token.TokenPayload(**payload)
except (jwt.JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
user = crud.user.get(db, id=token_data.sub)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
def get_current_active_user(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not crud.user.is_active(current_user):
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def check_role(
allowed_roles: List[str], user: models.User = Depends(get_current_active_user), db: Session = Depends(get_db)
) -> models.User:
role = db.query(models.Role).filter(models.Role.id == user.role_id).first()
if not role or role.name not in allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Operation not permitted. Required roles: {', '.join(allowed_roles)}",
)
return user
def get_admin_user(db: Session = Depends(get_db), user: models.User = Depends(get_current_active_user)) -> models.User:
return check_role(["admin"], user, db)
def get_teacher_user(db: Session = Depends(get_db), user: models.User = Depends(get_current_active_user)) -> models.User:
return check_role(["admin", "teacher"], user, db)