from typing import List, Dict, Optional, Union, Any from datetime import datetime import re from sqlalchemy.orm import Session from passlib.context import CryptContext from models.user import User from schemas.user import UserCreate, UserUpdate pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def validate_user_data(user_data: UserCreate) -> Dict[str, str]: """ Validate user input data. Args: user_data: User data to validate Returns: Dict with error messages if validation fails, empty dict if valid """ errors = {} if not user_data.first_name or len(user_data.first_name) < 2: errors["first_name"] = "First name must be at least 2 characters" if not user_data.last_name or len(user_data.last_name) < 2: errors["last_name"] = "Last name must be at least 2 characters" if not validate_email(user_data.email): errors["email"] = "Invalid email format" if not validate_phone(user_data.phone): errors["phone"] = "Invalid phone number format" if not user_data.password or len(user_data.password) < 8: errors["password"] = "Password must be at least 8 characters" return errors def validate_email(email: str) -> bool: """ Validate email format. Args: email: Email to validate Returns: bool: True if valid, False otherwise """ pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$' return bool(re.match(pattern, email)) def validate_phone(phone: str) -> bool: """ Validate phone number format. Args: phone: Phone number to validate Returns: bool: True if valid, False otherwise """ pattern = r'^\+?1?\d{9,15}$' return bool(re.match(pattern, phone)) def hash_password(password: str) -> str: """ Hash a password using bcrypt. Args: password: Plain text password Returns: str: Hashed password """ return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """ Verify a password against its hash. Args: plain_password: Password to verify hashed_password: Stored hash to check against Returns: bool: True if password matches, False otherwise """ return pwd_context.verify(plain_password, hashed_password) def get_user_by_email(db: Session, email: str) -> Optional[User]: """ Get user by email. Args: db: Database session email: Email to search for Returns: User if found, None otherwise """ return db.query(User).filter(User.email == email).first() def create_user_safely(db: Session, user_data: UserCreate) -> Union[User, Dict[str, str]]: """ Create new user with validation. Args: db: Database session user_data: User data Returns: Created user or error dict """ validation_errors = validate_user_data(user_data) if validation_errors: return {"errors": validation_errors} existing_user = get_user_by_email(db, user_data.email) if existing_user: return {"error": "Email already registered"} hashed_password = hash_password(user_data.password) db_user = User( first_name=user_data.first_name, last_name=user_data.last_name, email=user_data.email, phone=user_data.phone, password=hashed_password, address=user_data.address, city=user_data.city, country=user_data.country ) db.add(db_user) db.commit() db.refresh(db_user) return db_user def update_user_safely(db: Session, user_id: int, user_data: UserUpdate) -> Union[User, Dict[str, str]]: """ Update user data with validation. Args: db: Database session user_id: ID of user to update user_data: New user data Returns: Updated user or error dict """ db_user = db.query(User).filter(User.id == user_id).first() if not db_user: return {"error": "User not found"} if user_data.email and user_data.email != db_user.email: if not validate_email(user_data.email): return {"error": "Invalid email format"} existing_user = get_user_by_email(db, user_data.email) if existing_user: return {"error": "Email already registered"} if user_data.phone and not validate_phone(user_data.phone): return {"error": "Invalid phone number format"} for field, value in user_data.dict(exclude_unset=True).items(): if field == "password": value = hash_password(value) setattr(db_user, field, value) db.commit() db.refresh(db_user) return db_user