from typing import Dict from sqlalchemy.orm import Session from fastapi import HTTPException from email_validator import validate_email, EmailNotValidError from models.contact import Contact from schemas.contact import ContactCreate, ContactSchema def validate_contact_data(contact_data: ContactCreate) -> Dict[str, str]: """ Validates contact form submission data with enhanced email validation. Args: contact_data (ContactCreate): The contact form data to validate. Returns: Dict[str, str]: Dictionary containing validation errors, if any. """ errors = {} # Check required fields if not contact_data.name or not contact_data.name.strip(): errors["name"] = "Name is required" elif len(contact_data.name) > 255: errors["name"] = "Name must not exceed 255 characters" if not contact_data.email: errors["email"] = "Email is required" else: try: # Validate email format using email-validator validate_email(contact_data.email) except EmailNotValidError as e: errors["email"] = str(e) if not contact_data.message or not contact_data.message.strip(): errors["message"] = "Message is required" # Validate phone number format if provided if contact_data.phone_number: # Basic phone number format validation # Remove any spaces, dashes, or parentheses cleaned_phone = ''.join(filter(str.isdigit, contact_data.phone_number)) if not (10 <= len(cleaned_phone) <= 15): errors["phone_number"] = "Phone number must be between 10 and 15 digits" # Validate country if provided if contact_data.country and len(contact_data.country.strip()) > 255: errors["country"] = "Country name must not exceed 255 characters" return errors def create_contact(db: Session, contact_data: ContactCreate) -> Contact: """ Creates a new contact submission in the database after validation. Args: db (Session): The database session. contact_data (ContactCreate): The validated contact form data. Returns: Contact: The newly created contact object. Raises: HTTPException: If validation fails or database operation fails. """ # Validate contact data validation_errors = validate_contact_data(contact_data) if validation_errors: raise HTTPException( status_code=400, detail={"message": "Validation failed", "errors": validation_errors} ) try: # Create new contact db_contact = Contact(**contact_data.dict()) db.add(db_contact) db.commit() db.refresh(db_contact) return db_contact except Exception as e: db.rollback() raise HTTPException( status_code=500, detail={"message": "Failed to create contact", "error": str(e)} ) def sanitize_contact_data(contact_data: ContactCreate) -> ContactCreate: """ Sanitizes contact form input data and ensures email format. Args: contact_data (ContactCreate): The raw contact form data. Returns: ContactCreate: The sanitized contact form data. Raises: HTTPException: If email format is invalid after sanitization. """ try: # Sanitize and validate email email = contact_data.email.strip().lower() validate_email(email) # Sanitize phone number if provided phone_number = None if contact_data.phone_number: # Remove any non-digit characters except '+' at the start phone_number = contact_data.phone_number.strip() if phone_number.startswith('+'): phone_number = '+' + ''.join(filter(str.isdigit, phone_number[1:])) else: phone_number = ''.join(filter(str.isdigit, phone_number)) # Sanitize country if provided country = None if contact_data.country: country = contact_data.country.strip() # Create a new dict with sanitized values sanitized_data = ContactCreate( name=contact_data.name.strip(), email=email, phone_number=phone_number, message=contact_data.message.strip(), country=country ) return sanitized_data except EmailNotValidError as e: raise HTTPException( status_code=400, detail={"message": "Invalid email format", "error": str(e)} ) def format_contact_response(contact: Contact) -> ContactSchema: """ Formats a contact database object into a response schema. Args: contact (Contact): The contact database object. Returns: ContactSchema: The formatted contact response. """ return ContactSchema.from_orm(contact)