
Added complete task management functionality including: - CRUD operations for tasks (create, read, update, delete) - Task model with status (pending/in_progress/completed) and priority (low/medium/high) - SQLite database with SQLAlchemy ORM - Alembic migrations for database schema - Pydantic schemas for request/response validation - FastAPI routers with proper error handling - Filtering and pagination support - Health check endpoint - CORS configuration - Comprehensive API documentation - Proper project structure following FastAPI best practices
69 lines
2.5 KiB
Python
69 lines
2.5 KiB
Python
from typing import List
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlalchemy.orm import Session
|
|
from app.db.session import get_db
|
|
from app.crud import task as crud_task
|
|
from app.schemas.task import Task, TaskCreate, TaskUpdate
|
|
from app.models.task import TaskStatus, TaskPriority
|
|
|
|
router = APIRouter(prefix="/tasks", tags=["tasks"])
|
|
|
|
|
|
@router.post("/", response_model=Task, status_code=201)
|
|
def create_task(task: TaskCreate, db: Session = Depends(get_db)):
|
|
return crud_task.create_task(db=db, task=task)
|
|
|
|
|
|
@router.get("/", response_model=List[Task])
|
|
def get_tasks(
|
|
skip: int = Query(0, ge=0, description="Number of tasks to skip"),
|
|
limit: int = Query(
|
|
100, ge=1, le=1000, description="Maximum number of tasks to return"
|
|
),
|
|
status: str = Query(None, description="Filter by task status"),
|
|
priority: str = Query(None, description="Filter by task priority"),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
if status:
|
|
try:
|
|
status_enum = TaskStatus(status)
|
|
return crud_task.get_tasks_by_status(
|
|
db=db, status=status_enum, skip=skip, limit=limit
|
|
)
|
|
except ValueError:
|
|
raise HTTPException(status_code=400, detail="Invalid status value")
|
|
|
|
if priority:
|
|
try:
|
|
priority_enum = TaskPriority(priority)
|
|
return crud_task.get_tasks_by_priority(
|
|
db=db, priority=priority_enum, skip=skip, limit=limit
|
|
)
|
|
except ValueError:
|
|
raise HTTPException(status_code=400, detail="Invalid priority value")
|
|
|
|
return crud_task.get_tasks(db=db, skip=skip, limit=limit)
|
|
|
|
|
|
@router.get("/{task_id}", response_model=Task)
|
|
def get_task(task_id: int, db: Session = Depends(get_db)):
|
|
db_task = crud_task.get_task(db=db, task_id=task_id)
|
|
if db_task is None:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
return db_task
|
|
|
|
|
|
@router.put("/{task_id}", response_model=Task)
|
|
def update_task(task_id: int, task_update: TaskUpdate, db: Session = Depends(get_db)):
|
|
db_task = crud_task.update_task(db=db, task_id=task_id, task_update=task_update)
|
|
if db_task is None:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
return db_task
|
|
|
|
|
|
@router.delete("/{task_id}", status_code=204)
|
|
def delete_task(task_id: int, db: Session = Depends(get_db)):
|
|
deleted = crud_task.delete_task(db=db, task_id=task_id)
|
|
if not deleted:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|