Automated Action c1765ee96b Implement Task Manager API
- Set up project structure with FastAPI
- Configure SQLite database with SQLAlchemy
- Create Task model with Alembic migrations
- Implement CRUD API endpoints for tasks
- Add health check and CORS configuration
- Update documentation
2025-06-04 08:04:34 +00:00

105 lines
3.0 KiB
Python

from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.models.task import Task as TaskModel
from app.schemas.task import Task, TaskCreate, TaskUpdate
router = APIRouter()
@router.get("/", response_model=List[Task])
def read_tasks(
skip: int = 0,
limit: int = 100,
completed: Optional[bool] = None,
priority: Optional[int] = None,
db: Session = Depends(get_db),
):
"""
Retrieve tasks with optional filtering.
"""
query = db.query(TaskModel)
# Apply filters if provided
if completed is not None:
query = query.filter(TaskModel.is_completed == completed)
if priority is not None:
query = query.filter(TaskModel.priority == priority)
# Apply pagination
tasks = query.order_by(TaskModel.created_at.desc()).offset(skip).limit(limit).all()
return tasks
@router.post("/", response_model=Task, status_code=status.HTTP_201_CREATED)
def create_task(task: TaskCreate, db: Session = Depends(get_db)):
"""
Create a new task.
"""
db_task = TaskModel(**task.model_dump())
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
@router.get("/{task_id}", response_model=Task)
def read_task(task_id: int, db: Session = Depends(get_db)):
"""
Get a specific task by ID.
"""
db_task = db.query(TaskModel).filter(TaskModel.id == task_id).first()
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
return db_task
@router.put("/{task_id}", response_model=Task)
def update_task(task_id: int, task: TaskUpdate, db: Session = Depends(get_db)):
"""
Update a task.
"""
db_task = db.query(TaskModel).filter(TaskModel.id == task_id).first()
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
# Update only the fields that are set
update_data = task.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(db_task, key, value)
db.commit()
db.refresh(db_task)
return db_task
@router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_task(task_id: int, db: Session = Depends(get_db)):
"""
Delete a task.
"""
db_task = db.query(TaskModel).filter(TaskModel.id == task_id).first()
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
db.delete(db_task)
db.commit()
return None
@router.patch("/{task_id}/complete", response_model=Task)
def complete_task(task_id: int, db: Session = Depends(get_db)):
"""
Mark a task as completed.
"""
db_task = db.query(TaskModel).filter(TaskModel.id == task_id).first()
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
db_task.is_completed = True
db.commit()
db.refresh(db_task)
return db_task