
- Set up FastAPI application structure - Implement SQLite database with SQLAlchemy - Create WhatsApp webhook endpoints - Implement message storage and analysis - Integrate Gemini 2.5 Pro for message analysis - Add email delivery of insights - Configure APScheduler for weekend analysis - Add linting with Ruff
145 lines
3.8 KiB
Python
145 lines
3.8 KiB
Python
"""
|
|
Service for analyzing WhatsApp messages.
|
|
"""
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.analysis import Analysis
|
|
from app.schemas.analysis import AnalysisCreate
|
|
from app.services.email_service import send_analysis_email
|
|
from app.services.gemini_service import analyze_messages
|
|
from app.services.message_service import (
|
|
get_messages_by_date_range,
|
|
mark_messages_as_analyzed,
|
|
)
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def create_analysis(db: Session, analysis: AnalysisCreate) -> Analysis:
|
|
"""
|
|
Create a new analysis.
|
|
|
|
Args:
|
|
db: Database session
|
|
analysis: Analysis data
|
|
|
|
Returns:
|
|
The created analysis
|
|
"""
|
|
db_analysis = Analysis(
|
|
analysis_text=analysis.analysis_text,
|
|
start_date=analysis.start_date,
|
|
end_date=analysis.end_date,
|
|
)
|
|
db.add(db_analysis)
|
|
db.commit()
|
|
db.refresh(db_analysis)
|
|
return db_analysis
|
|
|
|
|
|
def get_analysis(db: Session, analysis_id: int) -> Analysis | None:
|
|
"""
|
|
Get an analysis by ID.
|
|
|
|
Args:
|
|
db: Database session
|
|
analysis_id: ID of the analysis to retrieve
|
|
|
|
Returns:
|
|
The analysis if found, None otherwise
|
|
"""
|
|
return db.query(Analysis).filter(Analysis.id == analysis_id).first()
|
|
|
|
|
|
def get_all_analyses(db: Session, skip: int = 0, limit: int = 100) -> list[Analysis]:
|
|
"""
|
|
Get all analyses with pagination.
|
|
|
|
Args:
|
|
db: Database session
|
|
skip: Number of analyses to skip
|
|
limit: Maximum number of analyses to return
|
|
|
|
Returns:
|
|
List of analyses
|
|
"""
|
|
return (
|
|
db.query(Analysis)
|
|
.order_by(Analysis.created_at.desc())
|
|
.offset(skip)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
|
|
async def analyze_messages_and_send_insights() -> None:
|
|
"""
|
|
Analyze messages and send insights via email.
|
|
|
|
This function is intended to be run as a scheduled task.
|
|
"""
|
|
# Create a new database session
|
|
from app.db.session import SessionLocal
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Get the date range for analysis (last 7 days)
|
|
end_date = datetime.now()
|
|
start_date = end_date - timedelta(days=7)
|
|
|
|
# Get messages within the date range
|
|
messages = get_messages_by_date_range(db, start_date, end_date)
|
|
|
|
if not messages:
|
|
logger.info("No messages found for analysis")
|
|
return
|
|
|
|
# Extract message texts for analysis
|
|
message_texts = [
|
|
f"From: {msg.sender_name} ({msg.sender_phone})\nTime: {msg.timestamp}\nMessage: {msg.message_body}"
|
|
for msg in messages
|
|
]
|
|
|
|
# Analyze messages using Gemini
|
|
analysis_text = await analyze_messages(message_texts)
|
|
|
|
if not analysis_text:
|
|
logger.error("Failed to analyze messages")
|
|
return
|
|
|
|
# Create analysis record
|
|
analysis_data = AnalysisCreate(
|
|
analysis_text=analysis_text,
|
|
start_date=start_date,
|
|
end_date=end_date,
|
|
)
|
|
create_analysis(db, analysis_data)
|
|
|
|
# Mark messages as analyzed
|
|
message_ids = [msg.id for msg in messages]
|
|
mark_messages_as_analyzed(db, message_ids)
|
|
|
|
# Send email with insights
|
|
email_sent = await send_analysis_email(
|
|
subject=f"WhatsApp Message Analysis: {start_date.date()} to {end_date.date()}",
|
|
analysis_text=analysis_text,
|
|
start_date=start_date,
|
|
end_date=end_date,
|
|
)
|
|
|
|
if email_sent:
|
|
logger.info("Analysis email sent successfully")
|
|
else:
|
|
logger.error("Failed to send analysis email")
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Error in analyze_messages_and_send_insights: {str(e)}")
|
|
|
|
finally:
|
|
db.close()
|