from typing import Any, List, Optional from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from app import crud, models, schemas from app.api.api_v1 import deps from app.models.task import TaskStatus router = APIRouter() @router.get("/", response_model=List[schemas.Task]) async def read_tasks( db: Session = Depends(deps.get_db), skip: int = 0, limit: int = 100, status: Optional[TaskStatus] = None, current_user: models.User = Depends(deps.get_current_active_user), ) -> Any: """ Retrieve tasks. """ if status: tasks = crud.task.get_by_status( db, owner_id=current_user.id, status=status, skip=skip, limit=limit ) else: tasks = crud.task.get_multi_by_owner( db, owner_id=current_user.id, skip=skip, limit=limit ) return tasks @router.post("/", response_model=schemas.Task) async def create_task( *, db: Session = Depends(deps.get_db), task_in: schemas.TaskCreate, current_user: models.User = Depends(deps.get_current_active_user), ) -> Any: """ Create new task. """ task = crud.task.create_with_owner(db=db, obj_in=task_in, owner_id=current_user.id) return task @router.get("/{task_id}", response_model=schemas.Task) async def read_task( *, db: Session = Depends(deps.get_db), task_id: str, current_user: models.User = Depends(deps.get_current_active_user), ) -> Any: """ Get task by ID. """ task = crud.task.get(db=db, id=task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") if task.owner_id != current_user.id: raise HTTPException(status_code=403, detail="Not enough permissions") return task @router.patch("/{task_id}", response_model=schemas.Task) async def update_task( *, db: Session = Depends(deps.get_db), task_id: str, task_in: schemas.TaskUpdate, current_user: models.User = Depends(deps.get_current_active_user), ) -> Any: """ Update a task. """ task = crud.task.get(db=db, id=task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") if task.owner_id != current_user.id: raise HTTPException(status_code=403, detail="Not enough permissions") # Auto-update completed_at when status is changed to DONE if task_in.status == TaskStatus.DONE and task.status != TaskStatus.DONE: from datetime import datetime task_in.completed_at = datetime.utcnow() elif task_in.status is not None and task_in.status != TaskStatus.DONE and task.status == TaskStatus.DONE: task_in.completed_at = None task = crud.task.update(db=db, db_obj=task, obj_in=task_in) return task @router.delete("/{task_id}", status_code=204, response_model=None) async def delete_task( *, db: Session = Depends(deps.get_db), task_id: str, current_user: models.User = Depends(deps.get_current_active_user), ) -> Any: """ Delete a task. """ task = crud.task.get(db=db, id=task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") if task.owner_id != current_user.id: raise HTTPException(status_code=403, detail="Not enough permissions") crud.task.remove(db=db, id=task_id) return None