Automated Action 538a985c8e Build e-commerce API with FastAPI and SQLite
Create a complete e-commerce application with the following features:
- User authentication and authorization
- Product and category management
- Shopping cart functionality
- Order processing
- Database migrations with Alembic
- Comprehensive documentation
2025-05-26 11:05:27 +00:00

128 lines
3.7 KiB
Python

from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app.core.deps import get_current_active_superuser, get_current_active_user, get_db
from app.models.user import User
from app.schemas.product import Product, ProductCreate, ProductList, ProductUpdate
from app.services import product as product_service
from app.services import category as category_service
router = APIRouter()
@router.get("/", response_model=List[ProductList])
def read_products(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
category_id: str = Query(None, description="Filter by category ID"),
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Retrieve products.
"""
if category_id:
category = category_service.get_by_id(db, category_id=category_id)
if not category:
raise HTTPException(
status_code=404,
detail="Category not found",
)
products = product_service.get_by_category(
db, category_id=category_id, skip=skip, limit=limit
)
else:
products = product_service.get_multi(db, skip=skip, limit=limit)
return products
@router.post("/", response_model=Product)
def create_product(
*,
db: Session = Depends(get_db),
product_in: ProductCreate,
current_user: User = Depends(get_current_active_superuser),
) -> Any:
"""
Create new product.
"""
category = category_service.get_by_id(db, category_id=product_in.category_id)
if not category:
raise HTTPException(
status_code=404,
detail="Category not found",
)
product = product_service.create(db, obj_in=product_in)
return product
@router.get("/{product_id}", response_model=Product)
def read_product(
*,
db: Session = Depends(get_db),
product_id: str,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Get product by ID.
"""
product = product_service.get_by_id(db, product_id=product_id)
if not product:
raise HTTPException(
status_code=404,
detail="Product not found",
)
return product
@router.put("/{product_id}", response_model=Product)
def update_product(
*,
db: Session = Depends(get_db),
product_id: str,
product_in: ProductUpdate,
current_user: User = Depends(get_current_active_superuser),
) -> Any:
"""
Update a product.
"""
product = product_service.get_by_id(db, product_id=product_id)
if not product:
raise HTTPException(
status_code=404,
detail="Product not found",
)
# Verify category exists if updating category
if product_in.category_id and product_in.category_id != product.category_id:
category = category_service.get_by_id(db, category_id=product_in.category_id)
if not category:
raise HTTPException(
status_code=404,
detail="Category not found",
)
product = product_service.update(db, db_obj=product, obj_in=product_in)
return product
@router.delete("/{product_id}", response_model=Product)
def delete_product(
*,
db: Session = Depends(get_db),
product_id: str,
current_user: User = Depends(get_current_active_superuser),
) -> Any:
"""
Delete a product.
"""
product = product_service.get_by_id(db, product_id=product_id)
if not product:
raise HTTPException(
status_code=404,
detail="Product not found",
)
product = product_service.delete(db, product_id=product_id)
return product