from typing import List, Optional from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from app.core.database import get_db from app.models.task import Task, TaskStatus from app.schemas.task import Task as TaskSchema from app.schemas.task import TaskCreate, TaskUpdate router = APIRouter() @router.get("/", response_model=List[TaskSchema]) def read_tasks( skip: int = 0, limit: int = 100, status: Optional[TaskStatus] = None, db: Session = Depends(get_db) ): """ Get a list of tasks with optional filtering by status. """ query = db.query(Task) # Filter by status if provided if status: query = query.filter(Task.status == status) # Apply pagination tasks = query.order_by(Task.created_at.desc()).offset(skip).limit(limit).all() return tasks @router.post("/", response_model=TaskSchema, status_code=status.HTTP_201_CREATED) def create_task( task_in: TaskCreate, db: Session = Depends(get_db) ): """ Create a new task. """ db_task = Task( title=task_in.title, description=task_in.description, due_date=task_in.due_date, status=task_in.status, priority=task_in.priority ) db.add(db_task) db.commit() db.refresh(db_task) return db_task @router.get("/{task_id}", response_model=TaskSchema) def read_task( task_id: str, db: Session = Depends(get_db) ): """ Get a task by ID. """ db_task = db.query(Task).filter(Task.id == task_id).first() if db_task is None: raise HTTPException(status_code=404, detail="Task not found") return db_task @router.put("/{task_id}", response_model=TaskSchema) def update_task( task_id: str, task_in: TaskUpdate, db: Session = Depends(get_db) ): """ Update a task. """ db_task = db.query(Task).filter(Task.id == task_id).first() if db_task is None: raise HTTPException(status_code=404, detail="Task not found") update_data = task_in.dict(exclude_unset=True) # If status is being set to completed and there's no completed_at date yet if (update_data.get("status") == TaskStatus.COMPLETED and not db_task.completed_at): db_task.completed_at = datetime.utcnow() # If status is being changed from completed to something else if (db_task.status == TaskStatus.COMPLETED and update_data.get("status") and update_data.get("status") != TaskStatus.COMPLETED): db_task.completed_at = None for field, value in update_data.items(): setattr(db_task, field, value) db.add(db_task) db.commit() db.refresh(db_task) return db_task @router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None) def delete_task( task_id: str, db: Session = Depends(get_db) ): """ Delete a task. """ db_task = db.query(Task).filter(Task.id == task_id).first() if db_task is None: raise HTTPException(status_code=404, detail="Task not found") db.delete(db_task) db.commit() return None @router.post("/{task_id}/complete", response_model=TaskSchema) def complete_task( task_id: str, db: Session = Depends(get_db) ): """ Mark a task as completed. """ db_task = db.query(Task).filter(Task.id == task_id).first() if db_task is None: raise HTTPException(status_code=404, detail="Task not found") if db_task.status == TaskStatus.COMPLETED: raise HTTPException(status_code=400, detail="Task is already completed") db_task.mark_as_completed() db.add(db_task) db.commit() db.refresh(db_task) return db_task