from typing import List, Dict, Optional, Union import re from datetime import datetime from sqlalchemy.orm import Session from models.user import User from schemas.user import UserCreate, UserUpdate def validate_email(email: str) -> bool: """ Validate an email address format. Args: email: The email address to validate Returns: bool: True if email format is valid, False otherwise """ pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$' return bool(re.match(pattern, email)) def get_user_by_email(db: Session, email: str) -> Optional[User]: """ Get a user by their email address. Args: db: Database session email: Email to search for Returns: User object if found, None otherwise """ return db.query(User).filter(User.email == email).first() def create_user_safely(db: Session, user_data: UserCreate) -> Union[User, Dict[str, str]]: """ Create a new user with validation and error handling. Args: db: Database session user_data: User data for creation Returns: User object if created successfully, error dict otherwise """ # Check if email is valid if not validate_email(user_data.email): return {"error": "Invalid email format"} # Check if user already exists existing_user = get_user_by_email(db, user_data.email) if existing_user: return {"error": "Email already registered"} # Create the user db_user = User( username=user_data.username, email=user_data.email, password=user_data.password # In a real app, hash this password ) db.add(db_user) db.commit() db.refresh(db_user) return db_user def update_user(db: Session, user_id: int, user_data: UserUpdate) -> Union[User, Dict[str, str]]: """ Update an existing user's information. Args: db: Database session user_id: ID of the user to update user_data: Updated user data Returns: User object if updated successfully, error dict otherwise """ user = db.query(User).filter(User.id == user_id).first() if not user: return {"error": "User not found"} # Update user fields if user_data.username: user.username = user_data.username if user_data.email: # Validate email format if not validate_email(user_data.email): return {"error": "Invalid email format"} # Check for duplicate email existing_user = get_user_by_email(db, user_data.email) if existing_user and existing_user.id != user_id: return {"error": "Email already registered"} user.email = user_data.email if user_data.password: user.password = user_data.password # In a real app, hash this password db.commit() db.refresh(user) return user def get_user_age(user: User) -> int: """ Calculate the age of a user based on their date of birth. Args: user: User object Returns: int: Age of the user """ if not user.date_of_birth: return 0 today = datetime.utcnow().date() age = today.year - user.date_of_birth.year if today.month < user.date_of_birth.month or ( today.month == user.date_of_birth.month and today.day < user.date_of_birth.day ): age -= 1 return age