146 lines
4.8 KiB
Python

from typing import Dict
from sqlalchemy.orm import Session
from fastapi import HTTPException
from email_validator import validate_email, EmailNotValidError
from models.contact import Contact
from schemas.contact import ContactCreate, ContactSchema
def validate_contact_data(contact_data: ContactCreate) -> Dict[str, str]:
"""
Validates contact form submission data with enhanced email validation.
Args:
contact_data (ContactCreate): The contact form data to validate.
Returns:
Dict[str, str]: Dictionary containing validation errors, if any.
"""
errors = {}
# Check required fields
if not contact_data.name or not contact_data.name.strip():
errors["name"] = "Name is required"
elif len(contact_data.name) > 255:
errors["name"] = "Name must not exceed 255 characters"
if not contact_data.email:
errors["email"] = "Email is required"
else:
try:
# Validate email format using email-validator
validate_email(contact_data.email)
except EmailNotValidError as e:
errors["email"] = str(e)
if not contact_data.message or not contact_data.message.strip():
errors["message"] = "Message is required"
# Validate phone number format if provided
if contact_data.phone_number:
# Basic phone number format validation
# Remove any spaces, dashes, or parentheses
cleaned_phone = ''.join(filter(str.isdigit, contact_data.phone_number))
if not (10 <= len(cleaned_phone) <= 15):
errors["phone_number"] = "Phone number must be between 10 and 15 digits"
# Validate country if provided
if contact_data.country and len(contact_data.country.strip()) > 255:
errors["country"] = "Country name must not exceed 255 characters"
return errors
def create_contact(db: Session, contact_data: ContactCreate) -> Contact:
"""
Creates a new contact submission in the database after validation.
Args:
db (Session): The database session.
contact_data (ContactCreate): The validated contact form data.
Returns:
Contact: The newly created contact object.
Raises:
HTTPException: If validation fails or database operation fails.
"""
# Validate contact data
validation_errors = validate_contact_data(contact_data)
if validation_errors:
raise HTTPException(
status_code=400,
detail={"message": "Validation failed", "errors": validation_errors}
)
try:
# Create new contact
db_contact = Contact(**contact_data.dict())
db.add(db_contact)
db.commit()
db.refresh(db_contact)
return db_contact
except Exception as e:
db.rollback()
raise HTTPException(
status_code=500,
detail={"message": "Failed to create contact", "error": str(e)}
)
def sanitize_contact_data(contact_data: ContactCreate) -> ContactCreate:
"""
Sanitizes contact form input data and ensures email format.
Args:
contact_data (ContactCreate): The raw contact form data.
Returns:
ContactCreate: The sanitized contact form data.
Raises:
HTTPException: If email format is invalid after sanitization.
"""
try:
# Sanitize and validate email
email = contact_data.email.strip().lower()
validate_email(email)
# Sanitize phone number if provided
phone_number = None
if contact_data.phone_number:
# Remove any non-digit characters except '+' at the start
phone_number = contact_data.phone_number.strip()
if phone_number.startswith('+'):
phone_number = '+' + ''.join(filter(str.isdigit, phone_number[1:]))
else:
phone_number = ''.join(filter(str.isdigit, phone_number))
# Sanitize country if provided
country = None
if contact_data.country:
country = contact_data.country.strip()
# Create a new dict with sanitized values
sanitized_data = ContactCreate(
name=contact_data.name.strip(),
email=email,
phone_number=phone_number,
message=contact_data.message.strip(),
country=country
)
return sanitized_data
except EmailNotValidError as e:
raise HTTPException(
status_code=400,
detail={"message": "Invalid email format", "error": str(e)}
)
def format_contact_response(contact: Contact) -> ContactSchema:
"""
Formats a contact database object into a response schema.
Args:
contact (Contact): The contact database object.
Returns:
ContactSchema: The formatted contact response.
"""
return ContactSchema.from_orm(contact)