Implement Small Business Inventory Management System

This commit implements a comprehensive inventory management system for small businesses using FastAPI and SQLAlchemy. Features include:
- Product and category management
- Inventory tracking across multiple locations
- Supplier management
- Purchase management
- Transaction tracking for inventory movements
- Complete API documentation

generated with BackendIM... (backend.im)
This commit is contained in:
Automated Action 2025-05-12 12:55:31 +00:00
parent 611b9952d2
commit 54bf9880b9
28 changed files with 2403 additions and 2 deletions

132
README.md
View File

@ -1,3 +1,131 @@
# FastAPI Application
# Small Business Inventory Management System
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform.
A comprehensive inventory management API built with FastAPI and SQLAlchemy for small businesses. This API provides functionality to manage products, inventory, suppliers, and purchases.
## Features
- **Product Management**: CRUD operations for products and categories
- **Inventory Management**: Track inventory levels across multiple locations
- **Supplier Management**: Manage suppliers and their contact information
- **Purchase Management**: Create and track purchase orders
- **Transaction Tracking**: Record and track inventory movements (sales, purchases, transfers, adjustments)
- **Stock Alerts**: Monitor low-stock items
## Tech Stack
- **FastAPI**: High-performance web framework for building APIs
- **SQLAlchemy**: SQL toolkit and ORM
- **Pydantic**: Data validation and settings management
- **Alembic**: Database migration tool
- **SQLite**: Lightweight database
## Getting Started
### Prerequisites
- Python 3.8+
- pip
### Installation
1. Clone the repository
```bash
git clone <repository-url>
cd smallbusinessinventorymanagementsystem
```
2. Install dependencies
```bash
pip install -r requirements.txt
```
3. Run the application
```bash
uvicorn main:app --reload
```
4. Access the API documentation at http://localhost:8000/docs
## API Endpoints
The API is organized around the following main resources:
### Products & Categories
- `GET /api/v1/products/`: List all products
- `POST /api/v1/products/`: Create a new product
- `GET /api/v1/products/{product_id}`: Get product details
- `PUT /api/v1/products/{product_id}`: Update a product
- `DELETE /api/v1/products/{product_id}`: Delete a product
- `GET /api/v1/categories/`: List all categories
- `POST /api/v1/categories/`: Create a new category
- `GET /api/v1/categories/{category_id}`: Get category details
- `PUT /api/v1/categories/{category_id}`: Update a category
- `DELETE /api/v1/categories/{category_id}`: Delete a category
### Inventory Management
- `GET /api/v1/inventory/`: List inventory items
- `POST /api/v1/inventory/`: Create inventory item
- `GET /api/v1/inventory/{item_id}`: Get inventory item details
- `PUT /api/v1/inventory/{item_id}`: Update inventory item
- `DELETE /api/v1/inventory/{item_id}`: Delete inventory item
- `GET /api/v1/locations/`: List all locations
- `POST /api/v1/locations/`: Create a new location
- `GET /api/v1/locations/{location_id}`: Get location details
- `PUT /api/v1/locations/{location_id}`: Update a location
- `DELETE /api/v1/locations/{location_id}`: Delete a location
- `GET /api/v1/inventory-transactions/`: List inventory transactions
- `POST /api/v1/inventory-transactions/`: Create inventory transaction
- `GET /api/v1/inventory-transactions/{transaction_id}`: Get transaction details
### Suppliers & Purchases
- `GET /api/v1/suppliers/`: List all suppliers
- `POST /api/v1/suppliers/`: Create a new supplier
- `GET /api/v1/suppliers/{supplier_id}`: Get supplier details
- `PUT /api/v1/suppliers/{supplier_id}`: Update a supplier
- `DELETE /api/v1/suppliers/{supplier_id}`: Delete a supplier
- `GET /api/v1/purchases/`: List all purchases
- `POST /api/v1/purchases/`: Create a new purchase
- `GET /api/v1/purchases/{purchase_id}`: Get purchase details
- `PUT /api/v1/purchases/{purchase_id}`: Update a purchase
- `DELETE /api/v1/purchases/{purchase_id}`: Delete a purchase
- `POST /api/v1/purchases/{purchase_id}/receive`: Receive purchase into inventory
### System
- `GET /health`: Check API health status
## Database Schema
The system uses the following main data models:
- **Product**: Base product information (name, SKU, price, etc.)
- **Category**: Product categories
- **Location**: Inventory storage locations (warehouses, stores)
- **InventoryItem**: Product stock at a specific location
- **InventoryTransaction**: Record of inventory movements
- **Supplier**: Supplier information
- **Purchase**: Purchase orders from suppliers
## Development
### Database Migrations
The project uses Alembic for database migrations:
```bash
# Generate a new migration
alembic revision -m "description"
# Run migrations
alembic upgrade head
# Rollback migration
alembic downgrade -1
```

84
alembic.ini Normal file
View File

@ -0,0 +1,84 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to ${script_location}/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat ${script_location}/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = sqlite:///app/storage/db/db.sqlite
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks=black
# black.type=console_scripts
# black.entrypoint=black
# black.options=-l 79
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

0
app/__init__.py Normal file
View File

0
app/api/__init__.py Normal file
View File

53
app/api/deps.py Normal file
View File

@ -0,0 +1,53 @@
from typing import Generator
from fastapi import Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.models.product import Product, Category
from app.models.inventory import InventoryItem, Location
from app.models.supplier import Supplier, Purchase
# Common dependencies
def get_product(db: Session = Depends(get_db), product_id: int = None):
if product_id:
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Product with ID {product_id} not found"
)
return product
return None
def get_category(db: Session = Depends(get_db), category_id: int = None):
if category_id:
category = db.query(Category).filter(Category.id == category_id).first()
if not category:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Category with ID {category_id} not found"
)
return category
return None
def get_location(db: Session = Depends(get_db), location_id: int = None):
if location_id:
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Location with ID {location_id} not found"
)
return location
return None
def get_supplier(db: Session = Depends(get_db), supplier_id: int = None):
if supplier_id:
supplier = db.query(Supplier).filter(Supplier.id == supplier_id).first()
if not supplier:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Supplier with ID {supplier_id} not found"
)
return supplier
return None

View File

@ -0,0 +1,3 @@
from app.api.routes.product import router as product_router
from app.api.routes.inventory import router as inventory_router
from app.api.routes.supplier import router as supplier_router

583
app/api/routes/inventory.py Normal file
View File

@ -0,0 +1,583 @@
from typing import List, Optional, Dict, Any
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from app.db.session import get_db
from app.models.inventory import (
Location,
LocationType,
InventoryItem,
InventoryTransaction,
InventoryTransactionItem,
TransactionType
)
from app.models.product import Product
from app.schemas.inventory import (
LocationCreate,
LocationUpdate,
LocationInDB,
InventoryItemCreate,
InventoryItemUpdate,
InventoryItemInDB,
InventoryItemWithDetails,
InventoryTransactionCreate,
InventoryTransactionWithItems,
InventoryTransactionInDB,
TransactionTypeEnum
)
from app.api.deps import get_product, get_location
router = APIRouter(prefix="/api/v1")
# Location endpoints
@router.post("/locations/", response_model=LocationInDB, status_code=status.HTTP_201_CREATED)
def create_location(
*,
db: Session = Depends(get_db),
location_in: LocationCreate
):
"""Create a new location"""
try:
# Convert from schema enum to model enum
type_value = LocationType(location_in.type.value)
db_location = Location(
name=location_in.name,
type=type_value,
address=location_in.address,
description=location_in.description
)
db.add(db_location)
db.commit()
db.refresh(db_location)
return db_location
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Could not create location: {str(e)}"
)
@router.get("/locations/", response_model=List[LocationInDB])
def read_locations(
*,
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
search: Optional[str] = None,
location_type: Optional[str] = None
):
"""Get all locations with optional filters"""
query = db.query(Location)
if search:
query = query.filter(Location.name.ilike(f"%{search}%"))
if location_type:
try:
type_value = LocationType(location_type)
query = query.filter(Location.type == type_value)
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid location type: {location_type}"
)
return query.offset(skip).limit(limit).all()
@router.get("/locations/{location_id}", response_model=LocationInDB)
def read_location(
*,
db: Session = Depends(get_db),
location: Location = Depends(get_location)
):
"""Get a specific location by ID"""
return location
@router.put("/locations/{location_id}", response_model=LocationInDB)
def update_location(
*,
db: Session = Depends(get_db),
location: Location = Depends(get_location),
location_in: LocationUpdate
):
"""Update a location"""
update_data = location_in.model_dump(exclude_unset=True)
# Convert type enum if present
if "type" in update_data and update_data["type"]:
update_data["type"] = LocationType(update_data["type"].value)
for field, value in update_data.items():
setattr(location, field, value)
try:
db.add(location)
db.commit()
db.refresh(location)
return location
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Could not update location: {str(e)}"
)
@router.delete("/locations/{location_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_location(
*,
db: Session = Depends(get_db),
location: Location = Depends(get_location)
):
"""Delete a location"""
# Check if location has inventory items
if location.inventory_items:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot delete location with existing inventory"
)
db.delete(location)
db.commit()
return None
# Inventory Item endpoints
@router.post("/inventory/", response_model=InventoryItemInDB, status_code=status.HTTP_201_CREATED)
def create_inventory_item(
*,
db: Session = Depends(get_db),
item_in: InventoryItemCreate
):
"""Create a new inventory item"""
# Check if product exists
product = db.query(Product).filter(Product.id == item_in.product_id).first()
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Product with ID {item_in.product_id} not found"
)
# Check if location exists
location = db.query(Location).filter(Location.id == item_in.location_id).first()
if not location:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Location with ID {item_in.location_id} not found"
)
# Check if inventory item already exists for this product and location
existing_item = db.query(InventoryItem).filter(
InventoryItem.product_id == item_in.product_id,
InventoryItem.location_id == item_in.location_id
).first()
if existing_item:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Inventory item already exists for this product and location"
)
try:
db_item = InventoryItem(**item_in.model_dump())
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Could not create inventory item: {str(e)}"
)
@router.get("/inventory/", response_model=List[InventoryItemWithDetails])
def read_inventory_items(
*,
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
product_id: Optional[int] = None,
location_id: Optional[int] = None,
low_stock: bool = False
):
"""Get inventory items with optional filters"""
query = db.query(InventoryItem)
if product_id:
query = query.filter(InventoryItem.product_id == product_id)
if location_id:
query = query.filter(InventoryItem.location_id == location_id)
if low_stock:
query = query.filter(InventoryItem.quantity <= InventoryItem.minimum_stock)
items = query.offset(skip).limit(limit).all()
# Enhance with product and location details
result = []
for item in items:
item_dict = {
"id": item.id,
"product_id": item.product_id,
"location_id": item.location_id,
"quantity": item.quantity,
"minimum_stock": item.minimum_stock,
"maximum_stock": item.maximum_stock,
"created_at": item.created_at,
"updated_at": item.updated_at,
"product": {
"id": item.product.id,
"name": item.product.name,
"sku": item.product.sku,
},
"location": {
"id": item.location.id,
"name": item.location.name,
"type": item.location.type.value,
}
}
result.append(item_dict)
return result
@router.get("/inventory/{item_id}", response_model=InventoryItemWithDetails)
def read_inventory_item(
*,
db: Session = Depends(get_db),
item_id: int
):
"""Get a specific inventory item by ID"""
item = db.query(InventoryItem).filter(InventoryItem.id == item_id).first()
if not item:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Inventory item with ID {item_id} not found"
)
# Enhance with product and location details
item_dict = {
"id": item.id,
"product_id": item.product_id,
"location_id": item.location_id,
"quantity": item.quantity,
"minimum_stock": item.minimum_stock,
"maximum_stock": item.maximum_stock,
"created_at": item.created_at,
"updated_at": item.updated_at,
"product": {
"id": item.product.id,
"name": item.product.name,
"sku": item.product.sku,
},
"location": {
"id": item.location.id,
"name": item.location.name,
"type": item.location.type.value,
}
}
return item_dict
@router.put("/inventory/{item_id}", response_model=InventoryItemInDB)
def update_inventory_item(
*,
db: Session = Depends(get_db),
item_id: int,
item_in: InventoryItemUpdate
):
"""Update an inventory item"""
item = db.query(InventoryItem).filter(InventoryItem.id == item_id).first()
if not item:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Inventory item with ID {item_id} not found"
)
update_data = item_in.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(item, field, value)
try:
db.add(item)
db.commit()
db.refresh(item)
return item
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Could not update inventory item: {str(e)}"
)
@router.delete("/inventory/{item_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_inventory_item(
*,
db: Session = Depends(get_db),
item_id: int
):
"""Delete an inventory item"""
item = db.query(InventoryItem).filter(InventoryItem.id == item_id).first()
if not item:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Inventory item with ID {item_id} not found"
)
db.delete(item)
db.commit()
return None
# Inventory Transaction endpoints
@router.post("/inventory-transactions/", response_model=InventoryTransactionInDB, status_code=status.HTTP_201_CREATED)
def create_inventory_transaction(
*,
db: Session = Depends(get_db),
transaction_in: InventoryTransactionWithItems
):
"""Create a new inventory transaction with items"""
# Convert from schema enum to model enum
try:
transaction_type = TransactionType(transaction_in.type.value)
# Create transaction
db_transaction = InventoryTransaction(
type=transaction_type,
reference_id=transaction_in.reference_id,
notes=transaction_in.notes
)
db.add(db_transaction)
db.flush() # Get ID without committing
# Process transaction items
transaction_items = []
for item in transaction_in.items:
# Validate product exists
product = db.query(Product).filter(Product.id == item.product_id).first()
if not product:
db.rollback()
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Product with ID {item.product_id} not found"
)
# Process based on transaction type
if transaction_type == TransactionType.TRANSFER:
# For transfers, need both locations
if not item.from_location_id or not item.to_location_id:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Transfer transactions require both from_location_id and to_location_id"
)
# Validate from_location exists
from_location = db.query(Location).filter(Location.id == item.from_location_id).first()
if not from_location:
db.rollback()
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"From location with ID {item.from_location_id} not found"
)
# Validate to_location exists
to_location = db.query(Location).filter(Location.id == item.to_location_id).first()
if not to_location:
db.rollback()
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"To location with ID {item.to_location_id} not found"
)
# Update inventory at from_location
from_inventory = db.query(InventoryItem).filter(
InventoryItem.product_id == item.product_id,
InventoryItem.location_id == item.from_location_id
).first()
if not from_inventory:
db.rollback()
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Product {item.product_id} not found at location {item.from_location_id}"
)
if from_inventory.quantity < item.quantity:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Insufficient stock for product {item.product_id} at location {item.from_location_id}"
)
from_inventory.quantity -= item.quantity
# Update inventory at to_location
to_inventory = db.query(InventoryItem).filter(
InventoryItem.product_id == item.product_id,
InventoryItem.location_id == item.to_location_id
).first()
if to_inventory:
to_inventory.quantity += item.quantity
else:
# Create new inventory item if it doesn't exist
to_inventory = InventoryItem(
product_id=item.product_id,
location_id=item.to_location_id,
quantity=item.quantity,
minimum_stock=0
)
db.add(to_inventory)
elif transaction_type in [TransactionType.PURCHASE, TransactionType.RETURN]:
# For purchases, need to_location (where items are added)
if not item.to_location_id:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"{transaction_type.value} transactions require to_location_id"
)
# Validate to_location exists
to_location = db.query(Location).filter(Location.id == item.to_location_id).first()
if not to_location:
db.rollback()
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Location with ID {item.to_location_id} not found"
)
# Update inventory at to_location
inventory = db.query(InventoryItem).filter(
InventoryItem.product_id == item.product_id,
InventoryItem.location_id == item.to_location_id
).first()
if inventory:
inventory.quantity += item.quantity
else:
# Create new inventory item if it doesn't exist
inventory = InventoryItem(
product_id=item.product_id,
location_id=item.to_location_id,
quantity=item.quantity,
minimum_stock=0
)
db.add(inventory)
elif transaction_type in [TransactionType.SALE, TransactionType.ADJUSTMENT]:
# For sales, need from_location (where items are removed)
if not item.from_location_id:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"{transaction_type.value} transactions require from_location_id"
)
# Validate from_location exists
from_location = db.query(Location).filter(Location.id == item.from_location_id).first()
if not from_location:
db.rollback()
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Location with ID {item.from_location_id} not found"
)
# Update inventory at from_location
inventory = db.query(InventoryItem).filter(
InventoryItem.product_id == item.product_id,
InventoryItem.location_id == item.from_location_id
).first()
if not inventory:
db.rollback()
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Product {item.product_id} not found at location {item.from_location_id}"
)
if inventory.quantity < item.quantity and transaction_type == TransactionType.SALE:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Insufficient stock for product {item.product_id} at location {item.from_location_id}"
)
inventory.quantity -= item.quantity
# Create transaction item
transaction_item = InventoryTransactionItem(
transaction_id=db_transaction.id,
product_id=item.product_id,
quantity=item.quantity,
from_location_id=item.from_location_id,
to_location_id=item.to_location_id
)
db.add(transaction_item)
transaction_items.append(transaction_item)
db.commit()
db.refresh(db_transaction)
# Refresh transaction items
for item in transaction_items:
db.refresh(item)
return db_transaction
except IntegrityError as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Database integrity error: {str(e)}"
)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Could not create transaction: {str(e)}"
)
@router.get("/inventory-transactions/", response_model=List[InventoryTransactionInDB])
def read_inventory_transactions(
*,
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
transaction_type: Optional[str] = None,
reference_id: Optional[str] = None
):
"""Get inventory transactions with optional filters"""
query = db.query(InventoryTransaction)
if transaction_type:
try:
type_value = TransactionType(transaction_type)
query = query.filter(InventoryTransaction.type == type_value)
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid transaction type: {transaction_type}"
)
if reference_id:
query = query.filter(InventoryTransaction.reference_id == reference_id)
return query.order_by(InventoryTransaction.created_at.desc()).offset(skip).limit(limit).all()
@router.get("/inventory-transactions/{transaction_id}", response_model=InventoryTransactionInDB)
def read_inventory_transaction(
*,
db: Session = Depends(get_db),
transaction_id: int
):
"""Get a specific inventory transaction by ID"""
transaction = db.query(InventoryTransaction).filter(InventoryTransaction.id == transaction_id).first()
if not transaction:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Inventory transaction with ID {transaction_id} not found"
)
return transaction

225
app/api/routes/product.py Normal file
View File

@ -0,0 +1,225 @@
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from app.db.session import get_db
from app.models.product import Product, Category
from app.schemas.product import (
ProductCreate,
ProductUpdate,
ProductInDB,
CategoryCreate,
CategoryUpdate,
CategoryInDB
)
from app.api.deps import get_product, get_category
router = APIRouter(prefix="/api/v1")
# Category endpoints
@router.post("/categories/", response_model=CategoryInDB, status_code=status.HTTP_201_CREATED)
def create_category(
*,
db: Session = Depends(get_db),
category_in: CategoryCreate
):
"""Create a new category"""
try:
db_category = Category(**category_in.model_dump())
db.add(db_category)
db.commit()
db.refresh(db_category)
return db_category
except IntegrityError:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Category with this name already exists"
)
@router.get("/categories/", response_model=List[CategoryInDB])
def read_categories(
*,
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
search: Optional[str] = None
):
"""Get all categories with optional search filter"""
query = db.query(Category)
if search:
query = query.filter(Category.name.ilike(f"%{search}%"))
return query.offset(skip).limit(limit).all()
@router.get("/categories/{category_id}", response_model=CategoryInDB)
def read_category(
*,
db: Session = Depends(get_db),
category: Category = Depends(get_category)
):
"""Get a specific category by ID"""
return category
@router.put("/categories/{category_id}", response_model=CategoryInDB)
def update_category(
*,
db: Session = Depends(get_db),
category: Category = Depends(get_category),
category_in: CategoryUpdate
):
"""Update a category"""
update_data = category_in.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(category, field, value)
try:
db.add(category)
db.commit()
db.refresh(category)
return category
except IntegrityError:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Category with this name already exists"
)
@router.delete("/categories/{category_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_category(
*,
db: Session = Depends(get_db),
category: Category = Depends(get_category)
):
"""Delete a category"""
# Check if category has products
if category.products:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot delete category with associated products"
)
db.delete(category)
db.commit()
return None
# Product endpoints
@router.post("/products/", response_model=ProductInDB, status_code=status.HTTP_201_CREATED)
def create_product(
*,
db: Session = Depends(get_db),
product_in: ProductCreate
):
"""Create a new product"""
# Check if category exists
if product_in.category_id:
category = db.query(Category).filter(Category.id == product_in.category_id).first()
if not category:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Category with ID {product_in.category_id} not found"
)
try:
db_product = Product(**product_in.model_dump())
db.add(db_product)
db.commit()
db.refresh(db_product)
return db_product
except IntegrityError:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Product with this SKU or barcode already exists"
)
@router.get("/products/", response_model=List[ProductInDB])
def read_products(
*,
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
search: Optional[str] = None,
category_id: Optional[int] = None
):
"""Get all products with optional filters"""
query = db.query(Product)
if search:
query = query.filter(
(Product.name.ilike(f"%{search}%")) |
(Product.sku.ilike(f"%{search}%")) |
(Product.description.ilike(f"%{search}%"))
)
if category_id:
query = query.filter(Product.category_id == category_id)
return query.offset(skip).limit(limit).all()
@router.get("/products/{product_id}", response_model=ProductInDB)
def read_product(
*,
db: Session = Depends(get_db),
product: Product = Depends(get_product)
):
"""Get a specific product by ID"""
return product
@router.put("/products/{product_id}", response_model=ProductInDB)
def update_product(
*,
db: Session = Depends(get_db),
product: Product = Depends(get_product),
product_in: ProductUpdate
):
"""Update a product"""
# Check if category exists
if product_in.category_id is not None:
category = db.query(Category).filter(Category.id == product_in.category_id).first()
if not category and product_in.category_id is not None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Category with ID {product_in.category_id} not found"
)
update_data = product_in.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(product, field, value)
try:
db.add(product)
db.commit()
db.refresh(product)
return product
except IntegrityError:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Product with this SKU or barcode already exists"
)
@router.delete("/products/{product_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_product(
*,
db: Session = Depends(get_db),
product: Product = Depends(get_product)
):
"""Delete a product"""
# Check if product has inventory items
if product.inventory_items:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot delete product with existing inventory"
)
# Check if product is used in purchase items
if product.purchase_items:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot delete product with purchase history"
)
db.delete(product)
db.commit()
return None

450
app/api/routes/supplier.py Normal file
View File

@ -0,0 +1,450 @@
from typing import List, Optional, Dict, Any
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from datetime import datetime
from app.db.session import get_db
from app.models.supplier import Supplier, Purchase, PurchaseItem
from app.models.product import Product
from app.models.inventory import (
InventoryTransaction,
InventoryTransactionItem,
Location,
TransactionType
)
from app.schemas.supplier import (
SupplierCreate,
SupplierUpdate,
SupplierInDB,
PurchaseCreate,
PurchaseUpdate,
PurchaseInDB,
PurchaseWithItems
)
from app.api.deps import get_supplier, get_product
router = APIRouter(prefix="/api/v1")
# Supplier endpoints
@router.post("/suppliers/", response_model=SupplierInDB, status_code=status.HTTP_201_CREATED)
def create_supplier(
*,
db: Session = Depends(get_db),
supplier_in: SupplierCreate
):
"""Create a new supplier"""
try:
db_supplier = Supplier(**supplier_in.model_dump())
db.add(db_supplier)
db.commit()
db.refresh(db_supplier)
return db_supplier
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Could not create supplier: {str(e)}"
)
@router.get("/suppliers/", response_model=List[SupplierInDB])
def read_suppliers(
*,
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
search: Optional[str] = None,
is_active: Optional[bool] = None
):
"""Get all suppliers with optional filters"""
query = db.query(Supplier)
if search:
query = query.filter(
(Supplier.name.ilike(f"%{search}%")) |
(Supplier.contact_name.ilike(f"%{search}%")) |
(Supplier.email.ilike(f"%{search}%"))
)
if is_active is not None:
query = query.filter(Supplier.is_active == is_active)
return query.offset(skip).limit(limit).all()
@router.get("/suppliers/{supplier_id}", response_model=SupplierInDB)
def read_supplier(
*,
db: Session = Depends(get_db),
supplier: Supplier = Depends(get_supplier)
):
"""Get a specific supplier by ID"""
return supplier
@router.put("/suppliers/{supplier_id}", response_model=SupplierInDB)
def update_supplier(
*,
db: Session = Depends(get_db),
supplier: Supplier = Depends(get_supplier),
supplier_in: SupplierUpdate
):
"""Update a supplier"""
update_data = supplier_in.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(supplier, field, value)
try:
db.add(supplier)
db.commit()
db.refresh(supplier)
return supplier
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Could not update supplier: {str(e)}"
)
@router.delete("/suppliers/{supplier_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_supplier(
*,
db: Session = Depends(get_db),
supplier: Supplier = Depends(get_supplier)
):
"""Delete a supplier"""
# Check if supplier has purchases
if supplier.purchases:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot delete supplier with purchase history"
)
db.delete(supplier)
db.commit()
return None
# Purchase endpoints
@router.post("/purchases/", response_model=PurchaseInDB, status_code=status.HTTP_201_CREATED)
def create_purchase(
*,
db: Session = Depends(get_db),
purchase_in: PurchaseCreate
):
"""Create a new purchase with items"""
# Check if supplier exists
supplier = db.query(Supplier).filter(Supplier.id == purchase_in.supplier_id).first()
if not supplier:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Supplier with ID {purchase_in.supplier_id} not found"
)
# Set order date if not provided
if not purchase_in.order_date:
purchase_in.order_date = datetime.now()
try:
# Create purchase record
total_amount = 0
purchase_data = purchase_in.model_dump(exclude={"items"})
db_purchase = Purchase(**purchase_data)
db.add(db_purchase)
db.flush() # Get ID without committing
# Process purchase items
for item_data in purchase_in.items:
# Validate product exists
product = db.query(Product).filter(Product.id == item_data.product_id).first()
if not product:
db.rollback()
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Product with ID {item_data.product_id} not found"
)
# Calculate total price if not provided
if not item_data.total_price:
item_data.total_price = item_data.quantity * item_data.unit_price
# Create purchase item
db_item = PurchaseItem(
purchase_id=db_purchase.id,
product_id=item_data.product_id,
quantity=item_data.quantity,
unit_price=item_data.unit_price,
total_price=item_data.total_price
)
db.add(db_item)
# Update total amount
total_amount += item_data.total_price
# Update purchase with total amount
db_purchase.total_amount = total_amount
db.commit()
db.refresh(db_purchase)
return db_purchase
except IntegrityError as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Database integrity error: {str(e)}"
)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Could not create purchase: {str(e)}"
)
@router.post("/purchases/{purchase_id}/receive", response_model=PurchaseInDB)
def receive_purchase(
*,
db: Session = Depends(get_db),
purchase_id: int,
location_id: int
):
"""Receive a purchase into inventory at specified location"""
# Check if purchase exists
purchase = db.query(Purchase).filter(Purchase.id == purchase_id).first()
if not purchase:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Purchase with ID {purchase_id} not found"
)
# Check if location exists
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Location with ID {location_id} not found"
)
try:
# Create inventory transaction
transaction = InventoryTransaction(
type=TransactionType.PURCHASE,
reference_id=f"PO-{purchase.id}",
notes=f"Receiving purchase order {purchase.reference_number}"
)
db.add(transaction)
db.flush()
# Get purchase items
purchase_items = db.query(PurchaseItem).filter(PurchaseItem.purchase_id == purchase.id).all()
for item in purchase_items:
# Create inventory transaction item
transaction_item = InventoryTransactionItem(
transaction_id=transaction.id,
product_id=item.product_id,
quantity=item.quantity,
to_location_id=location_id
)
db.add(transaction_item)
# Update or create inventory item
inventory_item = db.query(InventoryItem).filter(
InventoryItem.product_id == item.product_id,
InventoryItem.location_id == location_id
).first()
if inventory_item:
inventory_item.quantity += item.quantity
else:
from app.models.inventory import InventoryItem
new_item = InventoryItem(
product_id=item.product_id,
location_id=location_id,
quantity=item.quantity,
minimum_stock=0
)
db.add(new_item)
# Update purchase delivery date if not set
if not purchase.delivery_date:
purchase.delivery_date = datetime.now()
db.commit()
db.refresh(purchase)
return purchase
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Could not receive purchase: {str(e)}"
)
@router.get("/purchases/", response_model=List[PurchaseInDB])
def read_purchases(
*,
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
supplier_id: Optional[int] = None,
reference_number: Optional[str] = None
):
"""Get all purchases with optional filters"""
query = db.query(Purchase)
if supplier_id:
query = query.filter(Purchase.supplier_id == supplier_id)
if reference_number:
query = query.filter(Purchase.reference_number.ilike(f"%{reference_number}%"))
return query.order_by(Purchase.order_date.desc()).offset(skip).limit(limit).all()
@router.get("/purchases/{purchase_id}", response_model=PurchaseWithItems)
def read_purchase(
*,
db: Session = Depends(get_db),
purchase_id: int
):
"""Get a specific purchase by ID with all items"""
purchase = db.query(Purchase).filter(Purchase.id == purchase_id).first()
if not purchase:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Purchase with ID {purchase_id} not found"
)
# Get purchase items with product details
items = db.query(PurchaseItem).filter(PurchaseItem.purchase_id == purchase_id).all()
result_items = []
for item in items:
product = db.query(Product).filter(Product.id == item.product_id).first()
item_dict = {
"id": item.id,
"purchase_id": item.purchase_id,
"product_id": item.product_id,
"quantity": item.quantity,
"unit_price": item.unit_price,
"total_price": item.total_price,
"created_at": item.created_at,
"updated_at": item.updated_at,
"product": {
"id": product.id,
"name": product.name,
"sku": product.sku
}
}
result_items.append(item_dict)
# Get supplier details
supplier = db.query(Supplier).filter(Supplier.id == purchase.supplier_id).first()
# Create final result
result = {
"id": purchase.id,
"supplier_id": purchase.supplier_id,
"reference_number": purchase.reference_number,
"order_date": purchase.order_date,
"delivery_date": purchase.delivery_date,
"total_amount": purchase.total_amount,
"notes": purchase.notes,
"created_at": purchase.created_at,
"updated_at": purchase.updated_at,
"items": result_items,
"supplier": {
"id": supplier.id,
"name": supplier.name,
"contact_name": supplier.contact_name,
"email": supplier.email,
"phone": supplier.phone
}
}
return result
@router.put("/purchases/{purchase_id}", response_model=PurchaseInDB)
def update_purchase(
*,
db: Session = Depends(get_db),
purchase_id: int,
purchase_in: PurchaseUpdate
):
"""Update a purchase (header only, not items)"""
purchase = db.query(Purchase).filter(Purchase.id == purchase_id).first()
if not purchase:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Purchase with ID {purchase_id} not found"
)
# Check if supplier exists if supplier_id is being updated
if purchase_in.supplier_id:
supplier = db.query(Supplier).filter(Supplier.id == purchase_in.supplier_id).first()
if not supplier:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Supplier with ID {purchase_in.supplier_id} not found"
)
update_data = purchase_in.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(purchase, field, value)
try:
db.add(purchase)
db.commit()
db.refresh(purchase)
return purchase
except IntegrityError as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Database integrity error: {str(e)}"
)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Could not update purchase: {str(e)}"
)
@router.delete("/purchases/{purchase_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_purchase(
*,
db: Session = Depends(get_db),
purchase_id: int
):
"""Delete a purchase and its items"""
purchase = db.query(Purchase).filter(Purchase.id == purchase_id).first()
if not purchase:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Purchase with ID {purchase_id} not found"
)
# Check if this purchase has been received (has inventory transactions)
transaction = db.query(InventoryTransaction).filter(
InventoryTransaction.type == TransactionType.PURCHASE,
InventoryTransaction.reference_id == f"PO-{purchase.id}"
).first()
if transaction:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot delete purchase that has been received into inventory"
)
try:
# Delete purchase items first
db.query(PurchaseItem).filter(PurchaseItem.purchase_id == purchase_id).delete()
# Delete purchase
db.delete(purchase)
db.commit()
return None
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Could not delete purchase: {str(e)}"
)

0
app/core/__init__.py Normal file
View File

0
app/db/__init__.py Normal file
View File

13
app/db/base.py Normal file
View File

@ -0,0 +1,13 @@
# Import all models here so Alembic can discover them
from app.db.session import Base
from app.models.product import Product, Category
from app.models.inventory import (
Location,
InventoryItem,
InventoryTransaction,
InventoryTransactionItem
)
from app.models.supplier import Supplier, Purchase, PurchaseItem
# Create a base that includes all models
# This allows Alembic's autogenerate to pick up your models

32
app/db/init_db.py Normal file
View File

@ -0,0 +1,32 @@
from sqlalchemy.orm import Session
from app.models.product import Category
from app.models.inventory import Location, LocationType
def init_db(db: Session) -> None:
"""Initialize database with default data"""
# Check if we already have data
if db.query(Category).first():
return # Database already initialized
# Create default categories
default_categories = [
Category(name="General", description="General products"),
Category(name="Electronics", description="Electronic products"),
Category(name="Office Supplies", description="Office supplies and stationery"),
Category(name="Furniture", description="Furniture items"),
]
db.add_all(default_categories)
# Create default locations
default_locations = [
Location(name="Main Warehouse", type=LocationType.WAREHOUSE,
description="Main storage warehouse"),
Location(name="Store Front", type=LocationType.STORE,
description="Main store front"),
]
db.add_all(default_locations)
db.commit()

31
app/db/session.py Normal file
View File

@ -0,0 +1,31 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from pathlib import Path
# Create database directory
DB_DIR = Path("/app") / "storage" / "db"
DB_DIR.mkdir(parents=True, exist_ok=True)
# Database URL
SQLALCHEMY_DATABASE_URL = f"sqlite:///{DB_DIR}/db.sqlite"
# Create engine
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
# Create SessionLocal class
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Create Base class
Base = declarative_base()
# Dependency to get db session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

11
app/models/__init__.py Normal file
View File

@ -0,0 +1,11 @@
from app.models.base import BaseModel
from app.models.product import Product, Category
from app.models.inventory import (
Location,
LocationType,
InventoryItem,
InventoryTransaction,
InventoryTransactionItem,
TransactionType
)
from app.models.supplier import Supplier, Purchase, PurchaseItem

10
app/models/base.py Normal file
View File

@ -0,0 +1,10 @@
from sqlalchemy import Column, Integer, DateTime, func
from app.db.session import Base
class BaseModel(Base):
"""Base model for all database models"""
__abstract__ = True
id = Column(Integer, primary_key=True, index=True)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())

69
app/models/inventory.py Normal file
View File

@ -0,0 +1,69 @@
from sqlalchemy import Column, String, Float, Integer, ForeignKey, Enum
from sqlalchemy.orm import relationship
import enum
from app.models.base import BaseModel
class LocationType(str, enum.Enum):
WAREHOUSE = "warehouse"
STORE = "store"
SUPPLIER = "supplier"
class Location(BaseModel):
"""Location model for inventory storage"""
__tablename__ = "locations"
name = Column(String(100), nullable=False)
type = Column(Enum(LocationType), nullable=False, default=LocationType.WAREHOUSE)
address = Column(String(255), nullable=True)
description = Column(String(255), nullable=True)
# Relationships
inventory_items = relationship("InventoryItem", back_populates="location")
class TransactionType(str, enum.Enum):
PURCHASE = "purchase"
SALE = "sale"
TRANSFER = "transfer"
ADJUSTMENT = "adjustment"
RETURN = "return"
class InventoryTransaction(BaseModel):
"""Inventory transaction model to track changes"""
__tablename__ = "inventory_transactions"
type = Column(Enum(TransactionType), nullable=False)
reference_id = Column(String(100), nullable=True)
notes = Column(String(255), nullable=True)
# Relationships
transaction_items = relationship("InventoryTransactionItem", back_populates="transaction")
class InventoryTransactionItem(BaseModel):
"""Inventory transaction item model"""
__tablename__ = "inventory_transaction_items"
transaction_id = Column(Integer, ForeignKey("inventory_transactions.id"), nullable=False)
product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
quantity = Column(Float, nullable=False)
from_location_id = Column(Integer, ForeignKey("locations.id"), nullable=True)
to_location_id = Column(Integer, ForeignKey("locations.id"), nullable=True)
# Relationships
transaction = relationship("InventoryTransaction", back_populates="transaction_items")
product = relationship("Product")
from_location = relationship("Location", foreign_keys=[from_location_id])
to_location = relationship("Location", foreign_keys=[to_location_id])
class InventoryItem(BaseModel):
"""Inventory item model representing product stock at a location"""
__tablename__ = "inventory_items"
product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
location_id = Column(Integer, ForeignKey("locations.id"), nullable=False)
quantity = Column(Float, nullable=False, default=0)
minimum_stock = Column(Float, nullable=False, default=0)
maximum_stock = Column(Float, nullable=True)
# Relationships
product = relationship("Product", back_populates="inventory_items")
location = relationship("Location", back_populates="inventory_items")

30
app/models/product.py Normal file
View File

@ -0,0 +1,30 @@
from sqlalchemy import Column, String, Float, Integer, Text, ForeignKey
from sqlalchemy.orm import relationship
from app.models.base import BaseModel
class Category(BaseModel):
"""Product category model"""
__tablename__ = "categories"
name = Column(String(100), unique=True, index=True, nullable=False)
description = Column(Text, nullable=True)
# Relationships
products = relationship("Product", back_populates="category")
class Product(BaseModel):
"""Product model for inventory items"""
__tablename__ = "products"
name = Column(String(255), nullable=False, index=True)
description = Column(Text, nullable=True)
sku = Column(String(50), unique=True, index=True, nullable=False)
category_id = Column(Integer, ForeignKey("categories.id"), nullable=True)
price = Column(Float, nullable=False)
cost_price = Column(Float, nullable=False)
barcode = Column(String(100), unique=True, nullable=True)
# Relationships
category = relationship("Category", back_populates="products")
inventory_items = relationship("InventoryItem", back_populates="product")
purchase_items = relationship("PurchaseItem", back_populates="product")

47
app/models/supplier.py Normal file
View File

@ -0,0 +1,47 @@
from sqlalchemy import Column, String, Float, Integer, Text, Boolean, ForeignKey, DateTime
from sqlalchemy.orm import relationship
from app.models.base import BaseModel
class Supplier(BaseModel):
"""Supplier model"""
__tablename__ = "suppliers"
name = Column(String(255), nullable=False, index=True)
contact_name = Column(String(100), nullable=True)
email = Column(String(255), nullable=True)
phone = Column(String(20), nullable=True)
address = Column(Text, nullable=True)
notes = Column(Text, nullable=True)
is_active = Column(Boolean, default=True)
# Relationships
purchases = relationship("Purchase", back_populates="supplier")
class Purchase(BaseModel):
"""Purchase model for orders from suppliers"""
__tablename__ = "purchases"
supplier_id = Column(Integer, ForeignKey("suppliers.id"), nullable=False)
reference_number = Column(String(100), nullable=True, unique=True)
order_date = Column(DateTime, nullable=True)
delivery_date = Column(DateTime, nullable=True)
total_amount = Column(Float, nullable=False, default=0)
notes = Column(Text, nullable=True)
# Relationships
supplier = relationship("Supplier", back_populates="purchases")
purchase_items = relationship("PurchaseItem", back_populates="purchase")
class PurchaseItem(BaseModel):
"""Purchase item model for items in a purchase"""
__tablename__ = "purchase_items"
purchase_id = Column(Integer, ForeignKey("purchases.id"), nullable=False)
product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
quantity = Column(Float, nullable=False)
unit_price = Column(Float, nullable=False)
total_price = Column(Float, nullable=False)
# Relationships
purchase = relationship("Purchase", back_populates="purchase_items")
product = relationship("Product", back_populates="purchase_items")

0
app/schemas/__init__.py Normal file
View File

109
app/schemas/inventory.py Normal file
View File

@ -0,0 +1,109 @@
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum
from app.models.inventory import LocationType, TransactionType
# Location schemas
class LocationTypeEnum(str, Enum):
WAREHOUSE = "warehouse"
STORE = "store"
SUPPLIER = "supplier"
class LocationBase(BaseModel):
name: str
type: LocationTypeEnum = LocationTypeEnum.WAREHOUSE
address: Optional[str] = None
description: Optional[str] = None
class LocationCreate(LocationBase):
pass
class LocationUpdate(BaseModel):
name: Optional[str] = None
type: Optional[LocationTypeEnum] = None
address: Optional[str] = None
description: Optional[str] = None
class LocationInDB(LocationBase):
id: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# Inventory item schemas
class InventoryItemBase(BaseModel):
product_id: int
location_id: int
quantity: float
minimum_stock: float = 0
maximum_stock: Optional[float] = None
class InventoryItemCreate(InventoryItemBase):
pass
class InventoryItemUpdate(BaseModel):
quantity: Optional[float] = None
minimum_stock: Optional[float] = None
maximum_stock: Optional[float] = None
class InventoryItemInDB(InventoryItemBase):
id: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class InventoryItemWithDetails(InventoryItemInDB):
product: Dict[str, Any]
location: Dict[str, Any]
# Transaction schemas
class TransactionTypeEnum(str, Enum):
PURCHASE = "purchase"
SALE = "sale"
TRANSFER = "transfer"
ADJUSTMENT = "adjustment"
RETURN = "return"
class InventoryTransactionBase(BaseModel):
type: TransactionTypeEnum
reference_id: Optional[str] = None
notes: Optional[str] = None
class InventoryTransactionCreate(InventoryTransactionBase):
pass
class InventoryTransactionItemBase(BaseModel):
product_id: int
quantity: float
from_location_id: Optional[int] = None
to_location_id: Optional[int] = None
class InventoryTransactionItemCreate(InventoryTransactionItemBase):
pass
class InventoryTransactionWithItems(InventoryTransactionBase):
items: List[InventoryTransactionItemCreate]
class InventoryTransactionItemInDB(InventoryTransactionItemBase):
id: int
transaction_id: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class InventoryTransactionInDB(InventoryTransactionBase):
id: int
created_at: datetime
updated_at: datetime
transaction_items: List[InventoryTransactionItemInDB] = []
class Config:
from_attributes = True

53
app/schemas/product.py Normal file
View File

@ -0,0 +1,53 @@
from typing import List, Optional
from pydantic import BaseModel, Field
from datetime import datetime
# Category schemas
class CategoryBase(BaseModel):
name: str
description: Optional[str] = None
class CategoryCreate(CategoryBase):
pass
class CategoryUpdate(CategoryBase):
name: Optional[str] = None
class CategoryInDB(CategoryBase):
id: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# Product schemas
class ProductBase(BaseModel):
name: str
description: Optional[str] = None
sku: str = Field(..., description="Stock Keeping Unit, unique identifier")
price: float = Field(..., gt=0)
cost_price: float = Field(..., gt=0)
barcode: Optional[str] = None
category_id: Optional[int] = None
class ProductCreate(ProductBase):
pass
class ProductUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
sku: Optional[str] = None
price: Optional[float] = Field(None, gt=0)
cost_price: Optional[float] = Field(None, gt=0)
barcode: Optional[str] = None
category_id: Optional[int] = None
class ProductInDB(ProductBase):
id: int
created_at: datetime
updated_at: datetime
category: Optional[CategoryInDB] = None
class Config:
from_attributes = True

85
app/schemas/supplier.py Normal file
View File

@ -0,0 +1,85 @@
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
from datetime import datetime
# Supplier schemas
class SupplierBase(BaseModel):
name: str
contact_name: Optional[str] = None
email: Optional[str] = None
phone: Optional[str] = None
address: Optional[str] = None
notes: Optional[str] = None
is_active: bool = True
class SupplierCreate(SupplierBase):
pass
class SupplierUpdate(BaseModel):
name: Optional[str] = None
contact_name: Optional[str] = None
email: Optional[str] = None
phone: Optional[str] = None
address: Optional[str] = None
notes: Optional[str] = None
is_active: Optional[bool] = None
class SupplierInDB(SupplierBase):
id: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# Purchase schemas
class PurchaseItemBase(BaseModel):
product_id: int
quantity: float
unit_price: float
total_price: float = None # Will be calculated server-side
class PurchaseItemCreate(PurchaseItemBase):
pass
class PurchaseItemInDB(PurchaseItemBase):
id: int
purchase_id: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class PurchaseItemWithDetails(PurchaseItemInDB):
product: Dict[str, Any]
class PurchaseBase(BaseModel):
supplier_id: int
reference_number: Optional[str] = None
order_date: datetime = None
delivery_date: Optional[datetime] = None
notes: Optional[str] = None
total_amount: float = 0
class PurchaseCreate(PurchaseBase):
items: List[PurchaseItemCreate]
class PurchaseUpdate(BaseModel):
supplier_id: Optional[int] = None
reference_number: Optional[str] = None
order_date: Optional[datetime] = None
delivery_date: Optional[datetime] = None
notes: Optional[str] = None
class PurchaseInDB(PurchaseBase):
id: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class PurchaseWithItems(PurchaseInDB):
items: List[PurchaseItemWithDetails] = []
supplier: Dict[str, Any]

52
main.py Normal file
View File

@ -0,0 +1,52 @@
from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from pathlib import Path
from sqlalchemy.orm import Session
from app.db.session import get_db, engine
from app.db.base import Base
from app.db.init_db import init_db
# Import routers
from app.api.routes import product_router, inventory_router, supplier_router
# Create database tables
Base.metadata.create_all(bind=engine)
# Create FastAPI app
app = FastAPI(
title="Small Business Inventory Management System",
description="API for managing inventory for small businesses",
version="0.1.0"
)
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Update with specific origins in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(product_router)
app.include_router(inventory_router)
app.include_router(supplier_router)
@app.get("/")
async def root():
return {"message": "Welcome to the Small Business Inventory Management System API"}
@app.get("/health")
async def health():
return {"status": "healthy"}
@app.on_event("startup")
async def startup_event():
db = next(get_db())
init_db(db)
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

80
migrations/env.py Normal file
View File

@ -0,0 +1,80 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from pathlib import Path
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
from app.db.base import Base
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
# Ensure the DB directory exists
DB_DIR = Path("/app") / "storage" / "db"
DB_DIR.mkdir(parents=True, exist_ok=True)
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
migrations/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,220 @@
"""initial migration
Revision ID: 001
Revises:
Create Date: 2025-05-12
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy import Column, Integer, String, Float, DateTime, Text, Boolean, ForeignKey, Enum
import enum
# revision identifiers, used by Alembic.
revision = '001'
down_revision = None
branch_labels = None
depends_on = None
# Enum classes for migration
class LocationType(str, enum.Enum):
WAREHOUSE = "warehouse"
STORE = "store"
SUPPLIER = "supplier"
class TransactionType(str, enum.Enum):
PURCHASE = "purchase"
SALE = "sale"
TRANSFER = "transfer"
ADJUSTMENT = "adjustment"
RETURN = "return"
def upgrade():
# Categories table
op.create_table(
'categories',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('name', sa.String(length=100), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_categories_id'), 'categories', ['id'], unique=False)
op.create_index(op.f('ix_categories_name'), 'categories', ['name'], unique=True)
# Products table
op.create_table(
'products',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('sku', sa.String(length=50), nullable=False),
sa.Column('category_id', sa.Integer(), nullable=True),
sa.Column('price', sa.Float(), nullable=False),
sa.Column('cost_price', sa.Float(), nullable=False),
sa.Column('barcode', sa.String(length=100), nullable=True),
sa.ForeignKeyConstraint(['category_id'], ['categories.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_products_id'), 'products', ['id'], unique=False)
op.create_index(op.f('ix_products_name'), 'products', ['name'], unique=False)
op.create_index(op.f('ix_products_sku'), 'products', ['sku'], unique=True)
op.create_index(op.f('ix_products_barcode'), 'products', ['barcode'], unique=True)
# Locations table
op.create_table(
'locations',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('name', sa.String(length=100), nullable=False),
sa.Column('type', sa.Enum('warehouse', 'store', 'supplier', name='locationtype'), nullable=False),
sa.Column('address', sa.String(length=255), nullable=True),
sa.Column('description', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_locations_id'), 'locations', ['id'], unique=False)
# Inventory Transactions table
op.create_table(
'inventory_transactions',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('type', sa.Enum('purchase', 'sale', 'transfer', 'adjustment', 'return', name='transactiontype'), nullable=False),
sa.Column('reference_id', sa.String(length=100), nullable=True),
sa.Column('notes', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_inventory_transactions_id'), 'inventory_transactions', ['id'], unique=False)
# Inventory Items table
op.create_table(
'inventory_items',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('product_id', sa.Integer(), nullable=False),
sa.Column('location_id', sa.Integer(), nullable=False),
sa.Column('quantity', sa.Float(), nullable=False),
sa.Column('minimum_stock', sa.Float(), nullable=False),
sa.Column('maximum_stock', sa.Float(), nullable=True),
sa.ForeignKeyConstraint(['location_id'], ['locations.id'], ),
sa.ForeignKeyConstraint(['product_id'], ['products.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_inventory_items_id'), 'inventory_items', ['id'], unique=False)
# Inventory Transaction Items table
op.create_table(
'inventory_transaction_items',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('transaction_id', sa.Integer(), nullable=False),
sa.Column('product_id', sa.Integer(), nullable=False),
sa.Column('quantity', sa.Float(), nullable=False),
sa.Column('from_location_id', sa.Integer(), nullable=True),
sa.Column('to_location_id', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['from_location_id'], ['locations.id'], ),
sa.ForeignKeyConstraint(['product_id'], ['products.id'], ),
sa.ForeignKeyConstraint(['to_location_id'], ['locations.id'], ),
sa.ForeignKeyConstraint(['transaction_id'], ['inventory_transactions.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_inventory_transaction_items_id'), 'inventory_transaction_items', ['id'], unique=False)
# Suppliers table
op.create_table(
'suppliers',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('contact_name', sa.String(length=100), nullable=True),
sa.Column('email', sa.String(length=255), nullable=True),
sa.Column('phone', sa.String(length=20), nullable=True),
sa.Column('address', sa.Text(), nullable=True),
sa.Column('notes', sa.Text(), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_suppliers_id'), 'suppliers', ['id'], unique=False)
op.create_index(op.f('ix_suppliers_name'), 'suppliers', ['name'], unique=False)
# Purchases table
op.create_table(
'purchases',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('supplier_id', sa.Integer(), nullable=False),
sa.Column('reference_number', sa.String(length=100), nullable=True),
sa.Column('order_date', sa.DateTime(), nullable=True),
sa.Column('delivery_date', sa.DateTime(), nullable=True),
sa.Column('total_amount', sa.Float(), nullable=False),
sa.Column('notes', sa.Text(), nullable=True),
sa.ForeignKeyConstraint(['supplier_id'], ['suppliers.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('reference_number')
)
op.create_index(op.f('ix_purchases_id'), 'purchases', ['id'], unique=False)
# Purchase Items table
op.create_table(
'purchase_items',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('purchase_id', sa.Integer(), nullable=False),
sa.Column('product_id', sa.Integer(), nullable=False),
sa.Column('quantity', sa.Float(), nullable=False),
sa.Column('unit_price', sa.Float(), nullable=False),
sa.Column('total_price', sa.Float(), nullable=False),
sa.ForeignKeyConstraint(['product_id'], ['products.id'], ),
sa.ForeignKeyConstraint(['purchase_id'], ['purchases.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_purchase_items_id'), 'purchase_items', ['id'], unique=False)
def downgrade():
# Drop tables in reverse order (considering foreign key constraints)
op.drop_index(op.f('ix_purchase_items_id'), table_name='purchase_items')
op.drop_table('purchase_items')
op.drop_index(op.f('ix_purchases_id'), table_name='purchases')
op.drop_table('purchases')
op.drop_index(op.f('ix_suppliers_name'), table_name='suppliers')
op.drop_index(op.f('ix_suppliers_id'), table_name='suppliers')
op.drop_table('suppliers')
op.drop_index(op.f('ix_inventory_transaction_items_id'), table_name='inventory_transaction_items')
op.drop_table('inventory_transaction_items')
op.drop_index(op.f('ix_inventory_items_id'), table_name='inventory_items')
op.drop_table('inventory_items')
op.drop_index(op.f('ix_inventory_transactions_id'), table_name='inventory_transactions')
op.drop_table('inventory_transactions')
op.drop_index(op.f('ix_locations_id'), table_name='locations')
op.drop_table('locations')
op.drop_index(op.f('ix_products_barcode'), table_name='products')
op.drop_index(op.f('ix_products_sku'), table_name='products')
op.drop_index(op.f('ix_products_name'), table_name='products')
op.drop_index(op.f('ix_products_id'), table_name='products')
op.drop_table('products')
op.drop_index(op.f('ix_categories_name'), table_name='categories')
op.drop_index(op.f('ix_categories_id'), table_name='categories')
op.drop_table('categories')
# Drop enum types
op.execute("DROP TYPE locationtype")
op.execute("DROP TYPE transactiontype")

9
requirements.txt Normal file
View File

@ -0,0 +1,9 @@
fastapi==0.115.0
uvicorn==0.29.0
sqlalchemy==2.0.34
pydantic==2.7.1
alembic==1.13.1
python-dotenv==1.0.1
python-multipart==0.0.9
pathlib==1.0.1
pytest==8.0.2