146 lines
4.8 KiB
Python
146 lines
4.8 KiB
Python
from typing import Dict
|
|
from sqlalchemy.orm import Session
|
|
from fastapi import HTTPException
|
|
from email_validator import validate_email, EmailNotValidError
|
|
from models.contact import Contact
|
|
from schemas.contact import ContactCreate, ContactSchema
|
|
|
|
def validate_contact_data(contact_data: ContactCreate) -> Dict[str, str]:
|
|
"""
|
|
Validates contact form submission data with enhanced email validation.
|
|
|
|
Args:
|
|
contact_data (ContactCreate): The contact form data to validate.
|
|
|
|
Returns:
|
|
Dict[str, str]: Dictionary containing validation errors, if any.
|
|
"""
|
|
errors = {}
|
|
|
|
# Check required fields
|
|
if not contact_data.name or not contact_data.name.strip():
|
|
errors["name"] = "Name is required"
|
|
elif len(contact_data.name) > 255:
|
|
errors["name"] = "Name must not exceed 255 characters"
|
|
|
|
if not contact_data.email:
|
|
errors["email"] = "Email is required"
|
|
else:
|
|
try:
|
|
# Validate email format using email-validator
|
|
validate_email(contact_data.email)
|
|
except EmailNotValidError as e:
|
|
errors["email"] = str(e)
|
|
|
|
if not contact_data.message or not contact_data.message.strip():
|
|
errors["message"] = "Message is required"
|
|
|
|
# Validate phone number format if provided
|
|
if contact_data.phone_number:
|
|
# Basic phone number format validation
|
|
# Remove any spaces, dashes, or parentheses
|
|
cleaned_phone = ''.join(filter(str.isdigit, contact_data.phone_number))
|
|
if not (10 <= len(cleaned_phone) <= 15):
|
|
errors["phone_number"] = "Phone number must be between 10 and 15 digits"
|
|
|
|
# Validate country if provided
|
|
if contact_data.country and len(contact_data.country.strip()) > 255:
|
|
errors["country"] = "Country name must not exceed 255 characters"
|
|
|
|
return errors
|
|
|
|
def create_contact(db: Session, contact_data: ContactCreate) -> Contact:
|
|
"""
|
|
Creates a new contact submission in the database after validation.
|
|
|
|
Args:
|
|
db (Session): The database session.
|
|
contact_data (ContactCreate): The validated contact form data.
|
|
|
|
Returns:
|
|
Contact: The newly created contact object.
|
|
|
|
Raises:
|
|
HTTPException: If validation fails or database operation fails.
|
|
"""
|
|
# Validate contact data
|
|
validation_errors = validate_contact_data(contact_data)
|
|
if validation_errors:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail={"message": "Validation failed", "errors": validation_errors}
|
|
)
|
|
|
|
try:
|
|
# Create new contact
|
|
db_contact = Contact(**contact_data.dict())
|
|
db.add(db_contact)
|
|
db.commit()
|
|
db.refresh(db_contact)
|
|
return db_contact
|
|
except Exception as e:
|
|
db.rollback()
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail={"message": "Failed to create contact", "error": str(e)}
|
|
)
|
|
|
|
def sanitize_contact_data(contact_data: ContactCreate) -> ContactCreate:
|
|
"""
|
|
Sanitizes contact form input data and ensures email format.
|
|
|
|
Args:
|
|
contact_data (ContactCreate): The raw contact form data.
|
|
|
|
Returns:
|
|
ContactCreate: The sanitized contact form data.
|
|
|
|
Raises:
|
|
HTTPException: If email format is invalid after sanitization.
|
|
"""
|
|
try:
|
|
# Sanitize and validate email
|
|
email = contact_data.email.strip().lower()
|
|
validate_email(email)
|
|
|
|
# Sanitize phone number if provided
|
|
phone_number = None
|
|
if contact_data.phone_number:
|
|
# Remove any non-digit characters except '+' at the start
|
|
phone_number = contact_data.phone_number.strip()
|
|
if phone_number.startswith('+'):
|
|
phone_number = '+' + ''.join(filter(str.isdigit, phone_number[1:]))
|
|
else:
|
|
phone_number = ''.join(filter(str.isdigit, phone_number))
|
|
|
|
# Sanitize country if provided
|
|
country = None
|
|
if contact_data.country:
|
|
country = contact_data.country.strip()
|
|
|
|
# Create a new dict with sanitized values
|
|
sanitized_data = ContactCreate(
|
|
name=contact_data.name.strip(),
|
|
email=email,
|
|
phone_number=phone_number,
|
|
message=contact_data.message.strip(),
|
|
country=country
|
|
)
|
|
return sanitized_data
|
|
except EmailNotValidError as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail={"message": "Invalid email format", "error": str(e)}
|
|
)
|
|
|
|
def format_contact_response(contact: Contact) -> ContactSchema:
|
|
"""
|
|
Formats a contact database object into a response schema.
|
|
|
|
Args:
|
|
contact (Contact): The contact database object.
|
|
|
|
Returns:
|
|
ContactSchema: The formatted contact response.
|
|
"""
|
|
return ContactSchema.from_orm(contact) |