
- Set up FastAPI application structure - Create database models for User, Product, Cart, CartItem, Order, and OrderItem - Set up Alembic for database migrations - Create Pydantic schemas for request/response models - Implement API endpoints for products, cart operations, and checkout process - Add health endpoint - Update README with project details and documentation
177 lines
5.2 KiB
Python
177 lines
5.2 KiB
Python
from typing import Any, List
|
|
import uuid
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.db.session import get_db
|
|
from app.models.order import OrderStatus
|
|
from app.schemas.order import (
|
|
Order as OrderSchema,
|
|
OrderCreate,
|
|
OrderResponse
|
|
)
|
|
from app.crud import product, cart, order, order_item
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("", response_model=OrderResponse, status_code=status.HTTP_201_CREATED)
|
|
def create_order(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
order_in: OrderCreate,
|
|
user_id: int = 1 # Simplified: In a real app, this would come from auth
|
|
) -> Any:
|
|
"""
|
|
Create a new order from the current cart.
|
|
"""
|
|
# Get the cart with items
|
|
current_cart = cart.get_cart_with_items(db=db, cart_id=order_in.cart_id)
|
|
if not current_cart:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Cart not found"
|
|
)
|
|
|
|
# Check if cart is empty
|
|
if not current_cart.items:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Cannot checkout with an empty cart"
|
|
)
|
|
|
|
# Check if all products are available and in stock
|
|
for item in current_cart.items:
|
|
db_product = product.get(db=db, id=item.product_id)
|
|
if not db_product or not db_product.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Product {item.product_id} is not available"
|
|
)
|
|
|
|
if db_product.stock_quantity < item.quantity:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Not enough stock for product {db_product.name}. Only {db_product.stock_quantity} available."
|
|
)
|
|
|
|
# Create the order
|
|
new_order = order.create_from_cart(
|
|
db=db,
|
|
obj_in=order_in,
|
|
user_id=user_id,
|
|
cart=current_cart
|
|
)
|
|
|
|
# Update product stock
|
|
for item in current_cart.items:
|
|
product.update_stock(
|
|
db=db,
|
|
product_id=item.product_id,
|
|
quantity=-item.quantity # Decrease stock
|
|
)
|
|
|
|
# Mark cart as inactive
|
|
cart_update = {"is_active": False}
|
|
cart.update(db=db, db_obj=current_cart, obj_in=cart_update)
|
|
|
|
# Return order response
|
|
return OrderResponse(
|
|
order_id=new_order.id,
|
|
message="Order created successfully",
|
|
status=new_order.status.value,
|
|
total_amount=new_order.total_amount
|
|
)
|
|
|
|
|
|
@router.get("/{order_id}", response_model=OrderSchema)
|
|
def get_order(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
order_id: int,
|
|
user_id: int = 1 # Simplified: In a real app, this would come from auth
|
|
) -> Any:
|
|
"""
|
|
Get order by ID.
|
|
"""
|
|
db_order = order.get_with_items(db=db, order_id=order_id)
|
|
|
|
if not db_order:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Order with ID {order_id} not found"
|
|
)
|
|
|
|
# Check if the order belongs to the user (simplified authorization)
|
|
if db_order.user_id != user_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not authorized to access this order"
|
|
)
|
|
|
|
return db_order
|
|
|
|
|
|
@router.get("", response_model=List[OrderSchema])
|
|
def get_user_orders(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
user_id: int = 1 # Simplified: In a real app, this would come from auth
|
|
) -> Any:
|
|
"""
|
|
Get all orders for the current user.
|
|
"""
|
|
orders = order.get_by_user(db=db, user_id=user_id, skip=skip, limit=limit)
|
|
|
|
# Fetch items for each order
|
|
for o in orders:
|
|
o.items = order_item.get_by_order(db=db, order_id=o.id)
|
|
|
|
return orders
|
|
|
|
|
|
@router.post("/{order_id}/pay", response_model=OrderResponse)
|
|
def process_payment(
|
|
*,
|
|
db: Session = Depends(get_db),
|
|
order_id: int,
|
|
user_id: int = 1 # Simplified: In a real app, this would come from auth
|
|
) -> Any:
|
|
"""
|
|
Process payment for an order (simplified simulation).
|
|
"""
|
|
db_order = order.get(db=db, id=order_id)
|
|
|
|
if not db_order:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Order with ID {order_id} not found"
|
|
)
|
|
|
|
# Check if the order belongs to the user (simplified authorization)
|
|
if db_order.user_id != user_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not authorized to access this order"
|
|
)
|
|
|
|
# Check if the order is already paid
|
|
if db_order.status != OrderStatus.PENDING:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Order is already {db_order.status.value}"
|
|
)
|
|
|
|
# Simulate payment processing
|
|
payment_id = f"PAY-{uuid.uuid4().hex[:10].upper()}"
|
|
updated_order = order.process_payment(db=db, order_id=order_id, payment_id=payment_id)
|
|
|
|
return OrderResponse(
|
|
order_id=updated_order.id,
|
|
message="Payment processed successfully",
|
|
status=updated_order.status.value,
|
|
total_amount=updated_order.total_amount
|
|
) |