""" WhatsApp webhook endpoints. """ import logging from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy.orm import Session from app.core.config import settings from app.db.session import get_db from app.schemas.message import MessageCreate, WhatsAppWebhook from app.services.message_service import create_message # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) router = APIRouter() @router.get("/whatsapp") async def verify_webhook( hub_mode: str = Query(..., alias="hub.mode"), hub_challenge: int = Query(..., alias="hub.challenge"), hub_verify_token: str = Query(..., alias="hub.verify_token"), ): """ Verify WhatsApp webhook endpoint. This endpoint is used by WhatsApp to verify the webhook URL. """ # Check if the verify token matches our configured token if hub_mode == "subscribe" and hub_verify_token == settings.WHATSAPP_VERIFY_TOKEN: logger.info("Webhook verified successfully") return hub_challenge else: logger.error("Webhook verification failed: invalid verification token") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Verification failed", ) @router.post("/whatsapp") async def receive_webhook( webhook: WhatsAppWebhook, db: Session = Depends(get_db), # noqa: B008 ): """ Receive WhatsApp webhook notifications. This endpoint receives message notifications from WhatsApp Business API. """ try: # Verify it's a WhatsApp webhook if webhook.object != "whatsapp_business_account": logger.warning(f"Received non-WhatsApp webhook: {webhook.object}") return {"status": "ignored"} # Process each entry in the webhook for entry in webhook.entry: for change in entry.changes: # Extract value from change value = change.get("value", {}) if not value: continue # Extract messages from value messages = value.get("messages", []) contacts = value.get("contacts", []) # Process each message for message in messages: # Only process text messages if message.get("type") != "text": continue # Get message data phone_number = message.get("from") message.get("id") message_body = message.get("text", {}).get("body", "") # Find contact info sender_name = "Unknown" for contact in contacts: if contact.get("wa_id") == phone_number: profile = contact.get("profile", {}) sender_name = profile.get("name", "Unknown") # Create message in database message_data = MessageCreate( sender_phone=phone_number, sender_name=sender_name, message_body=message_body, ) create_message(db=db, message=message_data) logger.info(f"Saved message from {sender_name} ({phone_number})") return {"status": "success"} except Exception as e: logger.exception(f"Error processing webhook: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error processing webhook: {str(e)}", ) from e