Implement subtasks functionality for todo system

- Add parent_id field to Todo model with self-referential foreign key
- Add parent/children relationships and is_subtask property
- Update TodoCreate/TodoUpdate schemas to include parent_id
- Add subtasks list to Todo schema and create SubtaskCreate schema
- Enhance get_todos CRUD function with parent_id filtering
- Add subtask-specific CRUD functions: get_subtasks, create_subtask, move_subtask
- Add API endpoints for subtask management
- Create migration for adding parent_id column
- Update imports and fix circular dependencies
- Ensure proper cycle prevention and validation

Features added:
- GET /todos/{todo_id}/subtasks - Get all subtasks for a todo
- POST /todos/{todo_id}/subtasks - Create a new subtask
- PUT /subtasks/{subtask_id}/move - Move subtask or convert to main todo
- Query parameter parent_id for filtering by parent
- Query parameter include_subtasks for excluding subtasks from main list
This commit is contained in:
Automated Action 2025-06-19 00:04:18 +00:00
parent 99faaaeaf8
commit d993db2f17
25 changed files with 1173 additions and 11 deletions

View File

@ -0,0 +1,54 @@
"""Add categories table and category_id to todos
Revision ID: 003
Revises: 002
Create Date: 2025-06-18 10:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "003"
down_revision = "002"
branch_labels = None
depends_on = None
def upgrade():
# Create categories table
op.create_table(
"categories",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("name", sa.String(100), nullable=False),
sa.Column("description", sa.String(500), nullable=True),
sa.Column("color", sa.String(7), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("CURRENT_TIMESTAMP"),
nullable=True,
),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("name"),
)
op.create_index(op.f("ix_categories_id"), "categories", ["id"], unique=False)
# Add category_id column to todos table
op.add_column("todos", sa.Column("category_id", sa.Integer(), nullable=True))
op.create_foreign_key(
"fk_todos_category_id", "todos", "categories", ["category_id"], ["id"]
)
def downgrade():
# Remove foreign key and category_id column from todos
op.drop_constraint("fk_todos_category_id", "todos", type_="foreignkey")
op.drop_column("todos", "category_id")
# Drop categories table
op.drop_index(op.f("ix_categories_id"), table_name="categories")
op.drop_table("categories")

View File

@ -0,0 +1,43 @@
"""Add subtasks support with parent_id field
Revision ID: 003
Revises: 002
Create Date: 2024-01-01 00:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "003"
down_revision = "002"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"todos",
sa.Column("parent_id", sa.Integer(), nullable=True),
)
# Add foreign key constraint
op.create_foreign_key(
"fk_todos_parent_id",
"todos",
"todos",
["parent_id"],
["id"],
ondelete="CASCADE",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# Drop foreign key constraint first
op.drop_constraint("fk_todos_parent_id", "todos", type_="foreignkey")
# Drop the column
op.drop_column("todos", "parent_id")
# ### end Alembic commands ###

View File

@ -0,0 +1,57 @@
"""Add tags and todo_tags tables
Revision ID: 004_add_tags
Revises: 003_add_project_category
Create Date: 2025-06-18 12:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "004_add_tags"
down_revision: Union[str, None] = "003_add_project_category"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Create tags table
op.create_table(
"tags",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("name", sa.String(length=50), nullable=False),
sa.Column("color", sa.String(length=7), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=True,
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("name"),
)
op.create_index(op.f("ix_tags_id"), "tags", ["id"], unique=False)
# Create todo_tags association table
op.create_table(
"todo_tags",
sa.Column("todo_id", sa.Integer(), nullable=False),
sa.Column("tag_id", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(["tag_id"], ["tags.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["todo_id"], ["todos.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("todo_id", "tag_id"),
)
def downgrade() -> None:
# Drop todo_tags association table
op.drop_table("todo_tags")
# Drop tags table
op.drop_index(op.f("ix_tags_id"), table_name="tags")
op.drop_table("tags")

View File

@ -0,0 +1,57 @@
"""Add projects table and project_id to todos
Revision ID: 005_add_projects
Revises: 004_add_tags
Create Date: 2025-06-18 15:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "005_add_projects"
down_revision: Union[str, None] = "004_add_tags"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Create projects table
op.create_table(
"projects",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("name", sa.String(length=200), nullable=False),
sa.Column("description", sa.String(length=500), nullable=True),
sa.Column(
"status", sa.Enum("ACTIVE", "ARCHIVED", name="projectstatus"), nullable=True
),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=True,
),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_projects_id"), "projects", ["id"], unique=False)
# Add project_id column to todos table
op.add_column("todos", sa.Column("project_id", sa.Integer(), nullable=True))
op.create_foreign_key(
"fk_todos_project_id", "todos", "projects", ["project_id"], ["id"]
)
def downgrade() -> None:
# Remove foreign key and project_id column from todos
op.drop_constraint("fk_todos_project_id", "todos", type_="foreignkey")
op.drop_column("todos", "project_id")
# Drop projects table
op.drop_index(op.f("ix_projects_id"), table_name="projects")
op.drop_table("projects")

View File

@ -1,7 +1,9 @@
from fastapi import APIRouter
from .todos import router as todos_router
from .categories import router as categories_router
api_router = APIRouter()
api_router.include_router(todos_router)
api_router.include_router(todos_router, prefix="/todos", tags=["todos"])
api_router.include_router(categories_router, prefix="/categories", tags=["categories"])
__all__ = ["api_router"]

87
app/api/v1/categories.py Normal file
View File

@ -0,0 +1,87 @@
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.crud import category as category_crud
from app.schemas.category import Category, CategoryCreate, CategoryUpdate
router = APIRouter()
@router.get("/", response_model=List[Category])
def read_categories(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
"""
Retrieve categories.
"""
categories = category_crud.get_categories(db, skip=skip, limit=limit)
return categories
@router.post("/", response_model=Category, status_code=status.HTTP_201_CREATED)
def create_category(category: CategoryCreate, db: Session = Depends(get_db)):
"""
Create new category.
"""
# Check if category with same name already exists
existing_category = category_crud.get_category_by_name(db, name=category.name)
if existing_category:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Category with this name already exists",
)
return category_crud.create_category(db=db, category=category)
@router.get("/{category_id}", response_model=Category)
def read_category(category_id: int, db: Session = Depends(get_db)):
"""
Get category by ID.
"""
db_category = category_crud.get_category(db, category_id=category_id)
if db_category is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Category not found"
)
return db_category
@router.put("/{category_id}", response_model=Category)
def update_category(
category_id: int, category: CategoryUpdate, db: Session = Depends(get_db)
):
"""
Update category.
"""
# Check if category exists
db_category = category_crud.get_category(db, category_id=category_id)
if db_category is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Category not found"
)
# Check if name is being updated and already exists
if category.name and category.name != db_category.name:
existing_category = category_crud.get_category_by_name(db, name=category.name)
if existing_category:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Category with this name already exists",
)
return category_crud.update_category(
db=db, category_id=category_id, category=category
)
@router.delete("/{category_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_category(category_id: int, db: Session = Depends(get_db)):
"""
Delete category.
"""
success = category_crud.delete_category(db=db, category_id=category_id)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Category not found"
)

125
app/api/v1/projects.py Normal file
View File

@ -0,0 +1,125 @@
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.models.project import ProjectStatus
from app.schemas.project import (
Project,
ProjectCreate,
ProjectUpdate,
ProjectWithTodos,
ProjectListResponse,
)
from app.schemas.todo import TodoListResponse
from app.crud import project as project_crud
from app.crud import todo as todo_crud
router = APIRouter()
@router.get("/", response_model=ProjectListResponse)
def get_projects(
page: int = Query(1, ge=1, description="Page number"),
per_page: int = Query(10, ge=1, le=100, description="Items per page"),
status: Optional[ProjectStatus] = Query(None, description="Filter by status"),
search: Optional[str] = Query(None, description="Search in name and description"),
db: Session = Depends(get_db),
):
"""Get all projects with pagination and filtering."""
skip = (page - 1) * per_page
projects, total = project_crud.get_projects(
db=db, skip=skip, limit=per_page, status=status, search=search
)
total_pages = (total + per_page - 1) // per_page
has_next = page < total_pages
has_prev = page > 1
return ProjectListResponse(
items=projects,
total=total,
page=page,
per_page=per_page,
has_next=has_next,
has_prev=has_prev,
)
@router.post("/", response_model=Project)
def create_project(project: ProjectCreate, db: Session = Depends(get_db)):
"""Create a new project."""
return project_crud.create_project(db=db, project=project)
@router.get("/{project_id}", response_model=ProjectWithTodos)
def get_project(project_id: int, db: Session = Depends(get_db)):
"""Get a specific project with its todos."""
db_project = project_crud.get_project(db=db, project_id=project_id)
if db_project is None:
raise HTTPException(status_code=404, detail="Project not found")
return db_project
@router.put("/{project_id}", response_model=Project)
def update_project(
project_id: int, project_update: ProjectUpdate, db: Session = Depends(get_db)
):
"""Update a project."""
db_project = project_crud.update_project(
db=db, project_id=project_id, project_update=project_update
)
if db_project is None:
raise HTTPException(status_code=404, detail="Project not found")
return db_project
@router.delete("/{project_id}")
def delete_project(project_id: int, db: Session = Depends(get_db)):
"""Delete a project."""
success = project_crud.delete_project(db=db, project_id=project_id)
if not success:
raise HTTPException(status_code=404, detail="Project not found")
return {"message": "Project deleted successfully"}
@router.get("/{project_id}/todos", response_model=TodoListResponse)
def get_project_todos(
project_id: int,
page: int = Query(1, ge=1, description="Page number"),
per_page: int = Query(10, ge=1, le=100, description="Items per page"),
db: Session = Depends(get_db),
):
"""Get all todos for a specific project."""
# First verify the project exists
db_project = project_crud.get_project(db=db, project_id=project_id)
if db_project is None:
raise HTTPException(status_code=404, detail="Project not found")
# Get todos with pagination
skip = (page - 1) * per_page
todos, total = todo_crud.get_todos(
db=db, skip=skip, limit=per_page, project_id=project_id
)
total_pages = (total + per_page - 1) // per_page
has_next = page < total_pages
has_prev = page > 1
return TodoListResponse(
items=todos,
total=total,
page=page,
per_page=per_page,
has_next=has_next,
has_prev=has_prev,
)
@router.put("/{project_id}/archive", response_model=Project)
def archive_project(project_id: int, db: Session = Depends(get_db)):
"""Archive a project."""
db_project = project_crud.archive_project(db=db, project_id=project_id)
if db_project is None:
raise HTTPException(status_code=404, detail="Project not found")
return db_project

118
app/api/v1/tags.py Normal file
View File

@ -0,0 +1,118 @@
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.crud import tag as tag_crud
from app.schemas.tag import Tag, TagCreate, TagUpdate, TagListResponse
from app.schemas.todo import Todo
router = APIRouter()
@router.get("/", response_model=TagListResponse)
def read_tags(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
"""
Retrieve tags with pagination.
"""
tags = tag_crud.get_tags(db, skip=skip, limit=limit)
total = tag_crud.get_tags_count(db)
return TagListResponse(items=tags, total=total)
@router.post("/", response_model=Tag, status_code=status.HTTP_201_CREATED)
def create_tag(tag: TagCreate, db: Session = Depends(get_db)):
"""
Create a new tag.
"""
# Check if tag with this name already exists
db_tag = tag_crud.get_tag_by_name(db, name=tag.name)
if db_tag:
raise HTTPException(status_code=400, detail="Tag with this name already exists")
return tag_crud.create_tag(db=db, tag=tag)
@router.get("/{tag_id}", response_model=Tag)
def read_tag(tag_id: int, db: Session = Depends(get_db)):
"""
Get a specific tag by ID.
"""
db_tag = tag_crud.get_tag(db, tag_id=tag_id)
if db_tag is None:
raise HTTPException(status_code=404, detail="Tag not found")
return db_tag
@router.put("/{tag_id}", response_model=Tag)
def update_tag(tag_id: int, tag: TagUpdate, db: Session = Depends(get_db)):
"""
Update a tag.
"""
# Check if tag exists
db_tag = tag_crud.get_tag(db, tag_id=tag_id)
if db_tag is None:
raise HTTPException(status_code=404, detail="Tag not found")
# Check if new name conflicts with existing tag
if tag.name and tag.name != db_tag.name:
existing_tag = tag_crud.get_tag_by_name(db, name=tag.name)
if existing_tag:
raise HTTPException(
status_code=400, detail="Tag with this name already exists"
)
return tag_crud.update_tag(db=db, tag_id=tag_id, tag=tag)
@router.delete("/{tag_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_tag(tag_id: int, db: Session = Depends(get_db)):
"""
Delete a tag.
"""
success = tag_crud.delete_tag(db, tag_id=tag_id)
if not success:
raise HTTPException(status_code=404, detail="Tag not found")
@router.post("/{tag_id}/todos/{todo_id}", status_code=status.HTTP_201_CREATED)
def add_tag_to_todo(tag_id: int, todo_id: int, db: Session = Depends(get_db)):
"""
Add a tag to a todo.
"""
success = tag_crud.add_tag_to_todo(db, todo_id=todo_id, tag_id=tag_id)
if not success:
raise HTTPException(
status_code=400,
detail="Failed to add tag to todo. Tag or todo not found, or tag already assigned.",
)
return {"message": "Tag successfully added to todo"}
@router.delete("/{tag_id}/todos/{todo_id}", status_code=status.HTTP_204_NO_CONTENT)
def remove_tag_from_todo(tag_id: int, todo_id: int, db: Session = Depends(get_db)):
"""
Remove a tag from a todo.
"""
success = tag_crud.remove_tag_from_todo(db, todo_id=todo_id, tag_id=tag_id)
if not success:
raise HTTPException(
status_code=400,
detail="Failed to remove tag from todo. Tag or todo not found, or tag not assigned.",
)
@router.get("/{tag_id}/todos", response_model=List[Todo])
def get_todos_by_tag(
tag_id: int, skip: int = 0, limit: int = 100, db: Session = Depends(get_db)
):
"""
Get all todos that have a specific tag.
"""
# Check if tag exists
db_tag = tag_crud.get_tag(db, tag_id=tag_id)
if db_tag is None:
raise HTTPException(status_code=404, detail="Tag not found")
return tag_crud.get_todos_by_tag(db, tag_id=tag_id, skip=skip, limit=limit)

View File

@ -1,4 +1,4 @@
from typing import Optional
from typing import Optional, List
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
import math
@ -6,7 +6,13 @@ import math
from app.crud import todo as todo_crud
from app.db.session import get_db
from app.models.todo import Priority
from app.schemas.todo import Todo, TodoCreate, TodoUpdate, TodoListResponse
from app.schemas.todo import (
Todo,
TodoCreate,
TodoUpdate,
TodoListResponse,
SubtaskCreate,
)
router = APIRouter(prefix="/todos", tags=["todos"])
@ -18,6 +24,7 @@ def read_todos(
completed: Optional[bool] = Query(None, description="Filter by completion status"),
priority: Optional[Priority] = Query(None, description="Filter by priority"),
search: Optional[str] = Query(None, description="Search in title and description"),
category_id: Optional[int] = Query(None, description="Filter by category ID"),
db: Session = Depends(get_db),
):
skip = (page - 1) * per_page
@ -28,6 +35,7 @@ def read_todos(
completed=completed,
priority=priority,
search=search,
category_id=category_id,
)
total_pages = math.ceil(total / per_page)
@ -69,3 +77,40 @@ def delete_todo(todo_id: int, db: Session = Depends(get_db)):
if not success:
raise HTTPException(status_code=404, detail="Todo not found")
return {"message": "Todo deleted successfully"}
@router.post("/{todo_id}/subtasks", response_model=Todo)
def create_subtask(todo_id: int, subtask: SubtaskCreate, db: Session = Depends(get_db)):
"""Create a subtask for a specific todo."""
db_subtask = todo_crud.create_subtask(db, parent_id=todo_id, subtask=subtask)
if db_subtask is None:
raise HTTPException(status_code=404, detail="Parent todo not found")
return db_subtask
@router.get("/{todo_id}/subtasks", response_model=List[Todo])
def get_subtasks(todo_id: int, db: Session = Depends(get_db)):
"""Get all subtasks for a specific todo."""
# First check if parent todo exists
parent_todo = todo_crud.get_todo(db, todo_id=todo_id)
if parent_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
subtasks = todo_crud.get_subtasks(db, parent_id=todo_id)
return subtasks
@router.put("/subtasks/{subtask_id}/move", response_model=Todo)
def move_subtask(
subtask_id: int, new_parent_id: Optional[int] = None, db: Session = Depends(get_db)
):
"""Move a subtask to a different parent or make it a main todo."""
moved_subtask = todo_crud.move_subtask(
db, subtask_id=subtask_id, new_parent_id=new_parent_id
)
if moved_subtask is None:
raise HTTPException(
status_code=400,
detail="Cannot move subtask: subtask not found, invalid parent, or would create a cycle",
)
return moved_subtask

View File

@ -1,3 +1,5 @@
from . import todo
from . import category
from . import project
__all__ = ["todo"]
__all__ = ["todo", "category", "project"]

57
app/crud/category.py Normal file
View File

@ -0,0 +1,57 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.models.category import Category
from app.schemas.category import CategoryCreate, CategoryUpdate
def get_category(db: Session, category_id: int) -> Optional[Category]:
"""Get a single category by ID."""
return db.query(Category).filter(Category.id == category_id).first()
def get_category_by_name(db: Session, name: str) -> Optional[Category]:
"""Get a single category by name."""
return db.query(Category).filter(Category.name == name).first()
def get_categories(db: Session, skip: int = 0, limit: int = 100) -> List[Category]:
"""Get all categories with pagination."""
return db.query(Category).offset(skip).limit(limit).all()
def create_category(db: Session, category: CategoryCreate) -> Category:
"""Create a new category."""
db_category = Category(
name=category.name,
description=category.description,
color=category.color,
)
db.add(db_category)
db.commit()
db.refresh(db_category)
return db_category
def update_category(
db: Session, category_id: int, category: CategoryUpdate
) -> Optional[Category]:
"""Update an existing category."""
db_category = get_category(db, category_id)
if db_category:
update_data = category.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_category, field, value)
db.commit()
db.refresh(db_category)
return db_category
def delete_category(db: Session, category_id: int) -> bool:
"""Delete a category."""
db_category = get_category(db, category_id)
if db_category:
db.delete(db_category)
db.commit()
return True
return False

79
app/crud/project.py Normal file
View File

@ -0,0 +1,79 @@
from typing import List, Optional, Tuple
from sqlalchemy.orm import Session
from app.models.project import Project, ProjectStatus
from app.models.todo import Todo
from app.schemas.project import ProjectCreate, ProjectUpdate
def get_project(db: Session, project_id: int) -> Optional[Project]:
return db.query(Project).filter(Project.id == project_id).first()
def get_projects(
db: Session,
skip: int = 0,
limit: int = 100,
status: Optional[ProjectStatus] = None,
search: Optional[str] = None,
) -> Tuple[List[Project], int]:
query = db.query(Project)
# Apply filters
if status is not None:
query = query.filter(Project.status == status)
if search:
query = query.filter(
Project.name.contains(search) | Project.description.contains(search)
)
# Get total count before pagination
total = query.count()
# Apply pagination and ordering
projects = query.order_by(Project.created_at.desc()).offset(skip).limit(limit).all()
return projects, total
def create_project(db: Session, project: ProjectCreate) -> Project:
db_project = Project(**project.model_dump())
db.add(db_project)
db.commit()
db.refresh(db_project)
return db_project
def update_project(
db: Session, project_id: int, project_update: ProjectUpdate
) -> Optional[Project]:
db_project = db.query(Project).filter(Project.id == project_id).first()
if db_project:
update_data = project_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_project, field, value)
db.commit()
db.refresh(db_project)
return db_project
def delete_project(db: Session, project_id: int) -> bool:
db_project = db.query(Project).filter(Project.id == project_id).first()
if db_project:
db.delete(db_project)
db.commit()
return True
return False
def get_project_todos(db: Session, project_id: int) -> List[Todo]:
return db.query(Todo).filter(Todo.project_id == project_id).all()
def archive_project(db: Session, project_id: int) -> Optional[Project]:
db_project = db.query(Project).filter(Project.id == project_id).first()
if db_project:
db_project.status = ProjectStatus.ARCHIVED
db.commit()
db.refresh(db_project)
return db_project

92
app/crud/tag.py Normal file
View File

@ -0,0 +1,92 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from sqlalchemy import func
from app.models.tag import Tag
from app.models.todo import Todo
from app.schemas.tag import TagCreate, TagUpdate
def get_tag(db: Session, tag_id: int) -> Optional[Tag]:
"""Get a single tag by ID."""
return db.query(Tag).filter(Tag.id == tag_id).first()
def get_tag_by_name(db: Session, name: str) -> Optional[Tag]:
"""Get a tag by name."""
return db.query(Tag).filter(Tag.name == name).first()
def get_tags(db: Session, skip: int = 0, limit: int = 100) -> List[Tag]:
"""Get multiple tags with pagination."""
return db.query(Tag).offset(skip).limit(limit).all()
def get_tags_count(db: Session) -> int:
"""Get total count of tags."""
return db.query(func.count(Tag.id)).scalar()
def create_tag(db: Session, tag: TagCreate) -> Tag:
"""Create a new tag."""
db_tag = Tag(name=tag.name, color=tag.color)
db.add(db_tag)
db.commit()
db.refresh(db_tag)
return db_tag
def update_tag(db: Session, tag_id: int, tag: TagUpdate) -> Optional[Tag]:
"""Update an existing tag."""
db_tag = db.query(Tag).filter(Tag.id == tag_id).first()
if db_tag:
update_data = tag.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_tag, field, value)
db.commit()
db.refresh(db_tag)
return db_tag
def delete_tag(db: Session, tag_id: int) -> bool:
"""Delete a tag."""
db_tag = db.query(Tag).filter(Tag.id == tag_id).first()
if db_tag:
db.delete(db_tag)
db.commit()
return True
return False
def add_tag_to_todo(db: Session, todo_id: int, tag_id: int) -> bool:
"""Add a tag to a todo."""
todo = db.query(Todo).filter(Todo.id == todo_id).first()
tag = db.query(Tag).filter(Tag.id == tag_id).first()
if todo and tag and tag not in todo.tags:
todo.tags.append(tag)
db.commit()
return True
return False
def remove_tag_from_todo(db: Session, todo_id: int, tag_id: int) -> bool:
"""Remove a tag from a todo."""
todo = db.query(Todo).filter(Todo.id == todo_id).first()
tag = db.query(Tag).filter(Tag.id == tag_id).first()
if todo and tag and tag in todo.tags:
todo.tags.remove(tag)
db.commit()
return True
return False
def get_todos_by_tag(
db: Session, tag_id: int, skip: int = 0, limit: int = 100
) -> List[Todo]:
"""Get all todos that have a specific tag."""
tag = db.query(Tag).filter(Tag.id == tag_id).first()
if tag:
return tag.todos[skip : skip + limit]
return []

View File

@ -2,7 +2,8 @@ from typing import List, Optional, Tuple
from sqlalchemy.orm import Session
from app.models.todo import Todo, Priority
from app.schemas.todo import TodoCreate, TodoUpdate
from app.models.tag import Tag
from app.schemas.todo import TodoCreate, TodoUpdate, SubtaskCreate
def get_todo(db: Session, todo_id: int) -> Optional[Todo]:
@ -16,14 +17,29 @@ def get_todos(
completed: Optional[bool] = None,
priority: Optional[Priority] = None,
search: Optional[str] = None,
category_id: Optional[int] = None,
project_id: Optional[int] = None,
parent_id: Optional[int] = None,
include_subtasks: bool = True,
) -> Tuple[List[Todo], int]:
query = db.query(Todo)
# Filter by parent_id to get only main todos or subtasks
if parent_id is not None:
query = query.filter(Todo.parent_id == parent_id)
elif not include_subtasks:
# Only get main todos (no subtasks) if parent_id is None and include_subtasks is False
query = query.filter(Todo.parent_id.is_(None))
# Apply filters
if completed is not None:
query = query.filter(Todo.completed == completed)
if priority is not None:
query = query.filter(Todo.priority == priority)
if category_id is not None:
query = query.filter(Todo.category_id == category_id)
if project_id is not None:
query = query.filter(Todo.project_id == project_id)
if search:
query = query.filter(
Todo.title.contains(search) | Todo.description.contains(search)
@ -39,7 +55,14 @@ def get_todos(
def create_todo(db: Session, todo: TodoCreate) -> Todo:
db_todo = Todo(**todo.model_dump())
todo_data = todo.model_dump(exclude={"tag_ids"})
db_todo = Todo(**todo_data)
# Handle tags if provided
if todo.tag_ids:
tags = db.query(Tag).filter(Tag.id.in_(todo.tag_ids)).all()
db_todo.tags = tags
db.add(db_todo)
db.commit()
db.refresh(db_todo)
@ -49,9 +72,17 @@ def create_todo(db: Session, todo: TodoCreate) -> Todo:
def update_todo(db: Session, todo_id: int, todo_update: TodoUpdate) -> Optional[Todo]:
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo:
update_data = todo_update.model_dump(exclude_unset=True)
update_data = todo_update.model_dump(exclude_unset=True, exclude={"tag_ids"})
# Handle tags if provided
if todo_update.tag_ids is not None:
tags = db.query(Tag).filter(Tag.id.in_(todo_update.tag_ids)).all()
db_todo.tags = tags
# Update other fields
for field, value in update_data.items():
setattr(db_todo, field, value)
db.commit()
db.refresh(db_todo)
return db_todo
@ -64,3 +95,61 @@ def delete_todo(db: Session, todo_id: int) -> bool:
db.commit()
return True
return False
def get_subtasks(db: Session, parent_id: int) -> List[Todo]:
"""Get all subtasks for a given parent todo."""
return (
db.query(Todo)
.filter(Todo.parent_id == parent_id)
.order_by(Todo.created_at.desc())
.all()
)
def create_subtask(
db: Session, parent_id: int, subtask: SubtaskCreate
) -> Optional[Todo]:
"""Create a subtask for a given parent todo."""
# Check if parent exists
parent_todo = db.query(Todo).filter(Todo.id == parent_id).first()
if not parent_todo:
return None
# Create subtask with parent_id
subtask_data = subtask.model_dump()
subtask_data["parent_id"] = parent_id
db_subtask = Todo(**subtask_data)
db.add(db_subtask)
db.commit()
db.refresh(db_subtask)
return db_subtask
def move_subtask(
db: Session, subtask_id: int, new_parent_id: Optional[int]
) -> Optional[Todo]:
"""Move a subtask to a different parent or make it a main todo."""
subtask = db.query(Todo).filter(Todo.id == subtask_id).first()
if not subtask:
return None
# If new_parent_id is provided, check if it exists and is not the subtask itself
if new_parent_id is not None:
if new_parent_id == subtask_id:
return None # Cannot make a todo a subtask of itself
new_parent = db.query(Todo).filter(Todo.id == new_parent_id).first()
if not new_parent:
return None # New parent doesn't exist
# Prevent creating cycles (subtask cannot become parent of its current parent)
if new_parent.parent_id == subtask_id:
return None
# Update the parent_id
subtask.parent_id = new_parent_id
db.commit()
db.refresh(subtask)
return subtask

View File

@ -1,3 +1,7 @@
from .todo import Todo
from .category import Category
from .project import Project
from .tag import Tag
from .todo_tag import todo_tags
__all__ = ["Todo"]
__all__ = ["Todo", "Category", "Project", "Tag", "todo_tags"]

19
app/models/category.py Normal file
View File

@ -0,0 +1,19 @@
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.db.base import Base
class Category(Base):
__tablename__ = "categories"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(100), nullable=False, unique=True)
description = Column(String(500), nullable=True)
color = Column(String(7), nullable=True) # Hex color code like #FF0000
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationship to todos
todos = relationship("Todo", back_populates="category")

25
app/models/project.py Normal file
View File

@ -0,0 +1,25 @@
from sqlalchemy import Column, Integer, String, DateTime, Enum
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
import enum
from app.db.base import Base
class ProjectStatus(str, enum.Enum):
ACTIVE = "active"
ARCHIVED = "archived"
class Project(Base):
__tablename__ = "projects"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(200), nullable=False)
description = Column(String(500), nullable=True)
status = Column(Enum(ProjectStatus), default=ProjectStatus.ACTIVE)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationship to todos
todos = relationship("Todo", back_populates="project")

17
app/models/tag.py Normal file
View File

@ -0,0 +1,17 @@
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.db.base import Base
class Tag(Base):
__tablename__ = "tags"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(50), nullable=False, unique=True)
color = Column(String(7), nullable=False, default="#3B82F6") # Hex color code
created_at = Column(DateTime(timezone=True), server_default=func.now())
# Many-to-many relationship with todos
todos = relationship("Todo", secondary="todo_tags", back_populates="tags")

View File

@ -1,4 +1,5 @@
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Enum
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Enum, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
import enum
@ -19,5 +20,28 @@ class Todo(Base):
description = Column(String(500), nullable=True)
completed = Column(Boolean, default=False)
priority = Column(Enum(Priority), default=Priority.MEDIUM)
project_id = Column(Integer, ForeignKey("projects.id"), nullable=True)
category_id = Column(Integer, ForeignKey("categories.id"), nullable=True)
parent_id = Column(Integer, ForeignKey("todos.id"), nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationship to project
project = relationship("Project", back_populates="todos")
# Relationship to category
category = relationship("Category", back_populates="todos")
# Many-to-many relationship with tags
tags = relationship("Tag", secondary="todo_tags", back_populates="todos")
# Self-referential relationship for subtasks
parent = relationship("Todo", remote_side=[id], back_populates="children")
children = relationship(
"Todo", back_populates="parent", cascade="all, delete-orphan"
)
@property
def is_subtask(self) -> bool:
"""Check if this todo is a subtask of another todo."""
return self.parent_id is not None

15
app/models/todo_tag.py Normal file
View File

@ -0,0 +1,15 @@
from sqlalchemy import Column, Integer, ForeignKey, Table
from app.db.base import Base
# Association table for many-to-many relationship between todos and tags
todo_tags = Table(
"todo_tags",
Base.metadata,
Column(
"todo_id", Integer, ForeignKey("todos.id", ondelete="CASCADE"), primary_key=True
),
Column(
"tag_id", Integer, ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True
),
)

View File

@ -1,3 +1,29 @@
from .todo import Todo, TodoCreate, TodoUpdate, TodoListResponse
from .category import Category, CategoryCreate, CategoryUpdate
from .tag import Tag, TagCreate, TagUpdate, TagListResponse
from .project import (
Project,
ProjectCreate,
ProjectUpdate,
ProjectListResponse,
ProjectWithTodos,
)
__all__ = ["Todo", "TodoCreate", "TodoUpdate", "TodoListResponse"]
__all__ = [
"Todo",
"TodoCreate",
"TodoUpdate",
"TodoListResponse",
"Category",
"CategoryCreate",
"CategoryUpdate",
"Tag",
"TagCreate",
"TagUpdate",
"TagListResponse",
"Project",
"ProjectCreate",
"ProjectUpdate",
"ProjectListResponse",
"ProjectWithTodos",
]

28
app/schemas/category.py Normal file
View File

@ -0,0 +1,28 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
class CategoryBase(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
description: Optional[str] = Field(None, max_length=500)
color: Optional[str] = Field(None, regex=r"^#[0-9A-Fa-f]{6}$")
class CategoryCreate(CategoryBase):
pass
class CategoryUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=1, max_length=100)
description: Optional[str] = Field(None, max_length=500)
color: Optional[str] = Field(None, regex=r"^#[0-9A-Fa-f]{6}$")
class Category(CategoryBase):
id: int
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True

50
app/schemas/project.py Normal file
View File

@ -0,0 +1,50 @@
from __future__ import annotations
from datetime import datetime
from typing import Optional, List, TYPE_CHECKING
from pydantic import BaseModel
from app.models.project import ProjectStatus
if TYPE_CHECKING:
from app.schemas.todo import Todo
class ProjectBase(BaseModel):
name: str
description: Optional[str] = None
status: ProjectStatus = ProjectStatus.ACTIVE
class ProjectCreate(ProjectBase):
pass
class ProjectUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
status: Optional[ProjectStatus] = None
class Project(ProjectBase):
id: int
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class ProjectWithTodos(Project):
todos: List[Todo] = []
class Config:
from_attributes = True
class ProjectListResponse(BaseModel):
items: List[Project]
total: int
page: int
per_page: int
has_next: bool
has_prev: bool

36
app/schemas/tag.py Normal file
View File

@ -0,0 +1,36 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
class TagBase(BaseModel):
name: str = Field(..., min_length=1, max_length=50, description="Tag name")
color: str = Field(
default="#3B82F6", pattern=r"^#[0-9A-Fa-f]{6}$", description="Hex color code"
)
class TagCreate(TagBase):
pass
class TagUpdate(BaseModel):
name: Optional[str] = Field(
None, min_length=1, max_length=50, description="Tag name"
)
color: Optional[str] = Field(
None, pattern=r"^#[0-9A-Fa-f]{6}$", description="Hex color code"
)
class Tag(TagBase):
id: int
created_at: datetime
class Config:
from_attributes = True
class TagListResponse(BaseModel):
items: list[Tag]
total: int

View File

@ -1,15 +1,19 @@
from datetime import datetime
from typing import Optional
from typing import Optional, TYPE_CHECKING
from pydantic import BaseModel
from app.models.todo import Priority
if TYPE_CHECKING:
from app.schemas.category import Category
class TodoBase(BaseModel):
title: str
description: Optional[str] = None
completed: bool = False
priority: Priority = Priority.MEDIUM
category_id: Optional[int] = None
class TodoCreate(TodoBase):
@ -21,12 +25,15 @@ class TodoUpdate(BaseModel):
description: Optional[str] = None
completed: Optional[bool] = None
priority: Optional[Priority] = None
category_id: Optional[int] = None
# Forward declaration to handle circular reference
class Todo(TodoBase):
id: int
created_at: datetime
updated_at: Optional[datetime] = None
category: Optional["Category"] = None
class Config:
from_attributes = True
@ -39,3 +46,7 @@ class TodoListResponse(BaseModel):
per_page: int
has_next: bool
has_prev: bool
# Update forward references
Todo.model_rebuild()