2025-05-31 12:20:02 +00:00

149 lines
4.6 KiB
Python

from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.models.inventory import Item as ItemModel
from app.schemas.inventory import Item, ItemCreate, ItemList, ItemUpdate
router = APIRouter()
@router.post("", response_model=Item, status_code=status.HTTP_201_CREATED)
def create_item(*, db: Session = Depends(get_db), item_in: ItemCreate) -> Any:
"""
Create a new inventory item.
"""
# Check if an item with the same SKU already exists (if SKU is provided)
if item_in.sku:
existing_item = db.query(ItemModel).filter(ItemModel.sku == item_in.sku).first()
if existing_item:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Item with SKU {item_in.sku} already exists",
)
try:
db_item = ItemModel(**item_in.model_dump(exclude_unset=True))
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
except IntegrityError:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Could not create item. Check for duplicate SKU or invalid data.",
)
@router.get("", response_model=ItemList)
def read_items(
*,
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
name: Optional[str] = None,
category: Optional[str] = None,
sku: Optional[str] = None,
) -> Any:
"""
Retrieve inventory items with optional filtering.
"""
query = db.query(ItemModel)
# Apply filters if provided
if name:
query = query.filter(ItemModel.name.ilike(f"%{name}%"))
if category:
query = query.filter(ItemModel.category == category)
if sku:
query = query.filter(ItemModel.sku == sku)
# Get total count for pagination info
total = query.count()
# Apply pagination
items = query.order_by(ItemModel.name).offset(skip).limit(limit).all()
return {"items": items, "total": total}
@router.get("/{item_id}", response_model=Item)
def read_item(*, db: Session = Depends(get_db), item_id: int) -> Any:
"""
Get a specific inventory item by ID.
"""
item = db.query(ItemModel).filter(ItemModel.id == item_id).first()
if not item:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Item with ID {item_id} not found",
)
return item
@router.put("/{item_id}", response_model=Item)
def update_item(
*, db: Session = Depends(get_db), item_id: int, item_in: ItemUpdate
) -> Any:
"""
Update an existing inventory item.
"""
db_item = db.query(ItemModel).filter(ItemModel.id == item_id).first()
if not db_item:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Item with ID {item_id} not found",
)
# Check if there are any fields to update
update_data = item_in.model_dump(exclude_unset=True)
if not update_data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No fields to update were provided",
)
# Check if SKU already exists when updating SKU
if "sku" in update_data and update_data["sku"] and update_data["sku"] != db_item.sku:
existing_item = db.query(ItemModel).filter(ItemModel.sku == update_data["sku"]).first()
if existing_item:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Item with SKU {update_data['sku']} already exists",
)
try:
for field, value in update_data.items():
setattr(db_item, field, value)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
except IntegrityError:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Could not update item. Check for duplicate SKU or invalid data.",
)
@router.delete("/{item_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_item(*, db: Session = Depends(get_db), item_id: int) -> Any:
"""
Delete an inventory item.
"""
db_item = db.query(ItemModel).filter(ItemModel.id == item_id).first()
if not db_item:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Item with ID {item_id} not found",
)
db.delete(db_item)
db.commit()
return None