Automated Action a28e115e4f Implement simple cart system with FastAPI and SQLite
- Set up project structure with FastAPI and SQLite
- Create models for products and cart items
- Implement schemas for API request/response validation
- Add database migrations with Alembic
- Create service layer for business logic
- Implement API endpoints for cart operations
- Add health endpoint for monitoring
- Update documentation
- Fix linting issues
2025-05-18 23:36:40 +00:00

213 lines
5.9 KiB
Python

from typing import Optional, Dict, Any, Tuple
from sqlalchemy.orm import Session, joinedload
from fastapi import HTTPException, status
from app.models.cart import Cart, CartItem
from app.schemas.cart import CartItemCreate, CartItemUpdate
from app.services import product_service
def get_active_cart(db: Session, user_id: str) -> Optional[Cart]:
"""
Get the active cart for a user, or create one if it doesn't exist.
"""
cart = db.query(Cart).filter(
Cart.user_id == user_id,
Cart.is_active == 1
).first()
if not cart:
cart = Cart(user_id=user_id, is_active=1)
db.add(cart)
db.commit()
db.refresh(cart)
return cart
def get_cart_with_items(db: Session, cart_id: int) -> Optional[Cart]:
"""
Get a cart by ID with all its items.
"""
return db.query(Cart).options(
joinedload(Cart.items)
).filter(Cart.id == cart_id).first()
def get_cart_detail(db: Session, cart_id: int) -> Dict[str, Any]:
"""
Get detailed cart information including items and total.
"""
cart = get_cart_with_items(db, cart_id)
if not cart:
return None
items_with_details = []
total = 0.0
for item in cart.items:
product = product_service.get_product_by_id(db, item.product_id)
if product:
subtotal = item.quantity * item.unit_price
total += subtotal
items_with_details.append({
"id": item.id,
"product_id": item.product_id,
"product_name": product.name,
"quantity": item.quantity,
"unit_price": item.unit_price,
"subtotal": subtotal
})
return {
"id": cart.id,
"user_id": cart.user_id,
"is_active": cart.is_active,
"created_at": cart.created_at,
"items": items_with_details,
"total": round(total, 2)
}
def add_item_to_cart(
db: Session,
user_id: str,
item_data: CartItemCreate
) -> Tuple[CartItem, bool]:
"""
Add an item to a user's active cart. Returns the cart item and a boolean
indicating if it's a new item (True) or an updated existing item (False).
"""
# Get or create active cart
cart = get_active_cart(db, user_id)
# Check if product exists and is available
product = product_service.get_product_by_id(db, item_data.product_id)
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Product with ID {item_data.product_id} not found"
)
if not product.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Product with ID {item_data.product_id} is not available"
)
if product.stock < item_data.quantity:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Not enough stock available. Requested: {item_data.quantity}, Available: {product.stock}"
)
# Check if the item already exists in the cart
cart_item = db.query(CartItem).filter(
CartItem.cart_id == cart.id,
CartItem.product_id == item_data.product_id
).first()
is_new = False
if cart_item:
# Update existing item
new_quantity = cart_item.quantity + item_data.quantity
if new_quantity > product.stock:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Not enough stock available. Requested total: {new_quantity}, Available: {product.stock}"
)
cart_item.quantity = new_quantity
else:
# Create new item
is_new = True
cart_item = CartItem(
cart_id=cart.id,
product_id=item_data.product_id,
quantity=item_data.quantity,
unit_price=product.price
)
db.add(cart_item)
db.commit()
db.refresh(cart_item)
return cart_item, is_new
def update_cart_item(
db: Session,
user_id: str,
item_id: int,
item_data: CartItemUpdate
) -> Optional[CartItem]:
"""
Update the quantity of an item in a user's active cart.
"""
# Get active cart
cart = get_active_cart(db, user_id)
# Find the cart item
cart_item = db.query(CartItem).filter(
CartItem.id == item_id,
CartItem.cart_id == cart.id
).first()
if not cart_item:
return None
# Check product stock
product = product_service.get_product_by_id(db, cart_item.product_id)
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Product with ID {cart_item.product_id} not found"
)
if product.stock < item_data.quantity:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Not enough stock available. Requested: {item_data.quantity}, Available: {product.stock}"
)
# Update quantity
cart_item.quantity = item_data.quantity
db.commit()
db.refresh(cart_item)
return cart_item
def remove_cart_item(db: Session, user_id: str, item_id: int) -> bool:
"""
Remove an item from a user's active cart.
"""
# Get active cart
cart = get_active_cart(db, user_id)
# Find the cart item
cart_item = db.query(CartItem).filter(
CartItem.id == item_id,
CartItem.cart_id == cart.id
).first()
if not cart_item:
return False
# Remove item
db.delete(cart_item)
db.commit()
return True
def clear_cart(db: Session, user_id: str) -> bool:
"""
Remove all items from a user's active cart.
"""
# Get active cart
cart = get_active_cart(db, user_id)
# Delete all cart items
deleted = db.query(CartItem).filter(CartItem.cart_id == cart.id).delete()
db.commit()
return deleted > 0