from typing import List, Optional, Dict, Any from sqlalchemy import func from sqlalchemy.orm import Session, joinedload from decimal import Decimal from app.crud.base import CRUDBase from app.models.purchase_order import PurchaseOrder, PurchaseOrderItem from app.models.inventory import Inventory, InventoryTransaction from app.schemas.purchase_order import PurchaseOrderCreate, PurchaseOrderUpdate class CRUDPurchaseOrder(CRUDBase[PurchaseOrder, PurchaseOrderCreate, PurchaseOrderUpdate]): def create_with_items( self, db: Session, *, obj_in: PurchaseOrderCreate, user_id: int ) -> PurchaseOrder: """Create purchase order with items""" # Create purchase order db_obj = PurchaseOrder( supplier_name=obj_in.supplier_name, notes=obj_in.notes, status=obj_in.status, created_by=user_id ) db.add(db_obj) db.flush() # Create items for item in obj_in.items: db_item = PurchaseOrderItem( purchase_order_id=db_obj.id, product_id=item.product_id, quantity=item.quantity, unit_price=item.unit_price ) db.add(db_item) db.commit() db.refresh(db_obj) return db_obj def get_with_items(self, db: Session, id: int) -> Optional[PurchaseOrder]: """Get purchase order with its items""" return db.query(PurchaseOrder).options( joinedload(PurchaseOrder.items).joinedload(PurchaseOrderItem.product) ).filter(PurchaseOrder.id == id).first() def get_multi_with_items( self, db: Session, *, skip: int = 0, limit: int = 100 ) -> List[PurchaseOrder]: """Get multiple purchase orders with their items""" return db.query(PurchaseOrder).options( joinedload(PurchaseOrder.items).joinedload(PurchaseOrderItem.product) ).offset(skip).limit(limit).all() def receive_order(self, db: Session, *, id: int) -> Optional[PurchaseOrder]: """Mark a purchase order as received and update inventory""" purchase_order = self.get_with_items(db, id) if not purchase_order: return None if purchase_order.status != "pending": return purchase_order # Already processed or cancelled # Update purchase order status purchase_order.status = "received" # Update inventory for each item for item in purchase_order.items: # Find or create inventory inventory = db.query(Inventory).filter( Inventory.product_id == item.product_id, Inventory.location == None # Default location ).first() if not inventory: inventory = Inventory( product_id=item.product_id, quantity=0, location=None # Default location ) db.add(inventory) db.flush() # Update quantity inventory.quantity += item.quantity # Record transaction transaction = InventoryTransaction( product_id=item.product_id, quantity=item.quantity, transaction_type="purchase", reference_id=purchase_order.id, reason=f"Received from {purchase_order.supplier_name}" ) db.add(transaction) db.commit() db.refresh(purchase_order) return purchase_order def cancel_order(self, db: Session, *, id: int) -> Optional[PurchaseOrder]: """Cancel a purchase order""" purchase_order = db.query(PurchaseOrder).filter(PurchaseOrder.id == id).first() if not purchase_order: return None if purchase_order.status != "pending": return purchase_order # Already processed or cancelled purchase_order.status = "cancelled" db.commit() db.refresh(purchase_order) return purchase_order def get_total_amount(self, db: Session, *, id: int) -> Decimal: """Calculate total amount for a purchase order""" result = db.query( func.sum(PurchaseOrderItem.quantity * PurchaseOrderItem.unit_price).label("total") ).filter( PurchaseOrderItem.purchase_order_id == id ).scalar() return result or Decimal("0.00") purchase_order = CRUDPurchaseOrder(PurchaseOrder)