Automated Action e7225e6443 Add Task Management System with FastAPI and SQLite
Features:
- User authentication with JWT tokens
- Task and project CRUD operations
- Task filtering and organization

generated with BackendIM... (backend.im)
2025-05-12 10:14:26 +00:00

148 lines
4.0 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List, Optional
from app.database import get_db
from app.models.task import Task
from app.models.project import Project
from app.schemas.task import TaskCreate, Task as TaskSchema, TaskUpdate
from app.auth.dependencies import get_current_active_user
from app.models.user import User
router = APIRouter(
prefix="/tasks",
tags=["tasks"],
)
@router.post("/", response_model=TaskSchema, status_code=status.HTTP_201_CREATED)
def create_task(
task: TaskCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
# Check if project exists and belongs to user if project_id is provided
if task.project_id:
project = db.query(Project).filter(
Project.id == task.project_id,
Project.owner_id == current_user.id
).first()
if not project:
raise HTTPException(
status_code=404,
detail="Project not found or you don't have access to it"
)
# Create new task
db_task = Task(
**task.dict(),
owner_id=current_user.id
)
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
@router.get("/", response_model=List[TaskSchema])
def read_tasks(
skip: int = 0,
limit: int = 100,
status: Optional[str] = None,
priority: Optional[str] = None,
project_id: Optional[int] = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
query = db.query(Task).filter(Task.owner_id == current_user.id)
# Apply filters if provided
if status:
query = query.filter(Task.status == status)
if priority:
query = query.filter(Task.priority == priority)
if project_id:
query = query.filter(Task.project_id == project_id)
tasks = query.offset(skip).limit(limit).all()
return tasks
@router.get("/{task_id}", response_model=TaskSchema)
def read_task(
task_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
task = db.query(Task).filter(
Task.id == task_id,
Task.owner_id == current_user.id
).first()
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
return task
@router.put("/{task_id}", response_model=TaskSchema)
def update_task(
task_id: int,
task_update: TaskUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
# Get the task
db_task = db.query(Task).filter(
Task.id == task_id,
Task.owner_id == current_user.id
).first()
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
# Check if project exists and belongs to user if project_id is provided
if task_update.project_id is not None:
project = db.query(Project).filter(
Project.id == task_update.project_id,
Project.owner_id == current_user.id
).first()
if not project:
raise HTTPException(
status_code=404,
detail="Project not found or you don't have access to it"
)
# Update task with non-None values from update schema
update_data = task_update.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(db_task, key, value)
db.commit()
db.refresh(db_task)
return db_task
@router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_task(
task_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
# Get the task
db_task = db.query(Task).filter(
Task.id == task_id,
Task.owner_id == current_user.id
).first()
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
# Delete the task
db.delete(db_task)
db.commit()
return