from sqlalchemy.orm import Session from app.models.user import User from app.schemas.user import UserCreate, UserUpdate from app.core.security import get_password_hash, verify_password from typing import Optional def get_user_by_email(db: Session, email: str) -> Optional[User]: return db.query(User).filter(User.email == email).first() def get_user_by_id(db: Session, user_id: int) -> Optional[User]: return db.query(User).filter(User.id == user_id).first() def create_user(db: Session, user: UserCreate) -> User: hashed_password = get_password_hash(user.password) db_user = User( email=user.email, hashed_password=hashed_password, full_name=user.full_name, ) db.add(db_user) db.commit() db.refresh(db_user) return db_user def authenticate_user(db: Session, email: str, password: str) -> Optional[User]: user = get_user_by_email(db, email) if not user: return None if not verify_password(password, user.hashed_password): return None return user def update_user(db: Session, user_id: int, user_update: UserUpdate) -> Optional[User]: db_user = get_user_by_id(db, user_id) if not db_user: return None update_data = user_update.dict(exclude_unset=True) if "password" in update_data: update_data["hashed_password"] = get_password_hash(update_data.pop("password")) for field, value in update_data.items(): setattr(db_user, field, value) db.commit() db.refresh(db_user) return db_user