from datetime import datetime, timedelta from typing import Any, Optional, Union from jose import jwt from passlib.context import CryptContext from sqlalchemy.orm import Session from app.core.config import settings from app.models.user import User # Password hashing pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def create_access_token( subject: Union[str, Any], expires_delta: Optional[timedelta] = None ) -> str: """ Create a JWT access token. Args: subject: Token subject (usually user ID) expires_delta: Token expiration time Returns: JWT token string """ if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta( minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES ) to_encode = {"exp": expire, "sub": str(subject)} encoded_jwt = jwt.encode( to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM ) return encoded_jwt def verify_password(plain_password: str, hashed_password: str) -> bool: """ Verify password against hash. Args: plain_password: Plain text password hashed_password: Hashed password Returns: True if password matches hash """ return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """ Hash a password. Args: password: Plain text password Returns: Hashed password """ return pwd_context.hash(password) def authenticate_user(db: Session, email: str, password: str) -> Optional[User]: """ Authenticate a user. Args: db: Database session email: User email password: User password Returns: User object if authentication successful, None otherwise """ user = db.query(User).filter(User.email == email).first() if not user: return None if not verify_password(password, user.hashed_password): return None return user