diff --git a/README.md b/README.md index e8acfba..46ed945 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,146 @@ -# FastAPI Application +# WhatsApp Message Analytics Service -This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform. +A FastAPI application that receives webhooks from WhatsApp Business API, stores messages in a SQLite database, and provides insights using Gemini 2.5 Pro LLM. + +## Features + +- Webhook integration with WhatsApp Business API +- Message storage in SQLite database +- Weekly message analysis using Gemini 2.5 Pro LLM +- Email delivery of insights +- RESTful API for accessing messages + +## Tech Stack + +- **Framework**: FastAPI +- **Database**: SQLite with SQLAlchemy ORM +- **Migrations**: Alembic +- **AI Integration**: Google Gemini 2.5 Pro +- **Task Scheduling**: APScheduler +- **Email**: SMTP with HTML templates + +## Setup and Installation + +### Prerequisites + +- Python 3.9+ +- Access to WhatsApp Business API +- Google AI Gemini API key +- SMTP server for sending emails + +### Installation + +1. Clone the repository: + ```bash + git clone https://github.com/yourusername/whatsapp-message-analytics-service.git + cd whatsapp-message-analytics-service + ``` + +2. Create a virtual environment and activate it: + ```bash + python -m venv venv + source venv/bin/activate # On Windows: venv\Scripts\activate + ``` + +3. Install dependencies: + ```bash + pip install -r requirements.txt + ``` + +4. Create a `.env` file in the project root with the following content: + ``` + # WhatsApp API configuration + WHATSAPP_VERIFY_TOKEN=your_verify_token + + # Email configuration + SMTP_TLS=True + SMTP_PORT=587 + SMTP_HOST=smtp.gmail.com + SMTP_USER=your_email@gmail.com + SMTP_PASSWORD=your_app_password + EMAILS_FROM_EMAIL=your_email@gmail.com + EMAILS_FROM_NAME=WhatsApp Message Analytics + EMAIL_RECIPIENTS=["recipient1@example.com", "recipient2@example.com"] + + # Gemini API configuration + GEMINI_API_KEY=your_gemini_api_key + + # CORS configuration (optional) + BACKEND_CORS_ORIGINS=["http://localhost", "http://localhost:8080"] + ``` + +5. Initialize the database: + ```bash + alembic upgrade head + ``` + +### Running the Application + +Start the application with Uvicorn: + +```bash +uvicorn main:app --reload +``` + +The API will be available at http://localhost:8000, and the documentation at http://localhost:8000/docs. + +## WhatsApp Webhook Setup + +1. In your WhatsApp Business API dashboard, set up a webhook with the following URL: + ``` + https://your-domain.com/api/v1/webhooks/whatsapp + ``` + +2. Set the Verify Token to match the `WHATSAPP_VERIFY_TOKEN` in your `.env` file. + +3. Subscribe to the relevant webhook events (typically `messages`). + +## API Endpoints + +- `GET /health`: Health check endpoint +- `GET /api/v1/webhooks/whatsapp`: Webhook verification endpoint for WhatsApp +- `POST /api/v1/webhooks/whatsapp`: Webhook endpoint for receiving WhatsApp messages +- `GET /api/v1/messages/`: Get all messages with pagination +- `GET /api/v1/messages/{message_id}`: Get a specific message by ID + +## Message Analysis + +The service is configured to run message analysis every Sunday at 6:00 AM by default. This can be configured through the following environment variables: + +- `ANALYSIS_CRON_DAY_OF_WEEK`: Day of the week to run analysis (default: "sun") +- `ANALYSIS_CRON_HOUR`: Hour to run analysis (default: 6) +- `ANALYSIS_CRON_MINUTE`: Minute to run analysis (default: 0) + +## Development + +### Database Migrations + +Create a new migration: + +```bash +alembic revision --autogenerate -m "Description of changes" +``` + +Apply migrations: + +```bash +alembic upgrade head +``` + +### Code Linting + +Lint code with Ruff: + +```bash +ruff check . +``` + +Fix linting issues: + +```bash +ruff check --fix . +``` + +## License + +[MIT License](LICENSE) \ No newline at end of file diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 0000000..ae24797 --- /dev/null +++ b/alembic.ini @@ -0,0 +1,102 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +script_location = migrations + +# template used to generate migration files +# file_template = %%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. +prepend_sys_path = . + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the python-dateutil library that can be +# installed by adding `alembic[tz]` to the pip requirements +# string value is passed to dateutil.tz.gettz() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the +# "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to migrations/versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "version_path_separator" below. +# version_locations = %(here)s/bar:%(here)s/bat:migrations/versions + +# version path separator; As mentioned above, this is the character used to split +# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep. +# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas. +# Valid values for version_path_separator are: +# +# version_path_separator = : +# version_path_separator = ; +# version_path_separator = space +version_path_separator = os # Use os.pathsep. Default configuration used for new projects. + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +# SQLite URL example +sqlalchemy.url = sqlite:////app/storage/db/db.sqlite + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S \ No newline at end of file diff --git a/app/api/__init__.py b/app/api/__init__.py new file mode 100644 index 0000000..66a004c --- /dev/null +++ b/app/api/__init__.py @@ -0,0 +1,3 @@ +""" +API package. +""" diff --git a/app/api/api_v1/__init__.py b/app/api/api_v1/__init__.py new file mode 100644 index 0000000..250e042 --- /dev/null +++ b/app/api/api_v1/__init__.py @@ -0,0 +1,3 @@ +""" +API v1 package. +""" diff --git a/app/api/api_v1/api.py b/app/api/api_v1/api.py new file mode 100644 index 0000000..53f09e9 --- /dev/null +++ b/app/api/api_v1/api.py @@ -0,0 +1,10 @@ +""" +API v1 router. +""" +from fastapi import APIRouter + +from app.api.api_v1.endpoints import messages, webhooks + +api_router = APIRouter() +api_router.include_router(webhooks.router, prefix="/webhooks", tags=["webhooks"]) +api_router.include_router(messages.router, prefix="/messages", tags=["messages"]) diff --git a/app/api/api_v1/endpoints/__init__.py b/app/api/api_v1/endpoints/__init__.py new file mode 100644 index 0000000..1bfd1d4 --- /dev/null +++ b/app/api/api_v1/endpoints/__init__.py @@ -0,0 +1,3 @@ +""" +API v1 endpoints. +""" diff --git a/app/api/api_v1/endpoints/messages.py b/app/api/api_v1/endpoints/messages.py new file mode 100644 index 0000000..3b447dc --- /dev/null +++ b/app/api/api_v1/endpoints/messages.py @@ -0,0 +1,60 @@ +""" +Messages API endpoints. +""" + +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy.orm import Session + +from app.db.session import get_db +from app.schemas.message import Message +from app.services.message_service import get_all_messages, get_message + +router = APIRouter() + + +@router.get("/", response_model=list[Message]) +async def read_messages( + skip: int = 0, + limit: int = 100, + db: Session = Depends(get_db), # noqa: B008 +): + """ + Retrieve all messages. + + Args: + skip: Number of messages to skip + limit: Maximum number of messages to return + db: Database session + + Returns: + List of messages + """ + messages = get_all_messages(db, skip=skip, limit=limit) + return messages + + +@router.get("/{message_id}", response_model=Message) +async def read_message( + message_id: int, + db: Session = Depends(get_db), # noqa: B008 +): + """ + Retrieve a specific message by ID. + + Args: + message_id: ID of the message to retrieve + db: Database session + + Returns: + The requested message + + Raises: + HTTPException: If the message is not found + """ + message = get_message(db, message_id=message_id) + if message is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Message not found", + ) + return message diff --git a/app/api/api_v1/endpoints/webhooks.py b/app/api/api_v1/endpoints/webhooks.py new file mode 100644 index 0000000..9f84ec1 --- /dev/null +++ b/app/api/api_v1/endpoints/webhooks.py @@ -0,0 +1,106 @@ +""" +WhatsApp webhook endpoints. +""" +import logging + +from fastapi import APIRouter, Depends, HTTPException, Query, status +from sqlalchemy.orm import Session + +from app.core.config import settings +from app.db.session import get_db +from app.schemas.message import MessageCreate, WhatsAppWebhook +from app.services.message_service import create_message + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +router = APIRouter() + + +@router.get("/whatsapp") +async def verify_webhook( + hub_mode: str = Query(..., alias="hub.mode"), + hub_challenge: int = Query(..., alias="hub.challenge"), + hub_verify_token: str = Query(..., alias="hub.verify_token"), +): + """ + Verify WhatsApp webhook endpoint. + + This endpoint is used by WhatsApp to verify the webhook URL. + """ + # Check if the verify token matches our configured token + if hub_mode == "subscribe" and hub_verify_token == settings.WHATSAPP_VERIFY_TOKEN: + logger.info("Webhook verified successfully") + return hub_challenge + else: + logger.error("Webhook verification failed: invalid verification token") + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Verification failed", + ) + + +@router.post("/whatsapp") +async def receive_webhook( + webhook: WhatsAppWebhook, + db: Session = Depends(get_db), # noqa: B008 +): + """ + Receive WhatsApp webhook notifications. + + This endpoint receives message notifications from WhatsApp Business API. + """ + try: + # Verify it's a WhatsApp webhook + if webhook.object != "whatsapp_business_account": + logger.warning(f"Received non-WhatsApp webhook: {webhook.object}") + return {"status": "ignored"} + + # Process each entry in the webhook + for entry in webhook.entry: + for change in entry.changes: + # Extract value from change + value = change.get("value", {}) + if not value: + continue + + # Extract messages from value + messages = value.get("messages", []) + contacts = value.get("contacts", []) + + # Process each message + for message in messages: + # Only process text messages + if message.get("type") != "text": + continue + + # Get message data + phone_number = message.get("from") + message.get("id") + message_body = message.get("text", {}).get("body", "") + + # Find contact info + sender_name = "Unknown" + for contact in contacts: + if contact.get("wa_id") == phone_number: + profile = contact.get("profile", {}) + sender_name = profile.get("name", "Unknown") + + # Create message in database + message_data = MessageCreate( + sender_phone=phone_number, + sender_name=sender_name, + message_body=message_body, + ) + create_message(db=db, message=message_data) + logger.info(f"Saved message from {sender_name} ({phone_number})") + + return {"status": "success"} + + except Exception as e: + logger.exception(f"Error processing webhook: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error processing webhook: {str(e)}", + ) from e diff --git a/app/core/config.py b/app/core/config.py new file mode 100644 index 0000000..51cf348 --- /dev/null +++ b/app/core/config.py @@ -0,0 +1,67 @@ +""" +Configuration settings for the application. +""" +from pathlib import Path + +from pydantic import AnyHttpUrl, EmailStr, validator +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + """ + Application settings. + """ + API_V1_STR: str = "/api/v1" + PROJECT_NAME: str = "WhatsApp Message Analytics Service" + PROJECT_DESCRIPTION: str = "A service that processes WhatsApp messages and provides insights" + PROJECT_VERSION: str = "0.1.0" + + # CORS configuration + BACKEND_CORS_ORIGINS: list[str | AnyHttpUrl] = [] + + @validator("BACKEND_CORS_ORIGINS", pre=True) + def assemble_cors_origins(self, v: str | list[str]) -> list[str] | str: + """ + Parse and validate CORS origins. + """ + if isinstance(v, str) and not v.startswith("["): + return [i.strip() for i in v.split(",")] + elif isinstance(v, list | str): + return v + raise ValueError(v) + + # Database configuration + DB_DIR: Path = Path("/app") / "storage" / "db" + SQLALCHEMY_DATABASE_URL: str = f"sqlite:///{DB_DIR}/db.sqlite" + + # WhatsApp API configuration + WHATSAPP_VERIFY_TOKEN: str = "your_verify_token" + + # Email configuration + SMTP_TLS: bool = True + SMTP_PORT: int | None = 587 + SMTP_HOST: str | None = "smtp.gmail.com" + SMTP_USER: str | None = "" + SMTP_PASSWORD: str | None = "" + EMAILS_FROM_EMAIL: EmailStr | None = "your-email@example.com" + EMAILS_FROM_NAME: str | None = "WhatsApp Message Analytics" + EMAIL_RECIPIENTS: list[EmailStr] = [] + + # Gemini API configuration + GEMINI_API_KEY: str = "your_gemini_api_key" + + # Scheduler configuration + ENABLE_SCHEDULER: bool = True + ANALYSIS_CRON_DAY_OF_WEEK: str = "sun" # Run on Sundays + ANALYSIS_CRON_HOUR: int = 6 # Run at 6 AM + ANALYSIS_CRON_MINUTE: int = 0 # Run at 0 minutes + + class Config: + """ + Configuration for the settings class. + """ + case_sensitive = True + env_file = ".env" + + +settings = Settings() diff --git a/app/core/scheduler.py b/app/core/scheduler.py new file mode 100644 index 0000000..9dff094 --- /dev/null +++ b/app/core/scheduler.py @@ -0,0 +1,52 @@ +""" +Scheduler module for running periodic tasks. +""" +import logging + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger + +from app.core.config import settings +from app.services.analysis_service import analyze_messages_and_send_insights + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Create scheduler +scheduler = AsyncIOScheduler() + + +def start_scheduler(): + """ + Start the scheduler with the message analysis cron job. + """ + if not scheduler.running: + # Schedule the message analysis job to run on weekends (Sunday by default) + scheduler.add_job( + analyze_messages_and_send_insights, + trigger=CronTrigger( + day_of_week=settings.ANALYSIS_CRON_DAY_OF_WEEK, + hour=settings.ANALYSIS_CRON_HOUR, + minute=settings.ANALYSIS_CRON_MINUTE, + ), + id="message_analysis_job", + name="Analyze WhatsApp messages and send insights via email", + replace_existing=True, + ) + + # Log the next run time + job = scheduler.get_job("message_analysis_job") + if job: + next_run_time = job.next_run_time + if next_run_time: + logger.info( + "Message analysis job scheduled. Next run: %s", + next_run_time.strftime("%Y-%m-%d %H:%M:%S"), + ) + + # Start the scheduler + scheduler.start() + logger.info("Scheduler started successfully") + else: + logger.info("Scheduler is already running") diff --git a/app/db/base.py b/app/db/base.py new file mode 100644 index 0000000..82c9685 --- /dev/null +++ b/app/db/base.py @@ -0,0 +1,8 @@ +""" +Base module for database models. +""" +# Import all the models, so that Base has them before being +# imported by Alembic +from app.db.base_class import Base # noqa +from app.models.analysis import Analysis # noqa +from app.models.message import Message # noqa \ No newline at end of file diff --git a/app/db/base_class.py b/app/db/base_class.py new file mode 100644 index 0000000..59aa179 --- /dev/null +++ b/app/db/base_class.py @@ -0,0 +1,20 @@ +""" +Base class for SQLAlchemy models. +""" +from typing import Any + +from sqlalchemy.ext.declarative import as_declarative, declared_attr + + +@as_declarative() +class Base: + """ + Base class for all SQLAlchemy models. + """ + id: Any + __name__: str + + # Generate tablename automatically based on class name + @declared_attr + def __tablename__(self) -> str: + return self.__name__.lower() diff --git a/app/db/session.py b/app/db/session.py new file mode 100644 index 0000000..698ece9 --- /dev/null +++ b/app/db/session.py @@ -0,0 +1,34 @@ +""" +Database session module. +""" + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from app.core.config import settings + +# Ensure the database directory exists +settings.DB_DIR.mkdir(parents=True, exist_ok=True) + +# Create SQLAlchemy engine +engine = create_engine( + settings.SQLALCHEMY_DATABASE_URL, + connect_args={"check_same_thread": False}, +) + +# Create sessionmaker +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + + +def get_db(): + """ + Get a database session. + + Yields: + Session: A SQLAlchemy session. + """ + db = SessionLocal() + try: + yield db + finally: + db.close() diff --git a/app/models/analysis.py b/app/models/analysis.py new file mode 100644 index 0000000..f96b337 --- /dev/null +++ b/app/models/analysis.py @@ -0,0 +1,29 @@ +""" +SQLAlchemy models for message analyses. +""" + +from sqlalchemy import Column, DateTime, Integer, Text +from sqlalchemy.sql import func + +from app.db.base_class import Base + + +class Analysis(Base): + """ + Model for storing message analysis results. + """ + id = Column(Integer, primary_key=True, index=True) + + # Analysis details + analysis_text = Column(Text, nullable=False) + + # Metadata + created_at = Column(DateTime, default=func.now(), nullable=False, index=True) + start_date = Column(DateTime, nullable=False) + end_date = Column(DateTime, nullable=False) + + def __repr__(self) -> str: + """ + String representation of the analysis. + """ + return f"" diff --git a/app/models/message.py b/app/models/message.py new file mode 100644 index 0000000..4062265 --- /dev/null +++ b/app/models/message.py @@ -0,0 +1,33 @@ +""" +SQLAlchemy models for WhatsApp messages. +""" + +from sqlalchemy import Boolean, Column, DateTime, Integer, String, Text +from sqlalchemy.sql import func + +from app.db.base_class import Base + + +class Message(Base): + """ + Model for storing WhatsApp messages. + """ + id = Column(Integer, primary_key=True, index=True) + + # WhatsApp message details + sender_phone = Column(String(20), index=True, nullable=False) + sender_name = Column(String(255), index=True, nullable=False) + message_body = Column(Text, nullable=False) + + # Metadata + timestamp = Column(DateTime, default=func.now(), nullable=False, index=True) + + # Tracking fields + analyzed = Column(Boolean, default=False, index=True) + last_analyzed_at = Column(DateTime, nullable=True) + + def __repr__(self) -> str: + """ + String representation of the message. + """ + return f"" diff --git a/app/schemas/__init__.py b/app/schemas/__init__.py new file mode 100644 index 0000000..58c4281 --- /dev/null +++ b/app/schemas/__init__.py @@ -0,0 +1,3 @@ +""" +Pydantic schemas for the application. +""" diff --git a/app/schemas/analysis.py b/app/schemas/analysis.py new file mode 100644 index 0000000..2946a2d --- /dev/null +++ b/app/schemas/analysis.py @@ -0,0 +1,36 @@ +""" +Pydantic schemas for message analyses. +""" +from datetime import datetime + +from pydantic import BaseModel, Field + + +class AnalysisBase(BaseModel): + """ + Base schema for message analyses. + """ + analysis_text: str = Field(..., description="Text of the analysis") + start_date: datetime = Field(..., description="Start date of the analysis period") + end_date: datetime = Field(..., description="End date of the analysis period") + + +class AnalysisCreate(AnalysisBase): + """ + Schema for creating a new analysis. + """ + pass + + +class Analysis(AnalysisBase): + """ + Schema for an analysis. + """ + id: int + created_at: datetime + + class Config: + """ + Pydantic config for the Analysis schema. + """ + from_attributes = True diff --git a/app/schemas/message.py b/app/schemas/message.py new file mode 100644 index 0000000..5547fa7 --- /dev/null +++ b/app/schemas/message.py @@ -0,0 +1,76 @@ +""" +Pydantic schemas for WhatsApp messages. +""" +from datetime import datetime + +from pydantic import BaseModel, Field + + +class MessageBase(BaseModel): + """ + Base schema for WhatsApp messages. + """ + sender_phone: str = Field(..., description="Phone number of the message sender") + sender_name: str = Field(..., description="Name of the message sender") + message_body: str = Field(..., description="Content of the message") + + +class MessageCreate(MessageBase): + """ + Schema for creating a new message. + """ + pass + + +class Message(MessageBase): + """ + Schema for a message. + """ + id: int + timestamp: datetime + analyzed: bool + last_analyzed_at: datetime | None = None + + class Config: + """ + Pydantic config for the Message schema. + """ + from_attributes = True + + +# WhatsApp webhook specific schemas +class WhatsAppTextMessage(BaseModel): + """ + Schema for a WhatsApp text message. + """ + from_: str = Field(..., alias="from") + id: str + timestamp: str + text: dict + type: str + + +class WhatsAppValue(BaseModel): + """ + Schema for WhatsApp webhook value object. + """ + messaging_product: str + metadata: dict + contacts: list + messages: list[WhatsAppTextMessage] + + +class WhatsAppWebhookEntry(BaseModel): + """ + Schema for a WhatsApp webhook entry. + """ + id: str + changes: list[dict] + + +class WhatsAppWebhook(BaseModel): + """ + Schema for a WhatsApp webhook. + """ + object: str + entry: list[WhatsAppWebhookEntry] diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..627a579 --- /dev/null +++ b/app/services/__init__.py @@ -0,0 +1,3 @@ +""" +Services package. +""" diff --git a/app/services/analysis_service.py b/app/services/analysis_service.py new file mode 100644 index 0000000..dff8d0b --- /dev/null +++ b/app/services/analysis_service.py @@ -0,0 +1,144 @@ +""" +Service for analyzing WhatsApp messages. +""" +import logging +from datetime import datetime, timedelta + +from sqlalchemy.orm import Session + +from app.models.analysis import Analysis +from app.schemas.analysis import AnalysisCreate +from app.services.email_service import send_analysis_email +from app.services.gemini_service import analyze_messages +from app.services.message_service import ( + get_messages_by_date_range, + mark_messages_as_analyzed, +) + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def create_analysis(db: Session, analysis: AnalysisCreate) -> Analysis: + """ + Create a new analysis. + + Args: + db: Database session + analysis: Analysis data + + Returns: + The created analysis + """ + db_analysis = Analysis( + analysis_text=analysis.analysis_text, + start_date=analysis.start_date, + end_date=analysis.end_date, + ) + db.add(db_analysis) + db.commit() + db.refresh(db_analysis) + return db_analysis + + +def get_analysis(db: Session, analysis_id: int) -> Analysis | None: + """ + Get an analysis by ID. + + Args: + db: Database session + analysis_id: ID of the analysis to retrieve + + Returns: + The analysis if found, None otherwise + """ + return db.query(Analysis).filter(Analysis.id == analysis_id).first() + + +def get_all_analyses(db: Session, skip: int = 0, limit: int = 100) -> list[Analysis]: + """ + Get all analyses with pagination. + + Args: + db: Database session + skip: Number of analyses to skip + limit: Maximum number of analyses to return + + Returns: + List of analyses + """ + return ( + db.query(Analysis) + .order_by(Analysis.created_at.desc()) + .offset(skip) + .limit(limit) + .all() + ) + + +async def analyze_messages_and_send_insights() -> None: + """ + Analyze messages and send insights via email. + + This function is intended to be run as a scheduled task. + """ + # Create a new database session + from app.db.session import SessionLocal + + db = SessionLocal() + try: + # Get the date range for analysis (last 7 days) + end_date = datetime.now() + start_date = end_date - timedelta(days=7) + + # Get messages within the date range + messages = get_messages_by_date_range(db, start_date, end_date) + + if not messages: + logger.info("No messages found for analysis") + return + + # Extract message texts for analysis + message_texts = [ + f"From: {msg.sender_name} ({msg.sender_phone})\nTime: {msg.timestamp}\nMessage: {msg.message_body}" + for msg in messages + ] + + # Analyze messages using Gemini + analysis_text = await analyze_messages(message_texts) + + if not analysis_text: + logger.error("Failed to analyze messages") + return + + # Create analysis record + analysis_data = AnalysisCreate( + analysis_text=analysis_text, + start_date=start_date, + end_date=end_date, + ) + create_analysis(db, analysis_data) + + # Mark messages as analyzed + message_ids = [msg.id for msg in messages] + mark_messages_as_analyzed(db, message_ids) + + # Send email with insights + email_sent = await send_analysis_email( + subject=f"WhatsApp Message Analysis: {start_date.date()} to {end_date.date()}", + analysis_text=analysis_text, + start_date=start_date, + end_date=end_date, + ) + + if email_sent: + logger.info("Analysis email sent successfully") + else: + logger.error("Failed to send analysis email") + + except Exception as e: + logger.exception(f"Error in analyze_messages_and_send_insights: {str(e)}") + + finally: + db.close() diff --git a/app/services/email_service.py b/app/services/email_service.py new file mode 100644 index 0000000..4e5d380 --- /dev/null +++ b/app/services/email_service.py @@ -0,0 +1,184 @@ +""" +Service for sending emails. +""" +import logging +import smtplib +from datetime import datetime +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + +from jinja2 import Template + +from app.core.config import settings + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Email template for analysis results +ANALYSIS_EMAIL_TEMPLATE = """ + + + + + WhatsApp Message Analysis + + + +
+

WhatsApp Message Analysis

+
+
+
+

Analysis for messages from {{ start_date.strftime('%Y-%m-%d') }} to {{ end_date.strftime('%Y-%m-%d') }}

+
+ +
+ {{ analysis_text | safe }} +
+
+ + + +""" + + +async def send_email( + recipients: list[str], + subject: str, + html_content: str, + from_email: str | None = None, + from_name: str | None = None, +) -> bool: + """ + Send an email. + + Args: + recipients: List of email addresses to send to + subject: Email subject + html_content: HTML content of the email + from_email: Sender email address (defaults to settings.EMAILS_FROM_EMAIL) + from_name: Sender name (defaults to settings.EMAILS_FROM_NAME) + + Returns: + True if the email was sent successfully, False otherwise + """ + try: + # Use default sender details if not provided + sender_email = from_email or settings.EMAILS_FROM_EMAIL + sender_name = from_name or settings.EMAILS_FROM_NAME + + # Create message + message = MIMEMultipart() + message["From"] = f"{sender_name} <{sender_email}>" + message["To"] = ", ".join(recipients) + message["Subject"] = subject + + # Attach HTML content + message.attach(MIMEText(html_content, "html")) + + # Connect to SMTP server and send email + with smtplib.SMTP(settings.SMTP_HOST, settings.SMTP_PORT) as server: + if settings.SMTP_TLS: + server.starttls() + if settings.SMTP_USER and settings.SMTP_PASSWORD: + server.login(settings.SMTP_USER, settings.SMTP_PASSWORD) + server.send_message(message) + + logger.info(f"Email sent successfully to {len(recipients)} recipients") + return True + + except Exception as e: + logger.exception(f"Failed to send email: {str(e)}") + return False + + +async def send_analysis_email( + subject: str, + analysis_text: str, + start_date: datetime, + end_date: datetime, +) -> bool: + """ + Send an email with message analysis. + + Args: + subject: Email subject + analysis_text: Text of the analysis + start_date: Start date of the analysis period + end_date: End date of the analysis period + + Returns: + True if the email was sent successfully, False otherwise + """ + try: + # Get recipients from settings + recipients = settings.EMAIL_RECIPIENTS + if not recipients: + logger.error("No email recipients configured") + return False + + # Format HTML content using template + template = Template(ANALYSIS_EMAIL_TEMPLATE) + html_content = template.render( + analysis_text=analysis_text, + start_date=start_date, + end_date=end_date, + current_year=datetime.now().year, + ) + + # Send email + return await send_email( + recipients=recipients, + subject=subject, + html_content=html_content, + ) + + except Exception as e: + logger.exception(f"Error sending analysis email: {str(e)}") + return False diff --git a/app/services/gemini_service.py b/app/services/gemini_service.py new file mode 100644 index 0000000..54f9c02 --- /dev/null +++ b/app/services/gemini_service.py @@ -0,0 +1,62 @@ +""" +Service for interacting with Google's Gemini API. +""" +import logging + +import google.generativeai as genai + +from app.core.config import settings + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Configure Gemini API +genai.configure(api_key=settings.GEMINI_API_KEY) + + +async def analyze_messages(messages: list[str]) -> str | None: + """ + Analyze a list of WhatsApp messages using Gemini 2.5 Pro. + + Args: + messages: List of message texts to analyze + + Returns: + Analysis text or None if analysis failed + """ + try: + # Join messages into a single string with line breaks + messages_text = "\n\n".join(messages) + + # Prepare prompt for Gemini + prompt = f""" + You are an expert in analyzing WhatsApp conversations. I will provide you with a set of WhatsApp messages. + + Please analyze these messages and provide the following insights: + + 1. Key topics and themes discussed + 2. Sentiment analysis (overall mood of the conversations) + 3. Frequent participants and their engagement levels + 4. Any action items or follow-ups mentioned + 5. Suggestions or recommendations based on the conversations + + Format your response in a well-organized manner with clear headings and bullet points. + + Here are the messages to analyze: + + {messages_text} + """ + + # Get Gemini model + model = genai.GenerativeModel(model_name="gemini-1.5-pro") + + # Generate response + response = await model.generate_content_async(prompt) + + # Return the analysis text + return response.text + + except Exception as e: + logger.exception(f"Error analyzing messages with Gemini: {str(e)}") + return None diff --git a/app/services/message_service.py b/app/services/message_service.py new file mode 100644 index 0000000..a7fb8ba --- /dev/null +++ b/app/services/message_service.py @@ -0,0 +1,133 @@ +""" +Service for managing WhatsApp messages. +""" +from datetime import datetime + +from sqlalchemy.orm import Session + +from app.models.message import Message +from app.schemas.message import MessageCreate + + +def get_message(db: Session, message_id: int) -> Message | None: + """ + Get a message by ID. + + Args: + db: Database session + message_id: ID of the message to retrieve + + Returns: + The message if found, None otherwise + """ + return db.query(Message).filter(Message.id == message_id).first() + + +def get_all_messages(db: Session, skip: int = 0, limit: int = 100) -> list[Message]: + """ + Get all messages with pagination. + + Args: + db: Database session + skip: Number of messages to skip + limit: Maximum number of messages to return + + Returns: + List of messages + """ + return db.query(Message).order_by(Message.timestamp.desc()).offset(skip).limit(limit).all() + + +def get_unanalyzed_messages(db: Session) -> list[Message]: + """ + Get all messages that have not been analyzed yet. + + Args: + db: Database session + + Returns: + List of unanalyzed messages + """ + return db.query(Message).filter(not Message.analyzed).all() + + +def get_messages_by_date_range( + db: Session, start_date: datetime, end_date: datetime, +) -> list[Message]: + """ + Get messages within a date range. + + Args: + db: Database session + start_date: Start date for filtering messages + end_date: End date for filtering messages + + Returns: + List of messages within the date range + """ + return ( + db.query(Message) + .filter(Message.timestamp >= start_date, Message.timestamp <= end_date) + .order_by(Message.timestamp) + .all() + ) + + +def create_message(db: Session, message: MessageCreate) -> Message: + """ + Create a new message. + + Args: + db: Database session + message: Message data + + Returns: + The created message + """ + db_message = Message( + sender_phone=message.sender_phone, + sender_name=message.sender_name, + message_body=message.message_body, + ) + db.add(db_message) + db.commit() + db.refresh(db_message) + return db_message + + +def mark_message_as_analyzed(db: Session, message_id: int) -> Message | None: + """ + Mark a message as analyzed. + + Args: + db: Database session + message_id: ID of the message to mark + + Returns: + The updated message if found, None otherwise + """ + db_message = get_message(db, message_id) + if db_message: + db_message.analyzed = True + db_message.last_analyzed_at = datetime.now() + db.commit() + db.refresh(db_message) + return db_message + + +def mark_messages_as_analyzed(db: Session, message_ids: list[int]) -> None: + """ + Mark multiple messages as analyzed. + + Args: + db: Database session + message_ids: List of message IDs to mark + """ + db.query(Message).filter(Message.id.in_(message_ids)).update( + { + Message.analyzed: True, + Message.last_analyzed_at: datetime.now(), + }, + synchronize_session=False, + ) + db.commit() diff --git a/app/utils/__init__.py b/app/utils/__init__.py new file mode 100644 index 0000000..2243616 --- /dev/null +++ b/app/utils/__init__.py @@ -0,0 +1,3 @@ +""" +Utility functions for the application. +""" diff --git a/app/utils/date_utils.py b/app/utils/date_utils.py new file mode 100644 index 0000000..f05ee7f --- /dev/null +++ b/app/utils/date_utils.py @@ -0,0 +1,47 @@ +""" +Date and time utility functions. +""" +from datetime import datetime, timedelta + + +def get_start_of_week(dt: datetime = None) -> datetime: + """ + Get the start of the week (Monday) for a given date. + + Args: + dt: The date to get the start of the week for. Defaults to current date. + + Returns: + Datetime representing the start of the week + """ + dt = dt or datetime.now() + return dt - timedelta(days=dt.weekday()) + + +def get_end_of_week(dt: datetime = None) -> datetime: + """ + Get the end of the week (Sunday) for a given date. + + Args: + dt: The date to get the end of the week for. Defaults to current date. + + Returns: + Datetime representing the end of the week + """ + dt = dt or datetime.now() + return get_start_of_week(dt) + timedelta(days=6) + + +def get_previous_week_range() -> tuple[datetime, datetime]: + """ + Get the date range for the previous week. + + Returns: + Tuple of (start_date, end_date) for the previous week + """ + today = datetime.now() + # Go back to the previous week + previous_week = today - timedelta(days=7) + start_date = get_start_of_week(previous_week) + end_date = get_end_of_week(previous_week) + return start_date, end_date diff --git a/main.py b/main.py new file mode 100644 index 0000000..6412972 --- /dev/null +++ b/main.py @@ -0,0 +1,54 @@ +""" +WhatsApp Message Analytics Service main application module. +""" +import uvicorn +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware + +from app.api.api_v1.api import api_router +from app.core.config import settings +from app.core.scheduler import start_scheduler + +app = FastAPI( + title=settings.PROJECT_NAME, + description=settings.PROJECT_DESCRIPTION, + version=settings.PROJECT_VERSION, + openapi_url=f"{settings.API_V1_STR}/openapi.json", + docs_url="/docs", + redoc_url="/redoc", +) + +# Set all CORS enabled origins +if settings.BACKEND_CORS_ORIGINS: + app.add_middleware( + CORSMiddleware, + allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + +# Include API router +app.include_router(api_router, prefix=settings.API_V1_STR) + + +@app.get("/health", tags=["health"]) +async def health_check(): + """ + Health check endpoint. + """ + return {"status": "healthy"} + + +@app.on_event("startup") +async def startup_event(): + """ + Actions to run on application startup. + """ + # Start the scheduler for weekend message analysis + if settings.ENABLE_SCHEDULER: + start_scheduler() + + +if __name__ == "__main__": + uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) diff --git a/migrations/README b/migrations/README new file mode 100644 index 0000000..e0d0858 --- /dev/null +++ b/migrations/README @@ -0,0 +1 @@ +Generic single-database configuration with an async dbapi. \ No newline at end of file diff --git a/migrations/env.py b/migrations/env.py new file mode 100644 index 0000000..2aa227f --- /dev/null +++ b/migrations/env.py @@ -0,0 +1,84 @@ +""" +Alembic environment configuration. +""" +from logging.config import fileConfig + +from alembic import context +from sqlalchemy import engine_from_config, pool + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# Import needed for autogenerate support +from app.db.base import Base # noqa: E402 + +# add your model's MetaData object here +# for 'autogenerate' support +target_metadata = Base.metadata + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations_offline() -> None: + """ + Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + render_as_batch=True, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online() -> None: + """ + Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + is_sqlite = connection.dialect.name == 'sqlite' + context.configure( + connection=connection, + target_metadata=target_metadata, + render_as_batch=is_sqlite, + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/migrations/script.py.mako b/migrations/script.py.mako new file mode 100644 index 0000000..1e4564e --- /dev/null +++ b/migrations/script.py.mako @@ -0,0 +1,24 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + + +def upgrade(): + ${upgrades if upgrades else "pass"} + + +def downgrade(): + ${downgrades if downgrades else "pass"} \ No newline at end of file diff --git a/migrations/versions/f97bceaaef6c_initial_migration.py b/migrations/versions/f97bceaaef6c_initial_migration.py new file mode 100644 index 0000000..1969386 --- /dev/null +++ b/migrations/versions/f97bceaaef6c_initial_migration.py @@ -0,0 +1,61 @@ +"""Initial migration + +Revision ID: f97bceaaef6c +Revises: +Create Date: 2023-09-14 12:00:00.000000 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = 'f97bceaaef6c' +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade(): + # Create message table + op.create_table( + 'message', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('sender_phone', sa.String(length=20), nullable=False), + sa.Column('sender_name', sa.String(length=255), nullable=False), + sa.Column('message_body', sa.Text(), nullable=False), + sa.Column('timestamp', sa.DateTime(), nullable=False), + sa.Column('analyzed', sa.Boolean(), nullable=True, default=False), + sa.Column('last_analyzed_at', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id'), + ) + op.create_index(op.f('ix_message_id'), 'message', ['id'], unique=False) + op.create_index(op.f('ix_message_sender_phone'), 'message', ['sender_phone'], unique=False) + op.create_index(op.f('ix_message_sender_name'), 'message', ['sender_name'], unique=False) + op.create_index(op.f('ix_message_timestamp'), 'message', ['timestamp'], unique=False) + op.create_index(op.f('ix_message_analyzed'), 'message', ['analyzed'], unique=False) + + # Create analysis table + op.create_table( + 'analysis', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('analysis_text', sa.Text(), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('start_date', sa.DateTime(), nullable=False), + sa.Column('end_date', sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint('id'), + ) + op.create_index(op.f('ix_analysis_id'), 'analysis', ['id'], unique=False) + op.create_index(op.f('ix_analysis_created_at'), 'analysis', ['created_at'], unique=False) + + +def downgrade(): + op.drop_index(op.f('ix_analysis_created_at'), table_name='analysis') + op.drop_index(op.f('ix_analysis_id'), table_name='analysis') + op.drop_table('analysis') + + op.drop_index(op.f('ix_message_analyzed'), table_name='message') + op.drop_index(op.f('ix_message_timestamp'), table_name='message') + op.drop_index(op.f('ix_message_sender_name'), table_name='message') + op.drop_index(op.f('ix_message_sender_phone'), table_name='message') + op.drop_index(op.f('ix_message_id'), table_name='message') + op.drop_table('message') diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..51dc412 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,50 @@ +[tool.ruff] +# Exclude a variety of commonly ignored directories +exclude = [ + ".bzr", + ".direnv", + ".eggs", + ".git", + ".git-rewrite", + ".hg", + ".mypy_cache", + ".nox", + ".pants.d", + ".pytype", + ".ruff_cache", + ".svn", + ".tox", + ".venv", + "__pypackages__", + "_build", + "buck-out", + "build", + "dist", + "node_modules", + "venv", +] + +# Same as Black +line-length = 88 + +# Assume Python 3.10 +target-version = "py310" + +[tool.ruff.lint] +# Enable pycodestyle (E), Pyflakes (F), isort (I), and various other linters +select = ["E", "F", "I", "N", "B", "A", "COM", "C4", "UP", "W"] +ignore = ["E501"] # Ignore line too long errors + +# Allow autofix for all enabled rules +fixable = ["ALL"] +unfixable = [] + +# Allow unused variables when underscore-prefixed +dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" + +[tool.ruff.lint.mccabe] +# Unlike Flake8, default to a complexity level of 10 +max-complexity = 10 + +[tool.ruff.lint.isort] +known-third-party = ["fastapi", "pydantic", "sqlalchemy", "alembic", "google"] \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..8961bba --- /dev/null +++ b/requirements.txt @@ -0,0 +1,15 @@ +fastapi>=0.100.0 +uvicorn>=0.23.0 +sqlalchemy>=2.0.0 +alembic>=1.11.0 +pydantic>=2.1.0 +pydantic-settings>=2.0.0 +python-dotenv>=1.0.0 +google-generativeai>=0.3.0 +jinja2>=3.1.0 +python-multipart>=0.0.6 +email-validator>=2.0.0 +apscheduler>=3.10.0 +httpx>=0.24.0 +ruff>=0.1.0 +pytest>=7.4.0 \ No newline at end of file