Automated Action d2b80dacc4 Implement Shopping Cart and Checkout API
- Set up FastAPI application structure
- Create database models for User, Product, Cart, CartItem, Order, and OrderItem
- Set up Alembic for database migrations
- Create Pydantic schemas for request/response models
- Implement API endpoints for products, cart operations, and checkout process
- Add health endpoint
- Update README with project details and documentation
2025-05-18 00:00:02 +00:00

177 lines
5.2 KiB
Python

from typing import Any, List
import uuid
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.models.order import OrderStatus
from app.schemas.order import (
Order as OrderSchema,
OrderCreate,
OrderResponse
)
from app.crud import product, cart, order, order_item
router = APIRouter()
@router.post("", response_model=OrderResponse, status_code=status.HTTP_201_CREATED)
def create_order(
*,
db: Session = Depends(get_db),
order_in: OrderCreate,
user_id: int = 1 # Simplified: In a real app, this would come from auth
) -> Any:
"""
Create a new order from the current cart.
"""
# Get the cart with items
current_cart = cart.get_cart_with_items(db=db, cart_id=order_in.cart_id)
if not current_cart:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Cart not found"
)
# Check if cart is empty
if not current_cart.items:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot checkout with an empty cart"
)
# Check if all products are available and in stock
for item in current_cart.items:
db_product = product.get(db=db, id=item.product_id)
if not db_product or not db_product.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Product {item.product_id} is not available"
)
if db_product.stock_quantity < item.quantity:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Not enough stock for product {db_product.name}. Only {db_product.stock_quantity} available."
)
# Create the order
new_order = order.create_from_cart(
db=db,
obj_in=order_in,
user_id=user_id,
cart=current_cart
)
# Update product stock
for item in current_cart.items:
product.update_stock(
db=db,
product_id=item.product_id,
quantity=-item.quantity # Decrease stock
)
# Mark cart as inactive
cart_update = {"is_active": False}
cart.update(db=db, db_obj=current_cart, obj_in=cart_update)
# Return order response
return OrderResponse(
order_id=new_order.id,
message="Order created successfully",
status=new_order.status.value,
total_amount=new_order.total_amount
)
@router.get("/{order_id}", response_model=OrderSchema)
def get_order(
*,
db: Session = Depends(get_db),
order_id: int,
user_id: int = 1 # Simplified: In a real app, this would come from auth
) -> Any:
"""
Get order by ID.
"""
db_order = order.get_with_items(db=db, order_id=order_id)
if not db_order:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Order with ID {order_id} not found"
)
# Check if the order belongs to the user (simplified authorization)
if db_order.user_id != user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to access this order"
)
return db_order
@router.get("", response_model=List[OrderSchema])
def get_user_orders(
*,
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
user_id: int = 1 # Simplified: In a real app, this would come from auth
) -> Any:
"""
Get all orders for the current user.
"""
orders = order.get_by_user(db=db, user_id=user_id, skip=skip, limit=limit)
# Fetch items for each order
for o in orders:
o.items = order_item.get_by_order(db=db, order_id=o.id)
return orders
@router.post("/{order_id}/pay", response_model=OrderResponse)
def process_payment(
*,
db: Session = Depends(get_db),
order_id: int,
user_id: int = 1 # Simplified: In a real app, this would come from auth
) -> Any:
"""
Process payment for an order (simplified simulation).
"""
db_order = order.get(db=db, id=order_id)
if not db_order:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Order with ID {order_id} not found"
)
# Check if the order belongs to the user (simplified authorization)
if db_order.user_id != user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to access this order"
)
# Check if the order is already paid
if db_order.status != OrderStatus.PENDING:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Order is already {db_order.status.value}"
)
# Simulate payment processing
payment_id = f"PAY-{uuid.uuid4().hex[:10].upper()}"
updated_order = order.process_payment(db=db, order_id=order_id, payment_id=payment_id)
return OrderResponse(
order_id=updated_order.id,
message="Payment processed successfully",
status=updated_order.status.value,
total_amount=updated_order.total_amount
)