209 lines
6.5 KiB
Python

from datetime import datetime, timedelta
from typing import Any, List
from fastapi import APIRouter, Depends
from sqlalchemy import and_, func
from sqlalchemy.orm import Session
from app import models, schemas
from app.api import deps
from app.db.session import get_db
from app.models.inventory_movement import MovementType
router = APIRouter()
@router.get("/low-stock", response_model=List[schemas.LowStockProduct])
def get_low_stock_products(
db: Session = Depends(get_db),
current_user: models.User = Depends(deps.get_current_user),
) -> Any:
"""
Get all products that are at or below their minimum stock level.
"""
# Query products with supplier and category names
query = (
db.query(
models.Product.id,
models.Product.name,
models.Product.sku,
models.Product.current_stock,
models.Product.min_stock_level,
models.Supplier.name.label("supplier_name"),
models.Category.name.label("category_name"),
)
.outerjoin(models.Supplier, models.Product.supplier_id == models.Supplier.id)
.outerjoin(models.Category, models.Product.category_id == models.Category.id)
.filter(
and_(
models.Product.is_active,
models.Product.min_stock_level.isnot(None),
models.Product.current_stock <= models.Product.min_stock_level,
)
)
.order_by(
# Order by criticality: largest gap between current and min stock first
(models.Product.min_stock_level - models.Product.current_stock).desc()
)
)
# Convert to list of dictionaries
results = [
{
"id": row.id,
"name": row.name,
"sku": row.sku,
"current_stock": row.current_stock,
"min_stock_level": row.min_stock_level,
"supplier_name": row.supplier_name,
"category_name": row.category_name,
}
for row in query.all()
]
return results
@router.get("/inventory-value", response_model=schemas.InventoryValueSummary)
def get_inventory_value(
db: Session = Depends(get_db),
current_user: models.User = Depends(deps.get_current_user),
) -> Any:
"""
Get the total value of the inventory and breakdown by category.
"""
# Query all active products
products = (
db.query(
models.Product.id,
models.Product.name,
models.Product.sku,
models.Product.current_stock,
models.Product.price,
models.Product.category_id,
models.Category.name.label("category_name"),
)
.outerjoin(models.Category, models.Product.category_id == models.Category.id)
.filter(models.Product.is_active)
.all()
)
# Calculate overall inventory value
total_products = len(products)
total_items = sum(p.current_stock for p in products)
total_value = sum(p.current_stock * p.price for p in products)
# Calculate average item value
average_item_value = 0
if total_items > 0:
average_item_value = total_value / total_items
# Group by category
category_values = {}
for p in products:
category_id = p.category_id or "uncategorized"
category_name = p.category_name or "Uncategorized"
if category_id not in category_values:
category_values[category_id] = {
"id": category_id,
"name": category_name,
"product_count": 0,
"total_items": 0,
"total_value": 0.0,
}
category_values[category_id]["product_count"] += 1
category_values[category_id]["total_items"] += p.current_stock
category_values[category_id]["total_value"] += p.current_stock * p.price
return {
"total_products": total_products,
"total_items": total_items,
"total_value": total_value,
"average_item_value": average_item_value,
"by_category": list(category_values.values()),
}
@router.get("/movement-summary", response_model=schemas.MovementSummary)
def get_movement_summary(
period: schemas.TimePeriod = schemas.TimePeriod.MONTH,
db: Session = Depends(get_db),
current_user: models.User = Depends(deps.get_current_user),
) -> Any:
"""
Get a summary of inventory movements for a specific time period.
"""
# Calculate date range based on period
end_date = datetime.utcnow()
if period == schemas.TimePeriod.DAY:
start_date = end_date - timedelta(days=1)
period_name = "Last 24 hours"
elif period == schemas.TimePeriod.WEEK:
start_date = end_date - timedelta(days=7)
period_name = "Last 7 days"
elif period == schemas.TimePeriod.MONTH:
start_date = end_date - timedelta(days=30)
period_name = "Last 30 days"
elif period == schemas.TimePeriod.YEAR:
start_date = end_date - timedelta(days=365)
period_name = "Last 365 days"
else: # ALL
start_date = datetime(1900, 1, 1)
period_name = "All time"
# Query movement statistics
stock_in = (
db.query(func.sum(models.InventoryMovement.quantity))
.filter(
models.InventoryMovement.type == MovementType.STOCK_IN,
models.InventoryMovement.created_at.between(start_date, end_date),
)
.scalar()
or 0
)
stock_out = (
db.query(func.sum(models.InventoryMovement.quantity))
.filter(
models.InventoryMovement.type == MovementType.STOCK_OUT,
models.InventoryMovement.created_at.between(start_date, end_date),
)
.scalar()
or 0
)
adjustments = (
db.query(func.sum(models.InventoryMovement.quantity))
.filter(
models.InventoryMovement.type == MovementType.ADJUSTMENT,
models.InventoryMovement.created_at.between(start_date, end_date),
)
.scalar()
or 0
)
returns = (
db.query(func.sum(models.InventoryMovement.quantity))
.filter(
models.InventoryMovement.type == MovementType.RETURN,
models.InventoryMovement.created_at.between(start_date, end_date),
)
.scalar()
or 0
)
# Calculate net change
net_change = stock_in - stock_out + adjustments + returns
return {
"period": period_name,
"stock_in": int(stock_in),
"stock_out": int(stock_out),
"adjustments": int(adjustments),
"returns": int(returns),
"net_change": int(net_change),
}