Automated Action 691becf69b Add Kanban-style board functionality
- Added TodoBoard model and BoardStatus enum
- Created migration for todo boards
- Added TodoBoard API endpoints
- Added board-related features to the README

🤖 Generated with BackendIM... (backend.im)
2025-05-12 23:58:50 +00:00

427 lines
15 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from typing import List, Optional
from sqlalchemy.orm import Session
from sqlalchemy import or_, and_, func
from datetime import datetime, timedelta
from app.database import get_db, engine
from app.models import Todo, Tag, Subtask, TodoBoard, Base, BoardStatus
from app.schemas import (
TodoCreate, TodoCreateWithTags, TodoUpdate, TodoResponse,
TagCreate, TagResponse, HealthResponse,
SubtaskCreate, SubtaskUpdate, SubtaskResponse,
TodoBoardCreate, TodoBoardUpdate, TodoBoardResponse, TodoBoardDetailResponse
)
# Create tables if they don't exist
Base.metadata.create_all(bind=engine)
app = FastAPI(
title="Enhanced Todo API",
description="An improved API for managing todos with tags, priority levels, due dates, and search functionality",
version="0.2.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/health", response_model=HealthResponse)
def health():
return {"status": "healthy"}
# Tag endpoints
@app.post("/tags/", response_model=TagResponse)
def create_tag(tag: TagCreate, db: Session = Depends(get_db)):
# Check if tag already exists
existing_tag = db.query(Tag).filter(Tag.name == tag.name).first()
if existing_tag:
return existing_tag
db_tag = Tag(**tag.model_dump())
db.add(db_tag)
db.commit()
db.refresh(db_tag)
return db_tag
@app.get("/tags/", response_model=List[TagResponse])
def get_tags(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
return db.query(Tag).offset(skip).limit(limit).all()
@app.get("/tags/{tag_id}", response_model=TagResponse)
def get_tag(tag_id: int, db: Session = Depends(get_db)):
db_tag = db.query(Tag).filter(Tag.id == tag_id).first()
if db_tag is None:
raise HTTPException(status_code=404, detail="Tag not found")
return db_tag
# Todo endpoints
@app.post("/todos/", response_model=TodoResponse)
def create_todo(todo: TodoCreate, db: Session = Depends(get_db)):
todo_data = todo.model_dump(exclude={"tag_ids"})
db_todo = Todo(**todo_data)
# Add tags if provided
if todo.tag_ids:
for tag_id in todo.tag_ids:
tag = db.query(Tag).filter(Tag.id == tag_id).first()
if tag is None:
raise HTTPException(status_code=404, detail=f"Tag with id {tag_id} not found")
db_todo.tags.append(tag)
db.add(db_todo)
db.commit()
db.refresh(db_todo)
return db_todo
@app.post("/todos/with-tags/", response_model=TodoResponse)
def create_todo_with_tags(todo: TodoCreateWithTags, db: Session = Depends(get_db)):
# Extract and create todo
todo_data = todo.model_dump(exclude={"tags"})
db_todo = Todo(**todo_data)
# Process tags
if todo.tags:
for tag_name in todo.tags:
# Find existing tag or create new one
tag = db.query(Tag).filter(Tag.name == tag_name).first()
if not tag:
tag = Tag(name=tag_name)
db.add(tag)
db.flush()
db_todo.tags.append(tag)
db.add(db_todo)
db.commit()
db.refresh(db_todo)
return db_todo
@app.get("/todos/", response_model=List[TodoResponse])
def get_todos(
skip: int = 0,
limit: int = 100,
title: Optional[str] = None,
description: Optional[str] = None,
completed: Optional[bool] = None,
priority: Optional[str] = None,
tag: Optional[str] = None,
due_before: Optional[datetime] = None,
due_after: Optional[datetime] = None,
overdue: Optional[bool] = None,
db: Session = Depends(get_db)
):
query = db.query(Todo)
# Apply filters if provided
if title:
query = query.filter(Todo.title.ilike(f"%{title}%"))
if description:
query = query.filter(Todo.description.ilike(f"%{description}%"))
if completed is not None:
query = query.filter(Todo.completed == completed)
if priority:
query = query.filter(Todo.priority == priority)
if tag:
query = query.join(Todo.tags).filter(Tag.name == tag)
if due_before:
query = query.filter(Todo.due_date <= due_before)
if due_after:
query = query.filter(Todo.due_date >= due_after)
if overdue is not None and overdue:
query = query.filter(and_(
Todo.due_date < datetime.now(),
Todo.completed == False
))
return query.offset(skip).limit(limit).all()
@app.get("/todos/search/", response_model=List[TodoResponse])
def search_todos(
q: str = Query(..., min_length=1, description="Search query"),
db: Session = Depends(get_db)
):
"""Search todos by title, description or tag name"""
return db.query(Todo).filter(
or_(
Todo.title.ilike(f"%{q}%"),
Todo.description.ilike(f"%{q}%"),
Todo.tags.any(Tag.name.ilike(f"%{q}%"))
)
).all()
@app.get("/todos/upcoming/", response_model=List[TodoResponse])
def get_upcoming_todos(days: int = 7, db: Session = Depends(get_db)):
"""Get todos with due dates in the next N days"""
future_date = datetime.now() + timedelta(days=days)
return db.query(Todo).filter(
and_(
Todo.due_date <= future_date,
Todo.due_date >= datetime.now(),
Todo.completed == False
)
).order_by(Todo.due_date).all()
@app.get("/todos/reminders/", response_model=List[TodoResponse])
def get_upcoming_reminders(hours: int = 24, db: Session = Depends(get_db)):
"""Get todos with reminders in the next N hours"""
now = datetime.now()
future_time = now + timedelta(hours=hours)
return db.query(Todo).filter(
and_(
Todo.remind_at <= future_time,
Todo.remind_at >= now,
Todo.completed == False
)
).order_by(Todo.remind_at).all()
@app.put("/todos/{todo_id}/set-reminder", response_model=TodoResponse)
def set_reminder(todo_id: int, remind_at: datetime, db: Session = Depends(get_db)):
"""Set or update a reminder for a specific todo"""
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
db_todo.remind_at = remind_at
db.commit()
db.refresh(db_todo)
return db_todo
@app.get("/todos/{todo_id}", response_model=TodoResponse)
def get_todo(todo_id: int, db: Session = Depends(get_db)):
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
return db_todo
@app.put("/todos/{todo_id}", response_model=TodoResponse)
def update_todo(todo_id: int, todo: TodoUpdate, db: Session = Depends(get_db)):
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
# Update todo attributes
todo_data = todo.model_dump(exclude={"tag_ids"}, exclude_unset=True)
for key, value in todo_data.items():
setattr(db_todo, key, value)
# Update tags if provided
if todo.tag_ids is not None:
# Clear existing tags
db_todo.tags = []
# Add new tags
for tag_id in todo.tag_ids:
tag = db.query(Tag).filter(Tag.id == tag_id).first()
if tag is None:
raise HTTPException(status_code=404, detail=f"Tag with id {tag_id} not found")
db_todo.tags.append(tag)
db.commit()
db.refresh(db_todo)
return db_todo
@app.delete("/todos/{todo_id}", response_model=TodoResponse)
def delete_todo(todo_id: int, db: Session = Depends(get_db)):
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
db.delete(db_todo)
db.commit()
return db_todo
# Subtasks endpoints
@app.post("/todos/{todo_id}/subtasks/", response_model=SubtaskResponse)
def create_subtask(todo_id: int, subtask: SubtaskCreate, db: Session = Depends(get_db)):
"""Create a new subtask for a specific todo"""
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
db_subtask = Subtask(**subtask.model_dump(), todo_id=todo_id)
db.add(db_subtask)
db.commit()
db.refresh(db_subtask)
return db_subtask
@app.get("/todos/{todo_id}/subtasks/", response_model=List[SubtaskResponse])
def get_subtasks(todo_id: int, skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
"""Get all subtasks for a todo"""
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
return db.query(Subtask).filter(Subtask.todo_id == todo_id).offset(skip).limit(limit).all()
@app.get("/subtasks/{subtask_id}", response_model=SubtaskResponse)
def get_subtask(subtask_id: int, db: Session = Depends(get_db)):
"""Get a specific subtask by ID"""
db_subtask = db.query(Subtask).filter(Subtask.id == subtask_id).first()
if db_subtask is None:
raise HTTPException(status_code=404, detail="Subtask not found")
return db_subtask
@app.put("/subtasks/{subtask_id}", response_model=SubtaskResponse)
def update_subtask(subtask_id: int, subtask: SubtaskUpdate, db: Session = Depends(get_db)):
"""Update a specific subtask"""
db_subtask = db.query(Subtask).filter(Subtask.id == subtask_id).first()
if db_subtask is None:
raise HTTPException(status_code=404, detail="Subtask not found")
subtask_data = subtask.model_dump(exclude_unset=True)
for key, value in subtask_data.items():
setattr(db_subtask, key, value)
db.commit()
db.refresh(db_subtask)
return db_subtask
@app.delete("/subtasks/{subtask_id}", response_model=SubtaskResponse)
def delete_subtask(subtask_id: int, db: Session = Depends(get_db)):
"""Delete a specific subtask"""
db_subtask = db.query(Subtask).filter(Subtask.id == subtask_id).first()
if db_subtask is None:
raise HTTPException(status_code=404, detail="Subtask not found")
db.delete(db_subtask)
db.commit()
return db_subtask
@app.put("/todos/{todo_id}/complete-all-subtasks", response_model=TodoResponse)
def complete_all_subtasks(todo_id: int, db: Session = Depends(get_db)):
"""Mark all subtasks of a todo as completed"""
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
db.query(Subtask).filter(Subtask.todo_id == todo_id).update({"completed": True})
db.commit()
db.refresh(db_todo)
return db_todo
# TodoBoard endpoints
@app.post("/boards/", response_model=TodoBoardResponse)
def create_board(board: TodoBoardCreate, db: Session = Depends(get_db)):
"""Create a new todo board"""
db_board = TodoBoard(**board.model_dump())
db.add(db_board)
db.commit()
db.refresh(db_board)
return db_board
@app.get("/boards/", response_model=List[TodoBoardResponse])
def get_boards(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
"""Get all todo boards"""
return db.query(TodoBoard).offset(skip).limit(limit).all()
@app.get("/boards/{board_id}", response_model=TodoBoardDetailResponse)
def get_board(board_id: int, db: Session = Depends(get_db)):
"""Get a specific todo board with all its todos"""
db_board = db.query(TodoBoard).filter(TodoBoard.id == board_id).first()
if db_board is None:
raise HTTPException(status_code=404, detail="Board not found")
return db_board
@app.put("/boards/{board_id}", response_model=TodoBoardResponse)
def update_board(board_id: int, board: TodoBoardUpdate, db: Session = Depends(get_db)):
"""Update a specific todo board"""
db_board = db.query(TodoBoard).filter(TodoBoard.id == board_id).first()
if db_board is None:
raise HTTPException(status_code=404, detail="Board not found")
board_data = board.model_dump(exclude_unset=True)
for key, value in board_data.items():
setattr(db_board, key, value)
db.commit()
db.refresh(db_board)
return db_board
@app.delete("/boards/{board_id}", response_model=TodoBoardResponse)
def delete_board(board_id: int, db: Session = Depends(get_db)):
"""Delete a specific todo board"""
db_board = db.query(TodoBoard).filter(TodoBoard.id == board_id).first()
if db_board is None:
raise HTTPException(status_code=404, detail="Board not found")
db.delete(db_board)
db.commit()
return db_board
@app.post("/boards/{board_id}/todos/", response_model=TodoResponse)
def create_todo_in_board(board_id: int, todo: TodoCreate, db: Session = Depends(get_db)):
"""Create a new todo in a specific board"""
db_board = db.query(TodoBoard).filter(TodoBoard.id == board_id).first()
if db_board is None:
raise HTTPException(status_code=404, detail="Board not found")
todo_data = todo.model_dump(exclude={"tag_ids", "board_id"})
db_todo = Todo(**todo_data, board_id=board_id)
# Add tags if provided
if todo.tag_ids:
for tag_id in todo.tag_ids:
tag = db.query(Tag).filter(Tag.id == tag_id).first()
if tag is None:
raise HTTPException(status_code=404, detail=f"Tag with id {tag_id} not found")
db_todo.tags.append(tag)
db.add(db_todo)
db.commit()
db.refresh(db_todo)
return db_todo
@app.get("/boards/{board_id}/todos/", response_model=List[TodoResponse])
def get_todos_by_board(
board_id: int,
status: Optional[BoardStatus] = None,
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
"""Get all todos in a specific board with optional status filter"""
db_board = db.query(TodoBoard).filter(TodoBoard.id == board_id).first()
if db_board is None:
raise HTTPException(status_code=404, detail="Board not found")
query = db.query(Todo).filter(Todo.board_id == board_id)
if status:
query = query.filter(Todo.status == status)
return query.offset(skip).limit(limit).all()
@app.put("/todos/{todo_id}/move", response_model=TodoResponse)
def move_todo_to_board(
todo_id: int,
board_id: Optional[int] = None,
status: Optional[BoardStatus] = None,
db: Session = Depends(get_db)
):
"""Move a todo to a different board and/or change its status"""
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
if board_id is not None:
# Verify board exists
db_board = db.query(TodoBoard).filter(TodoBoard.id == board_id).first()
if db_board is None:
raise HTTPException(status_code=404, detail="Board not found")
db_todo.board_id = board_id
if status is not None:
db_todo.status = status
db.commit()
db.refresh(db_todo)
return db_todo
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)