
- Set up FastAPI application with SQLite database - Implemented CRUD operations for inventory items, categories, and suppliers - Added Alembic migrations for database schema management - Configured CORS middleware for cross-origin requests - Added health check and API documentation endpoints - Structured project with proper separation of concerns - Added comprehensive README with API documentation
191 lines
6.1 KiB
Python
191 lines
6.1 KiB
Python
from typing import List, Optional
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from sqlalchemy.orm import Session
|
|
from app.db.session import SessionLocal
|
|
from app.models.inventory import InventoryItem, Category, Supplier
|
|
from app.schemas.inventory import (
|
|
InventoryItem as InventoryItemSchema,
|
|
InventoryItemCreate,
|
|
InventoryItemUpdate,
|
|
Category as CategorySchema,
|
|
CategoryCreate,
|
|
CategoryUpdate,
|
|
Supplier as SupplierSchema,
|
|
SupplierCreate,
|
|
SupplierUpdate,
|
|
)
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
def get_db():
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get("/items", response_model=List[InventoryItemSchema])
|
|
def get_inventory_items(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
category_id: Optional[int] = None,
|
|
low_stock: bool = False,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
query = db.query(InventoryItem)
|
|
|
|
if category_id:
|
|
query = query.filter(InventoryItem.category_id == category_id)
|
|
|
|
if low_stock:
|
|
query = query.filter(InventoryItem.quantity <= InventoryItem.min_quantity)
|
|
|
|
return query.offset(skip).limit(limit).all()
|
|
|
|
|
|
@router.get("/items/{item_id}", response_model=InventoryItemSchema)
|
|
def get_inventory_item(item_id: int, db: Session = Depends(get_db)):
|
|
item = db.query(InventoryItem).filter(InventoryItem.id == item_id).first()
|
|
if not item:
|
|
raise HTTPException(status_code=404, detail="Item not found")
|
|
return item
|
|
|
|
|
|
@router.post("/items", response_model=InventoryItemSchema)
|
|
def create_inventory_item(item: InventoryItemCreate, db: Session = Depends(get_db)):
|
|
db_item = InventoryItem(**item.dict())
|
|
db.add(db_item)
|
|
db.commit()
|
|
db.refresh(db_item)
|
|
return db_item
|
|
|
|
|
|
@router.put("/items/{item_id}", response_model=InventoryItemSchema)
|
|
def update_inventory_item(
|
|
item_id: int, item_update: InventoryItemUpdate, db: Session = Depends(get_db)
|
|
):
|
|
db_item = db.query(InventoryItem).filter(InventoryItem.id == item_id).first()
|
|
if not db_item:
|
|
raise HTTPException(status_code=404, detail="Item not found")
|
|
|
|
update_data = item_update.dict(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(db_item, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_item)
|
|
return db_item
|
|
|
|
|
|
@router.delete("/items/{item_id}")
|
|
def delete_inventory_item(item_id: int, db: Session = Depends(get_db)):
|
|
db_item = db.query(InventoryItem).filter(InventoryItem.id == item_id).first()
|
|
if not db_item:
|
|
raise HTTPException(status_code=404, detail="Item not found")
|
|
|
|
db.delete(db_item)
|
|
db.commit()
|
|
return {"message": "Item deleted successfully"}
|
|
|
|
|
|
@router.get("/categories", response_model=List[CategorySchema])
|
|
def get_categories(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
return db.query(Category).offset(skip).limit(limit).all()
|
|
|
|
|
|
@router.get("/categories/{category_id}", response_model=CategorySchema)
|
|
def get_category(category_id: int, db: Session = Depends(get_db)):
|
|
category = db.query(Category).filter(Category.id == category_id).first()
|
|
if not category:
|
|
raise HTTPException(status_code=404, detail="Category not found")
|
|
return category
|
|
|
|
|
|
@router.post("/categories", response_model=CategorySchema)
|
|
def create_category(category: CategoryCreate, db: Session = Depends(get_db)):
|
|
db_category = Category(**category.dict())
|
|
db.add(db_category)
|
|
db.commit()
|
|
db.refresh(db_category)
|
|
return db_category
|
|
|
|
|
|
@router.put("/categories/{category_id}", response_model=CategorySchema)
|
|
def update_category(
|
|
category_id: int, category_update: CategoryUpdate, db: Session = Depends(get_db)
|
|
):
|
|
db_category = db.query(Category).filter(Category.id == category_id).first()
|
|
if not db_category:
|
|
raise HTTPException(status_code=404, detail="Category not found")
|
|
|
|
update_data = category_update.dict(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(db_category, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_category)
|
|
return db_category
|
|
|
|
|
|
@router.delete("/categories/{category_id}")
|
|
def delete_category(category_id: int, db: Session = Depends(get_db)):
|
|
db_category = db.query(Category).filter(Category.id == category_id).first()
|
|
if not db_category:
|
|
raise HTTPException(status_code=404, detail="Category not found")
|
|
|
|
db.delete(db_category)
|
|
db.commit()
|
|
return {"message": "Category deleted successfully"}
|
|
|
|
|
|
@router.get("/suppliers", response_model=List[SupplierSchema])
|
|
def get_suppliers(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
return db.query(Supplier).offset(skip).limit(limit).all()
|
|
|
|
|
|
@router.get("/suppliers/{supplier_id}", response_model=SupplierSchema)
|
|
def get_supplier(supplier_id: int, db: Session = Depends(get_db)):
|
|
supplier = db.query(Supplier).filter(Supplier.id == supplier_id).first()
|
|
if not supplier:
|
|
raise HTTPException(status_code=404, detail="Supplier not found")
|
|
return supplier
|
|
|
|
|
|
@router.post("/suppliers", response_model=SupplierSchema)
|
|
def create_supplier(supplier: SupplierCreate, db: Session = Depends(get_db)):
|
|
db_supplier = Supplier(**supplier.dict())
|
|
db.add(db_supplier)
|
|
db.commit()
|
|
db.refresh(db_supplier)
|
|
return db_supplier
|
|
|
|
|
|
@router.put("/suppliers/{supplier_id}", response_model=SupplierSchema)
|
|
def update_supplier(
|
|
supplier_id: int, supplier_update: SupplierUpdate, db: Session = Depends(get_db)
|
|
):
|
|
db_supplier = db.query(Supplier).filter(Supplier.id == supplier_id).first()
|
|
if not db_supplier:
|
|
raise HTTPException(status_code=404, detail="Supplier not found")
|
|
|
|
update_data = supplier_update.dict(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(db_supplier, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_supplier)
|
|
return db_supplier
|
|
|
|
|
|
@router.delete("/suppliers/{supplier_id}")
|
|
def delete_supplier(supplier_id: int, db: Session = Depends(get_db)):
|
|
db_supplier = db.query(Supplier).filter(Supplier.id == supplier_id).first()
|
|
if not db_supplier:
|
|
raise HTTPException(status_code=404, detail="Supplier not found")
|
|
|
|
db.delete(db_supplier)
|
|
db.commit()
|
|
return {"message": "Supplier deleted successfully"}
|