2025-06-02 11:37:11 +00:00

142 lines
4.2 KiB
Python

import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
from sqlalchemy.orm import Session
from app.models.order import Order, OrderItem, OrderStatus
from app.models.product import Product
from app.schemas.order import OrderCreate, OrderUpdate
def generate_order_number() -> str:
"""Generate a unique order number"""
# Format: ORD-{random_uuid}
return f"ORD-{uuid.uuid4().hex[:8].upper()}"
def get(db: Session, order_id: int) -> Optional[Order]:
return db.query(Order).filter(Order.id == order_id).first()
def get_by_order_number(db: Session, order_number: str) -> Optional[Order]:
return db.query(Order).filter(Order.order_number == order_number).first()
def get_multi(
db: Session, *, skip: int = 0, limit: int = 100, user_id: Optional[int] = None
) -> List[Order]:
query = db.query(Order)
if user_id:
query = query.filter(Order.user_id == user_id)
return query.order_by(Order.created_at.desc()).offset(skip).limit(limit).all()
def create(db: Session, *, obj_in: OrderCreate) -> Order:
# Calculate total order amount based on current product prices
total_amount = 0.0
order_items = []
for item in obj_in.items:
product = db.query(Product).get(item.product_id)
if not product:
raise ValueError(f"Product with ID {item.product_id} not found")
# Use provided unit price or get from product
unit_price = item.unit_price if item.unit_price is not None else product.price
item_total = unit_price * item.quantity
total_amount += item_total
order_items.append({
"product_id": item.product_id,
"quantity": item.quantity,
"unit_price": unit_price,
"total_price": item_total
})
# Create order
db_obj = Order(
user_id=obj_in.user_id,
order_number=generate_order_number(),
status=obj_in.status or OrderStatus.PENDING,
total_amount=total_amount,
shipping_address=obj_in.shipping_address,
created_at=datetime.now(),
)
db.add(db_obj)
db.flush() # Get the order ID without committing transaction
# Create order items
for item_data in order_items:
order_item = OrderItem(
order_id=db_obj.id,
product_id=item_data["product_id"],
quantity=item_data["quantity"],
unit_price=item_data["unit_price"],
total_price=item_data["total_price"]
)
db.add(order_item)
db.commit()
db.refresh(db_obj)
return db_obj
def update(db: Session, *, db_obj: Order, obj_in: Union[OrderUpdate, Dict[str, Any]]) -> Order:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
# Don't allow changing order_number or created_at
update_data.pop("order_number", None)
update_data.pop("created_at", None)
# Set updated_at
update_data["updated_at"] = datetime.now()
for field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update_status(db: Session, *, order_id: int, status: OrderStatus) -> Optional[Order]:
"""Update order status"""
order = get(db, order_id)
if not order:
return None
order.status = status
order.updated_at = datetime.now()
db.add(order)
db.commit()
db.refresh(order)
return order
def cancel_order(db: Session, *, order_id: int) -> Optional[Order]:
"""Cancel an order if it hasn't been shipped yet"""
order = get(db, order_id)
if not order:
return None
if order.status in [OrderStatus.SHIPPED, OrderStatus.DELIVERED]:
raise ValueError("Cannot cancel an order that has already been shipped or delivered")
order.status = OrderStatus.CANCELLED
order.updated_at = datetime.now()
db.add(order)
db.commit()
db.refresh(order)
return order
def get_order_items(db: Session, *, order_id: int) -> List[OrderItem]:
"""Get all items for a specific order"""
return db.query(OrderItem).filter(OrderItem.order_id == order_id).all()