from typing import Dict, List, Optional, Tuple from sqlalchemy import func from sqlalchemy.orm import Session from app.models.item import Category, Item, InventoryTransaction def get_item_count(db: Session) -> Tuple[int, Dict[str, int]]: """ Get the total count of items and breakdown by category Args: db: Database session Returns: Tuple containing total count and dictionary of counts by category """ # Get total item count total_count = db.query(func.count(Item.id)).scalar() or 0 # Get count by category category_counts = {} # Query for items grouped by category category_query = ( db.query( Category.name, func.count(Item.id) ) .outerjoin(Item, Category.id == Item.category_id) .group_by(Category.id) .all() ) # Build the category counts dictionary for category_name, count in category_query: category_counts[category_name] = count # Count items with no category no_category_count = ( db.query(func.count(Item.id)) .filter(Item.category_id.is_(None)) .scalar() ) or 0 if no_category_count > 0: category_counts["Uncategorized"] = no_category_count return total_count, category_counts def get_category_count(db: Session) -> int: """ Get the total count of categories Args: db: Database session Returns: Total number of categories """ return db.query(func.count(Category.id)).scalar() or 0 def get_inventory_value(db: Session) -> Tuple[float, Dict[str, float]]: """ Get the total value of inventory and breakdown by category Args: db: Database session Returns: Tuple containing total value and dictionary of values by category """ # Query to calculate total inventory value total_value_query = ( db.query(func.sum(Item.quantity * Item.price)) .filter(Item.price.isnot(None)) .scalar() ) total_value = float(total_value_query or 0) # Query to calculate value by category category_values = {} # Get value by category category_value_query = ( db.query( Category.name, func.sum(Item.quantity * Item.price) ) .outerjoin(Item, Category.id == Item.category_id) .filter(Item.price.isnot(None)) .group_by(Category.id) .all() ) # Build the category values dictionary for category_name, value in category_value_query: if value is not None: category_values[category_name] = float(value) # Calculate value for items with no category no_category_value = ( db.query(func.sum(Item.quantity * Item.price)) .filter(Item.category_id.is_(None), Item.price.isnot(None)) .scalar() ) if no_category_value: category_values["Uncategorized"] = float(no_category_value) return total_value, category_values def get_low_stock_items(db: Session, threshold: int = 10) -> Tuple[int, List[Dict]]: """ Get items with stock below the specified threshold Args: db: Database session threshold: Stock level threshold Returns: Tuple containing count of low stock items and list of basic item information """ # Query for low stock items low_stock_query = ( db.query(Item) .filter(Item.quantity < threshold) .order_by(Item.quantity) .all() ) low_stock_count = len(low_stock_query) # Format the results low_stock_items = [ { "id": item.id, "name": item.name, "quantity": item.quantity, "category": item.category.name if item.category else "Uncategorized" } for item in low_stock_query ] return low_stock_count, low_stock_items def get_recent_transactions(db: Session, limit: int = 5) -> Tuple[int, List[Dict]]: """ Get the most recent inventory transactions Args: db: Database session limit: Maximum number of transactions to return Returns: Tuple containing count of transactions and list of transaction information """ # Query for recent transactions transactions_query = ( db.query(InventoryTransaction, Item.name.label("item_name")) .join(Item, InventoryTransaction.item_id == Item.id) .order_by(InventoryTransaction.timestamp.desc()) .limit(limit) .all() ) transaction_count = len(transactions_query) # Format the results transactions = [ { "id": transaction.InventoryTransaction.id, "item_id": transaction.InventoryTransaction.item_id, "item_name": transaction.item_name, "quantity_change": transaction.InventoryTransaction.quantity_change, "transaction_type": transaction.InventoryTransaction.transaction_type, "timestamp": transaction.InventoryTransaction.timestamp.isoformat() } for transaction in transactions_query ] return transaction_count, transactions def get_dashboard_statistics( db: Session, include_value: bool = True, include_low_stock: bool = True, low_stock_threshold: int = 10, include_transactions: bool = True, transaction_limit: int = 5 ) -> Dict: """ Get all dashboard statistics Args: db: Database session include_value: Whether to include inventory value calculations include_low_stock: Whether to include low stock information low_stock_threshold: Threshold for low stock items include_transactions: Whether to include recent transactions transaction_limit: Maximum number of transactions to include Returns: Dictionary containing all requested dashboard statistics """ # Get item counts total_items, items_by_category = get_item_count(db) # Get category count total_categories = get_category_count(db) # Initialize the result dictionary result = { "item_count": { "total": total_items, "by_category": items_by_category }, "category_count": { "total": total_categories } } # Add inventory value if requested if include_value: total_value, value_by_category = get_inventory_value(db) result["inventory_value"] = { "total": total_value, "by_category": value_by_category } # Add low stock information if requested if include_low_stock: low_stock_count, low_stock_items = get_low_stock_items(db, low_stock_threshold) result["low_stock_items"] = { "count": low_stock_count, "threshold": low_stock_threshold, "items": low_stock_items } # Add recent transactions if requested if include_transactions: transaction_count, transactions = get_recent_transactions(db, transaction_limit) result["recent_transactions"] = { "count": transaction_count, "transactions": transactions } return result