
- Created comprehensive task management system with CRUD operations - Implemented SQLAlchemy models and database configuration - Added Alembic migrations for database schema management - Created FastAPI endpoints for task management with proper validation - Added health check endpoint and base URL information - Configured CORS middleware and OpenAPI documentation - Updated README with comprehensive API documentation - Code formatted and linted with Ruff
52 lines
1.9 KiB
Python
52 lines
1.9 KiB
Python
from typing import List, Optional
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlalchemy.orm import Session
|
|
from app.db.session import get_db
|
|
from app.crud import task_crud
|
|
from app.schemas.task import TaskCreate, TaskUpdate, TaskResponse
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("/tasks", response_model=TaskResponse, status_code=201)
|
|
def create_task(task: TaskCreate, db: Session = Depends(get_db)):
|
|
return task_crud.create_task(db=db, task=task)
|
|
|
|
|
|
@router.get("/tasks", response_model=List[TaskResponse])
|
|
def get_tasks(
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(100, ge=1, le=1000),
|
|
completed: Optional[bool] = Query(None),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
return task_crud.get_tasks(db=db, skip=skip, limit=limit, completed=completed)
|
|
|
|
|
|
@router.get("/tasks/{task_id}", response_model=TaskResponse)
|
|
def get_task(task_id: int, db: Session = Depends(get_db)):
|
|
db_task = task_crud.get_task(db=db, task_id=task_id)
|
|
if not db_task:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
return db_task
|
|
|
|
|
|
@router.put("/tasks/{task_id}", response_model=TaskResponse)
|
|
def update_task(task_id: int, task_update: TaskUpdate, db: Session = Depends(get_db)):
|
|
db_task = task_crud.update_task(db=db, task_id=task_id, task_update=task_update)
|
|
if not db_task:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
return db_task
|
|
|
|
|
|
@router.delete("/tasks/{task_id}", status_code=204)
|
|
def delete_task(task_id: int, db: Session = Depends(get_db)):
|
|
success = task_crud.delete_task(db=db, task_id=task_id)
|
|
if not success:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
|
|
|
|
@router.get("/tasks/priority/{priority}", response_model=List[TaskResponse])
|
|
def get_tasks_by_priority(priority: str, db: Session = Depends(get_db)):
|
|
return task_crud.get_tasks_by_priority(db=db, priority=priority)
|