71 lines
2.0 KiB
Python

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import ValidationError
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.database import get_db
from app.crud import user as crud_user
from app.models.user import User
from app.schemas.user import TokenPayload
# Create OAuth2 password bearer scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login")
async def get_current_user(
db: AsyncSession = Depends(get_db),
token: str = Depends(oauth2_scheme),
) -> User:
"""
Validate access token and return current user.
"""
try:
# Decode JWT token
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
token_data = TokenPayload(**payload)
except (JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Get user from database
user = await crud_user.get_by_id(db, user_id=token_data.sub)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
"""
Get current active user.
"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
)
return current_user
async def get_current_active_superuser(
current_user: User = Depends(get_current_active_user),
) -> User:
"""
Get current active superuser.
"""
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions",
)
return current_user