97 lines
2.6 KiB
Python

from typing import Generator, Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app.core.config import settings
from app.core.security import verify_password
from app.db.utils import get_db
from app.models.user import User
from app.schemas.token import TokenPayload
# OAuth2 configuration
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/auth/login"
)
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> User:
"""
Validate and decode the JWT token to get the current user.
"""
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
token_data = TokenPayload(**payload)
except (jwt.JWTError, ValidationError) as e:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
) from e
user = db.query(User).filter(User.id == token_data.sub).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user",
)
return user
def get_current_active_superuser(
current_user: User = Depends(get_current_user),
) -> User:
"""
Check if the current user is a superuser.
"""
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="The user doesn't have enough privileges",
)
return current_user
def get_current_verified_user(
current_user: User = Depends(get_current_user),
) -> User:
"""
Check if the current user has verified their email.
"""
if not current_user.is_verified:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Email not verified. Please check your email for verification instructions.",
)
return current_user
def authenticate_user(
db: Session, email: str, password: str
) -> Optional[User]:
"""
Authenticate a user using email and password.
"""
user = db.query(User).filter(User.email == email).first()
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user