Automated Action 660c28b96c Add subtasks and reminders functionality
- Added subtask model with CRUD operations
- Added reminder functionality to Todo model
- Added endpoints for managing subtasks and reminders
- Created new migration for subtasks and reminders
- Updated documentation

generated with BackendIM... (backend.im)
2025-05-12 23:37:35 +00:00

308 lines
11 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from typing import List, Optional
from sqlalchemy.orm import Session
from sqlalchemy import or_, and_, func
from datetime import datetime, timedelta
from app.database import get_db, engine
from app.models import Todo, Tag, Subtask, Base
from app.schemas import (
TodoCreate, TodoCreateWithTags, TodoUpdate, TodoResponse,
TagCreate, TagResponse, HealthResponse,
SubtaskCreate, SubtaskUpdate, SubtaskResponse
)
# Create tables if they don't exist
Base.metadata.create_all(bind=engine)
app = FastAPI(
title="Enhanced Todo API",
description="An improved API for managing todos with tags, priority levels, due dates, and search functionality",
version="0.2.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/health", response_model=HealthResponse)
def health():
return {"status": "healthy"}
# Tag endpoints
@app.post("/tags/", response_model=TagResponse)
def create_tag(tag: TagCreate, db: Session = Depends(get_db)):
# Check if tag already exists
existing_tag = db.query(Tag).filter(Tag.name == tag.name).first()
if existing_tag:
return existing_tag
db_tag = Tag(**tag.model_dump())
db.add(db_tag)
db.commit()
db.refresh(db_tag)
return db_tag
@app.get("/tags/", response_model=List[TagResponse])
def get_tags(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
return db.query(Tag).offset(skip).limit(limit).all()
@app.get("/tags/{tag_id}", response_model=TagResponse)
def get_tag(tag_id: int, db: Session = Depends(get_db)):
db_tag = db.query(Tag).filter(Tag.id == tag_id).first()
if db_tag is None:
raise HTTPException(status_code=404, detail="Tag not found")
return db_tag
# Todo endpoints
@app.post("/todos/", response_model=TodoResponse)
def create_todo(todo: TodoCreate, db: Session = Depends(get_db)):
todo_data = todo.model_dump(exclude={"tag_ids"})
db_todo = Todo(**todo_data)
# Add tags if provided
if todo.tag_ids:
for tag_id in todo.tag_ids:
tag = db.query(Tag).filter(Tag.id == tag_id).first()
if tag is None:
raise HTTPException(status_code=404, detail=f"Tag with id {tag_id} not found")
db_todo.tags.append(tag)
db.add(db_todo)
db.commit()
db.refresh(db_todo)
return db_todo
@app.post("/todos/with-tags/", response_model=TodoResponse)
def create_todo_with_tags(todo: TodoCreateWithTags, db: Session = Depends(get_db)):
# Extract and create todo
todo_data = todo.model_dump(exclude={"tags"})
db_todo = Todo(**todo_data)
# Process tags
if todo.tags:
for tag_name in todo.tags:
# Find existing tag or create new one
tag = db.query(Tag).filter(Tag.name == tag_name).first()
if not tag:
tag = Tag(name=tag_name)
db.add(tag)
db.flush()
db_todo.tags.append(tag)
db.add(db_todo)
db.commit()
db.refresh(db_todo)
return db_todo
@app.get("/todos/", response_model=List[TodoResponse])
def get_todos(
skip: int = 0,
limit: int = 100,
title: Optional[str] = None,
description: Optional[str] = None,
completed: Optional[bool] = None,
priority: Optional[str] = None,
tag: Optional[str] = None,
due_before: Optional[datetime] = None,
due_after: Optional[datetime] = None,
overdue: Optional[bool] = None,
db: Session = Depends(get_db)
):
query = db.query(Todo)
# Apply filters if provided
if title:
query = query.filter(Todo.title.ilike(f"%{title}%"))
if description:
query = query.filter(Todo.description.ilike(f"%{description}%"))
if completed is not None:
query = query.filter(Todo.completed == completed)
if priority:
query = query.filter(Todo.priority == priority)
if tag:
query = query.join(Todo.tags).filter(Tag.name == tag)
if due_before:
query = query.filter(Todo.due_date <= due_before)
if due_after:
query = query.filter(Todo.due_date >= due_after)
if overdue is not None and overdue:
query = query.filter(and_(
Todo.due_date < datetime.now(),
Todo.completed == False
))
return query.offset(skip).limit(limit).all()
@app.get("/todos/search/", response_model=List[TodoResponse])
def search_todos(
q: str = Query(..., min_length=1, description="Search query"),
db: Session = Depends(get_db)
):
"""Search todos by title, description or tag name"""
return db.query(Todo).filter(
or_(
Todo.title.ilike(f"%{q}%"),
Todo.description.ilike(f"%{q}%"),
Todo.tags.any(Tag.name.ilike(f"%{q}%"))
)
).all()
@app.get("/todos/upcoming/", response_model=List[TodoResponse])
def get_upcoming_todos(days: int = 7, db: Session = Depends(get_db)):
"""Get todos with due dates in the next N days"""
future_date = datetime.now() + timedelta(days=days)
return db.query(Todo).filter(
and_(
Todo.due_date <= future_date,
Todo.due_date >= datetime.now(),
Todo.completed == False
)
).order_by(Todo.due_date).all()
@app.get("/todos/reminders/", response_model=List[TodoResponse])
def get_upcoming_reminders(hours: int = 24, db: Session = Depends(get_db)):
"""Get todos with reminders in the next N hours"""
now = datetime.now()
future_time = now + timedelta(hours=hours)
return db.query(Todo).filter(
and_(
Todo.remind_at <= future_time,
Todo.remind_at >= now,
Todo.completed == False
)
).order_by(Todo.remind_at).all()
@app.put("/todos/{todo_id}/set-reminder", response_model=TodoResponse)
def set_reminder(todo_id: int, remind_at: datetime, db: Session = Depends(get_db)):
"""Set or update a reminder for a specific todo"""
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
db_todo.remind_at = remind_at
db.commit()
db.refresh(db_todo)
return db_todo
@app.get("/todos/{todo_id}", response_model=TodoResponse)
def get_todo(todo_id: int, db: Session = Depends(get_db)):
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
return db_todo
@app.put("/todos/{todo_id}", response_model=TodoResponse)
def update_todo(todo_id: int, todo: TodoUpdate, db: Session = Depends(get_db)):
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
# Update todo attributes
todo_data = todo.model_dump(exclude={"tag_ids"}, exclude_unset=True)
for key, value in todo_data.items():
setattr(db_todo, key, value)
# Update tags if provided
if todo.tag_ids is not None:
# Clear existing tags
db_todo.tags = []
# Add new tags
for tag_id in todo.tag_ids:
tag = db.query(Tag).filter(Tag.id == tag_id).first()
if tag is None:
raise HTTPException(status_code=404, detail=f"Tag with id {tag_id} not found")
db_todo.tags.append(tag)
db.commit()
db.refresh(db_todo)
return db_todo
@app.delete("/todos/{todo_id}", response_model=TodoResponse)
def delete_todo(todo_id: int, db: Session = Depends(get_db)):
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
db.delete(db_todo)
db.commit()
return db_todo
# Subtasks endpoints
@app.post("/todos/{todo_id}/subtasks/", response_model=SubtaskResponse)
def create_subtask(todo_id: int, subtask: SubtaskCreate, db: Session = Depends(get_db)):
"""Create a new subtask for a specific todo"""
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
db_subtask = Subtask(**subtask.model_dump(), todo_id=todo_id)
db.add(db_subtask)
db.commit()
db.refresh(db_subtask)
return db_subtask
@app.get("/todos/{todo_id}/subtasks/", response_model=List[SubtaskResponse])
def get_subtasks(todo_id: int, skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
"""Get all subtasks for a todo"""
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
return db.query(Subtask).filter(Subtask.todo_id == todo_id).offset(skip).limit(limit).all()
@app.get("/subtasks/{subtask_id}", response_model=SubtaskResponse)
def get_subtask(subtask_id: int, db: Session = Depends(get_db)):
"""Get a specific subtask by ID"""
db_subtask = db.query(Subtask).filter(Subtask.id == subtask_id).first()
if db_subtask is None:
raise HTTPException(status_code=404, detail="Subtask not found")
return db_subtask
@app.put("/subtasks/{subtask_id}", response_model=SubtaskResponse)
def update_subtask(subtask_id: int, subtask: SubtaskUpdate, db: Session = Depends(get_db)):
"""Update a specific subtask"""
db_subtask = db.query(Subtask).filter(Subtask.id == subtask_id).first()
if db_subtask is None:
raise HTTPException(status_code=404, detail="Subtask not found")
subtask_data = subtask.model_dump(exclude_unset=True)
for key, value in subtask_data.items():
setattr(db_subtask, key, value)
db.commit()
db.refresh(db_subtask)
return db_subtask
@app.delete("/subtasks/{subtask_id}", response_model=SubtaskResponse)
def delete_subtask(subtask_id: int, db: Session = Depends(get_db)):
"""Delete a specific subtask"""
db_subtask = db.query(Subtask).filter(Subtask.id == subtask_id).first()
if db_subtask is None:
raise HTTPException(status_code=404, detail="Subtask not found")
db.delete(db_subtask)
db.commit()
return db_subtask
@app.put("/todos/{todo_id}/complete-all-subtasks", response_model=TodoResponse)
def complete_all_subtasks(todo_id: int, db: Session = Depends(get_db)):
"""Mark all subtasks of a todo as completed"""
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
db.query(Subtask).filter(Subtask.todo_id == todo_id).update({"completed": True})
db.commit()
db.refresh(db_todo)
return db_todo
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)