96 lines
2.8 KiB
Python

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app.core.config import settings
from app.core.database import get_db
from app.core.security import verify_password
from app.models.user import User, UserRole
from app.schemas.user import TokenPayload
# OAuth2 password bearer flow for token authentication
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login")
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> User:
"""
Validate the access token and return the current user.
"""
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
token_data = TokenPayload(**payload)
except (JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user = db.query(User).filter(User.id == token_data.sub).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user"
)
return user
def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
"""
Check if the user is active.
"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user"
)
return current_user
def get_current_seller(
current_user: User = Depends(get_current_active_user),
) -> User:
"""
Check if the user is a seller.
"""
if current_user.role != UserRole.SELLER and current_user.role != UserRole.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Operation requires seller privileges"
)
return current_user
def get_current_admin(
current_user: User = Depends(get_current_active_user),
) -> User:
"""
Check if the user is an admin.
"""
if current_user.role != UserRole.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Operation requires admin privileges"
)
return current_user
def authenticate_user(db: Session, email: str, password: str) -> User:
"""
Authenticate a user by email and password.
"""
user = db.query(User).filter(User.email == email).first()
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user