from typing import Dict, Optional, Any, List, Union from sqlalchemy.orm import Session from fastapi import HTTPException from email_validator import validate_email, EmailNotValidError from models.contact_form import ContactForm from schemas.contact_form import ContactFormCreate from uuid import UUID def create_contact_form(db: Session, contact_data: ContactFormCreate) -> ContactForm: """ Creates a new contact form submission in the database. Args: db (Session): The database session. contact_data (ContactFormCreate): The data for the contact form submission. Returns: ContactForm: The newly created contact form object. """ db_contact_form = ContactForm(**contact_data.dict()) db.add(db_contact_form) db.commit() db.refresh(db_contact_form) return db_contact_form def validate_contact_form_data(form_data: Dict[str, Any]) -> Dict[str, Union[bool, str]]: """ Validates contact form data, checking for required fields and email format. Args: form_data (Dict[str, Any]): The form data to validate. Returns: Dict[str, Union[bool, str]]: A dictionary with validation results: - 'valid': Boolean indicating if the data is valid - 'error': Error message if validation fails, None otherwise """ # Check for required fields if not form_data.get('name'): return {'valid': False, 'error': 'Name is required'} if not form_data.get('email'): return {'valid': False, 'error': 'Email is required'} if not form_data.get('message'): return {'valid': False, 'error': 'Message is required'} # Validate email format try: validate_email(form_data['email']) except EmailNotValidError: return {'valid': False, 'error': 'Invalid email format'} return {'valid': True, 'error': None} def get_contact_form_by_id(db: Session, contact_form_id: UUID) -> Optional[ContactForm]: """ Retrieves a single contact form submission by its ID. Args: db (Session): The database session. contact_form_id (UUID): The ID of the contact form to retrieve. Returns: Optional[ContactForm]: The contact form object if found, otherwise None. """ return db.query(ContactForm).filter(ContactForm.id == contact_form_id).first() def get_contact_forms( db: Session, skip: int = 0, limit: int = 100, email: Optional[str] = None ) -> List[ContactForm]: """ Retrieves contact form submissions with optional filtering by email. Args: db (Session): The database session. skip (int, optional): Number of records to skip. Defaults to 0. limit (int, optional): Maximum number of records to return. Defaults to 100. email (Optional[str], optional): Filter by email address. Defaults to None. Returns: List[ContactForm]: A list of contact form objects. """ query = db.query(ContactForm) if email: query = query.filter(ContactForm.email == email) return query.offset(skip).limit(limit).all() def format_contact_form_error(error_message: str) -> Dict[str, str]: """ Formats a contact form validation error message for API response. Args: error_message (str): The error message to format. Returns: Dict[str, str]: A dictionary with the formatted error message. """ return {"detail": error_message} def handle_contact_form_submission( db: Session, form_data: Dict[str, Any] ) -> Dict[str, Any]: """ Processes a contact form submission with validation. Args: db (Session): The database session. form_data (Dict[str, Any]): The form data to process. Returns: Dict[str, Any]: Response data with status and message. Raises: HTTPException: If validation fails with a 400 status code. """ # Validate form data validation_result = validate_contact_form_data(form_data) if not validation_result['valid']: raise HTTPException( status_code=400, detail=validation_result['error'] ) # Create contact form submission contact_form_data = ContactFormCreate( name=form_data['name'], email=form_data['email'], message=form_data['message'] ) new_contact_form = create_contact_form(db, contact_form_data) return { "success": True, "message": "Contact form submitted successfully", "id": str(new_contact_form.id) }