from typing import Optional, Dict, Union import re from passlib.context import CryptContext from sqlalchemy.orm import Session from models.user import User from schemas.user import UserCreate from email_validator import validate_email, EmailNotValidError pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def validate_username(username: str) -> bool: """ Validate username format and length. Args: username: Username to validate Returns: bool: True if valid, False otherwise """ if not 3 <= len(username) <= 32: return False pattern = r'^[a-zA-Z0-9_-]+$' return bool(re.match(pattern, username)) def validate_user_email(email: str) -> bool: """ Validate email format using email-validator. Args: email: Email to validate Returns: bool: True if valid, False otherwise """ try: validate_email(email) return True except EmailNotValidError: return False def hash_password(password: str) -> str: """ Hash password using bcrypt. Args: password: Plain text password Returns: str: Hashed password """ return pwd_context.hash(password) def check_user_exists(db: Session, username: str, email: str) -> Optional[User]: """ Check if user already exists by username or email. Args: db: Database session username: Username to check email: Email to check Returns: Optional[User]: Existing user if found """ return db.query(User).filter( (User.username == username) | (User.email == email) ).first() def create_new_user(db: Session, user_data: UserCreate) -> Union[User, Dict[str, str]]: """ Create new user with validation and error handling. Args: db: Database session user_data: User creation data Returns: Union[User, Dict[str, str]]: Created user or error dict """ # Validate username if not validate_username(user_data.username): return {"error": "Invalid username format"} # Validate email if not validate_user_email(user_data.email): return {"error": "Invalid email format"} # Check existing user existing_user = check_user_exists(db, user_data.username, user_data.email) if existing_user: return {"error": "Username or email already registered"} # Create user hashed_password = hash_password(user_data.password) db_user = User( username=user_data.username, email=user_data.email, password=hashed_password, first_name=user_data.first_name, last_name=user_data.last_name, is_active=True, is_verified=False ) db.add(db_user) db.commit() db.refresh(db_user) return db_user def format_user_response(user: User) -> Dict: """ Format user object for API response. Args: user: User object Returns: Dict: Formatted user data """ return { "id": user.id, "username": user.username, "email": user.email, "first_name": user.first_name, "last_name": user.last_name, "is_active": user.is_active, "is_verified": user.is_verified }