Automated Action ef749e4878 Implement WhatsApp Message Analytics Service
- Set up FastAPI application structure
- Implement SQLite database with SQLAlchemy
- Create WhatsApp webhook endpoints
- Implement message storage and analysis
- Integrate Gemini 2.5 Pro for message analysis
- Add email delivery of insights
- Configure APScheduler for weekend analysis
- Add linting with Ruff
2025-05-22 13:29:12 +00:00

145 lines
3.8 KiB
Python

"""
Service for analyzing WhatsApp messages.
"""
import logging
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from app.models.analysis import Analysis
from app.schemas.analysis import AnalysisCreate
from app.services.email_service import send_analysis_email
from app.services.gemini_service import analyze_messages
from app.services.message_service import (
get_messages_by_date_range,
mark_messages_as_analyzed,
)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def create_analysis(db: Session, analysis: AnalysisCreate) -> Analysis:
"""
Create a new analysis.
Args:
db: Database session
analysis: Analysis data
Returns:
The created analysis
"""
db_analysis = Analysis(
analysis_text=analysis.analysis_text,
start_date=analysis.start_date,
end_date=analysis.end_date,
)
db.add(db_analysis)
db.commit()
db.refresh(db_analysis)
return db_analysis
def get_analysis(db: Session, analysis_id: int) -> Analysis | None:
"""
Get an analysis by ID.
Args:
db: Database session
analysis_id: ID of the analysis to retrieve
Returns:
The analysis if found, None otherwise
"""
return db.query(Analysis).filter(Analysis.id == analysis_id).first()
def get_all_analyses(db: Session, skip: int = 0, limit: int = 100) -> list[Analysis]:
"""
Get all analyses with pagination.
Args:
db: Database session
skip: Number of analyses to skip
limit: Maximum number of analyses to return
Returns:
List of analyses
"""
return (
db.query(Analysis)
.order_by(Analysis.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
async def analyze_messages_and_send_insights() -> None:
"""
Analyze messages and send insights via email.
This function is intended to be run as a scheduled task.
"""
# Create a new database session
from app.db.session import SessionLocal
db = SessionLocal()
try:
# Get the date range for analysis (last 7 days)
end_date = datetime.now()
start_date = end_date - timedelta(days=7)
# Get messages within the date range
messages = get_messages_by_date_range(db, start_date, end_date)
if not messages:
logger.info("No messages found for analysis")
return
# Extract message texts for analysis
message_texts = [
f"From: {msg.sender_name} ({msg.sender_phone})\nTime: {msg.timestamp}\nMessage: {msg.message_body}"
for msg in messages
]
# Analyze messages using Gemini
analysis_text = await analyze_messages(message_texts)
if not analysis_text:
logger.error("Failed to analyze messages")
return
# Create analysis record
analysis_data = AnalysisCreate(
analysis_text=analysis_text,
start_date=start_date,
end_date=end_date,
)
create_analysis(db, analysis_data)
# Mark messages as analyzed
message_ids = [msg.id for msg in messages]
mark_messages_as_analyzed(db, message_ids)
# Send email with insights
email_sent = await send_analysis_email(
subject=f"WhatsApp Message Analysis: {start_date.date()} to {end_date.date()}",
analysis_text=analysis_text,
start_date=start_date,
end_date=end_date,
)
if email_sent:
logger.info("Analysis email sent successfully")
else:
logger.error("Failed to send analysis email")
except Exception as e:
logger.exception(f"Error in analyze_messages_and_send_insights: {str(e)}")
finally:
db.close()