from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from sqlalchemy.orm import Session from app.core.config import settings from app.db.database import get_db from app.models.user import User from app.api.schemas.user import UserInDB # Password hashing context pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # OAuth2 password bearer token oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/users/login") # JWT constants SECRET_KEY = settings.SECRET_KEY ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = settings.ACCESS_TOKEN_EXPIRE_MINUTES def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against a hash.""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Generate a password hash.""" return pwd_context.hash(password) def get_user(db: Session, username: str) -> Optional[UserInDB]: """Get a user by username from the database.""" user = db.query(User).filter(User.username == username).first() if user: return UserInDB.model_validate(user.__dict__) return None def authenticate_user(db: Session, username: str, password: str) -> Optional[UserInDB]: """Authenticate a user with username and password.""" user = get_user(db, username) if not user: return None if not verify_password(password, user.hashed_password): return None return user def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """Create a JWT access token.""" to_encode = data.copy() expire = datetime.utcnow() + ( expires_delta if expires_delta else timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) ) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def get_current_user( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db) ) -> UserInDB: """Get the current authenticated user from the token.""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception except JWTError: raise credentials_exception user = get_user(db, username=username) if user is None: raise credentials_exception return user def get_current_active_user( current_user: UserInDB = Depends(get_current_user), ) -> UserInDB: """Get the current active user.""" if not current_user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return current_user def get_current_admin_user( current_user: UserInDB = Depends(get_current_active_user), ) -> UserInDB: """Get the current admin user.""" if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions" ) return current_user