
- Add parent_id field to Todo model with self-referential foreign key - Add parent/children relationships and is_subtask property - Update TodoCreate/TodoUpdate schemas to include parent_id - Add subtasks list to Todo schema and create SubtaskCreate schema - Enhance get_todos CRUD function with parent_id filtering - Add subtask-specific CRUD functions: get_subtasks, create_subtask, move_subtask - Add API endpoints for subtask management - Create migration for adding parent_id column - Update imports and fix circular dependencies - Ensure proper cycle prevention and validation Features added: - GET /todos/{todo_id}/subtasks - Get all subtasks for a todo - POST /todos/{todo_id}/subtasks - Create a new subtask - PUT /subtasks/{subtask_id}/move - Move subtask or convert to main todo - Query parameter parent_id for filtering by parent - Query parameter include_subtasks for excluding subtasks from main list
156 lines
4.6 KiB
Python
156 lines
4.6 KiB
Python
from typing import List, Optional, Tuple
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.todo import Todo, Priority
|
|
from app.models.tag import Tag
|
|
from app.schemas.todo import TodoCreate, TodoUpdate, SubtaskCreate
|
|
|
|
|
|
def get_todo(db: Session, todo_id: int) -> Optional[Todo]:
|
|
return db.query(Todo).filter(Todo.id == todo_id).first()
|
|
|
|
|
|
def get_todos(
|
|
db: Session,
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
completed: Optional[bool] = None,
|
|
priority: Optional[Priority] = None,
|
|
search: Optional[str] = None,
|
|
category_id: Optional[int] = None,
|
|
project_id: Optional[int] = None,
|
|
parent_id: Optional[int] = None,
|
|
include_subtasks: bool = True,
|
|
) -> Tuple[List[Todo], int]:
|
|
query = db.query(Todo)
|
|
|
|
# Filter by parent_id to get only main todos or subtasks
|
|
if parent_id is not None:
|
|
query = query.filter(Todo.parent_id == parent_id)
|
|
elif not include_subtasks:
|
|
# Only get main todos (no subtasks) if parent_id is None and include_subtasks is False
|
|
query = query.filter(Todo.parent_id.is_(None))
|
|
|
|
# Apply filters
|
|
if completed is not None:
|
|
query = query.filter(Todo.completed == completed)
|
|
if priority is not None:
|
|
query = query.filter(Todo.priority == priority)
|
|
if category_id is not None:
|
|
query = query.filter(Todo.category_id == category_id)
|
|
if project_id is not None:
|
|
query = query.filter(Todo.project_id == project_id)
|
|
if search:
|
|
query = query.filter(
|
|
Todo.title.contains(search) | Todo.description.contains(search)
|
|
)
|
|
|
|
# Get total count before pagination
|
|
total = query.count()
|
|
|
|
# Apply pagination and ordering
|
|
todos = query.order_by(Todo.created_at.desc()).offset(skip).limit(limit).all()
|
|
|
|
return todos, total
|
|
|
|
|
|
def create_todo(db: Session, todo: TodoCreate) -> Todo:
|
|
todo_data = todo.model_dump(exclude={"tag_ids"})
|
|
db_todo = Todo(**todo_data)
|
|
|
|
# Handle tags if provided
|
|
if todo.tag_ids:
|
|
tags = db.query(Tag).filter(Tag.id.in_(todo.tag_ids)).all()
|
|
db_todo.tags = tags
|
|
|
|
db.add(db_todo)
|
|
db.commit()
|
|
db.refresh(db_todo)
|
|
return db_todo
|
|
|
|
|
|
def update_todo(db: Session, todo_id: int, todo_update: TodoUpdate) -> Optional[Todo]:
|
|
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
|
|
if db_todo:
|
|
update_data = todo_update.model_dump(exclude_unset=True, exclude={"tag_ids"})
|
|
|
|
# Handle tags if provided
|
|
if todo_update.tag_ids is not None:
|
|
tags = db.query(Tag).filter(Tag.id.in_(todo_update.tag_ids)).all()
|
|
db_todo.tags = tags
|
|
|
|
# Update other fields
|
|
for field, value in update_data.items():
|
|
setattr(db_todo, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_todo)
|
|
return db_todo
|
|
|
|
|
|
def delete_todo(db: Session, todo_id: int) -> bool:
|
|
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
|
|
if db_todo:
|
|
db.delete(db_todo)
|
|
db.commit()
|
|
return True
|
|
return False
|
|
|
|
|
|
def get_subtasks(db: Session, parent_id: int) -> List[Todo]:
|
|
"""Get all subtasks for a given parent todo."""
|
|
return (
|
|
db.query(Todo)
|
|
.filter(Todo.parent_id == parent_id)
|
|
.order_by(Todo.created_at.desc())
|
|
.all()
|
|
)
|
|
|
|
|
|
def create_subtask(
|
|
db: Session, parent_id: int, subtask: SubtaskCreate
|
|
) -> Optional[Todo]:
|
|
"""Create a subtask for a given parent todo."""
|
|
# Check if parent exists
|
|
parent_todo = db.query(Todo).filter(Todo.id == parent_id).first()
|
|
if not parent_todo:
|
|
return None
|
|
|
|
# Create subtask with parent_id
|
|
subtask_data = subtask.model_dump()
|
|
subtask_data["parent_id"] = parent_id
|
|
db_subtask = Todo(**subtask_data)
|
|
|
|
db.add(db_subtask)
|
|
db.commit()
|
|
db.refresh(db_subtask)
|
|
return db_subtask
|
|
|
|
|
|
def move_subtask(
|
|
db: Session, subtask_id: int, new_parent_id: Optional[int]
|
|
) -> Optional[Todo]:
|
|
"""Move a subtask to a different parent or make it a main todo."""
|
|
subtask = db.query(Todo).filter(Todo.id == subtask_id).first()
|
|
if not subtask:
|
|
return None
|
|
|
|
# If new_parent_id is provided, check if it exists and is not the subtask itself
|
|
if new_parent_id is not None:
|
|
if new_parent_id == subtask_id:
|
|
return None # Cannot make a todo a subtask of itself
|
|
|
|
new_parent = db.query(Todo).filter(Todo.id == new_parent_id).first()
|
|
if not new_parent:
|
|
return None # New parent doesn't exist
|
|
|
|
# Prevent creating cycles (subtask cannot become parent of its current parent)
|
|
if new_parent.parent_id == subtask_id:
|
|
return None
|
|
|
|
# Update the parent_id
|
|
subtask.parent_id = new_parent_id
|
|
db.commit()
|
|
db.refresh(subtask)
|
|
return subtask
|