
- Added category, priority and due_date fields to Todo model - Created Enum for priority levels (low, medium, high) - Added advanced filtering in CRUD and API routes - Added statistics endpoint for todo analytics - Created Alembic migration for new fields - Updated README with new feature documentation generated with BackendIM... (backend.im)
132 lines
3.5 KiB
Python
132 lines
3.5 KiB
Python
from sqlalchemy.orm import Session
|
|
from sqlalchemy import asc, desc
|
|
from typing import List, Optional, Dict, Any
|
|
from datetime import datetime
|
|
|
|
from app.models.todo import Todo, PriorityEnum
|
|
from app.schemas.todo import TodoCreate, TodoUpdate
|
|
|
|
|
|
def get_todo(db: Session, todo_id: int) -> Optional[Todo]:
|
|
return db.query(Todo).filter(Todo.id == todo_id).first()
|
|
|
|
|
|
def get_todos(
|
|
db: Session,
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
category: Optional[str] = None,
|
|
priority: Optional[str] = None,
|
|
completed: Optional[bool] = None,
|
|
due_before: Optional[datetime] = None,
|
|
due_after: Optional[datetime] = None,
|
|
sort_by: str = "created_at",
|
|
sort_desc: bool = True
|
|
) -> List[Todo]:
|
|
query = db.query(Todo)
|
|
|
|
# Apply filters if provided
|
|
if category:
|
|
query = query.filter(Todo.category == category)
|
|
|
|
if priority:
|
|
try:
|
|
priority_enum = PriorityEnum[priority.upper()]
|
|
query = query.filter(Todo.priority == priority_enum)
|
|
except (KeyError, AttributeError):
|
|
pass # Invalid priority value, ignore filter
|
|
|
|
if completed is not None:
|
|
query = query.filter(Todo.completed == completed)
|
|
|
|
if due_before:
|
|
query = query.filter(Todo.due_date <= due_before)
|
|
|
|
if due_after:
|
|
query = query.filter(Todo.due_date >= due_after)
|
|
|
|
# Apply sorting
|
|
if hasattr(Todo, sort_by):
|
|
sort_column = getattr(Todo, sort_by)
|
|
if sort_desc:
|
|
query = query.order_by(desc(sort_column))
|
|
else:
|
|
query = query.order_by(asc(sort_column))
|
|
else:
|
|
# Default sort by created_at if invalid column provided
|
|
query = query.order_by(desc(Todo.created_at))
|
|
|
|
# Apply pagination
|
|
return query.offset(skip).limit(limit).all()
|
|
|
|
|
|
def create_todo(db: Session, todo: TodoCreate) -> Todo:
|
|
todo_data = todo.model_dump()
|
|
db_todo = Todo(**todo_data)
|
|
db.add(db_todo)
|
|
db.commit()
|
|
db.refresh(db_todo)
|
|
return db_todo
|
|
|
|
|
|
def update_todo(db: Session, todo_id: int, todo: TodoUpdate) -> Optional[Todo]:
|
|
db_todo = get_todo(db, todo_id)
|
|
if db_todo is None:
|
|
return None
|
|
|
|
update_data = todo.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(db_todo, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_todo)
|
|
return db_todo
|
|
|
|
|
|
def delete_todo(db: Session, todo_id: int) -> bool:
|
|
db_todo = get_todo(db, todo_id)
|
|
if db_todo is None:
|
|
return False
|
|
|
|
db.delete(db_todo)
|
|
db.commit()
|
|
return True
|
|
|
|
|
|
def get_todo_stats(db: Session) -> Dict[str, Any]:
|
|
"""Get statistics about todos"""
|
|
total = db.query(Todo).count()
|
|
completed = db.query(Todo).filter(Todo.completed == True).count()
|
|
incomplete = total - completed
|
|
|
|
# Group by category
|
|
categories = db.query(
|
|
Todo.category,
|
|
db.func.count(Todo.id)
|
|
).group_by(
|
|
Todo.category
|
|
).all()
|
|
|
|
# Group by priority
|
|
priorities = db.query(
|
|
Todo.priority,
|
|
db.func.count(Todo.id)
|
|
).group_by(
|
|
Todo.priority
|
|
).all()
|
|
|
|
# Overdue todos (due date in past and not completed)
|
|
now = datetime.now()
|
|
overdue = db.query(Todo).filter(
|
|
Todo.due_date < now,
|
|
Todo.completed == False
|
|
).count()
|
|
|
|
return {
|
|
"total": total,
|
|
"completed": completed,
|
|
"incomplete": incomplete,
|
|
"overdue": overdue,
|
|
"by_category": {cat or "uncategorized": count for cat, count in categories},
|
|
"by_priority": {str(pri.name).lower() if pri else "none": count for pri, count in priorities}
|
|
} |