from typing import List, Optional from sqlalchemy.orm import Session from app.crud.base import CRUDBase from app.models.item import Item, Transaction, TransactionItem, TransactionType from app.schemas.item import ItemCreate, ItemUpdate, TransactionCreate, TransactionUpdate, InventoryAdjustment class CRUDItem(CRUDBase[Item, ItemCreate, ItemUpdate]): def get_by_sku(self, db: Session, *, sku: str) -> Optional[Item]: return db.query(Item).filter(Item.sku == sku).first() def get_by_category(self, db: Session, *, category_id: int, skip: int = 0, limit: int = 100) -> List[Item]: return db.query(Item).filter(Item.category_id == category_id).offset(skip).limit(limit).all() def get_by_supplier(self, db: Session, *, supplier_id: int, skip: int = 0, limit: int = 100) -> List[Item]: return db.query(Item).filter(Item.supplier_id == supplier_id).offset(skip).limit(limit).all() def get_low_stock_items(self, db: Session, *, skip: int = 0, limit: int = 100) -> List[Item]: return db.query(Item).filter(Item.quantity <= Item.reorder_level).offset(skip).limit(limit).all() def adjust_inventory(self, db: Session, *, item_id: int, adjustment: InventoryAdjustment) -> Item: item = self.get(db, id=item_id) if not item: return None # Update item quantity item.quantity += adjustment.quantity # Create an adjustment transaction transaction = Transaction( transaction_type=TransactionType.ADJUSTMENT, notes=adjustment.reason or f"Quantity adjusted by {adjustment.quantity}" ) db.add(transaction) db.flush() # Create transaction item transaction_item = TransactionItem( transaction_id=transaction.id, item_id=item_id, quantity=abs(adjustment.quantity), unit_price=item.unit_price ) db.add(transaction_item) db.commit() db.refresh(item) return item class CRUDTransaction(CRUDBase[Transaction, TransactionCreate, TransactionUpdate]): def create(self, db: Session, *, obj_in: TransactionCreate) -> Transaction: # Create transaction transaction = Transaction( transaction_type=obj_in.transaction_type, reference_number=obj_in.reference_number, notes=obj_in.notes ) db.add(transaction) db.flush() # Create transaction items for item_data in obj_in.items: transaction_item = TransactionItem( transaction_id=transaction.id, item_id=item_data.item_id, quantity=item_data.quantity, unit_price=item_data.unit_price ) db.add(transaction_item) # Update item quantity based on transaction type item = db.query(Item).get(item_data.item_id) if obj_in.transaction_type == TransactionType.PURCHASE: item.quantity += item_data.quantity elif obj_in.transaction_type == TransactionType.SALE: item.quantity -= item_data.quantity db.commit() db.refresh(transaction) return transaction def get_transactions_by_item(self, db: Session, *, item_id: int, skip: int = 0, limit: int = 100) -> List[Transaction]: return ( db.query(Transaction) .join(TransactionItem) .filter(TransactionItem.item_id == item_id) .offset(skip) .limit(limit) .all() ) item = CRUDItem(Item) transaction = CRUDTransaction(Transaction)