from datetime import datetime, timedelta from typing import Optional from jose import jwt from passlib.context import CryptContext from pydantic import EmailStr from sqlalchemy.orm import Session from app.models.user import User # Password hashing setup pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # JWT settings SECRET_KEY = "your-secret-key-here" # Should be stored securely in env variables ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 def verify_password(plain_password, hashed_password): """Verify a password against a hashed password.""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): """Hash a password.""" return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): """Create a JWT access token.""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def get_user(db: Session, user_id: int): """Get a user by ID.""" return db.query(User).filter(User.id == user_id).first() def get_user_by_email(db: Session, email: EmailStr): """Get a user by email.""" return db.query(User).filter(User.email == email).first() def get_user_by_username(db: Session, username: str): """Get a user by username.""" return db.query(User).filter(User.username == username).first() def create_user(db: Session, email: EmailStr, username: str, password: str): """Create a new user.""" hashed_password = get_password_hash(password) db_user = User(email=email, username=username, hashed_password=hashed_password) db.add(db_user) db.commit() db.refresh(db_user) return db_user