
This commit implements a comprehensive inventory management system for small businesses using FastAPI and SQLAlchemy. Features include: - Product and category management - Inventory tracking across multiple locations - Supplier management - Purchase management - Transaction tracking for inventory movements - Complete API documentation generated with BackendIM... (backend.im)
450 lines
14 KiB
Python
450 lines
14 KiB
Python
from typing import List, Optional, Dict, Any
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Query
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy.exc import IntegrityError
|
|
from datetime import datetime
|
|
|
|
from app.db.session import get_db
|
|
from app.models.supplier import Supplier, Purchase, PurchaseItem
|
|
from app.models.product import Product
|
|
from app.models.inventory import (
|
|
InventoryTransaction,
|
|
InventoryTransactionItem,
|
|
Location,
|
|
TransactionType
|
|
)
|
|
from app.schemas.supplier import (
|
|
SupplierCreate,
|
|
SupplierUpdate,
|
|
SupplierInDB,
|
|
PurchaseCreate,
|
|
PurchaseUpdate,
|
|
PurchaseInDB,
|
|
PurchaseWithItems
|
|
)
|
|
from app.api.deps import get_supplier, get_product
|
|
|
|
router = APIRouter(prefix="/api/v1")
|
|
|
|
# Supplier endpoints
|
|
@router.post("/suppliers/", response_model=SupplierInDB, status_code=status.HTTP_201_CREATED)
|
|
def create_supplier(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
supplier_in: SupplierCreate
|
|
):
|
|
"""Create a new supplier"""
|
|
try:
|
|
db_supplier = Supplier(**supplier_in.model_dump())
|
|
db.add(db_supplier)
|
|
db.commit()
|
|
db.refresh(db_supplier)
|
|
return db_supplier
|
|
except Exception as e:
|
|
db.rollback()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Could not create supplier: {str(e)}"
|
|
)
|
|
|
|
@router.get("/suppliers/", response_model=List[SupplierInDB])
|
|
def read_suppliers(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
search: Optional[str] = None,
|
|
is_active: Optional[bool] = None
|
|
):
|
|
"""Get all suppliers with optional filters"""
|
|
query = db.query(Supplier)
|
|
|
|
if search:
|
|
query = query.filter(
|
|
(Supplier.name.ilike(f"%{search}%")) |
|
|
(Supplier.contact_name.ilike(f"%{search}%")) |
|
|
(Supplier.email.ilike(f"%{search}%"))
|
|
)
|
|
|
|
if is_active is not None:
|
|
query = query.filter(Supplier.is_active == is_active)
|
|
|
|
return query.offset(skip).limit(limit).all()
|
|
|
|
@router.get("/suppliers/{supplier_id}", response_model=SupplierInDB)
|
|
def read_supplier(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
supplier: Supplier = Depends(get_supplier)
|
|
):
|
|
"""Get a specific supplier by ID"""
|
|
return supplier
|
|
|
|
@router.put("/suppliers/{supplier_id}", response_model=SupplierInDB)
|
|
def update_supplier(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
supplier: Supplier = Depends(get_supplier),
|
|
supplier_in: SupplierUpdate
|
|
):
|
|
"""Update a supplier"""
|
|
update_data = supplier_in.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(supplier, field, value)
|
|
|
|
try:
|
|
db.add(supplier)
|
|
db.commit()
|
|
db.refresh(supplier)
|
|
return supplier
|
|
except Exception as e:
|
|
db.rollback()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Could not update supplier: {str(e)}"
|
|
)
|
|
|
|
@router.delete("/suppliers/{supplier_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
def delete_supplier(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
supplier: Supplier = Depends(get_supplier)
|
|
):
|
|
"""Delete a supplier"""
|
|
# Check if supplier has purchases
|
|
if supplier.purchases:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Cannot delete supplier with purchase history"
|
|
)
|
|
|
|
db.delete(supplier)
|
|
db.commit()
|
|
return None
|
|
|
|
# Purchase endpoints
|
|
@router.post("/purchases/", response_model=PurchaseInDB, status_code=status.HTTP_201_CREATED)
|
|
def create_purchase(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
purchase_in: PurchaseCreate
|
|
):
|
|
"""Create a new purchase with items"""
|
|
# Check if supplier exists
|
|
supplier = db.query(Supplier).filter(Supplier.id == purchase_in.supplier_id).first()
|
|
if not supplier:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Supplier with ID {purchase_in.supplier_id} not found"
|
|
)
|
|
|
|
# Set order date if not provided
|
|
if not purchase_in.order_date:
|
|
purchase_in.order_date = datetime.now()
|
|
|
|
try:
|
|
# Create purchase record
|
|
total_amount = 0
|
|
purchase_data = purchase_in.model_dump(exclude={"items"})
|
|
db_purchase = Purchase(**purchase_data)
|
|
db.add(db_purchase)
|
|
db.flush() # Get ID without committing
|
|
|
|
# Process purchase items
|
|
for item_data in purchase_in.items:
|
|
# Validate product exists
|
|
product = db.query(Product).filter(Product.id == item_data.product_id).first()
|
|
if not product:
|
|
db.rollback()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Product with ID {item_data.product_id} not found"
|
|
)
|
|
|
|
# Calculate total price if not provided
|
|
if not item_data.total_price:
|
|
item_data.total_price = item_data.quantity * item_data.unit_price
|
|
|
|
# Create purchase item
|
|
db_item = PurchaseItem(
|
|
purchase_id=db_purchase.id,
|
|
product_id=item_data.product_id,
|
|
quantity=item_data.quantity,
|
|
unit_price=item_data.unit_price,
|
|
total_price=item_data.total_price
|
|
)
|
|
db.add(db_item)
|
|
|
|
# Update total amount
|
|
total_amount += item_data.total_price
|
|
|
|
# Update purchase with total amount
|
|
db_purchase.total_amount = total_amount
|
|
|
|
db.commit()
|
|
db.refresh(db_purchase)
|
|
return db_purchase
|
|
|
|
except IntegrityError as e:
|
|
db.rollback()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Database integrity error: {str(e)}"
|
|
)
|
|
except Exception as e:
|
|
db.rollback()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Could not create purchase: {str(e)}"
|
|
)
|
|
|
|
@router.post("/purchases/{purchase_id}/receive", response_model=PurchaseInDB)
|
|
def receive_purchase(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
purchase_id: int,
|
|
location_id: int
|
|
):
|
|
"""Receive a purchase into inventory at specified location"""
|
|
# Check if purchase exists
|
|
purchase = db.query(Purchase).filter(Purchase.id == purchase_id).first()
|
|
if not purchase:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Purchase with ID {purchase_id} not found"
|
|
)
|
|
|
|
# Check if location exists
|
|
location = db.query(Location).filter(Location.id == location_id).first()
|
|
if not location:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Location with ID {location_id} not found"
|
|
)
|
|
|
|
try:
|
|
# Create inventory transaction
|
|
transaction = InventoryTransaction(
|
|
type=TransactionType.PURCHASE,
|
|
reference_id=f"PO-{purchase.id}",
|
|
notes=f"Receiving purchase order {purchase.reference_number}"
|
|
)
|
|
db.add(transaction)
|
|
db.flush()
|
|
|
|
# Get purchase items
|
|
purchase_items = db.query(PurchaseItem).filter(PurchaseItem.purchase_id == purchase.id).all()
|
|
|
|
for item in purchase_items:
|
|
# Create inventory transaction item
|
|
transaction_item = InventoryTransactionItem(
|
|
transaction_id=transaction.id,
|
|
product_id=item.product_id,
|
|
quantity=item.quantity,
|
|
to_location_id=location_id
|
|
)
|
|
db.add(transaction_item)
|
|
|
|
# Update or create inventory item
|
|
inventory_item = db.query(InventoryItem).filter(
|
|
InventoryItem.product_id == item.product_id,
|
|
InventoryItem.location_id == location_id
|
|
).first()
|
|
|
|
if inventory_item:
|
|
inventory_item.quantity += item.quantity
|
|
else:
|
|
from app.models.inventory import InventoryItem
|
|
new_item = InventoryItem(
|
|
product_id=item.product_id,
|
|
location_id=location_id,
|
|
quantity=item.quantity,
|
|
minimum_stock=0
|
|
)
|
|
db.add(new_item)
|
|
|
|
# Update purchase delivery date if not set
|
|
if not purchase.delivery_date:
|
|
purchase.delivery_date = datetime.now()
|
|
|
|
db.commit()
|
|
db.refresh(purchase)
|
|
return purchase
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Could not receive purchase: {str(e)}"
|
|
)
|
|
|
|
@router.get("/purchases/", response_model=List[PurchaseInDB])
|
|
def read_purchases(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
supplier_id: Optional[int] = None,
|
|
reference_number: Optional[str] = None
|
|
):
|
|
"""Get all purchases with optional filters"""
|
|
query = db.query(Purchase)
|
|
|
|
if supplier_id:
|
|
query = query.filter(Purchase.supplier_id == supplier_id)
|
|
|
|
if reference_number:
|
|
query = query.filter(Purchase.reference_number.ilike(f"%{reference_number}%"))
|
|
|
|
return query.order_by(Purchase.order_date.desc()).offset(skip).limit(limit).all()
|
|
|
|
@router.get("/purchases/{purchase_id}", response_model=PurchaseWithItems)
|
|
def read_purchase(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
purchase_id: int
|
|
):
|
|
"""Get a specific purchase by ID with all items"""
|
|
purchase = db.query(Purchase).filter(Purchase.id == purchase_id).first()
|
|
if not purchase:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Purchase with ID {purchase_id} not found"
|
|
)
|
|
|
|
# Get purchase items with product details
|
|
items = db.query(PurchaseItem).filter(PurchaseItem.purchase_id == purchase_id).all()
|
|
|
|
result_items = []
|
|
for item in items:
|
|
product = db.query(Product).filter(Product.id == item.product_id).first()
|
|
|
|
item_dict = {
|
|
"id": item.id,
|
|
"purchase_id": item.purchase_id,
|
|
"product_id": item.product_id,
|
|
"quantity": item.quantity,
|
|
"unit_price": item.unit_price,
|
|
"total_price": item.total_price,
|
|
"created_at": item.created_at,
|
|
"updated_at": item.updated_at,
|
|
"product": {
|
|
"id": product.id,
|
|
"name": product.name,
|
|
"sku": product.sku
|
|
}
|
|
}
|
|
result_items.append(item_dict)
|
|
|
|
# Get supplier details
|
|
supplier = db.query(Supplier).filter(Supplier.id == purchase.supplier_id).first()
|
|
|
|
# Create final result
|
|
result = {
|
|
"id": purchase.id,
|
|
"supplier_id": purchase.supplier_id,
|
|
"reference_number": purchase.reference_number,
|
|
"order_date": purchase.order_date,
|
|
"delivery_date": purchase.delivery_date,
|
|
"total_amount": purchase.total_amount,
|
|
"notes": purchase.notes,
|
|
"created_at": purchase.created_at,
|
|
"updated_at": purchase.updated_at,
|
|
"items": result_items,
|
|
"supplier": {
|
|
"id": supplier.id,
|
|
"name": supplier.name,
|
|
"contact_name": supplier.contact_name,
|
|
"email": supplier.email,
|
|
"phone": supplier.phone
|
|
}
|
|
}
|
|
|
|
return result
|
|
|
|
@router.put("/purchases/{purchase_id}", response_model=PurchaseInDB)
|
|
def update_purchase(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
purchase_id: int,
|
|
purchase_in: PurchaseUpdate
|
|
):
|
|
"""Update a purchase (header only, not items)"""
|
|
purchase = db.query(Purchase).filter(Purchase.id == purchase_id).first()
|
|
if not purchase:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Purchase with ID {purchase_id} not found"
|
|
)
|
|
|
|
# Check if supplier exists if supplier_id is being updated
|
|
if purchase_in.supplier_id:
|
|
supplier = db.query(Supplier).filter(Supplier.id == purchase_in.supplier_id).first()
|
|
if not supplier:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Supplier with ID {purchase_in.supplier_id} not found"
|
|
)
|
|
|
|
update_data = purchase_in.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(purchase, field, value)
|
|
|
|
try:
|
|
db.add(purchase)
|
|
db.commit()
|
|
db.refresh(purchase)
|
|
return purchase
|
|
except IntegrityError as e:
|
|
db.rollback()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Database integrity error: {str(e)}"
|
|
)
|
|
except Exception as e:
|
|
db.rollback()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Could not update purchase: {str(e)}"
|
|
)
|
|
|
|
@router.delete("/purchases/{purchase_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
def delete_purchase(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
purchase_id: int
|
|
):
|
|
"""Delete a purchase and its items"""
|
|
purchase = db.query(Purchase).filter(Purchase.id == purchase_id).first()
|
|
if not purchase:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Purchase with ID {purchase_id} not found"
|
|
)
|
|
|
|
# Check if this purchase has been received (has inventory transactions)
|
|
transaction = db.query(InventoryTransaction).filter(
|
|
InventoryTransaction.type == TransactionType.PURCHASE,
|
|
InventoryTransaction.reference_id == f"PO-{purchase.id}"
|
|
).first()
|
|
|
|
if transaction:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Cannot delete purchase that has been received into inventory"
|
|
)
|
|
|
|
try:
|
|
# Delete purchase items first
|
|
db.query(PurchaseItem).filter(PurchaseItem.purchase_id == purchase_id).delete()
|
|
|
|
# Delete purchase
|
|
db.delete(purchase)
|
|
db.commit()
|
|
return None
|
|
except Exception as e:
|
|
db.rollback()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Could not delete purchase: {str(e)}"
|
|
) |