97 lines
3.6 KiB
Python

from typing import List, Optional
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.item import Item, Transaction, TransactionItem, TransactionType
from app.schemas.item import ItemCreate, ItemUpdate, TransactionCreate, TransactionUpdate, InventoryAdjustment
class CRUDItem(CRUDBase[Item, ItemCreate, ItemUpdate]):
def get_by_sku(self, db: Session, *, sku: str) -> Optional[Item]:
return db.query(Item).filter(Item.sku == sku).first()
def get_by_category(self, db: Session, *, category_id: int, skip: int = 0, limit: int = 100) -> List[Item]:
return db.query(Item).filter(Item.category_id == category_id).offset(skip).limit(limit).all()
def get_by_supplier(self, db: Session, *, supplier_id: int, skip: int = 0, limit: int = 100) -> List[Item]:
return db.query(Item).filter(Item.supplier_id == supplier_id).offset(skip).limit(limit).all()
def get_low_stock_items(self, db: Session, *, skip: int = 0, limit: int = 100) -> List[Item]:
return db.query(Item).filter(Item.quantity <= Item.reorder_level).offset(skip).limit(limit).all()
def adjust_inventory(self, db: Session, *, item_id: int, adjustment: InventoryAdjustment) -> Item:
item = self.get(db, id=item_id)
if not item:
return None
# Update item quantity
item.quantity += adjustment.quantity
# Create an adjustment transaction
transaction = Transaction(
transaction_type=TransactionType.ADJUSTMENT,
notes=adjustment.reason or f"Quantity adjusted by {adjustment.quantity}"
)
db.add(transaction)
db.flush()
# Create transaction item
transaction_item = TransactionItem(
transaction_id=transaction.id,
item_id=item_id,
quantity=abs(adjustment.quantity),
unit_price=item.unit_price
)
db.add(transaction_item)
db.commit()
db.refresh(item)
return item
class CRUDTransaction(CRUDBase[Transaction, TransactionCreate, TransactionUpdate]):
def create(self, db: Session, *, obj_in: TransactionCreate) -> Transaction:
# Create transaction
transaction = Transaction(
transaction_type=obj_in.transaction_type,
reference_number=obj_in.reference_number,
notes=obj_in.notes
)
db.add(transaction)
db.flush()
# Create transaction items
for item_data in obj_in.items:
transaction_item = TransactionItem(
transaction_id=transaction.id,
item_id=item_data.item_id,
quantity=item_data.quantity,
unit_price=item_data.unit_price
)
db.add(transaction_item)
# Update item quantity based on transaction type
item = db.query(Item).get(item_data.item_id)
if obj_in.transaction_type == TransactionType.PURCHASE:
item.quantity += item_data.quantity
elif obj_in.transaction_type == TransactionType.SALE:
item.quantity -= item_data.quantity
db.commit()
db.refresh(transaction)
return transaction
def get_transactions_by_item(self, db: Session, *, item_id: int, skip: int = 0, limit: int = 100) -> List[Transaction]:
return (
db.query(Transaction)
.join(TransactionItem)
.filter(TransactionItem.item_id == item_id)
.offset(skip)
.limit(limit)
.all()
)
item = CRUDItem(Item)
transaction = CRUDTransaction(Transaction)