Add Task Manager API functionality with CRUD operations

This commit is contained in:
Automated Action 2025-05-19 22:32:24 +00:00
parent a813887960
commit b08b228767
6 changed files with 291 additions and 9 deletions

View File

@ -1,13 +1,14 @@
# Generic REST API Service
# Task Manager API
A modern REST API service built with FastAPI and SQLite for handling item resources.
A modern REST API service built with FastAPI and SQLite for task management and item resources.
## Features
- FastAPI-based REST API with automatic OpenAPI documentation
- SQLAlchemy ORM with SQLite database
- Alembic database migrations
- CRUD operations for item resources
- CRUD operations for task and item resources
- Task management with status, priority, and due dates
- Health check endpoint
- Async-ready with uvicorn
@ -20,22 +21,26 @@ A modern REST API service built with FastAPI and SQLite for handling item resour
│ ├── api/
│ │ ├── api.py
│ │ └── endpoints/
│ │ └── items.py
│ │ ├── items.py
│ │ └── tasks.py
│ ├── core/
│ │ ├── config.py
│ │ └── database.py
│ ├── models/
│ │ ├── base.py
│ │ └── item.py
│ │ ├── item.py
│ │ └── task.py
│ └── schemas/
│ └── item.py
│ ├── item.py
│ └── task.py
├── main.py
├── migrations/
│ ├── env.py
│ ├── README
│ ├── script.py.mako
│ └── versions/
│ └── 001_initial_migration.py
│ ├── 001_initial_migration.py
│ └── 002_add_task_table.py
└── requirements.txt
```
@ -87,13 +92,24 @@ FastAPI automatically generates interactive API documentation:
## API Endpoints
### Health
- **GET /health** - Health check endpoint
### Items
- **GET /api/v1/items/** - List all items
- **POST /api/v1/items/** - Create a new item
- **GET /api/v1/items/{item_id}** - Get a specific item
- **PUT /api/v1/items/{item_id}** - Update a specific item
- **DELETE /api/v1/items/{item_id}** - Delete a specific item
### Tasks
- **GET /api/v1/tasks/** - List all tasks (with optional status filtering)
- **POST /api/v1/tasks/** - Create a new task
- **GET /api/v1/tasks/{task_id}** - Get a specific task
- **PUT /api/v1/tasks/{task_id}** - Update a specific task
- **DELETE /api/v1/tasks/{task_id}** - Delete a specific task
- **POST /api/v1/tasks/{task_id}/complete** - Mark a task as completed
## Development
### Database Migrations

View File

@ -1,6 +1,7 @@
from fastapi import APIRouter
from app.api.endpoints import items
from app.api.endpoints import items, tasks
api_router = APIRouter()
api_router.include_router(items.router, prefix="/items", tags=["items"])
api_router.include_router(items.router, prefix="/items", tags=["items"])
api_router.include_router(tasks.router, prefix="/tasks", tags=["tasks"])

135
app/api/endpoints/tasks.py Normal file
View File

@ -0,0 +1,135 @@
from typing import List, Optional
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.models.task import Task, TaskStatus
from app.schemas.task import Task as TaskSchema
from app.schemas.task import TaskCreate, TaskUpdate
router = APIRouter()
@router.get("/", response_model=List[TaskSchema])
def read_tasks(
skip: int = 0,
limit: int = 100,
status: Optional[TaskStatus] = None,
db: Session = Depends(get_db)
):
"""
Get a list of tasks with optional filtering by status.
"""
query = db.query(Task)
# Filter by status if provided
if status:
query = query.filter(Task.status == status)
# Apply pagination
tasks = query.order_by(Task.created_at.desc()).offset(skip).limit(limit).all()
return tasks
@router.post("/", response_model=TaskSchema, status_code=status.HTTP_201_CREATED)
def create_task(
task_in: TaskCreate,
db: Session = Depends(get_db)
):
"""
Create a new task.
"""
db_task = Task(
title=task_in.title,
description=task_in.description,
due_date=task_in.due_date,
status=task_in.status,
priority=task_in.priority
)
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
@router.get("/{task_id}", response_model=TaskSchema)
def read_task(
task_id: str,
db: Session = Depends(get_db)
):
"""
Get a task by ID.
"""
db_task = db.query(Task).filter(Task.id == task_id).first()
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
return db_task
@router.put("/{task_id}", response_model=TaskSchema)
def update_task(
task_id: str,
task_in: TaskUpdate,
db: Session = Depends(get_db)
):
"""
Update a task.
"""
db_task = db.query(Task).filter(Task.id == task_id).first()
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
update_data = task_in.dict(exclude_unset=True)
# If status is being set to completed and there's no completed_at date yet
if (update_data.get("status") == TaskStatus.COMPLETED and
not db_task.completed_at):
db_task.completed_at = datetime.utcnow()
# If status is being changed from completed to something else
if (db_task.status == TaskStatus.COMPLETED and
update_data.get("status") and
update_data.get("status") != TaskStatus.COMPLETED):
db_task.completed_at = None
for field, value in update_data.items():
setattr(db_task, field, value)
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
@router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_task(
task_id: str,
db: Session = Depends(get_db)
):
"""
Delete a task.
"""
db_task = db.query(Task).filter(Task.id == task_id).first()
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
db.delete(db_task)
db.commit()
return None
@router.post("/{task_id}/complete", response_model=TaskSchema)
def complete_task(
task_id: str,
db: Session = Depends(get_db)
):
"""
Mark a task as completed.
"""
db_task = db.query(Task).filter(Task.id == task_id).first()
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
if db_task.status == TaskStatus.COMPLETED:
raise HTTPException(status_code=400, detail="Task is already completed")
db_task.mark_as_completed()
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task

42
app/models/task.py Normal file
View File

@ -0,0 +1,42 @@
from sqlalchemy import Column, String, Text, DateTime, Enum
from datetime import datetime
import enum
from app.models.base import BaseModel
class TaskStatus(str, enum.Enum):
"""Task status enum."""
TODO = "todo"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
CANCELLED = "cancelled"
class TaskPriority(str, enum.Enum):
"""Task priority enum."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
class Task(BaseModel):
"""Task model for task management."""
title = Column(String(200), index=True, nullable=False)
description = Column(Text, nullable=True)
due_date = Column(DateTime, nullable=True)
status = Column(
Enum(TaskStatus),
default=TaskStatus.TODO,
nullable=False,
index=True
)
priority = Column(
Enum(TaskPriority),
default=TaskPriority.MEDIUM,
nullable=False,
index=True
)
completed_at = Column(DateTime, nullable=True)
def mark_as_completed(self):
"""Mark the task as completed."""
self.status = TaskStatus.COMPLETED
self.completed_at = datetime.utcnow()

40
app/schemas/task.py Normal file
View File

@ -0,0 +1,40 @@
from typing import Optional
from datetime import datetime
from pydantic import BaseModel, Field
from app.models.task import TaskStatus, TaskPriority
class TaskBase(BaseModel):
"""Base schema for Task."""
title: str
description: Optional[str] = None
due_date: Optional[datetime] = None
status: TaskStatus = TaskStatus.TODO
priority: TaskPriority = TaskPriority.MEDIUM
class TaskCreate(TaskBase):
"""Schema for creating a Task."""
title: str = Field(..., min_length=1, max_length=200)
class TaskUpdate(BaseModel):
"""Schema for updating a Task."""
title: Optional[str] = Field(None, min_length=1, max_length=200)
description: Optional[str] = None
due_date: Optional[datetime] = None
status: Optional[TaskStatus] = None
priority: Optional[TaskPriority] = None
class TaskInDBBase(TaskBase):
"""Base schema for Task with DB fields."""
id: str
created_at: datetime
updated_at: datetime
completed_at: Optional[datetime] = None
class Config:
orm_mode = True
from_attributes = True
class Task(TaskInDBBase):
"""Schema for returning a Task."""
pass

View File

@ -0,0 +1,48 @@
"""Add task table
Revision ID: 002
Revises: 001
Create Date: 2023-10-23
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '002'
down_revision = '001'
branch_labels = None
depends_on = None
def upgrade():
# Create task table
op.create_table(
'task',
sa.Column('id', sa.String(36), primary_key=True, index=True),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('title', sa.String(200), index=True, nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('due_date', sa.DateTime(), nullable=True),
sa.Column('status', sa.Enum('todo', 'in_progress', 'completed', 'cancelled', name='taskstatus'), nullable=False, default='todo'),
sa.Column('priority', sa.Enum('low', 'medium', 'high', name='taskpriority'), nullable=False, default='medium'),
sa.Column('completed_at', sa.DateTime(), nullable=True),
)
op.create_index(op.f('ix_task_id'), 'task', ['id'], unique=False)
op.create_index(op.f('ix_task_title'), 'task', ['title'], unique=False)
op.create_index(op.f('ix_task_status'), 'task', ['status'], unique=False)
op.create_index(op.f('ix_task_priority'), 'task', ['priority'], unique=False)
def downgrade():
op.drop_index(op.f('ix_task_priority'), table_name='task')
op.drop_index(op.f('ix_task_status'), table_name='task')
op.drop_index(op.f('ix_task_title'), table_name='task')
op.drop_index(op.f('ix_task_id'), table_name='task')
op.drop_table('task')
# Drop the enum types
op.execute('DROP TYPE IF EXISTS taskstatus;')
op.execute('DROP TYPE IF EXISTS taskpriority;')