97 lines
3.6 KiB
Python
97 lines
3.6 KiB
Python
from typing import List, Optional
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.crud.base import CRUDBase
|
|
from app.models.item import Item, Transaction, TransactionItem, TransactionType
|
|
from app.schemas.item import ItemCreate, ItemUpdate, TransactionCreate, TransactionUpdate, InventoryAdjustment
|
|
|
|
|
|
class CRUDItem(CRUDBase[Item, ItemCreate, ItemUpdate]):
|
|
def get_by_sku(self, db: Session, *, sku: str) -> Optional[Item]:
|
|
return db.query(Item).filter(Item.sku == sku).first()
|
|
|
|
def get_by_category(self, db: Session, *, category_id: int, skip: int = 0, limit: int = 100) -> List[Item]:
|
|
return db.query(Item).filter(Item.category_id == category_id).offset(skip).limit(limit).all()
|
|
|
|
def get_by_supplier(self, db: Session, *, supplier_id: int, skip: int = 0, limit: int = 100) -> List[Item]:
|
|
return db.query(Item).filter(Item.supplier_id == supplier_id).offset(skip).limit(limit).all()
|
|
|
|
def get_low_stock_items(self, db: Session, *, skip: int = 0, limit: int = 100) -> List[Item]:
|
|
return db.query(Item).filter(Item.quantity <= Item.reorder_level).offset(skip).limit(limit).all()
|
|
|
|
def adjust_inventory(self, db: Session, *, item_id: int, adjustment: InventoryAdjustment) -> Item:
|
|
item = self.get(db, id=item_id)
|
|
if not item:
|
|
return None
|
|
|
|
# Update item quantity
|
|
item.quantity += adjustment.quantity
|
|
|
|
# Create an adjustment transaction
|
|
transaction = Transaction(
|
|
transaction_type=TransactionType.ADJUSTMENT,
|
|
notes=adjustment.reason or f"Quantity adjusted by {adjustment.quantity}"
|
|
)
|
|
db.add(transaction)
|
|
db.flush()
|
|
|
|
# Create transaction item
|
|
transaction_item = TransactionItem(
|
|
transaction_id=transaction.id,
|
|
item_id=item_id,
|
|
quantity=abs(adjustment.quantity),
|
|
unit_price=item.unit_price
|
|
)
|
|
db.add(transaction_item)
|
|
|
|
db.commit()
|
|
db.refresh(item)
|
|
return item
|
|
|
|
|
|
class CRUDTransaction(CRUDBase[Transaction, TransactionCreate, TransactionUpdate]):
|
|
def create(self, db: Session, *, obj_in: TransactionCreate) -> Transaction:
|
|
# Create transaction
|
|
transaction = Transaction(
|
|
transaction_type=obj_in.transaction_type,
|
|
reference_number=obj_in.reference_number,
|
|
notes=obj_in.notes
|
|
)
|
|
db.add(transaction)
|
|
db.flush()
|
|
|
|
# Create transaction items
|
|
for item_data in obj_in.items:
|
|
transaction_item = TransactionItem(
|
|
transaction_id=transaction.id,
|
|
item_id=item_data.item_id,
|
|
quantity=item_data.quantity,
|
|
unit_price=item_data.unit_price
|
|
)
|
|
db.add(transaction_item)
|
|
|
|
# Update item quantity based on transaction type
|
|
item = db.query(Item).get(item_data.item_id)
|
|
if obj_in.transaction_type == TransactionType.PURCHASE:
|
|
item.quantity += item_data.quantity
|
|
elif obj_in.transaction_type == TransactionType.SALE:
|
|
item.quantity -= item_data.quantity
|
|
|
|
db.commit()
|
|
db.refresh(transaction)
|
|
return transaction
|
|
|
|
def get_transactions_by_item(self, db: Session, *, item_id: int, skip: int = 0, limit: int = 100) -> List[Transaction]:
|
|
return (
|
|
db.query(Transaction)
|
|
.join(TransactionItem)
|
|
.filter(TransactionItem.item_id == item_id)
|
|
.offset(skip)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
|
|
item = CRUDItem(Item)
|
|
transaction = CRUDTransaction(Transaction) |