2025-05-18 19:36:06 +00:00

136 lines
3.9 KiB
Python

from datetime import datetime
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.models.task import Task
from app.schemas.task import TaskCreate, TaskUpdate, TaskResponse, PriorityEnum
router = APIRouter()
@router.post("", response_model=TaskResponse, status_code=status.HTTP_201_CREATED)
def create_task(*, db: Session = Depends(get_db), task_in: TaskCreate):
"""Create a new task"""
db_task = Task(
title=task_in.title,
description=task_in.description,
priority=task_in.priority,
due_date=task_in.due_date
)
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
@router.get("", response_model=List[TaskResponse])
def read_tasks(
*,
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
is_completed: Optional[bool] = None,
priority: Optional[PriorityEnum] = None
):
"""
Retrieve tasks with optional filtering
"""
query = db.query(Task)
# Apply filters if provided
if is_completed is not None:
query = query.filter(Task.is_completed == is_completed)
if priority is not None:
query = query.filter(Task.priority == priority)
# Apply pagination
query = query.offset(skip).limit(limit)
# Order by creation date (newest first)
query = query.order_by(Task.created_at.desc())
return query.all()
@router.get("/{task_id}", response_model=TaskResponse)
def read_task(*, db: Session = Depends(get_db), task_id: int):
"""Get a specific task by ID"""
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
return task
@router.put("/{task_id}", response_model=TaskResponse)
def update_task(
*,
db: Session = Depends(get_db),
task_id: int,
task_in: TaskUpdate
):
"""Update a task"""
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
update_data = task_in.model_dump(exclude_unset=True)
# If task is being marked as completed, set the completed_at timestamp
if "is_completed" in update_data and update_data["is_completed"] and not task.is_completed:
update_data["completed_at"] = datetime.utcnow()
# Reset completed_at if task is being marked as not completed
if "is_completed" in update_data and not update_data["is_completed"] and task.is_completed:
update_data["completed_at"] = None
for field, value in update_data.items():
setattr(task, field, value)
db.add(task)
db.commit()
db.refresh(task)
return task
@router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_task(*, db: Session = Depends(get_db), task_id: int):
"""Delete a task"""
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
db.delete(task)
db.commit()
return None
@router.post("/{task_id}/complete", response_model=TaskResponse)
def mark_task_as_completed(*, db: Session = Depends(get_db), task_id: int):
"""Mark a task as completed"""
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
# Only update if the task is not already completed
if not task.is_completed:
task.mark_as_completed()
db.add(task)
db.commit()
db.refresh(task)
return task