
- Implemented CRUD operations for task management - Added task status tracking (pending, in_progress, completed) - Included priority levels (low, medium, high) - Set up SQLite database with Alembic migrations - Added filtering and pagination support - Configured CORS for all origins - Included health check endpoint - Added comprehensive API documentation - Formatted code with Ruff linting
73 lines
2.3 KiB
Python
73 lines
2.3 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlalchemy.orm import Session
|
|
from typing import List, Optional
|
|
from app.db.session import get_db
|
|
from app.models.task import Task, TaskStatus, TaskPriority
|
|
from app.schemas.task import TaskCreate, TaskUpdate, TaskResponse
|
|
|
|
router = APIRouter(prefix="/tasks", tags=["tasks"])
|
|
|
|
|
|
@router.post("/", response_model=TaskResponse, status_code=201)
|
|
def create_task(task: TaskCreate, db: Session = Depends(get_db)):
|
|
db_task = Task(**task.dict())
|
|
db.add(db_task)
|
|
db.commit()
|
|
db.refresh(db_task)
|
|
return db_task
|
|
|
|
|
|
@router.get("/", response_model=List[TaskResponse])
|
|
def get_tasks(
|
|
skip: int = Query(0, ge=0, description="Number of tasks to skip"),
|
|
limit: int = Query(100, ge=1, le=1000, description="Number of tasks to return"),
|
|
status: Optional[TaskStatus] = Query(None, description="Filter by task status"),
|
|
priority: Optional[TaskPriority] = Query(
|
|
None, description="Filter by task priority"
|
|
),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
query = db.query(Task)
|
|
|
|
if status:
|
|
query = query.filter(Task.status == status)
|
|
if priority:
|
|
query = query.filter(Task.priority == priority)
|
|
|
|
tasks = query.offset(skip).limit(limit).all()
|
|
return tasks
|
|
|
|
|
|
@router.get("/{task_id}", response_model=TaskResponse)
|
|
def get_task(task_id: int, db: Session = Depends(get_db)):
|
|
task = db.query(Task).filter(Task.id == task_id).first()
|
|
if not task:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
return task
|
|
|
|
|
|
@router.put("/{task_id}", response_model=TaskResponse)
|
|
def update_task(task_id: int, task_update: TaskUpdate, db: Session = Depends(get_db)):
|
|
task = db.query(Task).filter(Task.id == task_id).first()
|
|
if not task:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
|
|
update_data = task_update.dict(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(task, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(task)
|
|
return task
|
|
|
|
|
|
@router.delete("/{task_id}", status_code=204)
|
|
def delete_task(task_id: int, db: Session = Depends(get_db)):
|
|
task = db.query(Task).filter(Task.id == task_id).first()
|
|
if not task:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
|
|
db.delete(task)
|
|
db.commit()
|
|
return None
|