from fastapi import FastAPI, Depends, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware import uvicorn from typing import List, Optional from sqlalchemy.orm import Session from sqlalchemy import or_, and_, func from datetime import datetime, timedelta from app.database import get_db, engine from app.models import Todo, Tag, Subtask, TodoBoard, Base, BoardStatus from app.schemas import ( TodoCreate, TodoCreateWithTags, TodoUpdate, TodoResponse, TagCreate, TagResponse, HealthResponse, SubtaskCreate, SubtaskUpdate, SubtaskResponse, TodoBoardCreate, TodoBoardUpdate, TodoBoardResponse, TodoBoardDetailResponse ) # Create tables if they don't exist Base.metadata.create_all(bind=engine) app = FastAPI( title="Enhanced Todo API", description="An improved API for managing todos with tags, priority levels, due dates, and search functionality", version="0.2.0", ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/health", response_model=HealthResponse) def health(): return {"status": "healthy"} # Tag endpoints @app.post("/tags/", response_model=TagResponse) def create_tag(tag: TagCreate, db: Session = Depends(get_db)): # Check if tag already exists existing_tag = db.query(Tag).filter(Tag.name == tag.name).first() if existing_tag: return existing_tag db_tag = Tag(**tag.model_dump()) db.add(db_tag) db.commit() db.refresh(db_tag) return db_tag @app.get("/tags/", response_model=List[TagResponse]) def get_tags(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): return db.query(Tag).offset(skip).limit(limit).all() @app.get("/tags/{tag_id}", response_model=TagResponse) def get_tag(tag_id: int, db: Session = Depends(get_db)): db_tag = db.query(Tag).filter(Tag.id == tag_id).first() if db_tag is None: raise HTTPException(status_code=404, detail="Tag not found") return db_tag # Todo endpoints @app.post("/todos/", response_model=TodoResponse) def create_todo(todo: TodoCreate, db: Session = Depends(get_db)): todo_data = todo.model_dump(exclude={"tag_ids"}) db_todo = Todo(**todo_data) # Add tags if provided if todo.tag_ids: for tag_id in todo.tag_ids: tag = db.query(Tag).filter(Tag.id == tag_id).first() if tag is None: raise HTTPException(status_code=404, detail=f"Tag with id {tag_id} not found") db_todo.tags.append(tag) db.add(db_todo) db.commit() db.refresh(db_todo) return db_todo @app.post("/todos/with-tags/", response_model=TodoResponse) def create_todo_with_tags(todo: TodoCreateWithTags, db: Session = Depends(get_db)): # Extract and create todo todo_data = todo.model_dump(exclude={"tags"}) db_todo = Todo(**todo_data) # Process tags if todo.tags: for tag_name in todo.tags: # Find existing tag or create new one tag = db.query(Tag).filter(Tag.name == tag_name).first() if not tag: tag = Tag(name=tag_name) db.add(tag) db.flush() db_todo.tags.append(tag) db.add(db_todo) db.commit() db.refresh(db_todo) return db_todo @app.get("/todos/", response_model=List[TodoResponse]) def get_todos( skip: int = 0, limit: int = 100, title: Optional[str] = None, description: Optional[str] = None, completed: Optional[bool] = None, priority: Optional[str] = None, tag: Optional[str] = None, due_before: Optional[datetime] = None, due_after: Optional[datetime] = None, overdue: Optional[bool] = None, db: Session = Depends(get_db) ): query = db.query(Todo) # Apply filters if provided if title: query = query.filter(Todo.title.ilike(f"%{title}%")) if description: query = query.filter(Todo.description.ilike(f"%{description}%")) if completed is not None: query = query.filter(Todo.completed == completed) if priority: query = query.filter(Todo.priority == priority) if tag: query = query.join(Todo.tags).filter(Tag.name == tag) if due_before: query = query.filter(Todo.due_date <= due_before) if due_after: query = query.filter(Todo.due_date >= due_after) if overdue is not None and overdue: query = query.filter(and_( Todo.due_date < datetime.now(), Todo.completed == False )) return query.offset(skip).limit(limit).all() @app.get("/todos/search/", response_model=List[TodoResponse]) def search_todos( q: str = Query(..., min_length=1, description="Search query"), db: Session = Depends(get_db) ): """Search todos by title, description or tag name""" return db.query(Todo).filter( or_( Todo.title.ilike(f"%{q}%"), Todo.description.ilike(f"%{q}%"), Todo.tags.any(Tag.name.ilike(f"%{q}%")) ) ).all() @app.get("/todos/upcoming/", response_model=List[TodoResponse]) def get_upcoming_todos(days: int = 7, db: Session = Depends(get_db)): """Get todos with due dates in the next N days""" future_date = datetime.now() + timedelta(days=days) return db.query(Todo).filter( and_( Todo.due_date <= future_date, Todo.due_date >= datetime.now(), Todo.completed == False ) ).order_by(Todo.due_date).all() @app.get("/todos/reminders/", response_model=List[TodoResponse]) def get_upcoming_reminders(hours: int = 24, db: Session = Depends(get_db)): """Get todos with reminders in the next N hours""" now = datetime.now() future_time = now + timedelta(hours=hours) return db.query(Todo).filter( and_( Todo.remind_at <= future_time, Todo.remind_at >= now, Todo.completed == False ) ).order_by(Todo.remind_at).all() @app.put("/todos/{todo_id}/set-reminder", response_model=TodoResponse) def set_reminder(todo_id: int, remind_at: datetime, db: Session = Depends(get_db)): """Set or update a reminder for a specific todo""" db_todo = db.query(Todo).filter(Todo.id == todo_id).first() if db_todo is None: raise HTTPException(status_code=404, detail="Todo not found") db_todo.remind_at = remind_at db.commit() db.refresh(db_todo) return db_todo @app.get("/todos/{todo_id}", response_model=TodoResponse) def get_todo(todo_id: int, db: Session = Depends(get_db)): db_todo = db.query(Todo).filter(Todo.id == todo_id).first() if db_todo is None: raise HTTPException(status_code=404, detail="Todo not found") return db_todo @app.put("/todos/{todo_id}", response_model=TodoResponse) def update_todo(todo_id: int, todo: TodoUpdate, db: Session = Depends(get_db)): db_todo = db.query(Todo).filter(Todo.id == todo_id).first() if db_todo is None: raise HTTPException(status_code=404, detail="Todo not found") # Update todo attributes todo_data = todo.model_dump(exclude={"tag_ids"}, exclude_unset=True) for key, value in todo_data.items(): setattr(db_todo, key, value) # Update tags if provided if todo.tag_ids is not None: # Clear existing tags db_todo.tags = [] # Add new tags for tag_id in todo.tag_ids: tag = db.query(Tag).filter(Tag.id == tag_id).first() if tag is None: raise HTTPException(status_code=404, detail=f"Tag with id {tag_id} not found") db_todo.tags.append(tag) db.commit() db.refresh(db_todo) return db_todo @app.delete("/todos/{todo_id}", response_model=TodoResponse) def delete_todo(todo_id: int, db: Session = Depends(get_db)): db_todo = db.query(Todo).filter(Todo.id == todo_id).first() if db_todo is None: raise HTTPException(status_code=404, detail="Todo not found") db.delete(db_todo) db.commit() return db_todo # Subtasks endpoints @app.post("/todos/{todo_id}/subtasks/", response_model=SubtaskResponse) def create_subtask(todo_id: int, subtask: SubtaskCreate, db: Session = Depends(get_db)): """Create a new subtask for a specific todo""" db_todo = db.query(Todo).filter(Todo.id == todo_id).first() if db_todo is None: raise HTTPException(status_code=404, detail="Todo not found") db_subtask = Subtask(**subtask.model_dump(), todo_id=todo_id) db.add(db_subtask) db.commit() db.refresh(db_subtask) return db_subtask @app.get("/todos/{todo_id}/subtasks/", response_model=List[SubtaskResponse]) def get_subtasks(todo_id: int, skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): """Get all subtasks for a todo""" db_todo = db.query(Todo).filter(Todo.id == todo_id).first() if db_todo is None: raise HTTPException(status_code=404, detail="Todo not found") return db.query(Subtask).filter(Subtask.todo_id == todo_id).offset(skip).limit(limit).all() @app.get("/subtasks/{subtask_id}", response_model=SubtaskResponse) def get_subtask(subtask_id: int, db: Session = Depends(get_db)): """Get a specific subtask by ID""" db_subtask = db.query(Subtask).filter(Subtask.id == subtask_id).first() if db_subtask is None: raise HTTPException(status_code=404, detail="Subtask not found") return db_subtask @app.put("/subtasks/{subtask_id}", response_model=SubtaskResponse) def update_subtask(subtask_id: int, subtask: SubtaskUpdate, db: Session = Depends(get_db)): """Update a specific subtask""" db_subtask = db.query(Subtask).filter(Subtask.id == subtask_id).first() if db_subtask is None: raise HTTPException(status_code=404, detail="Subtask not found") subtask_data = subtask.model_dump(exclude_unset=True) for key, value in subtask_data.items(): setattr(db_subtask, key, value) db.commit() db.refresh(db_subtask) return db_subtask @app.delete("/subtasks/{subtask_id}", response_model=SubtaskResponse) def delete_subtask(subtask_id: int, db: Session = Depends(get_db)): """Delete a specific subtask""" db_subtask = db.query(Subtask).filter(Subtask.id == subtask_id).first() if db_subtask is None: raise HTTPException(status_code=404, detail="Subtask not found") db.delete(db_subtask) db.commit() return db_subtask @app.put("/todos/{todo_id}/complete-all-subtasks", response_model=TodoResponse) def complete_all_subtasks(todo_id: int, db: Session = Depends(get_db)): """Mark all subtasks of a todo as completed""" db_todo = db.query(Todo).filter(Todo.id == todo_id).first() if db_todo is None: raise HTTPException(status_code=404, detail="Todo not found") db.query(Subtask).filter(Subtask.todo_id == todo_id).update({"completed": True}) db.commit() db.refresh(db_todo) return db_todo # TodoBoard endpoints @app.post("/boards/", response_model=TodoBoardResponse) def create_board(board: TodoBoardCreate, db: Session = Depends(get_db)): """Create a new todo board""" db_board = TodoBoard(**board.model_dump()) db.add(db_board) db.commit() db.refresh(db_board) return db_board @app.get("/boards/", response_model=List[TodoBoardResponse]) def get_boards(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): """Get all todo boards""" return db.query(TodoBoard).offset(skip).limit(limit).all() @app.get("/boards/{board_id}", response_model=TodoBoardDetailResponse) def get_board(board_id: int, db: Session = Depends(get_db)): """Get a specific todo board with all its todos""" db_board = db.query(TodoBoard).filter(TodoBoard.id == board_id).first() if db_board is None: raise HTTPException(status_code=404, detail="Board not found") return db_board @app.put("/boards/{board_id}", response_model=TodoBoardResponse) def update_board(board_id: int, board: TodoBoardUpdate, db: Session = Depends(get_db)): """Update a specific todo board""" db_board = db.query(TodoBoard).filter(TodoBoard.id == board_id).first() if db_board is None: raise HTTPException(status_code=404, detail="Board not found") board_data = board.model_dump(exclude_unset=True) for key, value in board_data.items(): setattr(db_board, key, value) db.commit() db.refresh(db_board) return db_board @app.delete("/boards/{board_id}", response_model=TodoBoardResponse) def delete_board(board_id: int, db: Session = Depends(get_db)): """Delete a specific todo board""" db_board = db.query(TodoBoard).filter(TodoBoard.id == board_id).first() if db_board is None: raise HTTPException(status_code=404, detail="Board not found") db.delete(db_board) db.commit() return db_board @app.post("/boards/{board_id}/todos/", response_model=TodoResponse) def create_todo_in_board(board_id: int, todo: TodoCreate, db: Session = Depends(get_db)): """Create a new todo in a specific board""" db_board = db.query(TodoBoard).filter(TodoBoard.id == board_id).first() if db_board is None: raise HTTPException(status_code=404, detail="Board not found") todo_data = todo.model_dump(exclude={"tag_ids", "board_id"}) db_todo = Todo(**todo_data, board_id=board_id) # Add tags if provided if todo.tag_ids: for tag_id in todo.tag_ids: tag = db.query(Tag).filter(Tag.id == tag_id).first() if tag is None: raise HTTPException(status_code=404, detail=f"Tag with id {tag_id} not found") db_todo.tags.append(tag) db.add(db_todo) db.commit() db.refresh(db_todo) return db_todo @app.get("/boards/{board_id}/todos/", response_model=List[TodoResponse]) def get_todos_by_board( board_id: int, status: Optional[BoardStatus] = None, skip: int = 0, limit: int = 100, db: Session = Depends(get_db) ): """Get all todos in a specific board with optional status filter""" db_board = db.query(TodoBoard).filter(TodoBoard.id == board_id).first() if db_board is None: raise HTTPException(status_code=404, detail="Board not found") query = db.query(Todo).filter(Todo.board_id == board_id) if status: query = query.filter(Todo.status == status) return query.offset(skip).limit(limit).all() @app.put("/todos/{todo_id}/move", response_model=TodoResponse) def move_todo_to_board( todo_id: int, board_id: Optional[int] = None, status: Optional[BoardStatus] = None, db: Session = Depends(get_db) ): """Move a todo to a different board and/or change its status""" db_todo = db.query(Todo).filter(Todo.id == todo_id).first() if db_todo is None: raise HTTPException(status_code=404, detail="Todo not found") if board_id is not None: # Verify board exists db_board = db.query(TodoBoard).filter(TodoBoard.id == board_id).first() if db_board is None: raise HTTPException(status_code=404, detail="Board not found") db_todo.board_id = board_id if status is not None: db_todo.status = status db.commit() db.refresh(db_todo) return db_todo if __name__ == "__main__": uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)