
- Set up project structure and dependencies - Create database models and schemas for tasks - Implement CRUD operations for tasks - Add API endpoints for task management - Create Alembic migrations for the database - Add health check endpoint - Implement error handling - Add documentation in README.md
115 lines
2.8 KiB
Python
115 lines
2.8 KiB
Python
from typing import List, Optional
|
|
|
|
from fastapi import APIRouter, Depends, status
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.database import get_db
|
|
from app.core.exceptions import InvalidTaskData, TaskNotFound
|
|
from app.crud.task import task
|
|
from app.models.task import TaskStatus
|
|
from app.schemas.task import Task, TaskCreate, TaskUpdate
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/", response_model=List[Task])
|
|
def read_tasks(
|
|
db: Session = Depends(get_db),
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
status: Optional[TaskStatus] = None,
|
|
):
|
|
"""
|
|
Retrieve tasks.
|
|
|
|
- **skip**: Number of tasks to skip
|
|
- **limit**: Maximum number of tasks to return
|
|
- **status**: Filter tasks by status
|
|
"""
|
|
if status:
|
|
return task.get_multi_by_status(db, status=status, skip=skip, limit=limit)
|
|
return task.get_multi(db, skip=skip, limit=limit)
|
|
|
|
|
|
@router.post("/", response_model=Task, status_code=status.HTTP_201_CREATED)
|
|
def create_task(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
task_in: TaskCreate,
|
|
):
|
|
"""
|
|
Create new task.
|
|
|
|
- **title**: Title of the task (required)
|
|
- **description**: Description of the task (optional)
|
|
- **status**: Status of the task (default: todo)
|
|
- **priority**: Priority of the task (default: medium)
|
|
- **due_date**: Due date of the task (optional)
|
|
"""
|
|
try:
|
|
return task.create(db=db, obj_in=task_in)
|
|
except Exception as e:
|
|
raise InvalidTaskData(detail=f"Could not create task: {str(e)}") from e
|
|
|
|
|
|
@router.get("/{id}", response_model=Task)
|
|
def read_task(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
id: int,
|
|
):
|
|
"""
|
|
Get task by ID.
|
|
|
|
- **id**: ID of the task
|
|
"""
|
|
task_obj = task.get(db=db, id=id)
|
|
if not task_obj:
|
|
raise TaskNotFound(task_id=id)
|
|
return task_obj
|
|
|
|
|
|
@router.put("/{id}", response_model=Task)
|
|
def update_task(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
id: int,
|
|
task_in: TaskUpdate,
|
|
):
|
|
"""
|
|
Update a task.
|
|
|
|
- **id**: ID of the task
|
|
- **title**: New title (optional)
|
|
- **description**: New description (optional)
|
|
- **status**: New status (optional)
|
|
- **priority**: New priority (optional)
|
|
- **due_date**: New due date (optional)
|
|
"""
|
|
task_obj = task.get(db=db, id=id)
|
|
if not task_obj:
|
|
raise TaskNotFound(task_id=id)
|
|
|
|
try:
|
|
return task.update(db=db, db_obj=task_obj, obj_in=task_in)
|
|
except Exception as e:
|
|
raise InvalidTaskData(detail=f"Could not update task: {str(e)}") from e
|
|
|
|
|
|
@router.delete("/{id}", response_model=Task)
|
|
def delete_task(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
id: int,
|
|
):
|
|
"""
|
|
Delete a task.
|
|
|
|
- **id**: ID of the task
|
|
"""
|
|
task_obj = task.get(db=db, id=id)
|
|
if not task_obj:
|
|
raise TaskNotFound(task_id=id)
|
|
|
|
return task.remove(db=db, id=id)
|