2025-06-06 13:28:18 +00:00

90 lines
2.1 KiB
Python

from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app import crud, schemas
from app.db.session import get_db
router = APIRouter()
@router.get("/", response_model=schemas.TaskList)
def read_tasks(
skip: int = 0, limit: int = 100, db: Session = Depends(get_db)
) -> Any:
"""
Retrieve tasks.
"""
tasks = crud.get_tasks(db=db, skip=skip, limit=limit)
return {"tasks": tasks, "count": len(tasks)}
@router.post("/", response_model=schemas.Task, status_code=status.HTTP_201_CREATED)
def create_task(
*,
db: Session = Depends(get_db),
task_in: schemas.TaskCreate,
) -> Any:
"""
Create new task.
"""
task = crud.create_task(db=db, task=task_in)
return task
@router.get("/{task_id}", response_model=schemas.Task)
def read_task(
*,
task_id: int,
db: Session = Depends(get_db),
) -> Any:
"""
Get task by ID.
"""
task = crud.get_task(db=db, task_id=task_id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found",
)
return task
@router.put("/{task_id}", response_model=schemas.Task)
def update_task(
*,
task_id: int,
task_in: schemas.TaskUpdate,
db: Session = Depends(get_db),
) -> Any:
"""
Update a task.
"""
task = crud.get_task(db=db, task_id=task_id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found",
)
task = crud.update_task(db=db, task_id=task_id, task=task_in)
return task
@router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_task(
*,
task_id: int,
db: Session = Depends(get_db),
) -> None:
"""
Delete a task.
"""
task = crud.get_task(db=db, task_id=task_id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found",
)
crud.delete_task(db=db, task_id=task_id)
return None