from typing import Optional, Dict, Any, Tuple from sqlalchemy.orm import Session, joinedload from fastapi import HTTPException, status from app.models.cart import Cart, CartItem from app.schemas.cart import CartItemCreate, CartItemUpdate from app.services import product_service def get_active_cart(db: Session, user_id: str) -> Optional[Cart]: """ Get the active cart for a user, or create one if it doesn't exist. """ cart = db.query(Cart).filter( Cart.user_id == user_id, Cart.is_active == 1 ).first() if not cart: cart = Cart(user_id=user_id, is_active=1) db.add(cart) db.commit() db.refresh(cart) return cart def get_cart_with_items(db: Session, cart_id: int) -> Optional[Cart]: """ Get a cart by ID with all its items. """ return db.query(Cart).options( joinedload(Cart.items) ).filter(Cart.id == cart_id).first() def get_cart_detail(db: Session, cart_id: int) -> Dict[str, Any]: """ Get detailed cart information including items and total. """ cart = get_cart_with_items(db, cart_id) if not cart: return None items_with_details = [] total = 0.0 for item in cart.items: product = product_service.get_product_by_id(db, item.product_id) if product: subtotal = item.quantity * item.unit_price total += subtotal items_with_details.append({ "id": item.id, "product_id": item.product_id, "product_name": product.name, "quantity": item.quantity, "unit_price": item.unit_price, "subtotal": subtotal }) return { "id": cart.id, "user_id": cart.user_id, "is_active": cart.is_active, "created_at": cart.created_at, "items": items_with_details, "total": round(total, 2) } def add_item_to_cart( db: Session, user_id: str, item_data: CartItemCreate ) -> Tuple[CartItem, bool]: """ Add an item to a user's active cart. Returns the cart item and a boolean indicating if it's a new item (True) or an updated existing item (False). """ # Get or create active cart cart = get_active_cart(db, user_id) # Check if product exists and is available product = product_service.get_product_by_id(db, item_data.product_id) if not product: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Product with ID {item_data.product_id} not found" ) if not product.is_active: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Product with ID {item_data.product_id} is not available" ) if product.stock < item_data.quantity: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Not enough stock available. Requested: {item_data.quantity}, Available: {product.stock}" ) # Check if the item already exists in the cart cart_item = db.query(CartItem).filter( CartItem.cart_id == cart.id, CartItem.product_id == item_data.product_id ).first() is_new = False if cart_item: # Update existing item new_quantity = cart_item.quantity + item_data.quantity if new_quantity > product.stock: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Not enough stock available. Requested total: {new_quantity}, Available: {product.stock}" ) cart_item.quantity = new_quantity else: # Create new item is_new = True cart_item = CartItem( cart_id=cart.id, product_id=item_data.product_id, quantity=item_data.quantity, unit_price=product.price ) db.add(cart_item) db.commit() db.refresh(cart_item) return cart_item, is_new def update_cart_item( db: Session, user_id: str, item_id: int, item_data: CartItemUpdate ) -> Optional[CartItem]: """ Update the quantity of an item in a user's active cart. """ # Get active cart cart = get_active_cart(db, user_id) # Find the cart item cart_item = db.query(CartItem).filter( CartItem.id == item_id, CartItem.cart_id == cart.id ).first() if not cart_item: return None # Check product stock product = product_service.get_product_by_id(db, cart_item.product_id) if not product: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Product with ID {cart_item.product_id} not found" ) if product.stock < item_data.quantity: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Not enough stock available. Requested: {item_data.quantity}, Available: {product.stock}" ) # Update quantity cart_item.quantity = item_data.quantity db.commit() db.refresh(cart_item) return cart_item def remove_cart_item(db: Session, user_id: str, item_id: int) -> bool: """ Remove an item from a user's active cart. """ # Get active cart cart = get_active_cart(db, user_id) # Find the cart item cart_item = db.query(CartItem).filter( CartItem.id == item_id, CartItem.cart_id == cart.id ).first() if not cart_item: return False # Remove item db.delete(cart_item) db.commit() return True def clear_cart(db: Session, user_id: str) -> bool: """ Remove all items from a user's active cart. """ # Get active cart cart = get_active_cart(db, user_id) # Delete all cart items deleted = db.query(CartItem).filter(CartItem.cart_id == cart.id).delete() db.commit() return deleted > 0