Automated Action d993db2f17 Implement subtasks functionality for todo system
- Add parent_id field to Todo model with self-referential foreign key
- Add parent/children relationships and is_subtask property
- Update TodoCreate/TodoUpdate schemas to include parent_id
- Add subtasks list to Todo schema and create SubtaskCreate schema
- Enhance get_todos CRUD function with parent_id filtering
- Add subtask-specific CRUD functions: get_subtasks, create_subtask, move_subtask
- Add API endpoints for subtask management
- Create migration for adding parent_id column
- Update imports and fix circular dependencies
- Ensure proper cycle prevention and validation

Features added:
- GET /todos/{todo_id}/subtasks - Get all subtasks for a todo
- POST /todos/{todo_id}/subtasks - Create a new subtask
- PUT /subtasks/{subtask_id}/move - Move subtask or convert to main todo
- Query parameter parent_id for filtering by parent
- Query parameter include_subtasks for excluding subtasks from main list
2025-06-19 00:04:18 +00:00

93 lines
2.6 KiB
Python

from typing import List, Optional
from sqlalchemy.orm import Session
from sqlalchemy import func
from app.models.tag import Tag
from app.models.todo import Todo
from app.schemas.tag import TagCreate, TagUpdate
def get_tag(db: Session, tag_id: int) -> Optional[Tag]:
"""Get a single tag by ID."""
return db.query(Tag).filter(Tag.id == tag_id).first()
def get_tag_by_name(db: Session, name: str) -> Optional[Tag]:
"""Get a tag by name."""
return db.query(Tag).filter(Tag.name == name).first()
def get_tags(db: Session, skip: int = 0, limit: int = 100) -> List[Tag]:
"""Get multiple tags with pagination."""
return db.query(Tag).offset(skip).limit(limit).all()
def get_tags_count(db: Session) -> int:
"""Get total count of tags."""
return db.query(func.count(Tag.id)).scalar()
def create_tag(db: Session, tag: TagCreate) -> Tag:
"""Create a new tag."""
db_tag = Tag(name=tag.name, color=tag.color)
db.add(db_tag)
db.commit()
db.refresh(db_tag)
return db_tag
def update_tag(db: Session, tag_id: int, tag: TagUpdate) -> Optional[Tag]:
"""Update an existing tag."""
db_tag = db.query(Tag).filter(Tag.id == tag_id).first()
if db_tag:
update_data = tag.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_tag, field, value)
db.commit()
db.refresh(db_tag)
return db_tag
def delete_tag(db: Session, tag_id: int) -> bool:
"""Delete a tag."""
db_tag = db.query(Tag).filter(Tag.id == tag_id).first()
if db_tag:
db.delete(db_tag)
db.commit()
return True
return False
def add_tag_to_todo(db: Session, todo_id: int, tag_id: int) -> bool:
"""Add a tag to a todo."""
todo = db.query(Todo).filter(Todo.id == todo_id).first()
tag = db.query(Tag).filter(Tag.id == tag_id).first()
if todo and tag and tag not in todo.tags:
todo.tags.append(tag)
db.commit()
return True
return False
def remove_tag_from_todo(db: Session, todo_id: int, tag_id: int) -> bool:
"""Remove a tag from a todo."""
todo = db.query(Todo).filter(Todo.id == todo_id).first()
tag = db.query(Tag).filter(Tag.id == tag_id).first()
if todo and tag and tag in todo.tags:
todo.tags.remove(tag)
db.commit()
return True
return False
def get_todos_by_tag(
db: Session, tag_id: int, skip: int = 0, limit: int = 100
) -> List[Todo]:
"""Get all todos that have a specific tag."""
tag = db.query(Tag).filter(Tag.id == tag_id).first()
if tag:
return tag.todos[skip : skip + limit]
return []