Automated Action aeab17a8ea Implement Task Manager API with FastAPI and SQLite
- Set up project structure
- Created Task model with SQLAlchemy
- Implemented SQLite database connection
- Created Alembic migrations
- Added Task CRUD endpoints
- Added health endpoint
- Updated README with project details

generated with BackendIM... (backend.im)
2025-05-14 07:18:59 +00:00

116 lines
2.6 KiB
Python

from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from app import crud
from app.api.deps import get_db
from app.models.task import TaskStatus
from app.schemas.task import Task, TaskCreate, TaskUpdate
router = APIRouter()
@router.get("/", response_model=List[Task])
def read_tasks(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
status: Optional[TaskStatus] = None,
) -> Any:
"""
Retrieve tasks.
"""
if status:
tasks = crud.task.get_by_status(db, status=status)
else:
tasks = crud.task.get_multi(db, skip=skip, limit=limit)
return tasks
@router.post("/", response_model=Task, status_code=status.HTTP_201_CREATED)
def create_task(
*,
db: Session = Depends(get_db),
task_in: TaskCreate,
) -> Any:
"""
Create new task.
"""
task = crud.task.create(db, obj_in=task_in)
return task
@router.get("/{task_id}", response_model=Task)
def read_task(
*,
db: Session = Depends(get_db),
task_id: int,
) -> Any:
"""
Get task by ID.
"""
task = crud.task.get(db, id=task_id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found",
)
return task
@router.put("/{task_id}", response_model=Task)
def update_task(
*,
db: Session = Depends(get_db),
task_id: int,
task_in: TaskUpdate,
) -> Any:
"""
Update a task.
"""
task = crud.task.get(db, id=task_id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found",
)
task = crud.task.update(db, db_obj=task, obj_in=task_in)
return task
@router.delete("/{task_id}", response_model=Task)
def delete_task(
*,
db: Session = Depends(get_db),
task_id: int,
) -> Any:
"""
Delete a task.
"""
task = crud.task.get(db, id=task_id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found",
)
task = crud.task.remove(db, id=task_id)
return task
@router.post("/{task_id}/complete", response_model=Task)
def complete_task(
*,
db: Session = Depends(get_db),
task_id: int,
) -> Any:
"""
Mark a task as completed.
"""
task = crud.task.mark_completed(db, task_id=task_id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found",
)
return task