118 lines
3.8 KiB
Python
118 lines
3.8 KiB
Python
from typing import Dict, Any, Optional
|
|
from sqlalchemy.orm import Session
|
|
from fastapi import HTTPException
|
|
from email_validator import validate_email, EmailNotValidError
|
|
from models.contact_form import ContactForm
|
|
from schemas.contact_form import ContactFormCreate
|
|
|
|
def validate_contact_form_data(form_data: Dict[str, Any]) -> Dict[str, str]:
|
|
"""
|
|
Validates contact form data with detailed error messages.
|
|
|
|
Args:
|
|
form_data (Dict[str, Any]): The data from the contact form submission
|
|
|
|
Returns:
|
|
Dict[str, str]: Dictionary with field names as keys and error messages as values.
|
|
Empty dictionary if validation passes.
|
|
"""
|
|
errors = {}
|
|
|
|
# Check required fields
|
|
if not form_data.get("name"):
|
|
errors["name"] = "Name is required"
|
|
elif not isinstance(form_data["name"], str) or len(form_data["name"].strip()) == 0:
|
|
errors["name"] = "Name must be a non-empty string"
|
|
|
|
if not form_data.get("message"):
|
|
errors["message"] = "Message is required"
|
|
elif not isinstance(form_data["message"], str) or len(form_data["message"].strip()) == 0:
|
|
errors["message"] = "Message must be a non-empty string"
|
|
|
|
# Validate email format
|
|
if not form_data.get("email"):
|
|
errors["email"] = "Email is required"
|
|
else:
|
|
try:
|
|
# Use email_validator to validate the email
|
|
validate_email(form_data["email"])
|
|
except EmailNotValidError as e:
|
|
errors["email"] = f"Invalid email format: {str(e)}"
|
|
|
|
return errors
|
|
|
|
def create_contact_form(db: Session, form_data: ContactFormCreate) -> ContactForm:
|
|
"""
|
|
Creates a new contact form entry in the database.
|
|
|
|
Args:
|
|
db (Session): The database session
|
|
form_data (ContactFormCreate): The validated contact form data
|
|
|
|
Returns:
|
|
ContactForm: The newly created contact form entry
|
|
"""
|
|
# Create new ContactForm object
|
|
db_contact_form = ContactForm(
|
|
name=form_data.name,
|
|
email=form_data.email,
|
|
message=form_data.message
|
|
)
|
|
|
|
# Add to database
|
|
db.add(db_contact_form)
|
|
db.commit()
|
|
db.refresh(db_contact_form)
|
|
|
|
return db_contact_form
|
|
|
|
def process_contact_form_submission(db: Session, form_data: Dict[str, Any]) -> ContactForm:
|
|
"""
|
|
Processes a contact form submission, validating the data and creating a database entry.
|
|
|
|
Args:
|
|
db (Session): The database session
|
|
form_data (Dict[str, Any]): The raw form data
|
|
|
|
Returns:
|
|
ContactForm: The created contact form entry
|
|
|
|
Raises:
|
|
HTTPException: If validation fails, with status_code 400 and detail containing error messages
|
|
"""
|
|
# Validate the form data
|
|
validation_errors = validate_contact_form_data(form_data)
|
|
|
|
if validation_errors:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail={
|
|
"message": "Invalid form submission",
|
|
"errors": validation_errors
|
|
}
|
|
)
|
|
|
|
# Convert to Pydantic model for additional validation
|
|
try:
|
|
form_schema = ContactFormCreate(**form_data)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail={"message": f"Data validation error: {str(e)}"}
|
|
)
|
|
|
|
# Create the contact form entry
|
|
return create_contact_form(db, form_schema)
|
|
|
|
def get_contact_form_by_id(db: Session, form_id: str) -> Optional[ContactForm]:
|
|
"""
|
|
Retrieves a contact form entry by its ID.
|
|
|
|
Args:
|
|
db (Session): The database session
|
|
form_id (str): The UUID of the contact form to retrieve
|
|
|
|
Returns:
|
|
Optional[ContactForm]: The contact form if found, None otherwise
|
|
"""
|
|
return db.query(ContactForm).filter(ContactForm.id == form_id).first() |