smallbusinessinventorymanag.../app/crud/crud_purchase_order.py
Automated Action 5935f302dc Create Small Business Inventory Management System with FastAPI and SQLite
- Set up project structure and FastAPI application
- Create database models with SQLAlchemy
- Implement authentication with JWT
- Add CRUD operations for products, inventory, categories
- Implement purchase order and sales functionality
- Create reporting endpoints
- Set up Alembic for database migrations
- Add comprehensive documentation in README.md
2025-05-16 08:53:15 +00:00

129 lines
4.6 KiB
Python

from typing import List, Optional, Dict, Any
from sqlalchemy import func
from sqlalchemy.orm import Session, joinedload
from decimal import Decimal
from app.crud.base import CRUDBase
from app.models.purchase_order import PurchaseOrder, PurchaseOrderItem
from app.models.inventory import Inventory, InventoryTransaction
from app.schemas.purchase_order import PurchaseOrderCreate, PurchaseOrderUpdate
class CRUDPurchaseOrder(CRUDBase[PurchaseOrder, PurchaseOrderCreate, PurchaseOrderUpdate]):
def create_with_items(
self, db: Session, *, obj_in: PurchaseOrderCreate, user_id: int
) -> PurchaseOrder:
"""Create purchase order with items"""
# Create purchase order
db_obj = PurchaseOrder(
supplier_name=obj_in.supplier_name,
notes=obj_in.notes,
status=obj_in.status,
created_by=user_id
)
db.add(db_obj)
db.flush()
# Create items
for item in obj_in.items:
db_item = PurchaseOrderItem(
purchase_order_id=db_obj.id,
product_id=item.product_id,
quantity=item.quantity,
unit_price=item.unit_price
)
db.add(db_item)
db.commit()
db.refresh(db_obj)
return db_obj
def get_with_items(self, db: Session, id: int) -> Optional[PurchaseOrder]:
"""Get purchase order with its items"""
return db.query(PurchaseOrder).options(
joinedload(PurchaseOrder.items).joinedload(PurchaseOrderItem.product)
).filter(PurchaseOrder.id == id).first()
def get_multi_with_items(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[PurchaseOrder]:
"""Get multiple purchase orders with their items"""
return db.query(PurchaseOrder).options(
joinedload(PurchaseOrder.items).joinedload(PurchaseOrderItem.product)
).offset(skip).limit(limit).all()
def receive_order(self, db: Session, *, id: int) -> Optional[PurchaseOrder]:
"""Mark a purchase order as received and update inventory"""
purchase_order = self.get_with_items(db, id)
if not purchase_order:
return None
if purchase_order.status != "pending":
return purchase_order # Already processed or cancelled
# Update purchase order status
purchase_order.status = "received"
# Update inventory for each item
for item in purchase_order.items:
# Find or create inventory
inventory = db.query(Inventory).filter(
Inventory.product_id == item.product_id,
Inventory.location == None # Default location
).first()
if not inventory:
inventory = Inventory(
product_id=item.product_id,
quantity=0,
location=None # Default location
)
db.add(inventory)
db.flush()
# Update quantity
inventory.quantity += item.quantity
# Record transaction
transaction = InventoryTransaction(
product_id=item.product_id,
quantity=item.quantity,
transaction_type="purchase",
reference_id=purchase_order.id,
reason=f"Received from {purchase_order.supplier_name}"
)
db.add(transaction)
db.commit()
db.refresh(purchase_order)
return purchase_order
def cancel_order(self, db: Session, *, id: int) -> Optional[PurchaseOrder]:
"""Cancel a purchase order"""
purchase_order = db.query(PurchaseOrder).filter(PurchaseOrder.id == id).first()
if not purchase_order:
return None
if purchase_order.status != "pending":
return purchase_order # Already processed or cancelled
purchase_order.status = "cancelled"
db.commit()
db.refresh(purchase_order)
return purchase_order
def get_total_amount(self, db: Session, *, id: int) -> Decimal:
"""Calculate total amount for a purchase order"""
result = db.query(
func.sum(PurchaseOrderItem.quantity * PurchaseOrderItem.unit_price).label("total")
).filter(
PurchaseOrderItem.purchase_order_id == id
).scalar()
return result or Decimal("0.00")
purchase_order = CRUDPurchaseOrder(PurchaseOrder)