Create Fast Task Manager API with FastAPI and SQLite\n\n- Initialize project structure and setup dependencies\n- Create SQLAlchemy models for tasks\n- Implement CRUD operations\n- Create RESTful API endpoints\n- Add health check endpoint\n- Setup Alembic migrations\n- Update documentation\n- Add test script\n\ngenerated with BackendIM... (backend.im)

This commit is contained in:
Automated Action 2025-05-13 23:23:08 +00:00
parent f0e1af325a
commit 8ec96ac4ac
21 changed files with 791 additions and 2 deletions

View File

@ -1,3 +1,65 @@
# FastAPI Application
# Fast Task Manager API
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform.
A FastAPI-based RESTful API for managing tasks. This API provides endpoints for creating, reading, updating, and deleting tasks, as well as filtering tasks by status, priority, and completion status.
## Features
- RESTful API with CRUD operations for tasks
- Task filtering by status, priority, and completion status
- Pagination support
- Health check endpoint
- SQLite database with SQLAlchemy ORM
- Database migrations with Alembic
## Project Structure
```
├── alembic.ini # Alembic configuration
├── app # Application package
│ ├── crud # CRUD operations
│ ├── database # Database configurations
│ ├── models # SQLAlchemy models
│ ├── routers # API routes
│ └── schemas # Pydantic schemas for validation
├── main.py # FastAPI application entry point
├── migrations # Alembic migrations
│ └── versions # Migration versions
└── requirements.txt # Project dependencies
```
## API Endpoints
### Tasks
- `POST /api/v1/tasks` - Create a new task
- `GET /api/v1/tasks` - Get list of tasks with filtering and pagination
- `GET /api/v1/tasks/{task_id}` - Get a specific task
- `PUT /api/v1/tasks/{task_id}` - Update a task
- `DELETE /api/v1/tasks/{task_id}` - Delete a task
### Health Check
- `GET /health` - Check API and database health
## Running the Application
1. Clone the repository
2. Install the dependencies:
```bash
pip install -r requirements.txt
```
3. Apply database migrations:
```bash
alembic upgrade head
```
4. Run the application:
```bash
uvicorn main:app --host 0.0.0.0 --port 8000 --reload
```
## API Documentation
Once the application is running, you can access the API documentation at:
- Swagger UI: http://localhost:8000/docs
- ReDoc: http://localhost:8000/redoc

85
alembic.ini Normal file
View File

@ -0,0 +1,85 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
# template used to generate migration files
file_template = %%(year)d%%(month).2d%%(day).2d_%%(hour).2d%%(minute).2d%%(second).2d_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat alembic/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# SQLite URL example
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks=black
# black.type=console_scripts
# black.entrypoint=black
# black.options=-l 79
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

0
app/__init__.py Normal file
View File

15
app/crud/__init__.py Normal file
View File

@ -0,0 +1,15 @@
from .task import (
create_task,
get_task,
get_tasks,
update_task,
delete_task,
)
__all__ = [
"create_task",
"get_task",
"get_tasks",
"update_task",
"delete_task",
]

78
app/crud/task.py Normal file
View File

@ -0,0 +1,78 @@
from typing import List, Optional, Dict, Any
from sqlalchemy.orm import Session
from app.models.task import Task
from app.schemas.task import TaskCreate, TaskUpdate
def get_task(db: Session, task_id: int) -> Optional[Task]:
return db.query(Task).filter(Task.id == task_id).first()
def get_tasks(
db: Session,
skip: int = 0,
limit: int = 100,
filters: Optional[Dict[str, Any]] = None
) -> List[Task]:
query = db.query(Task)
if filters:
if status := filters.get("status"):
query = query.filter(Task.status == status)
if priority := filters.get("priority"):
query = query.filter(Task.priority == priority)
if completed := filters.get("completed"):
query = query.filter(Task.completed == completed)
return query.order_by(Task.created_at.desc()).offset(skip).limit(limit).all()
def get_tasks_count(
db: Session,
filters: Optional[Dict[str, Any]] = None
) -> int:
query = db.query(Task)
if filters:
if status := filters.get("status"):
query = query.filter(Task.status == status)
if priority := filters.get("priority"):
query = query.filter(Task.priority == priority)
if completed := filters.get("completed"):
query = query.filter(Task.completed == completed)
return query.count()
def create_task(db: Session, task: TaskCreate) -> Task:
db_task = Task(
title=task.title,
description=task.description,
status=task.status,
priority=task.priority,
due_date=task.due_date,
completed=task.completed
)
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
def update_task(db: Session, task_id: int, task: TaskUpdate) -> Optional[Task]:
db_task = get_task(db, task_id)
if not db_task:
return None
update_data = task.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(db_task, key, value)
db.commit()
db.refresh(db_task)
return db_task
def delete_task(db: Session, task_id: int) -> bool:
db_task = get_task(db, task_id)
if not db_task:
return False
db.delete(db_task)
db.commit()
return True

3
app/database/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .session import engine, SessionLocal, get_db, create_tables
__all__ = ["engine", "SessionLocal", "get_db", "create_tables"]

28
app/database/session.py Normal file
View File

@ -0,0 +1,28 @@
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
DB_DIR = Path("/app") / "storage" / "db"
DB_DIR.mkdir(parents=True, exist_ok=True)
SQLALCHEMY_DATABASE_URL = f"sqlite:///{DB_DIR}/db.sqlite"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def create_tables():
Base.metadata.create_all(bind=engine)

3
app/models/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .task import Task
__all__ = ["Task"]

18
app/models/task.py Normal file
View File

@ -0,0 +1,18 @@
import datetime
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Text
from sqlalchemy.sql import func
from app.database.session import Base
class Task(Base):
__tablename__ = "tasks"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(255), nullable=False)
description = Column(Text, nullable=True)
status = Column(String(50), default="pending")
priority = Column(String(50), default="medium")
due_date = Column(DateTime, nullable=True)
completed = Column(Boolean, default=False)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())

0
app/routers/__init__.py Normal file
View File

31
app/routers/health.py Normal file
View File

@ -0,0 +1,31 @@
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from datetime import datetime
from pydantic import BaseModel
from app.database import get_db
router = APIRouter(tags=["health"])
class HealthResponse(BaseModel):
status: str
timestamp: datetime
db_status: str
@router.get("/health", response_model=HealthResponse)
async def health_check(db: Session = Depends(get_db)):
"""
Health check endpoint to verify the API is running and can connect to the database
"""
db_status = "ok"
try:
# Try to execute a simple query to check database connection
db.execute("SELECT 1")
except Exception:
db_status = "error"
return {
"status": "ok",
"timestamp": datetime.utcnow(),
"db_status": db_status
}

72
app/routers/tasks.py Normal file
View File

@ -0,0 +1,72 @@
from typing import Optional, Dict, Any
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from app.database import get_db
from app.models import Task
from app.schemas import TaskCreate, TaskUpdate, TaskResponse, TaskList
from app.crud import create_task, get_task, get_tasks, get_tasks_count, update_task, delete_task
router = APIRouter(tags=["tasks"])
@router.post("/tasks", response_model=TaskResponse, status_code=status.HTTP_201_CREATED)
def create_task_endpoint(task: TaskCreate, db: Session = Depends(get_db)):
"""
Create a new task
"""
return create_task(db=db, task=task)
@router.get("/tasks", response_model=TaskList)
def get_tasks_endpoint(
status: Optional[str] = Query(None, description="Filter by status"),
priority: Optional[str] = Query(None, description="Filter by priority"),
completed: Optional[bool] = Query(None, description="Filter by completion status"),
skip: int = Query(0, ge=0, description="Number of tasks to skip"),
limit: int = Query(100, ge=1, le=100, description="Max number of tasks to return"),
db: Session = Depends(get_db)
):
"""
Get list of tasks with optional filtering
"""
filters: Dict[str, Any] = {}
if status:
filters["status"] = status
if priority:
filters["priority"] = priority
if completed is not None:
filters["completed"] = completed
tasks = get_tasks(db=db, skip=skip, limit=limit, filters=filters)
total = get_tasks_count(db=db, filters=filters)
return {"tasks": tasks, "total": total}
@router.get("/tasks/{task_id}", response_model=TaskResponse)
def get_task_endpoint(task_id: int, db: Session = Depends(get_db)):
"""
Get a specific task by ID
"""
db_task = get_task(db=db, task_id=task_id)
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
return db_task
@router.put("/tasks/{task_id}", response_model=TaskResponse)
def update_task_endpoint(task_id: int, task: TaskUpdate, db: Session = Depends(get_db)):
"""
Update a task
"""
db_task = update_task(db=db, task_id=task_id, task=task)
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
return db_task
@router.delete("/tasks/{task_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_task_endpoint(task_id: int, db: Session = Depends(get_db)):
"""
Delete a task
"""
success = delete_task(db=db, task_id=task_id)
if not success:
raise HTTPException(status_code=404, detail="Task not found")
return None

3
app/schemas/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .task import TaskCreate, TaskUpdate, TaskResponse, TaskList
__all__ = ["TaskCreate", "TaskUpdate", "TaskResponse", "TaskList"]

34
app/schemas/task.py Normal file
View File

@ -0,0 +1,34 @@
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel
class TaskBase(BaseModel):
title: str
description: Optional[str] = None
status: Optional[str] = "pending"
priority: Optional[str] = "medium"
due_date: Optional[datetime] = None
completed: Optional[bool] = False
class TaskCreate(TaskBase):
pass
class TaskUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
status: Optional[str] = None
priority: Optional[str] = None
due_date: Optional[datetime] = None
completed: Optional[bool] = None
class TaskResponse(TaskBase):
id: int
created_at: datetime
updated_at: datetime
class Config:
orm_mode = True
class TaskList(BaseModel):
tasks: List[TaskResponse]
total: int

30
main.py Normal file
View File

@ -0,0 +1,30 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.routers import tasks, health
from app.database import create_tables
app = FastAPI(
title="Fast Task Manager API",
description="A simple API for managing tasks",
version="0.1.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(tasks.router, prefix="/api/v1")
app.include_router(health.router)
@app.on_event("startup")
async def startup_event():
create_tables()
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

1
migrations/README Normal file
View File

@ -0,0 +1 @@
Generic single-database configuration.

79
migrations/env.py Normal file
View File

@ -0,0 +1,79 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
from app.models import Task
from app.database.session import Base
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
migrations/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,38 @@
"""create tasks table
Revision ID: 20250513_000001
Revises:
Create Date: 2025-05-13 00:00:01
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '20250513_000001'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tasks',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('title', sa.String(length=255), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('status', sa.String(length=50), nullable=True, default='pending'),
sa.Column('priority', sa.String(length=50), nullable=True, default='medium'),
sa.Column('due_date', sa.DateTime(), nullable=True),
sa.Column('completed', sa.Boolean(), nullable=True, default=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_tasks_id'), 'tasks', ['id'], unique=False)
def downgrade():
op.drop_index(op.f('ix_tasks_id'), table_name='tasks')
op.drop_table('tasks')

8
requirements.txt Normal file
View File

@ -0,0 +1,8 @@
fastapi==0.95.2
uvicorn==0.22.0
sqlalchemy==2.0.15
pydantic==1.10.8
alembic==1.11.1
python-dotenv==1.0.0
ruff==0.0.270
pathlib==1.0.1

177
test_api.py Normal file
View File

@ -0,0 +1,177 @@
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
import pytest
from datetime import datetime, timedelta
from app.database.session import Base
from app.models.task import Task
from main import app
from app.database import get_db
# Setup in-memory SQLite database for testing
SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Override the dependency
def override_get_db():
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
# Initialize the test client
client = TestClient(app)
@pytest.fixture(scope="function")
def test_db():
# Create the database and tables
Base.metadata.create_all(bind=engine)
yield
# Drop the tables after the test
Base.metadata.drop_all(bind=engine)
@pytest.fixture(scope="function")
def sample_tasks(test_db):
db = TestingSessionLocal()
tasks = [
Task(
title="Task 1",
description="Description 1",
status="pending",
priority="high",
due_date=datetime.utcnow() + timedelta(days=1),
completed=False
),
Task(
title="Task 2",
description="Description 2",
status="in_progress",
priority="medium",
due_date=datetime.utcnow() + timedelta(days=2),
completed=False
),
Task(
title="Task 3",
description="Description 3",
status="completed",
priority="low",
due_date=datetime.utcnow() - timedelta(days=1),
completed=True
)
]
db.add_all(tasks)
db.commit()
yield tasks
db.close()
def test_health_endpoint(test_db):
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert "timestamp" in data
assert data["db_status"] == "ok"
def test_create_task(test_db):
task_data = {
"title": "New Task",
"description": "New Description",
"status": "pending",
"priority": "high",
"due_date": (datetime.utcnow() + timedelta(days=3)).isoformat(),
"completed": False
}
response = client.post("/api/v1/tasks", json=task_data)
assert response.status_code == 201
data = response.json()
assert data["title"] == task_data["title"]
assert data["description"] == task_data["description"]
assert data["status"] == task_data["status"]
assert data["priority"] == task_data["priority"]
assert data["completed"] == task_data["completed"]
assert "id" in data
def test_get_all_tasks(sample_tasks):
response = client.get("/api/v1/tasks")
assert response.status_code == 200
data = response.json()
assert "tasks" in data
assert "total" in data
assert data["total"] == 3
assert len(data["tasks"]) == 3
def test_get_task_by_id(sample_tasks):
# Get the first task
response = client.get(f"/api/v1/tasks/1")
assert response.status_code == 200
data = response.json()
assert data["id"] == 1
assert data["title"] == "Task 1"
def test_get_non_existent_task():
response = client.get("/api/v1/tasks/999")
assert response.status_code == 404
def test_update_task(sample_tasks):
update_data = {
"title": "Updated Task",
"description": "Updated Description",
"status": "completed",
"completed": True
}
response = client.put("/api/v1/tasks/1", json=update_data)
assert response.status_code == 200
data = response.json()
assert data["title"] == update_data["title"]
assert data["description"] == update_data["description"]
assert data["status"] == update_data["status"]
assert data["completed"] == update_data["completed"]
def test_delete_task(sample_tasks):
# First verify the task exists
response = client.get("/api/v1/tasks/1")
assert response.status_code == 200
# Delete the task
response = client.delete("/api/v1/tasks/1")
assert response.status_code == 204
# Verify it's gone
response = client.get("/api/v1/tasks/1")
assert response.status_code == 404
def test_filter_tasks(sample_tasks):
# Filter by status
response = client.get("/api/v1/tasks?status=pending")
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
assert len(data["tasks"]) == 1
assert data["tasks"][0]["status"] == "pending"
# Filter by priority
response = client.get("/api/v1/tasks?priority=medium")
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
assert len(data["tasks"]) == 1
assert data["tasks"][0]["priority"] == "medium"
# Filter by completed
response = client.get("/api/v1/tasks?completed=true")
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
assert len(data["tasks"]) == 1
assert data["tasks"][0]["completed"] is True