
- Fix unused imports in API endpoints - Add proper __all__ exports in model and schema modules - Add proper TYPE_CHECKING imports in models to prevent circular imports - Fix import order in migrations - Fix long lines in migration scripts - All ruff checks passing
105 lines
3.5 KiB
Python
105 lines
3.5 KiB
Python
import uuid
|
|
from datetime import datetime
|
|
from typing import List
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.crud.base import CRUDBase
|
|
from app.models.inventory_transaction import InventoryTransaction, TransactionType
|
|
from app.models.product import Product
|
|
from app.schemas.inventory_transaction import (
|
|
InventoryTransactionCreate,
|
|
InventoryTransactionUpdate,
|
|
)
|
|
|
|
|
|
class CRUDInventoryTransaction(
|
|
CRUDBase[InventoryTransaction, InventoryTransactionCreate, InventoryTransactionUpdate]
|
|
):
|
|
"""
|
|
CRUD operations for InventoryTransaction model.
|
|
"""
|
|
def create_with_product_update(
|
|
self, db: Session, *, obj_in: InventoryTransactionCreate
|
|
) -> InventoryTransaction:
|
|
"""
|
|
Create a new inventory transaction and update product quantity.
|
|
"""
|
|
# Get the product
|
|
product = db.query(Product).filter(Product.id == obj_in.product_id).first()
|
|
if not product:
|
|
raise ValueError("Product not found")
|
|
|
|
# Calculate the quantity change based on transaction type
|
|
if obj_in.transaction_type == TransactionType.PURCHASE:
|
|
product.quantity += obj_in.quantity
|
|
elif obj_in.transaction_type == TransactionType.SALE:
|
|
if product.quantity < obj_in.quantity:
|
|
raise ValueError("Not enough inventory for sale")
|
|
product.quantity -= obj_in.quantity
|
|
elif obj_in.transaction_type == TransactionType.ADJUSTMENT:
|
|
product.quantity += obj_in.quantity # Can be negative for reduction
|
|
elif obj_in.transaction_type == TransactionType.RETURN:
|
|
product.quantity += obj_in.quantity
|
|
elif obj_in.transaction_type == TransactionType.TRANSFER:
|
|
if product.quantity < obj_in.quantity:
|
|
raise ValueError("Not enough inventory for transfer")
|
|
product.quantity -= obj_in.quantity
|
|
|
|
# Create the transaction
|
|
db_obj = InventoryTransaction(
|
|
id=str(uuid.uuid4()),
|
|
transaction_type=obj_in.transaction_type,
|
|
quantity=obj_in.quantity,
|
|
transaction_date=obj_in.transaction_date or datetime.utcnow(),
|
|
notes=obj_in.notes,
|
|
unit_price=obj_in.unit_price,
|
|
reference_number=obj_in.reference_number,
|
|
product_id=obj_in.product_id,
|
|
)
|
|
|
|
# Update the database
|
|
db.add(db_obj)
|
|
db.add(product)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
|
|
return db_obj
|
|
|
|
def get_by_product(
|
|
self, db: Session, *, product_id: str, skip: int = 0, limit: int = 100
|
|
) -> List[InventoryTransaction]:
|
|
"""
|
|
Get transactions by product ID.
|
|
"""
|
|
return (
|
|
db.query(InventoryTransaction)
|
|
.filter(InventoryTransaction.product_id == product_id)
|
|
.order_by(InventoryTransaction.transaction_date.desc())
|
|
.offset(skip)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
def get_by_transaction_type(
|
|
self,
|
|
db: Session,
|
|
*,
|
|
transaction_type: TransactionType,
|
|
skip: int = 0,
|
|
limit: int = 100
|
|
) -> List[InventoryTransaction]:
|
|
"""
|
|
Get transactions by transaction type.
|
|
"""
|
|
return (
|
|
db.query(InventoryTransaction)
|
|
.filter(InventoryTransaction.transaction_type == transaction_type)
|
|
.order_by(InventoryTransaction.transaction_date.desc())
|
|
.offset(skip)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
|
|
inventory_transaction = CRUDInventoryTransaction(InventoryTransaction) |