from typing import List, Dict, Optional, Union from datetime import datetime import re from sqlalchemy.orm import Session from fastapi import UploadFile import os from models.user import User, UserRoles from schemas.user import UserCreate, UserUpdate from PIL import Image def validate_user_data(user_data: Union[UserCreate, UserUpdate]) -> Dict[str, str]: """ Validate user input data. Args: user_data: User data to validate Returns: Dict with error messages if validation fails, empty dict if valid """ errors = {} # Email validation if hasattr(user_data, 'email'): email_pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$' if not re.match(email_pattern, user_data.email): errors['email'] = "Invalid email format" # Phone validation if hasattr(user_data, 'phone_number'): phone_pattern = r'^\+?1?\d{9,15}$' if not re.match(phone_pattern, user_data.phone_number): errors['phone'] = "Invalid phone number format" # Age validation if hasattr(user_data, 'date_of_birth'): age = (datetime.now() - user_data.date_of_birth).days / 365 if age < 5: # Minimum age for school errors['age'] = "User must be at least 5 years old" return errors async def save_profile_picture(user_id: int, file: UploadFile) -> Optional[str]: """ Save and process user profile picture. Args: user_id: ID of the user file: Uploaded image file Returns: Path to saved image or None if failed """ try: # Create directory if not exists upload_dir = f"uploads/profile_pictures/{user_id}" os.makedirs(upload_dir, exist_ok=True) # Save file path file_path = f"{upload_dir}/{file.filename}" # Save uploaded file with open(file_path, "wb") as buffer: content = await file.read() buffer.write(content) # Process image - resize to standard size with Image.open(file_path) as img: img = img.resize((200, 200)) img.save(file_path) return file_path except Exception: return None def get_users_by_role(db: Session, role: UserRoles, active_only: bool = True) -> List[User]: """ Get all users with specific role. Args: db: Database session role: Role to filter by active_only: Whether to return only active users Returns: List of users with specified role """ query = db.query(User).filter(User.role == role) if active_only: query = query.filter(User.is_active == True) return query.all() def update_last_login(db: Session, user_id: int) -> bool: """ Update user's last login timestamp. Args: db: Database session user_id: ID of user to update Returns: True if update successful, False otherwise """ try: user = db.query(User).filter(User.id == user_id).first() if user: user.last_login = datetime.now() db.commit() return True except Exception: db.rollback() return False return False def format_user_display_name(user: User) -> str: """ Format user's display name. Args: user: User object Returns: Formatted display name """ if user.first_name and user.last_name: return f"{user.first_name} {user.last_name}" elif user.first_name: return user.first_name elif user.last_name: return user.last_name else: return user.email.split('@')[0]