Automated Action 8557c1426e Enhance Todo application with advanced features
- Add due dates and priority level to todos
- Add tags/categories for better organization
- Implement advanced search and filtering
- Create database migrations for model changes
- Add endpoints for managing tags
- Update documentation

generated with BackendIM... (backend.im)
2025-05-12 23:29:41 +00:00

212 lines
6.9 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from typing import List, Optional
from sqlalchemy.orm import Session
from sqlalchemy import or_, and_, func
from datetime import datetime, timedelta
from app.database import get_db, engine
from app.models import Todo, Tag, Base
from app.schemas import (
TodoCreate, TodoCreateWithTags, TodoUpdate, TodoResponse,
TagCreate, TagResponse, HealthResponse
)
# Create tables if they don't exist
Base.metadata.create_all(bind=engine)
app = FastAPI(
title="Enhanced Todo API",
description="An improved API for managing todos with tags, priority levels, due dates, and search functionality",
version="0.2.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/health", response_model=HealthResponse)
def health():
return {"status": "healthy"}
# Tag endpoints
@app.post("/tags/", response_model=TagResponse)
def create_tag(tag: TagCreate, db: Session = Depends(get_db)):
# Check if tag already exists
existing_tag = db.query(Tag).filter(Tag.name == tag.name).first()
if existing_tag:
return existing_tag
db_tag = Tag(**tag.model_dump())
db.add(db_tag)
db.commit()
db.refresh(db_tag)
return db_tag
@app.get("/tags/", response_model=List[TagResponse])
def get_tags(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
return db.query(Tag).offset(skip).limit(limit).all()
@app.get("/tags/{tag_id}", response_model=TagResponse)
def get_tag(tag_id: int, db: Session = Depends(get_db)):
db_tag = db.query(Tag).filter(Tag.id == tag_id).first()
if db_tag is None:
raise HTTPException(status_code=404, detail="Tag not found")
return db_tag
# Todo endpoints
@app.post("/todos/", response_model=TodoResponse)
def create_todo(todo: TodoCreate, db: Session = Depends(get_db)):
todo_data = todo.model_dump(exclude={"tag_ids"})
db_todo = Todo(**todo_data)
# Add tags if provided
if todo.tag_ids:
for tag_id in todo.tag_ids:
tag = db.query(Tag).filter(Tag.id == tag_id).first()
if tag is None:
raise HTTPException(status_code=404, detail=f"Tag with id {tag_id} not found")
db_todo.tags.append(tag)
db.add(db_todo)
db.commit()
db.refresh(db_todo)
return db_todo
@app.post("/todos/with-tags/", response_model=TodoResponse)
def create_todo_with_tags(todo: TodoCreateWithTags, db: Session = Depends(get_db)):
# Extract and create todo
todo_data = todo.model_dump(exclude={"tags"})
db_todo = Todo(**todo_data)
# Process tags
if todo.tags:
for tag_name in todo.tags:
# Find existing tag or create new one
tag = db.query(Tag).filter(Tag.name == tag_name).first()
if not tag:
tag = Tag(name=tag_name)
db.add(tag)
db.flush()
db_todo.tags.append(tag)
db.add(db_todo)
db.commit()
db.refresh(db_todo)
return db_todo
@app.get("/todos/", response_model=List[TodoResponse])
def get_todos(
skip: int = 0,
limit: int = 100,
title: Optional[str] = None,
description: Optional[str] = None,
completed: Optional[bool] = None,
priority: Optional[str] = None,
tag: Optional[str] = None,
due_before: Optional[datetime] = None,
due_after: Optional[datetime] = None,
overdue: Optional[bool] = None,
db: Session = Depends(get_db)
):
query = db.query(Todo)
# Apply filters if provided
if title:
query = query.filter(Todo.title.ilike(f"%{title}%"))
if description:
query = query.filter(Todo.description.ilike(f"%{description}%"))
if completed is not None:
query = query.filter(Todo.completed == completed)
if priority:
query = query.filter(Todo.priority == priority)
if tag:
query = query.join(Todo.tags).filter(Tag.name == tag)
if due_before:
query = query.filter(Todo.due_date <= due_before)
if due_after:
query = query.filter(Todo.due_date >= due_after)
if overdue is not None and overdue:
query = query.filter(and_(
Todo.due_date < datetime.now(),
Todo.completed == False
))
return query.offset(skip).limit(limit).all()
@app.get("/todos/search/", response_model=List[TodoResponse])
def search_todos(
q: str = Query(..., min_length=1, description="Search query"),
db: Session = Depends(get_db)
):
"""Search todos by title, description or tag name"""
return db.query(Todo).filter(
or_(
Todo.title.ilike(f"%{q}%"),
Todo.description.ilike(f"%{q}%"),
Todo.tags.any(Tag.name.ilike(f"%{q}%"))
)
).all()
@app.get("/todos/upcoming/", response_model=List[TodoResponse])
def get_upcoming_todos(days: int = 7, db: Session = Depends(get_db)):
"""Get todos with due dates in the next N days"""
future_date = datetime.now() + timedelta(days=days)
return db.query(Todo).filter(
and_(
Todo.due_date <= future_date,
Todo.due_date >= datetime.now(),
Todo.completed == False
)
).order_by(Todo.due_date).all()
@app.get("/todos/{todo_id}", response_model=TodoResponse)
def get_todo(todo_id: int, db: Session = Depends(get_db)):
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
return db_todo
@app.put("/todos/{todo_id}", response_model=TodoResponse)
def update_todo(todo_id: int, todo: TodoUpdate, db: Session = Depends(get_db)):
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
# Update todo attributes
todo_data = todo.model_dump(exclude={"tag_ids"}, exclude_unset=True)
for key, value in todo_data.items():
setattr(db_todo, key, value)
# Update tags if provided
if todo.tag_ids is not None:
# Clear existing tags
db_todo.tags = []
# Add new tags
for tag_id in todo.tag_ids:
tag = db.query(Tag).filter(Tag.id == tag_id).first()
if tag is None:
raise HTTPException(status_code=404, detail=f"Tag with id {tag_id} not found")
db_todo.tags.append(tag)
db.commit()
db.refresh(db_todo)
return db_todo
@app.delete("/todos/{todo_id}", response_model=TodoResponse)
def delete_todo(todo_id: int, db: Session = Depends(get_db)):
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
db.delete(db_todo)
db.commit()
return db_todo
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)