from typing import Optional from passlib.context import CryptContext from sqlalchemy.orm import Session from app.models.user import User from app.schemas.user import UserCreate, UserUpdate pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def get_password_hash(password: str) -> str: return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) def get_user(db: Session, user_id: int) -> Optional[User]: return db.query(User).filter(User.id == user_id).first() def get_user_by_email(db: Session, email: str) -> Optional[User]: return db.query(User).filter(User.email == email).first() def get_users(db: Session, skip: int = 0, limit: int = 100): return db.query(User).offset(skip).limit(limit).all() def create_user(db: Session, user: UserCreate) -> User: hashed_password = get_password_hash(user.password) db_user = User( email=user.email, full_name=user.full_name, hashed_password=hashed_password, is_active=user.is_active ) db.add(db_user) db.commit() db.refresh(db_user) return db_user def update_user(db: Session, user_id: int, user_update: UserUpdate) -> Optional[User]: db_user = get_user(db, user_id) if db_user: update_data = user_update.dict(exclude_unset=True) for field, value in update_data.items(): setattr(db_user, field, value) db.commit() db.refresh(db_user) return db_user def delete_user(db: Session, user_id: int) -> bool: db_user = get_user(db, user_id) if db_user: db.delete(db_user) db.commit() return True return False