Create simple inventory management app with FastAPI and SQLite
- Set up project structure with FastAPI - Implement SQLAlchemy models for inventory items and categories - Create database connection with SQLite - Add CRUD operations for inventory management - Implement API endpoints for categories, items, and inventory transactions - Add health check endpoint - Set up Alembic for database migrations - Update README with documentation
This commit is contained in:
parent
7e7faa579b
commit
4e60587fda
138
README.md
138
README.md
@ -1,3 +1,137 @@
|
||||
# FastAPI Application
|
||||
# Simple Inventory Management App
|
||||
|
||||
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform.
|
||||
A REST API for managing inventory items built with FastAPI and SQLite.
|
||||
|
||||
## Features
|
||||
|
||||
- Category management (CRUD operations)
|
||||
- Item management with inventory tracking
|
||||
- Inventory transactions (stock in/out)
|
||||
- Search and filter functionality
|
||||
- Health check endpoint
|
||||
- API documentation via Swagger UI and ReDoc
|
||||
|
||||
## Project Structure
|
||||
|
||||
```
|
||||
simple-inventory-management-app/
|
||||
├── alembic/ # Database migration files
|
||||
├── app/ # Application code
|
||||
│ ├── api/ # API endpoints
|
||||
│ │ └── v1/ # API version 1
|
||||
│ │ ├── endpoints/ # API route handlers
|
||||
│ │ └── api.py # API router
|
||||
│ ├── core/ # Core functionality
|
||||
│ │ └── config.py # Application configuration
|
||||
│ ├── crud/ # CRUD operations
|
||||
│ ├── db/ # Database setup and session
|
||||
│ ├── models/ # SQLAlchemy models
|
||||
│ └── schemas/ # Pydantic schemas
|
||||
└── main.py # Application entry point
|
||||
```
|
||||
|
||||
## Requirements
|
||||
|
||||
- Python 3.8+
|
||||
- Dependencies listed in `requirements.txt`
|
||||
|
||||
## Installation
|
||||
|
||||
1. Clone the repository:
|
||||
|
||||
```bash
|
||||
git clone <repository-url>
|
||||
cd simple-inventory-management-app
|
||||
```
|
||||
|
||||
2. Create a virtual environment and activate it:
|
||||
|
||||
```bash
|
||||
python -m venv venv
|
||||
source venv/bin/activate # On Windows: venv\Scripts\activate
|
||||
```
|
||||
|
||||
3. Install dependencies:
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
## Database Setup
|
||||
|
||||
The application uses SQLite as the database. The database file will be created automatically in the `/app/storage/db/` directory.
|
||||
|
||||
To initialize the database with the latest migrations:
|
||||
|
||||
```bash
|
||||
alembic upgrade head
|
||||
```
|
||||
|
||||
## Running the Application
|
||||
|
||||
Start the application with:
|
||||
|
||||
```bash
|
||||
uvicorn main:app --reload
|
||||
```
|
||||
|
||||
The API will be available at http://localhost:8000
|
||||
|
||||
## API Documentation
|
||||
|
||||
- Swagger UI: http://localhost:8000/docs
|
||||
- ReDoc: http://localhost:8000/redoc
|
||||
|
||||
## API Endpoints
|
||||
|
||||
### Health Check
|
||||
|
||||
- `GET /api/v1/health` - Check application health
|
||||
|
||||
### Categories
|
||||
|
||||
- `GET /api/v1/categories` - List all categories
|
||||
- `POST /api/v1/categories` - Create a new category
|
||||
- `GET /api/v1/categories/{category_id}` - Get a specific category
|
||||
- `PUT /api/v1/categories/{category_id}` - Update a category
|
||||
- `DELETE /api/v1/categories/{category_id}` - Delete a category
|
||||
|
||||
### Items
|
||||
|
||||
- `GET /api/v1/items` - List all items (with filtering options)
|
||||
- `POST /api/v1/items` - Create a new item
|
||||
- `GET /api/v1/items/{item_id}` - Get a specific item
|
||||
- `PUT /api/v1/items/{item_id}` - Update an item
|
||||
- `DELETE /api/v1/items/{item_id}` - Delete an item
|
||||
- `POST /api/v1/items/{item_id}/transactions` - Add inventory transaction
|
||||
- `GET /api/v1/items/{item_id}/transactions` - Get item transaction history
|
||||
|
||||
## Environment Variables
|
||||
|
||||
The application uses the following environment variables:
|
||||
|
||||
None required for basic operation. The application uses default values that work out of the box.
|
||||
|
||||
## Development
|
||||
|
||||
### Running Tests
|
||||
|
||||
```bash
|
||||
# To be implemented
|
||||
```
|
||||
|
||||
### Running Linting
|
||||
|
||||
```bash
|
||||
ruff check .
|
||||
```
|
||||
|
||||
### Running Formatting
|
||||
|
||||
```bash
|
||||
ruff format .
|
||||
```
|
||||
|
||||
## License
|
||||
|
||||
[MIT](LICENSE)
|
85
alembic.ini
Normal file
85
alembic.ini
Normal file
@ -0,0 +1,85 @@
|
||||
# A generic, single database configuration.
|
||||
|
||||
[alembic]
|
||||
# path to migration scripts
|
||||
script_location = alembic
|
||||
|
||||
# template used to generate migration files
|
||||
# file_template = %%(rev)s_%%(slug)s
|
||||
|
||||
# timezone to use when rendering the date
|
||||
# within the migration file as well as the filename.
|
||||
# string value is passed to dateutil.tz.gettz()
|
||||
# leave blank for localtime
|
||||
# timezone =
|
||||
|
||||
# max length of characters to apply to the
|
||||
# "slug" field
|
||||
# truncate_slug_length = 40
|
||||
|
||||
# set to 'true' to run the environment during
|
||||
# the 'revision' command, regardless of autogenerate
|
||||
# revision_environment = false
|
||||
|
||||
# set to 'true' to allow .pyc and .pyo files without
|
||||
# a source .py file to be detected as revisions in the
|
||||
# versions/ directory
|
||||
# sourceless = false
|
||||
|
||||
# version location specification; this defaults
|
||||
# to alembic/versions. When using multiple version
|
||||
# directories, initial revisions must be specified with --version-path
|
||||
# version_locations = %(here)s/bar %(here)s/bat alembic/versions
|
||||
|
||||
# the output encoding used when revision files
|
||||
# are written from script.py.mako
|
||||
# output_encoding = utf-8
|
||||
|
||||
# SQLite URL
|
||||
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
|
||||
|
||||
[post_write_hooks]
|
||||
# post_write_hooks defines scripts or Python functions that are run
|
||||
# on newly generated revision scripts. See the documentation for further
|
||||
# detail and examples
|
||||
|
||||
# format using "black" - use the console_scripts runner, against the "black" entrypoint
|
||||
# hooks=black
|
||||
# black.type=console_scripts
|
||||
# black.entrypoint=black
|
||||
# black.options=-l 79
|
||||
|
||||
# Logging configuration
|
||||
[loggers]
|
||||
keys = root,sqlalchemy,alembic
|
||||
|
||||
[handlers]
|
||||
keys = console
|
||||
|
||||
[formatters]
|
||||
keys = generic
|
||||
|
||||
[logger_root]
|
||||
level = WARN
|
||||
handlers = console
|
||||
qualname =
|
||||
|
||||
[logger_sqlalchemy]
|
||||
level = WARN
|
||||
handlers =
|
||||
qualname = sqlalchemy.engine
|
||||
|
||||
[logger_alembic]
|
||||
level = INFO
|
||||
handlers =
|
||||
qualname = alembic
|
||||
|
||||
[handler_console]
|
||||
class = StreamHandler
|
||||
args = (sys.stderr,)
|
||||
level = NOTSET
|
||||
formatter = generic
|
||||
|
||||
[formatter_generic]
|
||||
format = %(levelname)-5.5s [%(name)s] %(message)s
|
||||
datefmt = %H:%M:%S
|
1
alembic/README
Normal file
1
alembic/README
Normal file
@ -0,0 +1 @@
|
||||
Generic single-database configuration with SQLite.
|
84
alembic/env.py
Normal file
84
alembic/env.py
Normal file
@ -0,0 +1,84 @@
|
||||
from logging.config import fileConfig
|
||||
|
||||
from sqlalchemy import engine_from_config
|
||||
from sqlalchemy import pool
|
||||
|
||||
from alembic import context
|
||||
|
||||
# Import Base before defining target_metadata
|
||||
from app.db.base_model import Base
|
||||
|
||||
# this is the Alembic Config object, which provides
|
||||
# access to the values within the .ini file in use.
|
||||
config = context.config
|
||||
|
||||
# Interpret the config file for Python logging.
|
||||
# This line sets up loggers basically.
|
||||
fileConfig(config.config_file_name)
|
||||
|
||||
# add your model's MetaData object here
|
||||
# for 'autogenerate' support
|
||||
# from myapp import mymodel
|
||||
# target_metadata = mymodel.Base.metadata
|
||||
target_metadata = Base.metadata
|
||||
|
||||
# other values from the config, defined by the needs of env.py,
|
||||
# can be acquired:
|
||||
# my_important_option = config.get_main_option("my_important_option")
|
||||
# ... etc.
|
||||
|
||||
|
||||
def run_migrations_offline():
|
||||
"""Run migrations in 'offline' mode.
|
||||
|
||||
This configures the context with just a URL
|
||||
and not an Engine, though an Engine is acceptable
|
||||
here as well. By skipping the Engine creation
|
||||
we don't even need a DBAPI to be available.
|
||||
|
||||
Calls to context.execute() here emit the given string to the
|
||||
script output.
|
||||
|
||||
"""
|
||||
url = config.get_main_option("sqlalchemy.url")
|
||||
context.configure(
|
||||
url=url,
|
||||
target_metadata=target_metadata,
|
||||
literal_binds=True,
|
||||
dialect_opts={"paramstyle": "named"},
|
||||
render_as_batch=True, # Required for SQLite
|
||||
)
|
||||
|
||||
with context.begin_transaction():
|
||||
context.run_migrations()
|
||||
|
||||
|
||||
def run_migrations_online():
|
||||
"""Run migrations in 'online' mode.
|
||||
|
||||
In this scenario we need to create an Engine
|
||||
and associate a connection with the context.
|
||||
|
||||
"""
|
||||
connectable = engine_from_config(
|
||||
config.get_section(config.config_ini_section),
|
||||
prefix="sqlalchemy.",
|
||||
poolclass=pool.NullPool,
|
||||
)
|
||||
|
||||
with connectable.connect() as connection:
|
||||
is_sqlite = connection.dialect.name == 'sqlite'
|
||||
context.configure(
|
||||
connection=connection,
|
||||
target_metadata=target_metadata,
|
||||
render_as_batch=is_sqlite, # Required for SQLite
|
||||
)
|
||||
|
||||
with context.begin_transaction():
|
||||
context.run_migrations()
|
||||
|
||||
|
||||
if context.is_offline_mode():
|
||||
run_migrations_offline()
|
||||
else:
|
||||
run_migrations_online()
|
24
alembic/script.py.mako
Normal file
24
alembic/script.py.mako
Normal file
@ -0,0 +1,24 @@
|
||||
"""${message}
|
||||
|
||||
Revision ID: ${up_revision}
|
||||
Revises: ${down_revision | comma,n}
|
||||
Create Date: ${create_date}
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
${imports if imports else ""}
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = ${repr(up_revision)}
|
||||
down_revision = ${repr(down_revision)}
|
||||
branch_labels = ${repr(branch_labels)}
|
||||
depends_on = ${repr(depends_on)}
|
||||
|
||||
|
||||
def upgrade():
|
||||
${upgrades if upgrades else "pass"}
|
||||
|
||||
|
||||
def downgrade():
|
||||
${downgrades if downgrades else "pass"}
|
77
alembic/versions/a1b2c3d4e5f6_initial_migration.py
Normal file
77
alembic/versions/a1b2c3d4e5f6_initial_migration.py
Normal file
@ -0,0 +1,77 @@
|
||||
"""Initial migration
|
||||
|
||||
Revision ID: a1b2c3d4e5f6
|
||||
Revises:
|
||||
Create Date: 2023-11-10 12:00:00.000000
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'a1b2c3d4e5f6'
|
||||
down_revision = None
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# Create categories table
|
||||
op.create_table(
|
||||
'categories',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('name', sa.String(), nullable=False),
|
||||
sa.Column('description', sa.Text(), nullable=True),
|
||||
sa.PrimaryKeyConstraint('id')
|
||||
)
|
||||
op.create_index(op.f('ix_categories_id'), 'categories', ['id'], unique=False)
|
||||
op.create_index(op.f('ix_categories_name'), 'categories', ['name'], unique=True)
|
||||
|
||||
# Create items table
|
||||
op.create_table(
|
||||
'items',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('name', sa.String(), nullable=False),
|
||||
sa.Column('description', sa.Text(), nullable=True),
|
||||
sa.Column('quantity', sa.Integer(), nullable=False, server_default='0'),
|
||||
sa.Column('price', sa.Float(), nullable=True),
|
||||
sa.Column('sku', sa.String(), nullable=True),
|
||||
sa.Column('category_id', sa.Integer(), nullable=True),
|
||||
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=False),
|
||||
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=False),
|
||||
sa.ForeignKeyConstraint(['category_id'], ['categories.id'], ),
|
||||
sa.PrimaryKeyConstraint('id')
|
||||
)
|
||||
op.create_index(op.f('ix_items_id'), 'items', ['id'], unique=False)
|
||||
op.create_index(op.f('ix_items_name'), 'items', ['name'], unique=False)
|
||||
op.create_index(op.f('ix_items_sku'), 'items', ['sku'], unique=True)
|
||||
|
||||
# Create inventory_transactions table
|
||||
op.create_table(
|
||||
'inventory_transactions',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('item_id', sa.Integer(), nullable=False),
|
||||
sa.Column('quantity_change', sa.Integer(), nullable=False),
|
||||
sa.Column('transaction_type', sa.String(), nullable=False),
|
||||
sa.Column('notes', sa.Text(), nullable=True),
|
||||
sa.Column('timestamp', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=False),
|
||||
sa.ForeignKeyConstraint(['item_id'], ['items.id'], ),
|
||||
sa.PrimaryKeyConstraint('id')
|
||||
)
|
||||
op.create_index(op.f('ix_inventory_transactions_id'), 'inventory_transactions', ['id'], unique=False)
|
||||
|
||||
|
||||
def downgrade():
|
||||
# Drop tables in reverse order to respect foreign key constraints
|
||||
op.drop_index(op.f('ix_inventory_transactions_id'), table_name='inventory_transactions')
|
||||
op.drop_table('inventory_transactions')
|
||||
|
||||
op.drop_index(op.f('ix_items_sku'), table_name='items')
|
||||
op.drop_index(op.f('ix_items_name'), table_name='items')
|
||||
op.drop_index(op.f('ix_items_id'), table_name='items')
|
||||
op.drop_table('items')
|
||||
|
||||
op.drop_index(op.f('ix_categories_name'), table_name='categories')
|
||||
op.drop_index(op.f('ix_categories_id'), table_name='categories')
|
||||
op.drop_table('categories')
|
0
app/__init__.py
Normal file
0
app/__init__.py
Normal file
0
app/api/__init__.py
Normal file
0
app/api/__init__.py
Normal file
0
app/api/v1/__init__.py
Normal file
0
app/api/v1/__init__.py
Normal file
8
app/api/v1/api.py
Normal file
8
app/api/v1/api.py
Normal file
@ -0,0 +1,8 @@
|
||||
from fastapi import APIRouter
|
||||
|
||||
from app.api.v1.endpoints import categories, items, health
|
||||
|
||||
api_router = APIRouter()
|
||||
api_router.include_router(categories.router, prefix="/categories", tags=["categories"])
|
||||
api_router.include_router(items.router, prefix="/items", tags=["items"])
|
||||
api_router.include_router(health.router, prefix="/health", tags=["health"])
|
1
app/api/v1/endpoints/__init__.py
Normal file
1
app/api/v1/endpoints/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
# Import endpoints modules here for easier importing
|
118
app/api/v1/endpoints/categories.py
Normal file
118
app/api/v1/endpoints/categories.py
Normal file
@ -0,0 +1,118 @@
|
||||
from typing import Any, List
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app import crud
|
||||
from app.db.session import get_db
|
||||
from app.schemas.category import Category, CategoryCreate, CategoryUpdate
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get("/", response_model=List[Category])
|
||||
def read_categories(
|
||||
db: Session = Depends(get_db),
|
||||
skip: int = 0,
|
||||
limit: int = 100,
|
||||
) -> Any:
|
||||
"""
|
||||
Retrieve categories.
|
||||
"""
|
||||
categories = crud.category.get_multi(db, skip=skip, limit=limit)
|
||||
return categories
|
||||
|
||||
|
||||
@router.post("/", response_model=Category)
|
||||
def create_category(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
category_in: CategoryCreate,
|
||||
) -> Any:
|
||||
"""
|
||||
Create new category.
|
||||
"""
|
||||
category = crud.category.get_by_name(db, name=category_in.name)
|
||||
if category:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="Category with this name already exists",
|
||||
)
|
||||
category = crud.category.create(db, obj_in=category_in)
|
||||
return category
|
||||
|
||||
|
||||
@router.get("/{category_id}", response_model=Category)
|
||||
def read_category(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
category_id: int,
|
||||
) -> Any:
|
||||
"""
|
||||
Get category by ID.
|
||||
"""
|
||||
category = crud.category.get(db, id=category_id)
|
||||
if not category:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="Category not found",
|
||||
)
|
||||
return category
|
||||
|
||||
|
||||
@router.put("/{category_id}", response_model=Category)
|
||||
def update_category(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
category_id: int,
|
||||
category_in: CategoryUpdate,
|
||||
) -> Any:
|
||||
"""
|
||||
Update a category.
|
||||
"""
|
||||
category = crud.category.get(db, id=category_id)
|
||||
if not category:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="Category not found",
|
||||
)
|
||||
|
||||
# Check if updating name and if it already exists
|
||||
if category_in.name and category_in.name != category.name:
|
||||
existing_category = crud.category.get_by_name(db, name=category_in.name)
|
||||
if existing_category:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="Category with this name already exists",
|
||||
)
|
||||
|
||||
category = crud.category.update(db, db_obj=category, obj_in=category_in)
|
||||
return category
|
||||
|
||||
|
||||
@router.delete("/{category_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
|
||||
def delete_category(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
category_id: int,
|
||||
) -> None:
|
||||
"""
|
||||
Delete a category.
|
||||
"""
|
||||
category = crud.category.get(db, id=category_id)
|
||||
if not category:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="Category not found",
|
||||
)
|
||||
|
||||
# Check if category has items
|
||||
items = crud.item.get_by_category(db, category_id=category_id)
|
||||
if items:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="Cannot delete category with associated items",
|
||||
)
|
||||
|
||||
crud.category.remove(db, id=category_id)
|
||||
return None
|
31
app/api/v1/endpoints/health.py
Normal file
31
app/api/v1/endpoints/health.py
Normal file
@ -0,0 +1,31 @@
|
||||
from typing import Dict
|
||||
|
||||
from fastapi import APIRouter, Depends
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.db.session import get_db
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get("/")
|
||||
def health_check(
|
||||
db: Session = Depends(get_db),
|
||||
) -> Dict:
|
||||
"""
|
||||
Perform health check of the application.
|
||||
|
||||
Checks:
|
||||
- Database connection
|
||||
"""
|
||||
# Test database connection
|
||||
db_status = "healthy"
|
||||
try:
|
||||
db.execute("SELECT 1")
|
||||
except Exception:
|
||||
db_status = "unhealthy"
|
||||
|
||||
return {
|
||||
"status": "healthy" if db_status == "healthy" else "unhealthy",
|
||||
"database": db_status
|
||||
}
|
227
app/api/v1/endpoints/items.py
Normal file
227
app/api/v1/endpoints/items.py
Normal file
@ -0,0 +1,227 @@
|
||||
from typing import Any, List, Optional
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app import crud
|
||||
from app.db.session import get_db
|
||||
from app.schemas.item import (
|
||||
Item, ItemCreate, ItemUpdate,
|
||||
InventoryTransaction, InventoryTransactionCreate
|
||||
)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get("/", response_model=List[Item])
|
||||
def read_items(
|
||||
db: Session = Depends(get_db),
|
||||
skip: int = 0,
|
||||
limit: int = 100,
|
||||
category_id: Optional[int] = None,
|
||||
search: Optional[str] = None,
|
||||
low_stock: Optional[bool] = False,
|
||||
low_stock_threshold: Optional[int] = 10,
|
||||
) -> Any:
|
||||
"""
|
||||
Retrieve items with filtering options.
|
||||
"""
|
||||
if category_id:
|
||||
# Filter by category
|
||||
items = crud.item.get_by_category(db, category_id=category_id, skip=skip, limit=limit)
|
||||
elif search:
|
||||
# Search by name or description
|
||||
items = crud.item.search_items(db, query=search, skip=skip, limit=limit)
|
||||
elif low_stock:
|
||||
# Get low stock items
|
||||
items = crud.item.get_low_stock_items(db, threshold=low_stock_threshold, skip=skip, limit=limit)
|
||||
else:
|
||||
# Get all items
|
||||
items = crud.item.get_multi(db, skip=skip, limit=limit)
|
||||
return items
|
||||
|
||||
|
||||
@router.post("/", response_model=Item)
|
||||
def create_item(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
item_in: ItemCreate,
|
||||
) -> Any:
|
||||
"""
|
||||
Create new item.
|
||||
"""
|
||||
# Check if SKU exists if provided
|
||||
if item_in.sku:
|
||||
existing_item = crud.item.get_by_sku(db, sku=item_in.sku)
|
||||
if existing_item:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="Item with this SKU already exists",
|
||||
)
|
||||
|
||||
# Check if category exists if provided
|
||||
if item_in.category_id:
|
||||
category = crud.category.get(db, id=item_in.category_id)
|
||||
if not category:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="Category not found",
|
||||
)
|
||||
|
||||
# Create item with transaction if initial quantity > 0
|
||||
item = crud.item.create_with_transaction(db, obj_in=item_in)
|
||||
return item
|
||||
|
||||
|
||||
@router.get("/{item_id}", response_model=Item)
|
||||
def read_item(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
item_id: int,
|
||||
) -> Any:
|
||||
"""
|
||||
Get item by ID.
|
||||
"""
|
||||
item = crud.item.get(db, id=item_id)
|
||||
if not item:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="Item not found",
|
||||
)
|
||||
return item
|
||||
|
||||
|
||||
@router.put("/{item_id}", response_model=Item)
|
||||
def update_item(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
item_id: int,
|
||||
item_in: ItemUpdate,
|
||||
) -> Any:
|
||||
"""
|
||||
Update an item.
|
||||
"""
|
||||
item = crud.item.get(db, id=item_id)
|
||||
if not item:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="Item not found",
|
||||
)
|
||||
|
||||
# Check if SKU is being updated and if it already exists
|
||||
if item_in.sku and item_in.sku != item.sku:
|
||||
existing_item = crud.item.get_by_sku(db, sku=item_in.sku)
|
||||
if existing_item:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="Item with this SKU already exists",
|
||||
)
|
||||
|
||||
# Check if category exists if being updated
|
||||
if item_in.category_id and item_in.category_id != item.category_id:
|
||||
category = crud.category.get(db, id=item_in.category_id)
|
||||
if not category:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="Category not found",
|
||||
)
|
||||
|
||||
# If quantity is being updated, create a transaction
|
||||
if item_in.quantity is not None and item_in.quantity != item.quantity:
|
||||
quantity_change = item_in.quantity - item.quantity
|
||||
if item.quantity + quantity_change < 0:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="Cannot reduce quantity below zero",
|
||||
)
|
||||
|
||||
# Update quantity with transaction
|
||||
return crud.item.update_quantity(
|
||||
db,
|
||||
db_obj=item,
|
||||
quantity_change=quantity_change,
|
||||
transaction_type="adjustment",
|
||||
notes="Quantity updated via API"
|
||||
)
|
||||
else:
|
||||
# Regular update without quantity change
|
||||
return crud.item.update(db, db_obj=item, obj_in=item_in)
|
||||
|
||||
|
||||
@router.delete("/{item_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
|
||||
def delete_item(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
item_id: int,
|
||||
) -> None:
|
||||
"""
|
||||
Delete an item.
|
||||
"""
|
||||
item = crud.item.get(db, id=item_id)
|
||||
if not item:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="Item not found",
|
||||
)
|
||||
|
||||
crud.item.remove(db, id=item_id)
|
||||
return None
|
||||
|
||||
|
||||
@router.post("/{item_id}/transactions", response_model=Item)
|
||||
def add_inventory_transaction(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
item_id: int,
|
||||
transaction_in: InventoryTransactionCreate,
|
||||
) -> Any:
|
||||
"""
|
||||
Add inventory transaction (stock in/out).
|
||||
"""
|
||||
item = crud.item.get(db, id=item_id)
|
||||
if not item:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="Item not found",
|
||||
)
|
||||
|
||||
# Update item quantity with transaction
|
||||
try:
|
||||
updated_item = crud.item.update_quantity(
|
||||
db,
|
||||
db_obj=item,
|
||||
quantity_change=transaction_in.quantity_change,
|
||||
transaction_type=transaction_in.transaction_type,
|
||||
notes=transaction_in.notes
|
||||
)
|
||||
return updated_item
|
||||
except ValueError as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=str(e),
|
||||
)
|
||||
|
||||
|
||||
@router.get("/{item_id}/transactions", response_model=List[InventoryTransaction])
|
||||
def get_item_transactions(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
item_id: int,
|
||||
skip: int = 0,
|
||||
limit: int = 100,
|
||||
) -> Any:
|
||||
"""
|
||||
Get transaction history for an item.
|
||||
"""
|
||||
# Check if item exists
|
||||
item = crud.item.get(db, id=item_id)
|
||||
if not item:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="Item not found",
|
||||
)
|
||||
|
||||
transactions = crud.inventory_transaction.get_by_item(
|
||||
db, item_id=item_id, skip=skip, limit=limit
|
||||
)
|
||||
return transactions
|
0
app/core/__init__.py
Normal file
0
app/core/__init__.py
Normal file
25
app/core/config.py
Normal file
25
app/core/config.py
Normal file
@ -0,0 +1,25 @@
|
||||
from pydantic_settings import BaseSettings
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
API_V1_STR: str = "/api/v1"
|
||||
PROJECT_NAME: str = "Simple Inventory Management App"
|
||||
|
||||
# Database settings
|
||||
DB_DIR: Path = Path("/app") / "storage" / "db"
|
||||
SQLALCHEMY_DATABASE_URL: str = f"sqlite:///{DB_DIR}/db.sqlite"
|
||||
|
||||
# CORS Settings
|
||||
BACKEND_CORS_ORIGINS: List[str] = ["*"]
|
||||
|
||||
class Config:
|
||||
env_file = ".env"
|
||||
case_sensitive = True
|
||||
|
||||
|
||||
# Create DB directory if it doesn't exist
|
||||
Path("/app/storage/db").mkdir(parents=True, exist_ok=True)
|
||||
|
||||
settings = Settings()
|
2
app/crud/__init__.py
Normal file
2
app/crud/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
from app.crud.crud_category import category as category
|
||||
from app.crud.crud_item import item as item, inventory_transaction as inventory_transaction
|
34
app/crud/crud_category.py
Normal file
34
app/crud/crud_category.py
Normal file
@ -0,0 +1,34 @@
|
||||
from typing import List, Optional
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.db.crud_base import CRUDBase
|
||||
from app.models.item import Category
|
||||
from app.schemas.category import CategoryCreate, CategoryUpdate
|
||||
|
||||
|
||||
class CRUDCategory(CRUDBase[Category, CategoryCreate, CategoryUpdate]):
|
||||
"""
|
||||
CRUD operations for Category
|
||||
"""
|
||||
def get_by_name(self, db: Session, *, name: str) -> Optional[Category]:
|
||||
"""
|
||||
Get a category by name
|
||||
"""
|
||||
return db.query(self.model).filter(self.model.name == name).first()
|
||||
|
||||
def get_multi_with_items(
|
||||
self, db: Session, *, skip: int = 0, limit: int = 100
|
||||
) -> List[Category]:
|
||||
"""
|
||||
Get multiple categories with their associated items
|
||||
"""
|
||||
return (
|
||||
db.query(self.model)
|
||||
.offset(skip)
|
||||
.limit(limit)
|
||||
.all()
|
||||
)
|
||||
|
||||
|
||||
category = CRUDCategory(Category)
|
137
app/crud/crud_item.py
Normal file
137
app/crud/crud_item.py
Normal file
@ -0,0 +1,137 @@
|
||||
from typing import List, Optional
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.db.crud_base import CRUDBase
|
||||
from app.models.item import Item, InventoryTransaction
|
||||
from app.schemas.item import ItemCreate, ItemUpdate, InventoryTransactionCreate
|
||||
|
||||
|
||||
class CRUDItem(CRUDBase[Item, ItemCreate, ItemUpdate]):
|
||||
"""
|
||||
CRUD operations for Item
|
||||
"""
|
||||
def get_by_sku(self, db: Session, *, sku: str) -> Optional[Item]:
|
||||
"""
|
||||
Get an item by SKU
|
||||
"""
|
||||
return db.query(self.model).filter(self.model.sku == sku).first()
|
||||
|
||||
def get_by_category(
|
||||
self, db: Session, *, category_id: int, skip: int = 0, limit: int = 100
|
||||
) -> List[Item]:
|
||||
"""
|
||||
Get items by category ID
|
||||
"""
|
||||
return (
|
||||
db.query(self.model)
|
||||
.filter(self.model.category_id == category_id)
|
||||
.offset(skip)
|
||||
.limit(limit)
|
||||
.all()
|
||||
)
|
||||
|
||||
def search_items(
|
||||
self, db: Session, *, query: str, skip: int = 0, limit: int = 100
|
||||
) -> List[Item]:
|
||||
"""
|
||||
Search items by name or description
|
||||
"""
|
||||
search_pattern = f"%{query}%"
|
||||
return (
|
||||
db.query(self.model)
|
||||
.filter(
|
||||
(self.model.name.ilike(search_pattern)) |
|
||||
(self.model.description.ilike(search_pattern))
|
||||
)
|
||||
.offset(skip)
|
||||
.limit(limit)
|
||||
.all()
|
||||
)
|
||||
|
||||
def create_with_transaction(
|
||||
self, db: Session, *, obj_in: ItemCreate
|
||||
) -> Item:
|
||||
"""
|
||||
Create an item and record it as an inventory transaction if quantity > 0
|
||||
"""
|
||||
# Create the item
|
||||
db_obj = self.create(db, obj_in=obj_in)
|
||||
|
||||
# If initial quantity > 0, create a transaction record
|
||||
if db_obj.quantity > 0:
|
||||
transaction = InventoryTransaction(
|
||||
item_id=db_obj.id,
|
||||
quantity_change=db_obj.quantity,
|
||||
transaction_type="initial",
|
||||
notes="Initial inventory"
|
||||
)
|
||||
db.add(transaction)
|
||||
db.commit()
|
||||
|
||||
return db_obj
|
||||
|
||||
def update_quantity(
|
||||
self, db: Session, *, db_obj: Item, quantity_change: int, transaction_type: str, notes: Optional[str] = None
|
||||
) -> Item:
|
||||
"""
|
||||
Update item quantity and record the transaction
|
||||
"""
|
||||
# Update item quantity
|
||||
new_quantity = db_obj.quantity + quantity_change
|
||||
if new_quantity < 0:
|
||||
raise ValueError("Cannot reduce quantity below zero")
|
||||
|
||||
db_obj.quantity = new_quantity
|
||||
db.add(db_obj)
|
||||
|
||||
# Create transaction record
|
||||
transaction = InventoryTransaction(
|
||||
item_id=db_obj.id,
|
||||
quantity_change=quantity_change,
|
||||
transaction_type=transaction_type,
|
||||
notes=notes
|
||||
)
|
||||
db.add(transaction)
|
||||
db.commit()
|
||||
db.refresh(db_obj)
|
||||
|
||||
return db_obj
|
||||
|
||||
def get_low_stock_items(
|
||||
self, db: Session, *, threshold: int = 10, skip: int = 0, limit: int = 100
|
||||
) -> List[Item]:
|
||||
"""
|
||||
Get items with quantity below the specified threshold
|
||||
"""
|
||||
return (
|
||||
db.query(self.model)
|
||||
.filter(self.model.quantity < threshold)
|
||||
.offset(skip)
|
||||
.limit(limit)
|
||||
.all()
|
||||
)
|
||||
|
||||
|
||||
class CRUDInventoryTransaction(CRUDBase[InventoryTransaction, InventoryTransactionCreate, InventoryTransactionCreate]):
|
||||
"""
|
||||
CRUD operations for InventoryTransaction
|
||||
"""
|
||||
def get_by_item(
|
||||
self, db: Session, *, item_id: int, skip: int = 0, limit: int = 100
|
||||
) -> List[InventoryTransaction]:
|
||||
"""
|
||||
Get transactions for a specific item
|
||||
"""
|
||||
return (
|
||||
db.query(self.model)
|
||||
.filter(self.model.item_id == item_id)
|
||||
.order_by(self.model.timestamp.desc())
|
||||
.offset(skip)
|
||||
.limit(limit)
|
||||
.all()
|
||||
)
|
||||
|
||||
|
||||
item = CRUDItem(Item)
|
||||
inventory_transaction = CRUDInventoryTransaction(InventoryTransaction)
|
0
app/db/__init__.py
Normal file
0
app/db/__init__.py
Normal file
3
app/db/base.py
Normal file
3
app/db/base.py
Normal file
@ -0,0 +1,3 @@
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
|
||||
Base = declarative_base()
|
0
app/db/base_model.py
Normal file
0
app/db/base_model.py
Normal file
79
app/db/crud_base.py
Normal file
79
app/db/crud_base.py
Normal file
@ -0,0 +1,79 @@
|
||||
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
|
||||
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.db.base import Base
|
||||
|
||||
ModelType = TypeVar("ModelType", bound=Base)
|
||||
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
|
||||
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
|
||||
|
||||
|
||||
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
|
||||
"""
|
||||
CRUD base class with default methods for Create, Read, Update, Delete operations
|
||||
"""
|
||||
def __init__(self, model: Type[ModelType]):
|
||||
"""
|
||||
Initialize CRUD object with SQLAlchemy model
|
||||
"""
|
||||
self.model = model
|
||||
|
||||
def get(self, db: Session, id: Any) -> Optional[ModelType]:
|
||||
"""
|
||||
Get a single record by ID
|
||||
"""
|
||||
return db.query(self.model).filter(self.model.id == id).first()
|
||||
|
||||
def get_multi(
|
||||
self, db: Session, *, skip: int = 0, limit: int = 100
|
||||
) -> List[ModelType]:
|
||||
"""
|
||||
Get multiple records with pagination
|
||||
"""
|
||||
return db.query(self.model).offset(skip).limit(limit).all()
|
||||
|
||||
def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
|
||||
"""
|
||||
Create a new record
|
||||
"""
|
||||
obj_in_data = jsonable_encoder(obj_in)
|
||||
db_obj = self.model(**obj_in_data)
|
||||
db.add(db_obj)
|
||||
db.commit()
|
||||
db.refresh(db_obj)
|
||||
return db_obj
|
||||
|
||||
def update(
|
||||
self,
|
||||
db: Session,
|
||||
*,
|
||||
db_obj: ModelType,
|
||||
obj_in: Union[UpdateSchemaType, Dict[str, Any]]
|
||||
) -> ModelType:
|
||||
"""
|
||||
Update a record
|
||||
"""
|
||||
obj_data = jsonable_encoder(db_obj)
|
||||
if isinstance(obj_in, dict):
|
||||
update_data = obj_in
|
||||
else:
|
||||
update_data = obj_in.dict(exclude_unset=True)
|
||||
for field in obj_data:
|
||||
if field in update_data:
|
||||
setattr(db_obj, field, update_data[field])
|
||||
db.add(db_obj)
|
||||
db.commit()
|
||||
db.refresh(db_obj)
|
||||
return db_obj
|
||||
|
||||
def remove(self, db: Session, *, id: int) -> ModelType:
|
||||
"""
|
||||
Delete a record
|
||||
"""
|
||||
obj = db.query(self.model).get(id)
|
||||
db.delete(obj)
|
||||
db.commit()
|
||||
return obj
|
27
app/db/session.py
Normal file
27
app/db/session.py
Normal file
@ -0,0 +1,27 @@
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from app.core.config import settings
|
||||
|
||||
# Create DB directory if it doesn't exist
|
||||
settings.DB_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
engine = create_engine(
|
||||
settings.SQLALCHEMY_DATABASE_URL,
|
||||
connect_args={"check_same_thread": False} # Only needed for SQLite
|
||||
)
|
||||
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||
|
||||
|
||||
def get_db():
|
||||
"""
|
||||
Dependency function to get a database session.
|
||||
|
||||
Yields:
|
||||
Session: SQLAlchemy session
|
||||
"""
|
||||
db = SessionLocal()
|
||||
try:
|
||||
yield db
|
||||
finally:
|
||||
db.close()
|
0
app/models/__init__.py
Normal file
0
app/models/__init__.py
Normal file
56
app/models/item.py
Normal file
56
app/models/item.py
Normal file
@ -0,0 +1,56 @@
|
||||
from sqlalchemy import Column, Integer, String, Float, DateTime, ForeignKey, Text
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.sql import func
|
||||
|
||||
from app.db.base import Base
|
||||
|
||||
|
||||
class Category(Base):
|
||||
"""
|
||||
Model for item categories in inventory
|
||||
"""
|
||||
__tablename__ = "categories"
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
name = Column(String, unique=True, index=True, nullable=False)
|
||||
description = Column(Text, nullable=True)
|
||||
|
||||
# Relationships
|
||||
items = relationship("Item", back_populates="category")
|
||||
|
||||
|
||||
class Item(Base):
|
||||
"""
|
||||
Model for inventory items
|
||||
"""
|
||||
__tablename__ = "items"
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
name = Column(String, index=True, nullable=False)
|
||||
description = Column(Text, nullable=True)
|
||||
quantity = Column(Integer, default=0, nullable=False)
|
||||
price = Column(Float, nullable=True)
|
||||
sku = Column(String, unique=True, index=True, nullable=True)
|
||||
category_id = Column(Integer, ForeignKey("categories.id"), nullable=True)
|
||||
created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
|
||||
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False)
|
||||
|
||||
# Relationships
|
||||
category = relationship("Category", back_populates="items")
|
||||
|
||||
|
||||
class InventoryTransaction(Base):
|
||||
"""
|
||||
Model for tracking inventory transactions (additions/removals)
|
||||
"""
|
||||
__tablename__ = "inventory_transactions"
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
item_id = Column(Integer, ForeignKey("items.id"), nullable=False)
|
||||
quantity_change = Column(Integer, nullable=False) # Positive for additions, negative for removals
|
||||
transaction_type = Column(String, nullable=False) # "addition", "removal", "adjustment"
|
||||
notes = Column(Text, nullable=True)
|
||||
timestamp = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
|
||||
|
||||
# Relationships
|
||||
item = relationship("Item")
|
0
app/schemas/__init__.py
Normal file
0
app/schemas/__init__.py
Normal file
47
app/schemas/category.py
Normal file
47
app/schemas/category.py
Normal file
@ -0,0 +1,47 @@
|
||||
from pydantic import BaseModel, Field
|
||||
from typing import Optional, List, ForwardRef
|
||||
|
||||
|
||||
# Shared properties
|
||||
class CategoryBase(BaseModel):
|
||||
name: str = Field(..., description="Category name", example="Electronics")
|
||||
description: Optional[str] = Field(None, description="Category description", example="Electronic devices and accessories")
|
||||
|
||||
|
||||
# Properties to receive via API on creation
|
||||
class CategoryCreate(CategoryBase):
|
||||
pass
|
||||
|
||||
|
||||
# Properties to receive via API on update
|
||||
class CategoryUpdate(BaseModel):
|
||||
name: Optional[str] = Field(None, description="Category name", example="Electronics")
|
||||
description: Optional[str] = Field(None, description="Category description", example="Electronic devices and accessories")
|
||||
|
||||
|
||||
# Properties shared by models stored in DB
|
||||
class CategoryInDBBase(CategoryBase):
|
||||
id: int = Field(..., description="The unique identifier of the category")
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
|
||||
|
||||
# Properties to return to client
|
||||
class Category(CategoryInDBBase):
|
||||
pass
|
||||
|
||||
|
||||
# Forward reference for Item
|
||||
Item = ForwardRef("Item")
|
||||
|
||||
|
||||
# Properties to return to client with relationships
|
||||
class CategoryWithItems(CategoryInDBBase):
|
||||
items: List[Item] = Field([], description="Items in this category")
|
||||
|
||||
|
||||
# Import here to avoid circular import (moved down to fix linting)
|
||||
from app.schemas.item import Item # noqa: E402
|
||||
|
||||
CategoryWithItems.model_rebuild() # Update forward references
|
77
app/schemas/item.py
Normal file
77
app/schemas/item.py
Normal file
@ -0,0 +1,77 @@
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
from typing import Optional
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
# Shared properties
|
||||
class ItemBase(BaseModel):
|
||||
name: str = Field(..., description="Item name", example="Laptop")
|
||||
description: Optional[str] = Field(None, description="Item description", example="High-performance laptop")
|
||||
quantity: int = Field(0, description="Current quantity in stock", example=10, ge=0)
|
||||
price: Optional[float] = Field(None, description="Item price", example=999.99, ge=0)
|
||||
sku: Optional[str] = Field(None, description="Stock Keeping Unit", example="LAPTOP-001")
|
||||
category_id: Optional[int] = Field(None, description="Category ID", example=1)
|
||||
|
||||
|
||||
# Properties to receive via API on creation
|
||||
class ItemCreate(ItemBase):
|
||||
pass
|
||||
|
||||
|
||||
# Properties to receive via API on update
|
||||
class ItemUpdate(BaseModel):
|
||||
name: Optional[str] = Field(None, description="Item name", example="Laptop")
|
||||
description: Optional[str] = Field(None, description="Item description", example="High-performance laptop")
|
||||
quantity: Optional[int] = Field(None, description="Current quantity in stock", example=10)
|
||||
price: Optional[float] = Field(None, description="Item price", example=999.99)
|
||||
sku: Optional[str] = Field(None, description="Stock Keeping Unit", example="LAPTOP-001")
|
||||
category_id: Optional[int] = Field(None, description="Category ID", example=1)
|
||||
|
||||
@field_validator("quantity", "price")
|
||||
@classmethod
|
||||
def validate_non_negative(cls, v):
|
||||
if v is not None and v < 0:
|
||||
raise ValueError("Value cannot be negative")
|
||||
return v
|
||||
|
||||
|
||||
# Properties shared by models stored in DB
|
||||
class ItemInDBBase(ItemBase):
|
||||
id: int = Field(..., description="The unique identifier of the item")
|
||||
created_at: datetime = Field(..., description="Item creation timestamp")
|
||||
updated_at: datetime = Field(..., description="Item last update timestamp")
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
|
||||
|
||||
# Properties to return to client
|
||||
class Item(ItemInDBBase):
|
||||
pass
|
||||
|
||||
|
||||
# Properties for transaction
|
||||
class InventoryTransactionBase(BaseModel):
|
||||
item_id: int = Field(..., description="Item ID", example=1)
|
||||
quantity_change: int = Field(..., description="Quantity change (positive for additions, negative for removals)", example=5)
|
||||
transaction_type: str = Field(..., description="Transaction type", example="addition")
|
||||
notes: Optional[str] = Field(None, description="Transaction notes", example="Received new shipment")
|
||||
|
||||
|
||||
# Properties to receive via API on creation
|
||||
class InventoryTransactionCreate(InventoryTransactionBase):
|
||||
pass
|
||||
|
||||
|
||||
# Properties shared by models stored in DB
|
||||
class InventoryTransactionInDBBase(InventoryTransactionBase):
|
||||
id: int = Field(..., description="The unique identifier of the transaction")
|
||||
timestamp: datetime = Field(..., description="Transaction timestamp")
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
|
||||
|
||||
# Properties to return to client
|
||||
class InventoryTransaction(InventoryTransactionInDBBase):
|
||||
pass
|
46
main.py
Normal file
46
main.py
Normal file
@ -0,0 +1,46 @@
|
||||
from fastapi import FastAPI
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
from app.api.v1.api import api_router
|
||||
from app.core.config import settings
|
||||
|
||||
app = FastAPI(
|
||||
title=settings.PROJECT_NAME,
|
||||
openapi_url="/openapi.json",
|
||||
docs_url="/docs",
|
||||
redoc_url="/redoc",
|
||||
)
|
||||
|
||||
# Set up CORS middleware
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
# Include the API router
|
||||
app.include_router(api_router, prefix=settings.API_V1_STR)
|
||||
|
||||
@app.get("/")
|
||||
async def root():
|
||||
"""
|
||||
Root endpoint that returns basic information about the API.
|
||||
"""
|
||||
return {
|
||||
"title": settings.PROJECT_NAME,
|
||||
"docs": f"{settings.API_V1_STR}/docs",
|
||||
"health": f"{settings.API_V1_STR}/health"
|
||||
}
|
||||
|
||||
@app.get("/health")
|
||||
async def health_check():
|
||||
"""
|
||||
Health check endpoint to verify the API is functioning.
|
||||
"""
|
||||
return {"status": "healthy"}
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
|
9
requirements.txt
Normal file
9
requirements.txt
Normal file
@ -0,0 +1,9 @@
|
||||
fastapi>=0.104.0
|
||||
uvicorn>=0.23.2
|
||||
sqlalchemy>=2.0.0
|
||||
alembic>=1.12.0
|
||||
pydantic>=2.4.2
|
||||
pydantic-settings>=2.0.3
|
||||
python-dotenv>=1.0.0
|
||||
ruff>=0.1.4
|
||||
pathlib>=1.0.1
|
Loading…
x
Reference in New Issue
Block a user