99 lines
2.8 KiB
Python

from typing import Any
from fastapi import APIRouter, Depends
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.api.deps import get_db, get_current_user
from app.models.user import User
from app.models.item import Item, Transaction
router = APIRouter()
@router.get("/inventory-summary")
def inventory_summary(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> Any:
"""
Get inventory summary including total items, total value, and out-of-stock count.
"""
# Total number of items
total_items = db.query(func.count(Item.id)).scalar()
# Total inventory value
total_value = db.query(func.sum(Item.quantity * Item.unit_price)).scalar() or 0
# Out of stock items
out_of_stock_count = db.query(func.count(Item.id)).filter(Item.quantity == 0).scalar()
# Low stock items (below reorder level)
low_stock_count = db.query(func.count(Item.id)).filter(
Item.quantity > 0, Item.quantity <= Item.reorder_level
).scalar()
return {
"total_items": total_items,
"total_value": total_value,
"out_of_stock_count": out_of_stock_count,
"low_stock_count": low_stock_count,
}
@router.get("/category-summary")
def category_summary(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> Any:
"""
Get inventory summary by category.
"""
query = db.query(
Item.category_id,
func.count(Item.id).label("item_count"),
func.sum(Item.quantity).label("total_quantity"),
func.sum(Item.quantity * Item.unit_price).label("total_value")
).group_by(Item.category_id).all()
return [
{
"category_id": category_id,
"item_count": item_count,
"total_quantity": total_quantity or 0,
"total_value": total_value or 0
}
for category_id, item_count, total_quantity, total_value in query
]
@router.get("/transaction-summary")
def transaction_summary(
db: Session = Depends(get_db),
start_date: str = None,
end_date: str = None,
current_user: User = Depends(get_current_user),
) -> Any:
"""
Get transaction summary by type, optionally filtered by date range.
"""
query = db.query(
Transaction.transaction_type,
func.count(Transaction.id).label("transaction_count")
).group_by(Transaction.transaction_type)
# Apply date filters if provided
if start_date:
query = query.filter(Transaction.created_at >= start_date)
if end_date:
query = query.filter(Transaction.created_at <= end_date)
result = query.all()
return [
{
"transaction_type": transaction_type,
"transaction_count": transaction_count
}
for transaction_type, transaction_count in result
]