from datetime import datetime, timedelta from typing import Any, Optional, Union from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from sqlalchemy.orm import Session from app.core.config import settings from app.core.security import verify_password from app.crud.crud_user import get_user_by_email from app.db.session import get_db from app.models.user import User from app.schemas.token import TokenPayload oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login") def authenticate_user(db: Session, email: str, password: str) -> Optional[User]: """ Authenticate user by email and password """ user = get_user_by_email(db, email=email) if not user: return None if not verify_password(password, user.hashed_password): return None return user def create_access_token(subject: Union[str, Any], expires_delta: Optional[timedelta] = None) -> str: """ Create JWT access token """ if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta( minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES ) to_encode = {"exp": expire, "sub": str(subject)} encoded_jwt = jwt.encode( to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM ) return encoded_jwt def get_current_user( db: Session = Depends(get_db), token: str = Depends(oauth2_scheme) ) -> User: """ Get current user from JWT token """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode( token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) token_data = TokenPayload(**payload) if token_data.sub is None: raise credentials_exception except JWTError: raise credentials_exception user = get_user_by_email(db, email=token_data.sub) if user is None: raise credentials_exception if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user" ) return user