114 lines
3.5 KiB
Python
114 lines
3.5 KiB
Python
from typing import Dict, Any
|
|
from sqlalchemy.orm import Session
|
|
from models.contact import Contact
|
|
from schemas.contact import ContactCreate, ContactSchema
|
|
from fastapi import HTTPException, status
|
|
import email_validator
|
|
|
|
def validate_contact_data(contact_data: Dict[str, Any]) -> Dict[str, str]:
|
|
"""
|
|
Validates contact form submission data with enhanced validation.
|
|
|
|
Args:
|
|
contact_data (Dict[str, Any]): The contact form data to validate.
|
|
|
|
Returns:
|
|
Dict[str, str]: Dictionary of validation errors, empty if valid.
|
|
"""
|
|
errors = {}
|
|
|
|
# Validate name
|
|
if not contact_data.get("name"):
|
|
errors["name"] = "Name is required"
|
|
elif not contact_data["name"].strip():
|
|
errors["name"] = "Name cannot consist of only whitespace"
|
|
|
|
# Validate email with enhanced validation
|
|
if not contact_data.get("email"):
|
|
errors["email"] = "Email is required"
|
|
else:
|
|
try:
|
|
email_validator.validate_email(contact_data["email"])
|
|
except email_validator.EmailNotValidError:
|
|
errors["email"] = "Invalid email format - please provide a valid email address"
|
|
|
|
# Validate message
|
|
if not contact_data.get("message"):
|
|
errors["message"] = "Message is required"
|
|
elif not contact_data["message"].strip():
|
|
errors["message"] = "Message cannot consist of only whitespace"
|
|
|
|
return errors
|
|
|
|
def create_contact(db: Session, contact_data: ContactCreate) -> Contact:
|
|
"""
|
|
Creates a new contact submission in the database.
|
|
|
|
Args:
|
|
db (Session): The database session.
|
|
contact_data (ContactCreate): The validated contact form data.
|
|
|
|
Returns:
|
|
Contact: The newly created contact object.
|
|
|
|
Raises:
|
|
HTTPException: If there are validation errors.
|
|
"""
|
|
# Validate data
|
|
validation_errors = validate_contact_data(contact_data.dict())
|
|
if validation_errors:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=validation_errors
|
|
)
|
|
|
|
# Create new contact
|
|
db_contact = Contact(**contact_data.dict())
|
|
db.add(db_contact)
|
|
db.commit()
|
|
db.refresh(db_contact)
|
|
return db_contact
|
|
|
|
def format_contact_response(contact: Contact) -> ContactSchema:
|
|
"""
|
|
Formats a contact database object into the response schema.
|
|
|
|
Args:
|
|
contact (Contact): The contact database object.
|
|
|
|
Returns:
|
|
ContactSchema: The formatted contact response.
|
|
"""
|
|
return ContactSchema.from_orm(contact)
|
|
|
|
def sanitize_contact_input(contact_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Sanitizes contact form input data with enhanced validation.
|
|
|
|
Args:
|
|
contact_data (Dict[str, Any]): Raw contact form data.
|
|
|
|
Returns:
|
|
Dict[str, Any]: Sanitized contact form data.
|
|
"""
|
|
sanitized = {}
|
|
|
|
if "name" in contact_data:
|
|
sanitized["name"] = contact_data["name"].strip()
|
|
|
|
if "email" in contact_data:
|
|
sanitized["email"] = contact_data["email"].lower().strip()
|
|
|
|
if "message" in contact_data:
|
|
sanitized["message"] = contact_data["message"].strip()
|
|
|
|
# Ensure all required fields are present
|
|
required_fields = ["name", "email", "message"]
|
|
for field in required_fields:
|
|
if field not in sanitized or not sanitized[field]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"{field.capitalize()} is required"
|
|
)
|
|
|
|
return sanitized |