2025-05-19 22:32:24 +00:00

135 lines
3.7 KiB
Python

from typing import List, Optional
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.models.task import Task, TaskStatus
from app.schemas.task import Task as TaskSchema
from app.schemas.task import TaskCreate, TaskUpdate
router = APIRouter()
@router.get("/", response_model=List[TaskSchema])
def read_tasks(
skip: int = 0,
limit: int = 100,
status: Optional[TaskStatus] = None,
db: Session = Depends(get_db)
):
"""
Get a list of tasks with optional filtering by status.
"""
query = db.query(Task)
# Filter by status if provided
if status:
query = query.filter(Task.status == status)
# Apply pagination
tasks = query.order_by(Task.created_at.desc()).offset(skip).limit(limit).all()
return tasks
@router.post("/", response_model=TaskSchema, status_code=status.HTTP_201_CREATED)
def create_task(
task_in: TaskCreate,
db: Session = Depends(get_db)
):
"""
Create a new task.
"""
db_task = Task(
title=task_in.title,
description=task_in.description,
due_date=task_in.due_date,
status=task_in.status,
priority=task_in.priority
)
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
@router.get("/{task_id}", response_model=TaskSchema)
def read_task(
task_id: str,
db: Session = Depends(get_db)
):
"""
Get a task by ID.
"""
db_task = db.query(Task).filter(Task.id == task_id).first()
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
return db_task
@router.put("/{task_id}", response_model=TaskSchema)
def update_task(
task_id: str,
task_in: TaskUpdate,
db: Session = Depends(get_db)
):
"""
Update a task.
"""
db_task = db.query(Task).filter(Task.id == task_id).first()
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
update_data = task_in.dict(exclude_unset=True)
# If status is being set to completed and there's no completed_at date yet
if (update_data.get("status") == TaskStatus.COMPLETED and
not db_task.completed_at):
db_task.completed_at = datetime.utcnow()
# If status is being changed from completed to something else
if (db_task.status == TaskStatus.COMPLETED and
update_data.get("status") and
update_data.get("status") != TaskStatus.COMPLETED):
db_task.completed_at = None
for field, value in update_data.items():
setattr(db_task, field, value)
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
@router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_task(
task_id: str,
db: Session = Depends(get_db)
):
"""
Delete a task.
"""
db_task = db.query(Task).filter(Task.id == task_id).first()
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
db.delete(db_task)
db.commit()
return None
@router.post("/{task_id}/complete", response_model=TaskSchema)
def complete_task(
task_id: str,
db: Session = Depends(get_db)
):
"""
Mark a task as completed.
"""
db_task = db.query(Task).filter(Task.id == task_id).first()
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
if db_task.status == TaskStatus.COMPLETED:
raise HTTPException(status_code=400, detail="Task is already completed")
db_task.mark_as_completed()
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task