2025-06-11 17:21:30 +00:00

107 lines
3.2 KiB
Python

from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.core.config import settings
from app.models.user import User
from app.schemas.token import TokenPayload
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 token URL
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Verify that a plain password matches a hashed password.
"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""
Hash a password for storing in the database.
"""
return pwd_context.hash(password)
def authenticate_user(db: Session, email: str, password: str) -> Optional[User]:
"""
Try to authenticate the user.
"""
from app.crud.user import get_user_by_email
user = get_user_by_email(db, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""
Create JWT access token.
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> User:
"""
Get the current user from the token.
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
email: str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = TokenPayload(email=email)
except JWTError:
raise credentials_exception
from app.crud.user import get_user_by_email
user = get_user_by_email(db, email=token_data.email)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
"""
Get the current active user.
"""
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
async def get_current_active_superuser(
current_user: User = Depends(get_current_user),
) -> User:
"""
Get the current active superuser.
"""
if not current_user.is_superuser:
raise HTTPException(
status_code=403, detail="The user doesn't have enough privileges"
)
return current_user