Automated Action ef749e4878 Implement WhatsApp Message Analytics Service
- Set up FastAPI application structure
- Implement SQLite database with SQLAlchemy
- Create WhatsApp webhook endpoints
- Implement message storage and analysis
- Integrate Gemini 2.5 Pro for message analysis
- Add email delivery of insights
- Configure APScheduler for weekend analysis
- Add linting with Ruff
2025-05-22 13:29:12 +00:00

107 lines
3.6 KiB
Python

"""
WhatsApp webhook endpoints.
"""
import logging
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from app.core.config import settings
from app.db.session import get_db
from app.schemas.message import MessageCreate, WhatsAppWebhook
from app.services.message_service import create_message
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/whatsapp")
async def verify_webhook(
hub_mode: str = Query(..., alias="hub.mode"),
hub_challenge: int = Query(..., alias="hub.challenge"),
hub_verify_token: str = Query(..., alias="hub.verify_token"),
):
"""
Verify WhatsApp webhook endpoint.
This endpoint is used by WhatsApp to verify the webhook URL.
"""
# Check if the verify token matches our configured token
if hub_mode == "subscribe" and hub_verify_token == settings.WHATSAPP_VERIFY_TOKEN:
logger.info("Webhook verified successfully")
return hub_challenge
else:
logger.error("Webhook verification failed: invalid verification token")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Verification failed",
)
@router.post("/whatsapp")
async def receive_webhook(
webhook: WhatsAppWebhook,
db: Session = Depends(get_db), # noqa: B008
):
"""
Receive WhatsApp webhook notifications.
This endpoint receives message notifications from WhatsApp Business API.
"""
try:
# Verify it's a WhatsApp webhook
if webhook.object != "whatsapp_business_account":
logger.warning(f"Received non-WhatsApp webhook: {webhook.object}")
return {"status": "ignored"}
# Process each entry in the webhook
for entry in webhook.entry:
for change in entry.changes:
# Extract value from change
value = change.get("value", {})
if not value:
continue
# Extract messages from value
messages = value.get("messages", [])
contacts = value.get("contacts", [])
# Process each message
for message in messages:
# Only process text messages
if message.get("type") != "text":
continue
# Get message data
phone_number = message.get("from")
message.get("id")
message_body = message.get("text", {}).get("body", "")
# Find contact info
sender_name = "Unknown"
for contact in contacts:
if contact.get("wa_id") == phone_number:
profile = contact.get("profile", {})
sender_name = profile.get("name", "Unknown")
# Create message in database
message_data = MessageCreate(
sender_phone=phone_number,
sender_name=sender_name,
message_body=message_body,
)
create_message(db=db, message=message_data)
logger.info(f"Saved message from {sender_name} ({phone_number})")
return {"status": "success"}
except Exception as e:
logger.exception(f"Error processing webhook: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error processing webhook: {str(e)}",
) from e