Automated Action 54bf9880b9 Implement Small Business Inventory Management System
This commit implements a comprehensive inventory management system for small businesses using FastAPI and SQLAlchemy. Features include:
- Product and category management
- Inventory tracking across multiple locations
- Supplier management
- Purchase management
- Transaction tracking for inventory movements
- Complete API documentation

generated with BackendIM... (backend.im)
2025-05-12 12:55:31 +00:00

450 lines
14 KiB
Python

from typing import List, Optional, Dict, Any
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from datetime import datetime
from app.db.session import get_db
from app.models.supplier import Supplier, Purchase, PurchaseItem
from app.models.product import Product
from app.models.inventory import (
InventoryTransaction,
InventoryTransactionItem,
Location,
TransactionType
)
from app.schemas.supplier import (
SupplierCreate,
SupplierUpdate,
SupplierInDB,
PurchaseCreate,
PurchaseUpdate,
PurchaseInDB,
PurchaseWithItems
)
from app.api.deps import get_supplier, get_product
router = APIRouter(prefix="/api/v1")
# Supplier endpoints
@router.post("/suppliers/", response_model=SupplierInDB, status_code=status.HTTP_201_CREATED)
def create_supplier(
*,
db: Session = Depends(get_db),
supplier_in: SupplierCreate
):
"""Create a new supplier"""
try:
db_supplier = Supplier(**supplier_in.model_dump())
db.add(db_supplier)
db.commit()
db.refresh(db_supplier)
return db_supplier
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Could not create supplier: {str(e)}"
)
@router.get("/suppliers/", response_model=List[SupplierInDB])
def read_suppliers(
*,
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
search: Optional[str] = None,
is_active: Optional[bool] = None
):
"""Get all suppliers with optional filters"""
query = db.query(Supplier)
if search:
query = query.filter(
(Supplier.name.ilike(f"%{search}%")) |
(Supplier.contact_name.ilike(f"%{search}%")) |
(Supplier.email.ilike(f"%{search}%"))
)
if is_active is not None:
query = query.filter(Supplier.is_active == is_active)
return query.offset(skip).limit(limit).all()
@router.get("/suppliers/{supplier_id}", response_model=SupplierInDB)
def read_supplier(
*,
db: Session = Depends(get_db),
supplier: Supplier = Depends(get_supplier)
):
"""Get a specific supplier by ID"""
return supplier
@router.put("/suppliers/{supplier_id}", response_model=SupplierInDB)
def update_supplier(
*,
db: Session = Depends(get_db),
supplier: Supplier = Depends(get_supplier),
supplier_in: SupplierUpdate
):
"""Update a supplier"""
update_data = supplier_in.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(supplier, field, value)
try:
db.add(supplier)
db.commit()
db.refresh(supplier)
return supplier
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Could not update supplier: {str(e)}"
)
@router.delete("/suppliers/{supplier_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_supplier(
*,
db: Session = Depends(get_db),
supplier: Supplier = Depends(get_supplier)
):
"""Delete a supplier"""
# Check if supplier has purchases
if supplier.purchases:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot delete supplier with purchase history"
)
db.delete(supplier)
db.commit()
return None
# Purchase endpoints
@router.post("/purchases/", response_model=PurchaseInDB, status_code=status.HTTP_201_CREATED)
def create_purchase(
*,
db: Session = Depends(get_db),
purchase_in: PurchaseCreate
):
"""Create a new purchase with items"""
# Check if supplier exists
supplier = db.query(Supplier).filter(Supplier.id == purchase_in.supplier_id).first()
if not supplier:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Supplier with ID {purchase_in.supplier_id} not found"
)
# Set order date if not provided
if not purchase_in.order_date:
purchase_in.order_date = datetime.now()
try:
# Create purchase record
total_amount = 0
purchase_data = purchase_in.model_dump(exclude={"items"})
db_purchase = Purchase(**purchase_data)
db.add(db_purchase)
db.flush() # Get ID without committing
# Process purchase items
for item_data in purchase_in.items:
# Validate product exists
product = db.query(Product).filter(Product.id == item_data.product_id).first()
if not product:
db.rollback()
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Product with ID {item_data.product_id} not found"
)
# Calculate total price if not provided
if not item_data.total_price:
item_data.total_price = item_data.quantity * item_data.unit_price
# Create purchase item
db_item = PurchaseItem(
purchase_id=db_purchase.id,
product_id=item_data.product_id,
quantity=item_data.quantity,
unit_price=item_data.unit_price,
total_price=item_data.total_price
)
db.add(db_item)
# Update total amount
total_amount += item_data.total_price
# Update purchase with total amount
db_purchase.total_amount = total_amount
db.commit()
db.refresh(db_purchase)
return db_purchase
except IntegrityError as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Database integrity error: {str(e)}"
)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Could not create purchase: {str(e)}"
)
@router.post("/purchases/{purchase_id}/receive", response_model=PurchaseInDB)
def receive_purchase(
*,
db: Session = Depends(get_db),
purchase_id: int,
location_id: int
):
"""Receive a purchase into inventory at specified location"""
# Check if purchase exists
purchase = db.query(Purchase).filter(Purchase.id == purchase_id).first()
if not purchase:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Purchase with ID {purchase_id} not found"
)
# Check if location exists
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Location with ID {location_id} not found"
)
try:
# Create inventory transaction
transaction = InventoryTransaction(
type=TransactionType.PURCHASE,
reference_id=f"PO-{purchase.id}",
notes=f"Receiving purchase order {purchase.reference_number}"
)
db.add(transaction)
db.flush()
# Get purchase items
purchase_items = db.query(PurchaseItem).filter(PurchaseItem.purchase_id == purchase.id).all()
for item in purchase_items:
# Create inventory transaction item
transaction_item = InventoryTransactionItem(
transaction_id=transaction.id,
product_id=item.product_id,
quantity=item.quantity,
to_location_id=location_id
)
db.add(transaction_item)
# Update or create inventory item
inventory_item = db.query(InventoryItem).filter(
InventoryItem.product_id == item.product_id,
InventoryItem.location_id == location_id
).first()
if inventory_item:
inventory_item.quantity += item.quantity
else:
from app.models.inventory import InventoryItem
new_item = InventoryItem(
product_id=item.product_id,
location_id=location_id,
quantity=item.quantity,
minimum_stock=0
)
db.add(new_item)
# Update purchase delivery date if not set
if not purchase.delivery_date:
purchase.delivery_date = datetime.now()
db.commit()
db.refresh(purchase)
return purchase
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Could not receive purchase: {str(e)}"
)
@router.get("/purchases/", response_model=List[PurchaseInDB])
def read_purchases(
*,
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
supplier_id: Optional[int] = None,
reference_number: Optional[str] = None
):
"""Get all purchases with optional filters"""
query = db.query(Purchase)
if supplier_id:
query = query.filter(Purchase.supplier_id == supplier_id)
if reference_number:
query = query.filter(Purchase.reference_number.ilike(f"%{reference_number}%"))
return query.order_by(Purchase.order_date.desc()).offset(skip).limit(limit).all()
@router.get("/purchases/{purchase_id}", response_model=PurchaseWithItems)
def read_purchase(
*,
db: Session = Depends(get_db),
purchase_id: int
):
"""Get a specific purchase by ID with all items"""
purchase = db.query(Purchase).filter(Purchase.id == purchase_id).first()
if not purchase:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Purchase with ID {purchase_id} not found"
)
# Get purchase items with product details
items = db.query(PurchaseItem).filter(PurchaseItem.purchase_id == purchase_id).all()
result_items = []
for item in items:
product = db.query(Product).filter(Product.id == item.product_id).first()
item_dict = {
"id": item.id,
"purchase_id": item.purchase_id,
"product_id": item.product_id,
"quantity": item.quantity,
"unit_price": item.unit_price,
"total_price": item.total_price,
"created_at": item.created_at,
"updated_at": item.updated_at,
"product": {
"id": product.id,
"name": product.name,
"sku": product.sku
}
}
result_items.append(item_dict)
# Get supplier details
supplier = db.query(Supplier).filter(Supplier.id == purchase.supplier_id).first()
# Create final result
result = {
"id": purchase.id,
"supplier_id": purchase.supplier_id,
"reference_number": purchase.reference_number,
"order_date": purchase.order_date,
"delivery_date": purchase.delivery_date,
"total_amount": purchase.total_amount,
"notes": purchase.notes,
"created_at": purchase.created_at,
"updated_at": purchase.updated_at,
"items": result_items,
"supplier": {
"id": supplier.id,
"name": supplier.name,
"contact_name": supplier.contact_name,
"email": supplier.email,
"phone": supplier.phone
}
}
return result
@router.put("/purchases/{purchase_id}", response_model=PurchaseInDB)
def update_purchase(
*,
db: Session = Depends(get_db),
purchase_id: int,
purchase_in: PurchaseUpdate
):
"""Update a purchase (header only, not items)"""
purchase = db.query(Purchase).filter(Purchase.id == purchase_id).first()
if not purchase:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Purchase with ID {purchase_id} not found"
)
# Check if supplier exists if supplier_id is being updated
if purchase_in.supplier_id:
supplier = db.query(Supplier).filter(Supplier.id == purchase_in.supplier_id).first()
if not supplier:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Supplier with ID {purchase_in.supplier_id} not found"
)
update_data = purchase_in.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(purchase, field, value)
try:
db.add(purchase)
db.commit()
db.refresh(purchase)
return purchase
except IntegrityError as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Database integrity error: {str(e)}"
)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Could not update purchase: {str(e)}"
)
@router.delete("/purchases/{purchase_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_purchase(
*,
db: Session = Depends(get_db),
purchase_id: int
):
"""Delete a purchase and its items"""
purchase = db.query(Purchase).filter(Purchase.id == purchase_id).first()
if not purchase:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Purchase with ID {purchase_id} not found"
)
# Check if this purchase has been received (has inventory transactions)
transaction = db.query(InventoryTransaction).filter(
InventoryTransaction.type == TransactionType.PURCHASE,
InventoryTransaction.reference_id == f"PO-{purchase.id}"
).first()
if transaction:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot delete purchase that has been received into inventory"
)
try:
# Delete purchase items first
db.query(PurchaseItem).filter(PurchaseItem.purchase_id == purchase_id).delete()
# Delete purchase
db.delete(purchase)
db.commit()
return None
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Could not delete purchase: {str(e)}"
)