from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from typing import List, Optional from app.database import get_db from app.models.task import Task from app.models.project import Project from app.schemas.task import TaskCreate, Task as TaskSchema, TaskUpdate from app.auth.dependencies import get_current_active_user from app.models.user import User router = APIRouter( prefix="/tasks", tags=["tasks"], ) @router.post("/", response_model=TaskSchema, status_code=status.HTTP_201_CREATED) def create_task( task: TaskCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_active_user) ): # Check if project exists and belongs to user if project_id is provided if task.project_id: project = db.query(Project).filter( Project.id == task.project_id, Project.owner_id == current_user.id ).first() if not project: raise HTTPException( status_code=404, detail="Project not found or you don't have access to it" ) # Create new task db_task = Task( **task.dict(), owner_id=current_user.id ) db.add(db_task) db.commit() db.refresh(db_task) return db_task @router.get("/", response_model=List[TaskSchema]) def read_tasks( skip: int = 0, limit: int = 100, status: Optional[str] = None, priority: Optional[str] = None, project_id: Optional[int] = None, db: Session = Depends(get_db), current_user: User = Depends(get_current_active_user) ): query = db.query(Task).filter(Task.owner_id == current_user.id) # Apply filters if provided if status: query = query.filter(Task.status == status) if priority: query = query.filter(Task.priority == priority) if project_id: query = query.filter(Task.project_id == project_id) tasks = query.offset(skip).limit(limit).all() return tasks @router.get("/{task_id}", response_model=TaskSchema) def read_task( task_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_active_user) ): task = db.query(Task).filter( Task.id == task_id, Task.owner_id == current_user.id ).first() if task is None: raise HTTPException(status_code=404, detail="Task not found") return task @router.put("/{task_id}", response_model=TaskSchema) def update_task( task_id: int, task_update: TaskUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_active_user) ): # Get the task db_task = db.query(Task).filter( Task.id == task_id, Task.owner_id == current_user.id ).first() if db_task is None: raise HTTPException(status_code=404, detail="Task not found") # Check if project exists and belongs to user if project_id is provided if task_update.project_id is not None: project = db.query(Project).filter( Project.id == task_update.project_id, Project.owner_id == current_user.id ).first() if not project: raise HTTPException( status_code=404, detail="Project not found or you don't have access to it" ) # Update task with non-None values from update schema update_data = task_update.dict(exclude_unset=True) for key, value in update_data.items(): setattr(db_task, key, value) db.commit() db.refresh(db_task) return db_task @router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_task( task_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_active_user) ): # Get the task db_task = db.query(Task).filter( Task.id == task_id, Task.owner_id == current_user.id ).first() if db_task is None: raise HTTPException(status_code=404, detail="Task not found") # Delete the task db.delete(db_task) db.commit() return