Automated Action 29d75cdf08 Implement Task Management API with FastAPI and SQLite
- Set up project structure and dependencies
- Create database models and schemas for tasks
- Implement CRUD operations for tasks
- Add API endpoints for task management
- Create Alembic migrations for the database
- Add health check endpoint
- Implement error handling
- Add documentation in README.md
2025-05-17 11:20:00 +00:00

115 lines
2.8 KiB
Python

from typing import List, Optional
from fastapi import APIRouter, Depends, status
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.core.exceptions import InvalidTaskData, TaskNotFound
from app.crud.task import task
from app.models.task import TaskStatus
from app.schemas.task import Task, TaskCreate, TaskUpdate
router = APIRouter()
@router.get("/", response_model=List[Task])
def read_tasks(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
status: Optional[TaskStatus] = None,
):
"""
Retrieve tasks.
- **skip**: Number of tasks to skip
- **limit**: Maximum number of tasks to return
- **status**: Filter tasks by status
"""
if status:
return task.get_multi_by_status(db, status=status, skip=skip, limit=limit)
return task.get_multi(db, skip=skip, limit=limit)
@router.post("/", response_model=Task, status_code=status.HTTP_201_CREATED)
def create_task(
*,
db: Session = Depends(get_db),
task_in: TaskCreate,
):
"""
Create new task.
- **title**: Title of the task (required)
- **description**: Description of the task (optional)
- **status**: Status of the task (default: todo)
- **priority**: Priority of the task (default: medium)
- **due_date**: Due date of the task (optional)
"""
try:
return task.create(db=db, obj_in=task_in)
except Exception as e:
raise InvalidTaskData(detail=f"Could not create task: {str(e)}") from e
@router.get("/{id}", response_model=Task)
def read_task(
*,
db: Session = Depends(get_db),
id: int,
):
"""
Get task by ID.
- **id**: ID of the task
"""
task_obj = task.get(db=db, id=id)
if not task_obj:
raise TaskNotFound(task_id=id)
return task_obj
@router.put("/{id}", response_model=Task)
def update_task(
*,
db: Session = Depends(get_db),
id: int,
task_in: TaskUpdate,
):
"""
Update a task.
- **id**: ID of the task
- **title**: New title (optional)
- **description**: New description (optional)
- **status**: New status (optional)
- **priority**: New priority (optional)
- **due_date**: New due date (optional)
"""
task_obj = task.get(db=db, id=id)
if not task_obj:
raise TaskNotFound(task_id=id)
try:
return task.update(db=db, db_obj=task_obj, obj_in=task_in)
except Exception as e:
raise InvalidTaskData(detail=f"Could not update task: {str(e)}") from e
@router.delete("/{id}", response_model=Task)
def delete_task(
*,
db: Session = Depends(get_db),
id: int,
):
"""
Delete a task.
- **id**: ID of the task
"""
task_obj = task.get(db=db, id=id)
if not task_obj:
raise TaskNotFound(task_id=id)
return task.remove(db=db, id=id)