Automated Action a28e115e4f Implement simple cart system with FastAPI and SQLite
- Set up project structure with FastAPI and SQLite
- Create models for products and cart items
- Implement schemas for API request/response validation
- Add database migrations with Alembic
- Create service layer for business logic
- Implement API endpoints for cart operations
- Add health endpoint for monitoring
- Update documentation
- Fix linting issues
2025-05-18 23:36:40 +00:00

105 lines
3.1 KiB
Python

from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.schemas.product import Product, ProductCreate, ProductUpdate
from app.schemas.common import PaginatedResponse, DataResponse
from app.services import product_service
router = APIRouter(prefix="/products", tags=["products"])
@router.get("", response_model=PaginatedResponse[Product])
def get_products(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100),
is_active: Optional[bool] = None,
db: Session = Depends(get_db)
):
"""
Get a list of products with pagination and optional filtering.
"""
products = product_service.get_products(db, skip=skip, limit=limit, is_active=is_active)
total = product_service.get_product_count(db, is_active=is_active)
return {
"success": True,
"message": "Products retrieved successfully",
"data": products,
"total": total,
"page": skip // limit + 1 if limit > 0 else 1,
"size": limit,
"pages": (total + limit - 1) // limit if limit > 0 else 1
}
@router.get("/{product_id}", response_model=DataResponse[Product])
def get_product(product_id: int, db: Session = Depends(get_db)):
"""
Get a specific product by ID.
"""
product = product_service.get_product_by_id(db, product_id)
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Product with ID {product_id} not found"
)
return {
"success": True,
"message": "Product retrieved successfully",
"data": product
}
@router.post("", response_model=DataResponse[Product], status_code=status.HTTP_201_CREATED)
def create_product(product_data: ProductCreate, db: Session = Depends(get_db)):
"""
Create a new product.
"""
product = product_service.create_product(db, product_data)
return {
"success": True,
"message": "Product created successfully",
"data": product
}
@router.put("/{product_id}", response_model=DataResponse[Product])
def update_product(
product_id: int,
product_data: ProductUpdate,
db: Session = Depends(get_db)
):
"""
Update an existing product.
"""
product = product_service.update_product(db, product_id, product_data)
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Product with ID {product_id} not found"
)
return {
"success": True,
"message": "Product updated successfully",
"data": product
}
@router.delete("/{product_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_product(product_id: int, db: Session = Depends(get_db)):
"""
Delete a product.
"""
success = product_service.delete_product(db, product_id)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Product with ID {product_id} not found"
)
return None