Automated Action 3be833dc43 Implement Task Manager API with FastAPI and SQLite
- Set up project structure
- Configure SQLAlchemy models and database connection
- Set up Alembic for database migrations
- Create Pydantic schemas for API data validation
- Implement task CRUD operations
- Add task filtering and pagination
- Include health check endpoint
- Update README with setup and usage instructions
2025-06-04 20:37:13 +00:00

132 lines
3.2 KiB
Python

from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from app.database.deps import get_db
from app.models.task import Task
from app.schemas.task import TaskCreate, TaskResponse, TaskUpdate
router = APIRouter()
@router.post(
"/",
response_model=TaskResponse,
status_code=status.HTTP_201_CREATED
)
def create_task(
task_in: TaskCreate,
db: Session = Depends(get_db)
):
"""Create a new task."""
db_task = Task(
title=task_in.title,
description=task_in.description,
status=task_in.status,
priority=task_in.priority,
due_date=task_in.due_date,
completed=task_in.completed
)
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
@router.get(
"/",
response_model=List[TaskResponse]
)
def list_tasks(
skip: int = 0,
limit: int = 100,
status: Optional[str] = Query(None, description="Filter tasks by status"),
db: Session = Depends(get_db)
):
"""List all tasks with optional filtering."""
query = db.query(Task)
# Apply filters if provided
if status:
query = query.filter(Task.status == status)
# Apply pagination
tasks = query.offset(skip).limit(limit).all()
return tasks
@router.get(
"/{task_id}",
response_model=TaskResponse
)
def get_task(
task_id: int,
db: Session = Depends(get_db)
):
"""Get a specific task by ID."""
task = db.query(Task).filter(Task.id == task_id).first()
if task is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task with ID {task_id} not found"
)
return task
@router.put(
"/{task_id}",
response_model=TaskResponse
)
def update_task(
task_id: int,
task_in: TaskUpdate,
db: Session = Depends(get_db)
):
"""Update a task."""
task = db.query(Task).filter(Task.id == task_id).first()
if task is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task with ID {task_id} not found"
)
# Update task attributes if provided in the request
if task_in.title is not None:
task.title = task_in.title
if task_in.description is not None:
task.description = task_in.description
if task_in.status is not None:
task.status = task_in.status
if task_in.priority is not None:
task.priority = task_in.priority
if task_in.due_date is not None:
task.due_date = task_in.due_date
if task_in.completed is not None:
task.completed = task_in.completed
db.add(task)
db.commit()
db.refresh(task)
return task
@router.delete(
"/{task_id}",
status_code=status.HTTP_204_NO_CONTENT,
response_model=None
)
def delete_task(
task_id: int,
db: Session = Depends(get_db)
):
"""Delete a task."""
task = db.query(Task).filter(Task.id == task_id).first()
if task is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task with ID {task_id} not found"
)
db.delete(task)
db.commit()
return None