Automated Action 53bc4e0199 Implement Task Manager API
- Set up FastAPI application structure
- Create Task model with SQLAlchemy
- Implement CRUD operations for tasks
- Add API endpoints for tasks with filtering options
- Configure Alembic for database migrations
- Add health check endpoint
- Configure Ruff for linting
- Add comprehensive documentation in README.md
2025-06-10 14:48:01 +00:00

103 lines
2.9 KiB
Python

"""Task endpoints."""
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app import crud, schemas
from app.db.session import get_db
router = APIRouter()
@router.get("/", response_model=List[schemas.TaskResponse])
def read_tasks(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
title: Optional[str] = None,
priority: Optional[int] = None,
completed: Optional[bool] = None,
) -> Any:
"""
Retrieve tasks.
- **skip**: Number of tasks to skip (pagination)
- **limit**: Maximum number of tasks to return (pagination)
- **title**: Filter by title (partial match)
- **priority**: Filter by priority (1=Low, 2=Medium, 3=High)
- **completed**: Filter by completion status
"""
if title is not None:
return crud.task.get_by_title(db, title=title)
if priority is not None:
return crud.task.get_by_priority(db, priority=priority, skip=skip, limit=limit)
if completed is not None:
if completed:
return crud.task.get_completed(db, skip=skip, limit=limit)
else:
# Get incomplete tasks
return db.query(crud.task.model).filter(
crud.task.model.completed == False # noqa: E712
).offset(skip).limit(limit).all()
return crud.task.get_multi(db, skip=skip, limit=limit)
@router.post("/", response_model=schemas.TaskResponse, status_code=status.HTTP_201_CREATED)
def create_task(
*,
db: Session = Depends(get_db),
task_in: schemas.TaskCreate,
) -> Any:
"""Create new task."""
return crud.task.create(db=db, obj_in=task_in)
@router.get("/{task_id}", response_model=schemas.TaskResponse)
def read_task(
*,
db: Session = Depends(get_db),
task_id: int,
) -> Any:
"""Get task by ID."""
task = crud.task.get(db=db, id=task_id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found",
)
return task
@router.put("/{task_id}", response_model=schemas.TaskResponse)
def update_task(
*,
db: Session = Depends(get_db),
task_id: int,
task_in: schemas.TaskUpdate,
) -> Any:
"""Update a task."""
task = crud.task.get(db=db, id=task_id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found",
)
return crud.task.update(db=db, db_obj=task, obj_in=task_in)
@router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_task(
*,
db: Session = Depends(get_db),
task_id: int,
) -> None:
"""Delete a task."""
task = crud.task.get(db=db, id=task_id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found",
)
crud.task.remove(db=db, id=task_id)
return None