79 lines
2.1 KiB
Python

from typing import Generator
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.core.config import settings
from app.db.session import SessionLocal
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login")
def get_db() -> Generator:
"""
Dependency for getting database session
"""
db = SessionLocal()
try:
yield db
finally:
db.close()
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> models.User:
"""
Dependency for getting the current authenticated user
"""
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
token_data = schemas.TokenPayload(**payload)
except (JWTError, ValidationError) as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
) from e
user = crud.user.get(db, id=token_data.sub)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
def get_current_active_user(
current_user: models.User = Depends(get_current_user),
) -> models.User:
"""
Dependency for getting the current active user
"""
if not crud.user.is_active(current_user):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
return current_user
def get_current_admin_user(
current_user: models.User = Depends(get_current_active_user),
) -> models.User:
"""
Dependency for getting the current admin user
"""
if not crud.user.is_admin(current_user):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user