Automated Action 66410bdab5 Implement comprehensive due date functionality for todos
Added due_date field to TodoBase, TodoCreate, TodoUpdate schemas with proper validation
and timezone handling. Included computed fields is_overdue and days_until_due
for enhanced todo management capabilities.
2025-06-19 13:20:33 +00:00

359 lines
11 KiB
Python

from typing import List, Optional, Tuple
from datetime import datetime
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from app.models.todo import Todo, Priority
from app.models.tag import Tag
from app.schemas.todo import TodoCreate, TodoUpdate, SubtaskCreate
from app.utils.date_utils import (
get_timezone_aware_now,
get_date_range_today,
get_date_range_this_week,
get_date_range_next_week,
get_date_range_next_days,
get_overdue_cutoff,
get_due_soon_cutoff,
is_overdue,
)
def get_todo(db: Session, todo_id: int) -> Optional[Todo]:
return db.query(Todo).filter(Todo.id == todo_id).first()
def get_todos(
db: Session,
skip: int = 0,
limit: int = 100,
completed: Optional[bool] = None,
priority: Optional[Priority] = None,
search: Optional[str] = None,
category_id: Optional[int] = None,
project_id: Optional[int] = None,
parent_id: Optional[int] = None,
include_subtasks: bool = True,
due_date_filter: Optional[str] = None,
overdue: Optional[bool] = None,
sort_by_due_date: bool = False,
due_date_start: Optional[datetime] = None,
due_date_end: Optional[datetime] = None,
) -> Tuple[List[Todo], int]:
query = db.query(Todo)
# Filter by parent_id to get only main todos or subtasks
if parent_id is not None:
query = query.filter(Todo.parent_id == parent_id)
elif not include_subtasks:
# Only get main todos (no subtasks) if parent_id is None and include_subtasks is False
query = query.filter(Todo.parent_id.is_(None))
# Apply filters
if completed is not None:
query = query.filter(Todo.completed == completed)
if priority is not None:
query = query.filter(Todo.priority == priority)
if category_id is not None:
query = query.filter(Todo.category_id == category_id)
if project_id is not None:
query = query.filter(Todo.project_id == project_id)
if search:
query = query.filter(
Todo.title.contains(search) | Todo.description.contains(search)
)
# Due date filtering
if due_date_filter:
if due_date_filter == "today":
start_date, end_date = get_date_range_today()
query = query.filter(
and_(Todo.due_date >= start_date, Todo.due_date <= end_date)
)
elif due_date_filter == "this_week":
start_date, end_date = get_date_range_this_week()
query = query.filter(
and_(Todo.due_date >= start_date, Todo.due_date <= end_date)
)
elif due_date_filter == "next_week":
start_date, end_date = get_date_range_next_week()
query = query.filter(
and_(Todo.due_date >= start_date, Todo.due_date <= end_date)
)
# Custom date range filtering
if due_date_start:
query = query.filter(Todo.due_date >= due_date_start)
if due_date_end:
query = query.filter(Todo.due_date <= due_date_end)
# Overdue filtering
if overdue is not None:
current_time = get_overdue_cutoff()
if overdue:
query = query.filter(
and_(Todo.due_date.isnot(None), Todo.due_date < current_time)
)
else:
query = query.filter(
or_(Todo.due_date.is_(None), Todo.due_date >= current_time)
)
# Get total count before pagination
total = query.count()
# Apply pagination and ordering
if sort_by_due_date:
# Sort by due_date ascending (earliest first), with None values at the end
todos = (
query.order_by(Todo.due_date.asc().nullslast(), Todo.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
else:
todos = query.order_by(Todo.created_at.desc()).offset(skip).limit(limit).all()
return todos, total
def get_overdue_todos(
db: Session,
skip: int = 0,
limit: int = 100,
completed: Optional[bool] = None,
priority: Optional[Priority] = None,
category_id: Optional[int] = None,
project_id: Optional[int] = None,
parent_id: Optional[int] = None,
include_subtasks: bool = True,
) -> Tuple[List[Todo], int]:
"""Get all overdue todos."""
current_time = get_overdue_cutoff()
query = db.query(Todo).filter(
and_(Todo.due_date.isnot(None), Todo.due_date < current_time)
)
# Apply same filtering logic as get_todos
if parent_id is not None:
query = query.filter(Todo.parent_id == parent_id)
elif not include_subtasks:
query = query.filter(Todo.parent_id.is_(None))
if completed is not None:
query = query.filter(Todo.completed == completed)
if priority is not None:
query = query.filter(Todo.priority == priority)
if category_id is not None:
query = query.filter(Todo.category_id == category_id)
if project_id is not None:
query = query.filter(Todo.project_id == project_id)
total = query.count()
todos = (
query.order_by(Todo.due_date.asc(), Todo.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return todos, total
def get_todos_due_soon(
db: Session,
days: int = 7,
skip: int = 0,
limit: int = 100,
completed: Optional[bool] = None,
priority: Optional[Priority] = None,
category_id: Optional[int] = None,
project_id: Optional[int] = None,
parent_id: Optional[int] = None,
include_subtasks: bool = True,
) -> Tuple[List[Todo], int]:
"""Get todos due within the next N days (default 7 days)."""
current_time = get_timezone_aware_now()
future_cutoff = get_due_soon_cutoff(days)
query = db.query(Todo).filter(
and_(
Todo.due_date.isnot(None),
Todo.due_date >= current_time,
Todo.due_date <= future_cutoff,
)
)
# Apply same filtering logic as get_todos
if parent_id is not None:
query = query.filter(Todo.parent_id == parent_id)
elif not include_subtasks:
query = query.filter(Todo.parent_id.is_(None))
if completed is not None:
query = query.filter(Todo.completed == completed)
if priority is not None:
query = query.filter(Todo.priority == priority)
if category_id is not None:
query = query.filter(Todo.category_id == category_id)
if project_id is not None:
query = query.filter(Todo.project_id == project_id)
total = query.count()
todos = (
query.order_by(Todo.due_date.asc(), Todo.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return todos, total
def get_todos_by_date_range(
db: Session,
start_date: datetime,
end_date: datetime,
skip: int = 0,
limit: int = 100,
completed: Optional[bool] = None,
priority: Optional[Priority] = None,
category_id: Optional[int] = None,
project_id: Optional[int] = None,
parent_id: Optional[int] = None,
include_subtasks: bool = True,
) -> Tuple[List[Todo], int]:
"""Get todos within a specific date range."""
query = db.query(Todo).filter(
and_(
Todo.due_date.isnot(None),
Todo.due_date >= start_date,
Todo.due_date <= end_date,
)
)
# Apply same filtering logic as get_todos
if parent_id is not None:
query = query.filter(Todo.parent_id == parent_id)
elif not include_subtasks:
query = query.filter(Todo.parent_id.is_(None))
if completed is not None:
query = query.filter(Todo.completed == completed)
if priority is not None:
query = query.filter(Todo.priority == priority)
if category_id is not None:
query = query.filter(Todo.category_id == category_id)
if project_id is not None:
query = query.filter(Todo.project_id == project_id)
total = query.count()
todos = (
query.order_by(Todo.due_date.asc(), Todo.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return todos, total
def create_todo(db: Session, todo: TodoCreate) -> Todo:
todo_data = todo.model_dump(exclude={"tag_ids"})
db_todo = Todo(**todo_data)
# Handle tags if provided
if todo.tag_ids:
tags = db.query(Tag).filter(Tag.id.in_(todo.tag_ids)).all()
db_todo.tags = tags
db.add(db_todo)
db.commit()
db.refresh(db_todo)
return db_todo
def update_todo(db: Session, todo_id: int, todo_update: TodoUpdate) -> Optional[Todo]:
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo:
update_data = todo_update.model_dump(exclude_unset=True, exclude={"tag_ids"})
# Handle tags if provided
if todo_update.tag_ids is not None:
tags = db.query(Tag).filter(Tag.id.in_(todo_update.tag_ids)).all()
db_todo.tags = tags
# Update other fields
for field, value in update_data.items():
setattr(db_todo, field, value)
db.commit()
db.refresh(db_todo)
return db_todo
def delete_todo(db: Session, todo_id: int) -> bool:
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo:
db.delete(db_todo)
db.commit()
return True
return False
def get_subtasks(db: Session, parent_id: int) -> List[Todo]:
"""Get all subtasks for a given parent todo."""
return (
db.query(Todo)
.filter(Todo.parent_id == parent_id)
.order_by(Todo.created_at.desc())
.all()
)
def create_subtask(
db: Session, parent_id: int, subtask: SubtaskCreate
) -> Optional[Todo]:
"""Create a subtask for a given parent todo."""
# Check if parent exists
parent_todo = db.query(Todo).filter(Todo.id == parent_id).first()
if not parent_todo:
return None
# Create subtask with parent_id
subtask_data = subtask.model_dump()
subtask_data["parent_id"] = parent_id
db_subtask = Todo(**subtask_data)
db.add(db_subtask)
db.commit()
db.refresh(db_subtask)
return db_subtask
def move_subtask(
db: Session, subtask_id: int, new_parent_id: Optional[int]
) -> Optional[Todo]:
"""Move a subtask to a different parent or make it a main todo."""
subtask = db.query(Todo).filter(Todo.id == subtask_id).first()
if not subtask:
return None
# If new_parent_id is provided, check if it exists and is not the subtask itself
if new_parent_id is not None:
if new_parent_id == subtask_id:
return None # Cannot make a todo a subtask of itself
new_parent = db.query(Todo).filter(Todo.id == new_parent_id).first()
if not new_parent:
return None # New parent doesn't exist
# Prevent creating cycles (subtask cannot become parent of its current parent)
if new_parent.parent_id == subtask_id:
return None
# Update the parent_id
subtask.parent_id = new_parent_id
db.commit()
db.refresh(subtask)
return subtask