
Features: - User authentication with JWT tokens - Task and project CRUD operations - Task filtering and organization generated with BackendIM... (backend.im)
148 lines
4.0 KiB
Python
148 lines
4.0 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from typing import List, Optional
|
|
|
|
from app.database import get_db
|
|
from app.models.task import Task
|
|
from app.models.project import Project
|
|
from app.schemas.task import TaskCreate, Task as TaskSchema, TaskUpdate
|
|
from app.auth.dependencies import get_current_active_user
|
|
from app.models.user import User
|
|
|
|
router = APIRouter(
|
|
prefix="/tasks",
|
|
tags=["tasks"],
|
|
)
|
|
|
|
|
|
@router.post("/", response_model=TaskSchema, status_code=status.HTTP_201_CREATED)
|
|
def create_task(
|
|
task: TaskCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_active_user)
|
|
):
|
|
# Check if project exists and belongs to user if project_id is provided
|
|
if task.project_id:
|
|
project = db.query(Project).filter(
|
|
Project.id == task.project_id,
|
|
Project.owner_id == current_user.id
|
|
).first()
|
|
if not project:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail="Project not found or you don't have access to it"
|
|
)
|
|
|
|
# Create new task
|
|
db_task = Task(
|
|
**task.dict(),
|
|
owner_id=current_user.id
|
|
)
|
|
|
|
db.add(db_task)
|
|
db.commit()
|
|
db.refresh(db_task)
|
|
|
|
return db_task
|
|
|
|
|
|
@router.get("/", response_model=List[TaskSchema])
|
|
def read_tasks(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
status: Optional[str] = None,
|
|
priority: Optional[str] = None,
|
|
project_id: Optional[int] = None,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_active_user)
|
|
):
|
|
query = db.query(Task).filter(Task.owner_id == current_user.id)
|
|
|
|
# Apply filters if provided
|
|
if status:
|
|
query = query.filter(Task.status == status)
|
|
if priority:
|
|
query = query.filter(Task.priority == priority)
|
|
if project_id:
|
|
query = query.filter(Task.project_id == project_id)
|
|
|
|
tasks = query.offset(skip).limit(limit).all()
|
|
return tasks
|
|
|
|
|
|
@router.get("/{task_id}", response_model=TaskSchema)
|
|
def read_task(
|
|
task_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_active_user)
|
|
):
|
|
task = db.query(Task).filter(
|
|
Task.id == task_id,
|
|
Task.owner_id == current_user.id
|
|
).first()
|
|
|
|
if task is None:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
|
|
return task
|
|
|
|
|
|
@router.put("/{task_id}", response_model=TaskSchema)
|
|
def update_task(
|
|
task_id: int,
|
|
task_update: TaskUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_active_user)
|
|
):
|
|
# Get the task
|
|
db_task = db.query(Task).filter(
|
|
Task.id == task_id,
|
|
Task.owner_id == current_user.id
|
|
).first()
|
|
|
|
if db_task is None:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
|
|
# Check if project exists and belongs to user if project_id is provided
|
|
if task_update.project_id is not None:
|
|
project = db.query(Project).filter(
|
|
Project.id == task_update.project_id,
|
|
Project.owner_id == current_user.id
|
|
).first()
|
|
if not project:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail="Project not found or you don't have access to it"
|
|
)
|
|
|
|
# Update task with non-None values from update schema
|
|
update_data = task_update.dict(exclude_unset=True)
|
|
for key, value in update_data.items():
|
|
setattr(db_task, key, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_task)
|
|
|
|
return db_task
|
|
|
|
|
|
@router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
def delete_task(
|
|
task_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_active_user)
|
|
):
|
|
# Get the task
|
|
db_task = db.query(Task).filter(
|
|
Task.id == task_id,
|
|
Task.owner_id == current_user.id
|
|
).first()
|
|
|
|
if db_task is None:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
|
|
# Delete the task
|
|
db.delete(db_task)
|
|
db.commit()
|
|
|
|
return |