
- Set up FastAPI application structure - Implement SQLite database with SQLAlchemy - Create WhatsApp webhook endpoints - Implement message storage and analysis - Integrate Gemini 2.5 Pro for message analysis - Add email delivery of insights - Configure APScheduler for weekend analysis - Add linting with Ruff
134 lines
3.1 KiB
Python
134 lines
3.1 KiB
Python
"""
|
|
Service for managing WhatsApp messages.
|
|
"""
|
|
from datetime import datetime
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.message import Message
|
|
from app.schemas.message import MessageCreate
|
|
|
|
|
|
def get_message(db: Session, message_id: int) -> Message | None:
|
|
"""
|
|
Get a message by ID.
|
|
|
|
Args:
|
|
db: Database session
|
|
message_id: ID of the message to retrieve
|
|
|
|
Returns:
|
|
The message if found, None otherwise
|
|
"""
|
|
return db.query(Message).filter(Message.id == message_id).first()
|
|
|
|
|
|
def get_all_messages(db: Session, skip: int = 0, limit: int = 100) -> list[Message]:
|
|
"""
|
|
Get all messages with pagination.
|
|
|
|
Args:
|
|
db: Database session
|
|
skip: Number of messages to skip
|
|
limit: Maximum number of messages to return
|
|
|
|
Returns:
|
|
List of messages
|
|
"""
|
|
return db.query(Message).order_by(Message.timestamp.desc()).offset(skip).limit(limit).all()
|
|
|
|
|
|
def get_unanalyzed_messages(db: Session) -> list[Message]:
|
|
"""
|
|
Get all messages that have not been analyzed yet.
|
|
|
|
Args:
|
|
db: Database session
|
|
|
|
Returns:
|
|
List of unanalyzed messages
|
|
"""
|
|
return db.query(Message).filter(not Message.analyzed).all()
|
|
|
|
|
|
def get_messages_by_date_range(
|
|
db: Session, start_date: datetime, end_date: datetime,
|
|
) -> list[Message]:
|
|
"""
|
|
Get messages within a date range.
|
|
|
|
Args:
|
|
db: Database session
|
|
start_date: Start date for filtering messages
|
|
end_date: End date for filtering messages
|
|
|
|
Returns:
|
|
List of messages within the date range
|
|
"""
|
|
return (
|
|
db.query(Message)
|
|
.filter(Message.timestamp >= start_date, Message.timestamp <= end_date)
|
|
.order_by(Message.timestamp)
|
|
.all()
|
|
)
|
|
|
|
|
|
def create_message(db: Session, message: MessageCreate) -> Message:
|
|
"""
|
|
Create a new message.
|
|
|
|
Args:
|
|
db: Database session
|
|
message: Message data
|
|
|
|
Returns:
|
|
The created message
|
|
"""
|
|
db_message = Message(
|
|
sender_phone=message.sender_phone,
|
|
sender_name=message.sender_name,
|
|
message_body=message.message_body,
|
|
)
|
|
db.add(db_message)
|
|
db.commit()
|
|
db.refresh(db_message)
|
|
return db_message
|
|
|
|
|
|
def mark_message_as_analyzed(db: Session, message_id: int) -> Message | None:
|
|
"""
|
|
Mark a message as analyzed.
|
|
|
|
Args:
|
|
db: Database session
|
|
message_id: ID of the message to mark
|
|
|
|
Returns:
|
|
The updated message if found, None otherwise
|
|
"""
|
|
db_message = get_message(db, message_id)
|
|
if db_message:
|
|
db_message.analyzed = True
|
|
db_message.last_analyzed_at = datetime.now()
|
|
db.commit()
|
|
db.refresh(db_message)
|
|
return db_message
|
|
|
|
|
|
def mark_messages_as_analyzed(db: Session, message_ids: list[int]) -> None:
|
|
"""
|
|
Mark multiple messages as analyzed.
|
|
|
|
Args:
|
|
db: Database session
|
|
message_ids: List of message IDs to mark
|
|
"""
|
|
db.query(Message).filter(Message.id.in_(message_ids)).update(
|
|
{
|
|
Message.analyzed: True,
|
|
Message.last_analyzed_at: datetime.now(),
|
|
},
|
|
synchronize_session=False,
|
|
)
|
|
db.commit()
|