import logging import os import re import uuid from datetime import datetime from fastapi import APIRouter, Depends, File, HTTPException, Path, UploadFile, status from sqlalchemy import or_ from sqlalchemy.orm import Session from app.core.config import settings from app.core.database import get_db from app.dependencies.auth import get_current_active_user, get_current_seller from app.models.category import Category from app.models.product import Product, ProductImage, ProductStatus from app.models.tag import Tag from app.models.user import User, UserRole from app.schemas.product import ( Product as ProductSchema, ) from app.schemas.product import ( ProductCreate, ProductDetails, ProductUpdate, ) from app.schemas.product import ( ProductImage as ProductImageSchema, ) router = APIRouter() logger = logging.getLogger(__name__) def slugify(text): """Convert a string to a URL-friendly slug.""" # Remove non-alphanumeric characters text = re.sub(r'[^\w\s-]', '', text.lower()) # Replace spaces with hyphens text = re.sub(r'[\s]+', '-', text) # Remove consecutive hyphens text = re.sub(r'[-]+', '-', text) # Add timestamp to ensure uniqueness timestamp = int(datetime.now().timestamp()) return f"{text}-{timestamp}" @router.get("/", response_model=list[ProductSchema]) async def get_products( skip: int = 0, limit: int = 100, category_id: str | None = None, status: ProductStatus | None = None, search: str | None = None, min_price: float | None = None, max_price: float | None = None, featured: bool | None = None, db: Session = Depends(get_db) ): """ Get all products with optional filtering. """ query = db.query(Product) # Apply filters if category_id: query = query.filter(Product.category_id == category_id) if status: query = query.filter(Product.status == status) else: # By default, only show published products query = query.filter(Product.status == ProductStatus.PUBLISHED) if search: search_term = f"%{search}%" query = query.filter( or_( Product.name.ilike(search_term), Product.description.ilike(search_term), Product.sku.ilike(search_term) ) ) if min_price is not None: query = query.filter(Product.price >= min_price) if max_price is not None: query = query.filter(Product.price <= max_price) if featured is not None: query = query.filter(Product.is_featured == featured) # Apply pagination products = query.offset(skip).limit(limit).all() # Enhance products with category name and tags for product in products: if product.category: product.category_name = product.category.name product.tags = [tag.name for tag in product.tags] return products @router.get("/{product_id}", response_model=ProductDetails) async def get_product( product_id: str = Path(..., title="The ID of the product to get"), db: Session = Depends(get_db) ): """ Get a specific product by ID. """ product = db.query(Product).filter(Product.id == product_id).first() if not product: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Product not found" ) # Add category name if available if product.category: product.category_name = product.category.name # Add tags product.tags = [tag.name for tag in product.tags] # Calculate in_stock product.in_stock = product.stock_quantity > 0 # For seller/admin, add sales data product.total_sales = 0 product.total_revenue = 0 for order_item in product.order_items: product.total_sales += order_item.quantity product.total_revenue += order_item.subtotal return product @router.post("/", response_model=ProductSchema) async def create_product( product_in: ProductCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_seller) ): """ Create a new product (seller or admin only). """ # Check if slug already exists existing_product = db.query(Product).filter(Product.slug == product_in.slug).first() if existing_product: # If slug exists, create a unique one product_in.slug = slugify(product_in.name) # Check if category exists if product_in.category_id: category = db.query(Category).filter(Category.id == product_in.category_id).first() if not category: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Category not found" ) # Create new product db_product = Product( name=product_in.name, description=product_in.description, price=product_in.price, sku=product_in.sku, barcode=product_in.barcode, stock_quantity=product_in.stock_quantity, weight=product_in.weight, dimensions=product_in.dimensions, status=product_in.status, is_featured=product_in.is_featured, is_digital=product_in.is_digital, digital_download_link=product_in.digital_download_link, slug=product_in.slug, tax_rate=product_in.tax_rate, discount_price=product_in.discount_price, discount_start_date=product_in.discount_start_date, discount_end_date=product_in.discount_end_date, category_id=product_in.category_id, seller_id=current_user.id, ) db.add(db_product) db.commit() db.refresh(db_product) # Add images if provided if product_in.images: for image_data in product_in.images: db_image = ProductImage( product_id=db_product.id, image_url=image_data.image_url, alt_text=image_data.alt_text, is_primary=image_data.is_primary, display_order=image_data.display_order ) db.add(db_image) db.commit() # Add tags if provided if product_in.tag_ids: for tag_id in product_in.tag_ids: tag = db.query(Tag).filter(Tag.id == tag_id).first() if tag: db_product.tags.append(tag) db.commit() db.refresh(db_product) logger.info(f"Product created: {db_product.name} (ID: {db_product.id})") return db_product @router.put("/{product_id}", response_model=ProductSchema) async def update_product( product_id: str, product_in: ProductUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_active_user) ): """ Update a product. Sellers can only update their own products. Admins can update any product. """ product = db.query(Product).filter(Product.id == product_id).first() if not product: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Product not found" ) # Check permissions if current_user.role != UserRole.ADMIN and current_user.id != product.seller_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions to update this product" ) # Check if slug already exists (if updating slug) if product_in.slug and product_in.slug != product.slug: existing_product = db.query(Product).filter(Product.slug == product_in.slug).first() if existing_product: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Slug already exists" ) # Check if category exists (if updating category) if product_in.category_id: category = db.query(Category).filter(Category.id == product_in.category_id).first() if not category: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Category not found" ) # Update product attributes for key, value in product_in.dict(exclude_unset=True).items(): if key != "tag_ids": # Handle tags separately setattr(product, key, value) # Update tags if provided if product_in.tag_ids is not None: # Clear existing tags product.tags = [] # Add new tags for tag_id in product_in.tag_ids: tag = db.query(Tag).filter(Tag.id == tag_id).first() if tag: product.tags.append(tag) db.commit() db.refresh(product) logger.info(f"Product updated: {product.name} (ID: {product.id})") return product @router.delete("/{product_id}", response_model=dict) async def delete_product( product_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_active_user) ): """ Delete a product. Sellers can only delete their own products. Admins can delete any product. """ product = db.query(Product).filter(Product.id == product_id).first() if not product: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Product not found" ) # Check permissions if current_user.role != UserRole.ADMIN and current_user.id != product.seller_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions to delete this product" ) # Delete the product db.delete(product) db.commit() logger.info(f"Product deleted: {product.name} (ID: {product.id})") return {"message": "Product successfully deleted"} @router.post("/{product_id}/images", response_model=ProductImageSchema) async def upload_product_image( product_id: str, file: UploadFile = File(...), is_primary: bool = False, alt_text: str = None, display_order: int = 0, db: Session = Depends(get_db), current_user: User = Depends(get_current_active_user) ): """ Upload an image for a product. Sellers can only upload images for their own products. Admins can upload images for any product. """ product = db.query(Product).filter(Product.id == product_id).first() if not product: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Product not found" ) # Check permissions if current_user.role != UserRole.ADMIN and current_user.id != product.seller_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions to update this product" ) # Validate file content_type = file.content_type if not content_type.startswith("image/"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="File must be an image" ) # Create product images directory if it doesn't exist product_images_dir = settings.PRODUCT_IMAGES_DIR product_images_dir.mkdir(parents=True, exist_ok=True) # Generate unique filename file_extension = os.path.splitext(file.filename)[1] unique_filename = f"{uuid.uuid4()}{file_extension}" file_path = product_images_dir / unique_filename # Save the file with open(file_path, "wb") as buffer: buffer.write(await file.read()) # If this is the primary image, update other images if is_primary: db.query(ProductImage).filter( ProductImage.product_id == product_id, ProductImage.is_primary == True ).update({"is_primary": False}) # Create the image record relative_path = f"/storage/product_images/{unique_filename}" db_image = ProductImage( product_id=product_id, image_url=relative_path, alt_text=alt_text, is_primary=is_primary, display_order=display_order ) db.add(db_image) db.commit() db.refresh(db_image) logger.info(f"Image uploaded for product ID {product_id}: {unique_filename}") return db_image @router.delete("/{product_id}/images/{image_id}", response_model=dict) async def delete_product_image( product_id: str, image_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_active_user) ): """ Delete a product image. Sellers can only delete images for their own products. Admins can delete images for any product. """ product = db.query(Product).filter(Product.id == product_id).first() if not product: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Product not found" ) # Check permissions if current_user.role != UserRole.ADMIN and current_user.id != product.seller_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions to update this product" ) # Find the image image = db.query(ProductImage).filter( ProductImage.id == image_id, ProductImage.product_id == product_id ).first() if not image: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Image not found" ) # Delete the image from the database db.delete(image) db.commit() # Try to delete the physical file (if it exists) try: file_name = os.path.basename(image.image_url) file_path = settings.PRODUCT_IMAGES_DIR / file_name if os.path.exists(file_path): os.remove(file_path) except Exception as e: # Log the error but don't fail the request logger.error(f"Error deleting image file: {str(e)}") logger.info(f"Image deleted for product ID {product_id}: {image_id}") return {"message": "Product image successfully deleted"}