
- Set up FastAPI application structure - Create database models for User, Product, Cart, CartItem, Order, and OrderItem - Set up Alembic for database migrations - Create Pydantic schemas for request/response models - Implement API endpoints for products, cart operations, and checkout process - Add health endpoint - Update README with project details and documentation
105 lines
3.5 KiB
Python
105 lines
3.5 KiB
Python
from typing import List, Optional, Any
|
|
from datetime import datetime
|
|
|
|
from sqlalchemy.orm import Session, joinedload
|
|
|
|
from app.models.order import Order, OrderItem, OrderStatus
|
|
from app.models.cart import Cart, CartItem
|
|
from app.schemas.order import OrderCreate, OrderUpdate, OrderItemCreate
|
|
from app.crud.base import CRUDBase
|
|
|
|
|
|
class CRUDOrderItem(CRUDBase[OrderItem, OrderItemCreate, Any]):
|
|
"""CRUD operations for OrderItem model."""
|
|
|
|
def create_from_cart_item(
|
|
self, db: Session, *, order_id: int, cart_item: CartItem
|
|
) -> OrderItem:
|
|
"""Create a new order item from a cart item."""
|
|
order_item = OrderItem(
|
|
order_id=order_id,
|
|
product_id=cart_item.product_id,
|
|
quantity=cart_item.quantity,
|
|
unit_price=cart_item.unit_price
|
|
)
|
|
db.add(order_item)
|
|
db.commit()
|
|
db.refresh(order_item)
|
|
return order_item
|
|
|
|
def get_by_order(self, db: Session, *, order_id: int) -> List[OrderItem]:
|
|
"""Get all items in an order."""
|
|
return db.query(OrderItem).filter(OrderItem.order_id == order_id).all()
|
|
|
|
|
|
class CRUDOrder(CRUDBase[Order, OrderCreate, OrderUpdate]):
|
|
"""CRUD operations for Order model."""
|
|
|
|
def create_from_cart(
|
|
self, db: Session, *, obj_in: OrderCreate, user_id: int, cart: Cart
|
|
) -> Order:
|
|
"""Create a new order from a cart."""
|
|
# Calculate total amount from cart items
|
|
total_amount = sum(item.quantity * item.unit_price for item in cart.items)
|
|
|
|
# Create order
|
|
order = Order(
|
|
user_id=user_id,
|
|
status=OrderStatus.PENDING,
|
|
total_amount=total_amount,
|
|
shipping_address=obj_in.shipping_address,
|
|
payment_method=obj_in.payment_method,
|
|
notes=obj_in.notes,
|
|
tracking_number=None,
|
|
payment_id=None,
|
|
paid_at=None
|
|
)
|
|
db.add(order)
|
|
db.commit()
|
|
db.refresh(order)
|
|
|
|
# Create order items from cart items
|
|
order_items = []
|
|
for cart_item in cart.items:
|
|
order_item = OrderItem(
|
|
order_id=order.id,
|
|
product_id=cart_item.product_id,
|
|
quantity=cart_item.quantity,
|
|
unit_price=cart_item.unit_price
|
|
)
|
|
db.add(order_item)
|
|
order_items.append(order_item)
|
|
|
|
db.commit()
|
|
|
|
return order
|
|
|
|
def get_by_user(self, db: Session, *, user_id: int, skip: int = 0, limit: int = 100) -> List[Order]:
|
|
"""Get all orders for a user."""
|
|
return db.query(Order).filter(Order.user_id == user_id).offset(skip).limit(limit).all()
|
|
|
|
def get_with_items(self, db: Session, *, order_id: int) -> Optional[Order]:
|
|
"""Get an order with all its items and product details."""
|
|
return db.query(Order).options(
|
|
joinedload(Order.items).joinedload(OrderItem.product)
|
|
).filter(Order.id == order_id).first()
|
|
|
|
def process_payment(self, db: Session, *, order_id: int, payment_id: str) -> Order:
|
|
"""Process a payment for an order."""
|
|
order = self.get(db=db, id=order_id)
|
|
if not order:
|
|
return None
|
|
|
|
order.status = OrderStatus.PAID
|
|
order.payment_id = payment_id
|
|
order.paid_at = datetime.utcnow()
|
|
|
|
db.add(order)
|
|
db.commit()
|
|
db.refresh(order)
|
|
|
|
return order
|
|
|
|
|
|
order = CRUDOrder(Order)
|
|
order_item = CRUDOrderItem(OrderItem) |