Automated Action 667ac548e1 Build complete FastAPI inventory management application
- Created FastAPI application structure with SQLAlchemy and Alembic
- Implemented full CRUD operations for inventory items
- Added search, filtering, and pagination capabilities
- Configured CORS, API documentation, and health endpoints
- Set up SQLite database with proper migrations
- Added comprehensive API documentation and README
2025-07-21 19:29:24 +00:00

106 lines
3.5 KiB
Python

from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.models.inventory import InventoryItem
from app.schemas.inventory import (
InventoryItem as InventoryItemSchema,
InventoryItemCreate,
InventoryItemUpdate,
)
router = APIRouter()
@router.get("/", response_model=List[InventoryItemSchema])
def get_inventory_items(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
category: Optional[str] = Query(None),
search: Optional[str] = Query(None),
db: Session = Depends(get_db),
):
query = db.query(InventoryItem)
if category:
query = query.filter(InventoryItem.category == category)
if search:
query = query.filter(
(InventoryItem.name.contains(search)) |
(InventoryItem.description.contains(search)) |
(InventoryItem.sku.contains(search))
)
items = query.offset(skip).limit(limit).all()
return items
@router.get("/{item_id}", response_model=InventoryItemSchema)
def get_inventory_item(item_id: int, db: Session = Depends(get_db)):
item = db.query(InventoryItem).filter(InventoryItem.id == item_id).first()
if not item:
raise HTTPException(status_code=404, detail="Inventory item not found")
return item
@router.post("/", response_model=InventoryItemSchema)
def create_inventory_item(item: InventoryItemCreate, db: Session = Depends(get_db)):
existing_item = db.query(InventoryItem).filter(InventoryItem.sku == item.sku).first()
if existing_item:
raise HTTPException(status_code=400, detail="SKU already exists")
db_item = InventoryItem(**item.model_dump())
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
@router.put("/{item_id}", response_model=InventoryItemSchema)
def update_inventory_item(
item_id: int, item_update: InventoryItemUpdate, db: Session = Depends(get_db)
):
db_item = db.query(InventoryItem).filter(InventoryItem.id == item_id).first()
if not db_item:
raise HTTPException(status_code=404, detail="Inventory item not found")
if item_update.sku and item_update.sku != db_item.sku:
existing_item = db.query(InventoryItem).filter(InventoryItem.sku == item_update.sku).first()
if existing_item:
raise HTTPException(status_code=400, detail="SKU already exists")
update_data = item_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_item, field, value)
db.commit()
db.refresh(db_item)
return db_item
@router.delete("/{item_id}")
def delete_inventory_item(item_id: int, db: Session = Depends(get_db)):
db_item = db.query(InventoryItem).filter(InventoryItem.id == item_id).first()
if not db_item:
raise HTTPException(status_code=404, detail="Inventory item not found")
db.delete(db_item)
db.commit()
return {"message": "Inventory item deleted successfully"}
@router.patch("/{item_id}/quantity")
def update_item_quantity(
item_id: int,
quantity: int = Query(..., ge=0),
db: Session = Depends(get_db)
):
db_item = db.query(InventoryItem).filter(InventoryItem.id == item_id).first()
if not db_item:
raise HTTPException(status_code=404, detail="Inventory item not found")
db_item.quantity = quantity
db.commit()
db.refresh(db_item)
return {"message": f"Quantity updated to {quantity}", "item": db_item}