
- Add search parameter to get_todos CRUD function - Add search parameter to main todos endpoint - Create dedicated /search endpoint for title/description search - Update README with new search functionality - Fix datetime import issue in todo_stats function generated with BackendIM... (backend.im)
167 lines
4.8 KiB
Python
167 lines
4.8 KiB
Python
from sqlalchemy.orm import Session
|
|
from sqlalchemy import asc, desc
|
|
from typing import List, Optional, Dict, Any
|
|
from datetime import datetime, timedelta
|
|
|
|
from app.models.todo import Todo, PriorityEnum
|
|
from app.schemas.todo import TodoCreate, TodoUpdate
|
|
|
|
|
|
def get_todo(db: Session, todo_id: int) -> Optional[Todo]:
|
|
return db.query(Todo).filter(Todo.id == todo_id).first()
|
|
|
|
|
|
def get_todos(
|
|
db: Session,
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
category: Optional[str] = None,
|
|
priority: Optional[str] = None,
|
|
completed: Optional[bool] = None,
|
|
due_before: Optional[datetime] = None,
|
|
due_after: Optional[datetime] = None,
|
|
search: Optional[str] = None,
|
|
sort_by: str = "created_at",
|
|
sort_desc: bool = True
|
|
) -> List[Todo]:
|
|
query = db.query(Todo)
|
|
|
|
# Apply filters if provided
|
|
if category:
|
|
query = query.filter(Todo.category == category)
|
|
|
|
if priority:
|
|
try:
|
|
priority_enum = PriorityEnum[priority.upper()]
|
|
query = query.filter(Todo.priority == priority_enum)
|
|
except (KeyError, AttributeError):
|
|
pass # Invalid priority value, ignore filter
|
|
|
|
if completed is not None:
|
|
query = query.filter(Todo.completed == completed)
|
|
|
|
if due_before:
|
|
query = query.filter(Todo.due_date <= due_before)
|
|
|
|
if due_after:
|
|
query = query.filter(Todo.due_date >= due_after)
|
|
|
|
# Apply text search if provided
|
|
if search:
|
|
search_term = f"%{search}%"
|
|
query = query.filter(
|
|
(Todo.title.ilike(search_term)) |
|
|
(Todo.description.ilike(search_term))
|
|
)
|
|
|
|
# Apply sorting
|
|
if hasattr(Todo, sort_by):
|
|
sort_column = getattr(Todo, sort_by)
|
|
if sort_desc:
|
|
query = query.order_by(desc(sort_column))
|
|
else:
|
|
query = query.order_by(asc(sort_column))
|
|
else:
|
|
# Default sort by created_at if invalid column provided
|
|
query = query.order_by(desc(Todo.created_at))
|
|
|
|
# Apply pagination
|
|
return query.offset(skip).limit(limit).all()
|
|
|
|
|
|
def create_todo(db: Session, todo: TodoCreate) -> Todo:
|
|
todo_data = todo.model_dump()
|
|
db_todo = Todo(**todo_data)
|
|
db.add(db_todo)
|
|
db.commit()
|
|
db.refresh(db_todo)
|
|
return db_todo
|
|
|
|
|
|
def update_todo(db: Session, todo_id: int, todo: TodoUpdate) -> Optional[Todo]:
|
|
db_todo = get_todo(db, todo_id)
|
|
if db_todo is None:
|
|
return None
|
|
|
|
update_data = todo.model_dump(exclude_unset=True)
|
|
|
|
# If completed status is being set to True and there's no completion_date provided,
|
|
# set completion_date to current datetime
|
|
if update_data.get('completed') is True and 'completion_date' not in update_data:
|
|
update_data['completion_date'] = datetime.now()
|
|
|
|
# If completed status is being set to False, clear the completion_date
|
|
if update_data.get('completed') is False and db_todo.completed:
|
|
update_data['completion_date'] = None
|
|
|
|
for field, value in update_data.items():
|
|
setattr(db_todo, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_todo)
|
|
return db_todo
|
|
|
|
|
|
def delete_todo(db: Session, todo_id: int) -> bool:
|
|
db_todo = get_todo(db, todo_id)
|
|
if db_todo is None:
|
|
return False
|
|
|
|
db.delete(db_todo)
|
|
db.commit()
|
|
return True
|
|
|
|
|
|
def get_todo_stats(db: Session) -> Dict[str, Any]:
|
|
"""Get statistics about todos"""
|
|
total = db.query(Todo).count()
|
|
completed = db.query(Todo).filter(Todo.completed == True).count()
|
|
incomplete = total - completed
|
|
|
|
# Group by category
|
|
categories = db.query(
|
|
Todo.category,
|
|
db.func.count(Todo.id)
|
|
).group_by(
|
|
Todo.category
|
|
).all()
|
|
|
|
# Group by priority
|
|
priorities = db.query(
|
|
Todo.priority,
|
|
db.func.count(Todo.id)
|
|
).group_by(
|
|
Todo.priority
|
|
).all()
|
|
|
|
# Overdue todos (due date in past and not completed)
|
|
now = datetime.now()
|
|
overdue = db.query(Todo).filter(
|
|
Todo.due_date < now,
|
|
Todo.completed == False
|
|
).count()
|
|
|
|
# Get completed todos in last 24 hours
|
|
yesterday = now - timedelta(days=1)
|
|
completed_last_24h = db.query(Todo).filter(
|
|
Todo.completion_date > yesterday,
|
|
Todo.completed == True
|
|
).count()
|
|
|
|
# Get completed todos in last 7 days
|
|
last_week = now - timedelta(days=7)
|
|
completed_last_week = db.query(Todo).filter(
|
|
Todo.completion_date > last_week,
|
|
Todo.completed == True
|
|
).count()
|
|
|
|
return {
|
|
"total": total,
|
|
"completed": completed,
|
|
"incomplete": incomplete,
|
|
"overdue": overdue,
|
|
"completed_last_24h": completed_last_24h,
|
|
"completed_last_week": completed_last_week,
|
|
"by_category": {cat or "uncategorized": count for cat, count in categories},
|
|
"by_priority": {str(pri.name).lower() if pri else "none": count for pri, count in priorities}
|
|
} |