125 lines
3.6 KiB
Python
125 lines
3.6 KiB
Python
from typing import Dict, Optional, Union, Any
|
|
from datetime import datetime
|
|
from fastapi import HTTPException
|
|
from sqlalchemy.orm import Session
|
|
from models.product import Product
|
|
from schemas.product import ProductCreate
|
|
from decimal import Decimal
|
|
import logging
|
|
|
|
def validate_product_data(product_data: ProductCreate) -> bool:
|
|
"""
|
|
Validate product data before creation.
|
|
|
|
Args:
|
|
product_data: Product data to validate
|
|
|
|
Returns:
|
|
bool: True if valid, False otherwise
|
|
"""
|
|
if not product_data.name or len(product_data.name.strip()) == 0:
|
|
return False
|
|
|
|
if not isinstance(product_data.price, (int, float, Decimal)) or product_data.price <= 0:
|
|
return False
|
|
|
|
if product_data.quantity < 0:
|
|
return False
|
|
|
|
return True
|
|
|
|
def check_duplicate_product(db: Session, name: str) -> Optional[Product]:
|
|
"""
|
|
Check if product with same name already exists.
|
|
|
|
Args:
|
|
db: Database session
|
|
name: Product name to check
|
|
|
|
Returns:
|
|
Product if exists, None otherwise
|
|
"""
|
|
return db.query(Product).filter(Product.name == name).first()
|
|
|
|
def create_product_safely(db: Session, product_data: ProductCreate) -> Union[Product, Dict[str, str]]:
|
|
"""
|
|
Create new product with validation and error handling.
|
|
|
|
Args:
|
|
db: Database session
|
|
product_data: Product data for creation
|
|
|
|
Returns:
|
|
Product object if created successfully, error dict otherwise
|
|
"""
|
|
try:
|
|
if not validate_product_data(product_data):
|
|
raise HTTPException(status_code=400, detail="Invalid product data")
|
|
|
|
existing_product = check_duplicate_product(db, product_data.name)
|
|
if existing_product:
|
|
raise HTTPException(status_code=400, detail="Product already exists")
|
|
|
|
db_product = Product(
|
|
name=product_data.name,
|
|
description=product_data.description,
|
|
price=product_data.price,
|
|
quantity=product_data.quantity,
|
|
created_at=datetime.utcnow()
|
|
)
|
|
|
|
db.add(db_product)
|
|
db.commit()
|
|
db.refresh(db_product)
|
|
|
|
return db_product
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logging.error(f"Error creating product: {str(e)}")
|
|
raise HTTPException(status_code=500, detail="Error creating product")
|
|
|
|
def format_product_response(product: Product) -> Dict[str, Any]:
|
|
"""
|
|
Format product object for API response.
|
|
|
|
Args:
|
|
product: Product object to format
|
|
|
|
Returns:
|
|
Dict containing formatted product data
|
|
"""
|
|
return {
|
|
"id": product.id,
|
|
"name": product.name,
|
|
"description": product.description,
|
|
"price": str(product.price),
|
|
"quantity": product.quantity,
|
|
"created_at": product.created_at.isoformat()
|
|
}
|
|
|
|
def update_product_stock(db: Session, product_id: int, quantity_change: int) -> Product:
|
|
"""
|
|
Update product stock quantity.
|
|
|
|
Args:
|
|
db: Database session
|
|
product_id: ID of product to update
|
|
quantity_change: Amount to change stock by (positive or negative)
|
|
|
|
Returns:
|
|
Updated product object
|
|
"""
|
|
product = db.query(Product).filter(Product.id == product_id).first()
|
|
if not product:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
|
|
new_quantity = product.quantity + quantity_change
|
|
if new_quantity < 0:
|
|
raise HTTPException(status_code=400, detail="Insufficient stock")
|
|
|
|
product.quantity = new_quantity
|
|
db.commit()
|
|
db.refresh(product)
|
|
|
|
return product |