Update code via agent code generation

This commit is contained in:
Automated Action 2025-06-05 23:45:53 +00:00
parent e92d783de4
commit cfbbf9051f
40 changed files with 1757 additions and 0 deletions

12
app/api/api_v1/api.py Normal file
View File

@ -0,0 +1,12 @@
from fastapi import APIRouter
from app.api.api_v1.endpoints import auth, users, products, categories, suppliers, inventory, orders
api_router = APIRouter()
api_router.include_router(auth.router, prefix="/auth", tags=["Authentication"])
api_router.include_router(users.router, prefix="/users", tags=["Users"])
api_router.include_router(products.router, prefix="/products", tags=["Products"])
api_router.include_router(categories.router, prefix="/categories", tags=["Categories"])
api_router.include_router(suppliers.router, prefix="/suppliers", tags=["Suppliers"])
api_router.include_router(inventory.router, prefix="/inventory", tags=["Inventory"])
api_router.include_router(orders.router, prefix="/orders", tags=["Orders"])

View File

@ -0,0 +1,55 @@
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app import crud, schemas
from app.api import deps
from app.core import security
from app.core.config import settings
router = APIRouter()
@router.post("/login/access-token", response_model=schemas.Token)
def login_access_token(
db: Session = Depends(deps.get_db), form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests
"""
user = crud.user.authenticate(
db, email=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(status_code=400, detail="Incorrect email or password")
elif not crud.user.is_active(user):
raise HTTPException(status_code=400, detail="Inactive user")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/register", response_model=schemas.User)
def register_new_user(
*,
db: Session = Depends(deps.get_db),
user_in: schemas.UserCreate,
) -> Any:
"""
Create new user.
"""
user = crud.user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this email already exists in the system.",
)
user = crud.user.create(db, obj_in=user_in)
return user

View File

@ -0,0 +1,94 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.Category])
def read_categories(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve categories.
"""
categories = crud.category.get_multi(db, skip=skip, limit=limit)
return categories
@router.post("/", response_model=schemas.Category)
def create_category(
*,
db: Session = Depends(deps.get_db),
category_in: schemas.CategoryCreate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new category.
"""
category = crud.category.get_by_name(db, name=category_in.name)
if category:
raise HTTPException(
status_code=400,
detail="The category with this name already exists in the system.",
)
category = crud.category.create(db, obj_in=category_in)
return category
@router.put("/{id}", response_model=schemas.Category)
def update_category(
*,
db: Session = Depends(deps.get_db),
id: int,
category_in: schemas.CategoryUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update a category.
"""
category = crud.category.get(db, id=id)
if not category:
raise HTTPException(status_code=404, detail="Category not found")
category = crud.category.update(db, db_obj=category, obj_in=category_in)
return category
@router.get("/{id}", response_model=schemas.Category)
def read_category(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get category by ID.
"""
category = crud.category.get(db, id=id)
if not category:
raise HTTPException(status_code=404, detail="Category not found")
return category
@router.delete("/{id}", response_model=schemas.Category)
def delete_category(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Delete a category.
"""
category = crud.category.get(db, id=id)
if not category:
raise HTTPException(status_code=404, detail="Category not found")
category = crud.category.remove(db, id=id)
return category

View File

@ -0,0 +1,155 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, Body
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.InventoryItem])
def read_inventory_items(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve inventory items.
"""
inventory_items = crud.inventory.get_multi(db, skip=skip, limit=limit)
return inventory_items
@router.get("/low-stock", response_model=List[schemas.InventoryItem])
def read_low_stock_items(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve low stock inventory items.
"""
inventory_items = crud.inventory.get_low_stock(db, skip=skip, limit=limit)
return inventory_items
@router.post("/", response_model=schemas.InventoryItem)
def create_inventory_item(
*,
db: Session = Depends(deps.get_db),
item_in: schemas.InventoryItemCreate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new inventory item.
"""
product = crud.product.get(db, id=item_in.product_id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
existing_item = crud.inventory.get_by_product(db, product_id=item_in.product_id)
if existing_item:
raise HTTPException(
status_code=400,
detail="An inventory item for this product already exists.",
)
inventory_item = crud.inventory.create(db, obj_in=item_in)
return inventory_item
@router.put("/{id}", response_model=schemas.InventoryItem)
def update_inventory_item(
*,
db: Session = Depends(deps.get_db),
id: int,
item_in: schemas.InventoryItemUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update an inventory item.
"""
inventory_item = crud.inventory.get(db, id=id)
if not inventory_item:
raise HTTPException(status_code=404, detail="Inventory item not found")
inventory_item = crud.inventory.update(db, db_obj=inventory_item, obj_in=item_in)
return inventory_item
@router.put("/{id}/adjust-quantity", response_model=schemas.InventoryItem)
def adjust_inventory_quantity(
*,
db: Session = Depends(deps.get_db),
id: int,
quantity_change: int = Body(..., embed=True),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Adjust inventory quantity. Use positive number to increase, negative to decrease.
"""
inventory_item = crud.inventory.get(db, id=id)
if not inventory_item:
raise HTTPException(status_code=404, detail="Inventory item not found")
if inventory_item.quantity + quantity_change < 0:
raise HTTPException(
status_code=400,
detail="Cannot reduce quantity below zero."
)
inventory_item = crud.inventory.update_quantity(db, item_id=id, quantity_change=quantity_change)
return inventory_item
@router.get("/{id}", response_model=schemas.InventoryItem)
def read_inventory_item(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get inventory item by ID.
"""
inventory_item = crud.inventory.get(db, id=id)
if not inventory_item:
raise HTTPException(status_code=404, detail="Inventory item not found")
return inventory_item
@router.get("/product/{product_id}", response_model=schemas.InventoryItem)
def read_inventory_by_product(
*,
db: Session = Depends(deps.get_db),
product_id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get inventory item by product ID.
"""
inventory_item = crud.inventory.get_by_product(db, product_id=product_id)
if not inventory_item:
raise HTTPException(status_code=404, detail="Inventory item not found")
return inventory_item
@router.delete("/{id}", response_model=schemas.InventoryItem)
def delete_inventory_item(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Delete an inventory item.
"""
inventory_item = crud.inventory.get(db, id=id)
if not inventory_item:
raise HTTPException(status_code=404, detail="Inventory item not found")
inventory_item = crud.inventory.remove(db, id=id)
return inventory_item

View File

@ -0,0 +1,134 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, Body
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
from app.models.order import OrderStatus
router = APIRouter()
@router.get("/", response_model=List[schemas.Order])
def read_orders(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
status: OrderStatus = None,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve orders.
"""
if crud.user.is_superuser(current_user):
if status:
orders = crud.order.get_by_status(db, status=status, skip=skip, limit=limit)
else:
orders = crud.order.get_multi(db, skip=skip, limit=limit)
else:
orders = crud.order.get_by_user(db, user_id=current_user.id, skip=skip, limit=limit)
if status:
orders = [order for order in orders if order.status == status]
return orders
@router.post("/", response_model=schemas.Order)
def create_order(
*,
db: Session = Depends(deps.get_db),
order_in: schemas.OrderCreate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new order.
"""
# Verify all products exist
for item in order_in.order_items:
product = crud.product.get(db, id=item.product_id)
if not product:
raise HTTPException(
status_code=404,
detail=f"Product with id {item.product_id} not found"
)
# Check if there's enough inventory
inventory = crud.inventory.get_by_product(db, product_id=item.product_id)
if inventory and inventory.quantity < item.quantity:
raise HTTPException(
status_code=400,
detail=f"Not enough stock for product {product.name}. Available: {inventory.quantity}"
)
# Create order
order = crud.order.create(db, obj_in=order_in, user_id=current_user.id)
# Update inventory
for item in order_in.order_items:
inventory = crud.inventory.get_by_product(db, product_id=item.product_id)
if inventory:
crud.inventory.update_quantity(db, item_id=inventory.id, quantity_change=-item.quantity)
return order
@router.put("/{id}/status", response_model=schemas.Order)
def update_order_status(
*,
db: Session = Depends(deps.get_db),
id: int,
status: OrderStatus = Body(..., embed=True),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update order status.
"""
order = crud.order.get(db, id=id)
if not order:
raise HTTPException(status_code=404, detail="Order not found")
# Only superusers can update orders they don't own
if order.user_id != current_user.id and not crud.user.is_superuser(current_user):
raise HTTPException(status_code=403, detail="Not enough permissions")
order_in = schemas.OrderUpdate(status=status)
order = crud.order.update(db, db_obj=order, obj_in=order_in)
return order
@router.get("/{id}", response_model=schemas.Order)
def read_order(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get order by ID.
"""
order = crud.order.get(db, id=id)
if not order:
raise HTTPException(status_code=404, detail="Order not found")
# Only superusers can view orders they don't own
if order.user_id != current_user.id and not crud.user.is_superuser(current_user):
raise HTTPException(status_code=403, detail="Not enough permissions")
return order
@router.delete("/{id}", response_model=schemas.Order)
def delete_order(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Delete an order.
"""
order = crud.order.get(db, id=id)
if not order:
raise HTTPException(status_code=404, detail="Order not found")
order = crud.order.remove(db, id=id)
return order

View File

@ -0,0 +1,136 @@
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.Product])
def read_products(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
category_id: Optional[int] = None,
supplier_id: Optional[int] = None,
active_only: bool = False,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve products.
"""
if category_id:
products = crud.product.get_by_category(db, category_id=category_id, skip=skip, limit=limit)
elif supplier_id:
products = crud.product.get_by_supplier(db, supplier_id=supplier_id, skip=skip, limit=limit)
elif active_only:
products = crud.product.get_active(db, skip=skip, limit=limit)
else:
products = crud.product.get_multi(db, skip=skip, limit=limit)
return products
@router.post("/", response_model=schemas.Product)
def create_product(
*,
db: Session = Depends(deps.get_db),
product_in: schemas.ProductCreate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new product.
"""
product = crud.product.get_by_sku(db, sku=product_in.sku)
if product:
raise HTTPException(
status_code=400,
detail="The product with this SKU already exists in the system.",
)
product = crud.product.create(db, obj_in=product_in)
return product
@router.put("/{id}", response_model=schemas.Product)
def update_product(
*,
db: Session = Depends(deps.get_db),
id: int,
product_in: schemas.ProductUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update a product.
"""
product = crud.product.get(db, id=id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
product = crud.product.update(db, db_obj=product, obj_in=product_in)
return product
@router.get("/{id}", response_model=schemas.Product)
def read_product(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get product by ID.
"""
product = crud.product.get(db, id=id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
return product
@router.get("/sku/{sku}", response_model=schemas.Product)
def read_product_by_sku(
*,
db: Session = Depends(deps.get_db),
sku: str,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get product by SKU.
"""
product = crud.product.get_by_sku(db, sku=sku)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
return product
@router.get("/barcode/{barcode}", response_model=schemas.Product)
def read_product_by_barcode(
*,
db: Session = Depends(deps.get_db),
barcode: str,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get product by barcode.
"""
product = crud.product.get_by_barcode(db, barcode=barcode)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
return product
@router.delete("/{id}", response_model=schemas.Product)
def delete_product(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Delete a product.
"""
product = crud.product.get(db, id=id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
product = crud.product.remove(db, id=id)
return product

View File

@ -0,0 +1,94 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.Supplier])
def read_suppliers(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve suppliers.
"""
suppliers = crud.supplier.get_multi(db, skip=skip, limit=limit)
return suppliers
@router.post("/", response_model=schemas.Supplier)
def create_supplier(
*,
db: Session = Depends(deps.get_db),
supplier_in: schemas.SupplierCreate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new supplier.
"""
supplier = crud.supplier.get_by_name(db, name=supplier_in.name)
if supplier:
raise HTTPException(
status_code=400,
detail="The supplier with this name already exists in the system.",
)
supplier = crud.supplier.create(db, obj_in=supplier_in)
return supplier
@router.put("/{id}", response_model=schemas.Supplier)
def update_supplier(
*,
db: Session = Depends(deps.get_db),
id: int,
supplier_in: schemas.SupplierUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update a supplier.
"""
supplier = crud.supplier.get(db, id=id)
if not supplier:
raise HTTPException(status_code=404, detail="Supplier not found")
supplier = crud.supplier.update(db, db_obj=supplier, obj_in=supplier_in)
return supplier
@router.get("/{id}", response_model=schemas.Supplier)
def read_supplier(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get supplier by ID.
"""
supplier = crud.supplier.get(db, id=id)
if not supplier:
raise HTTPException(status_code=404, detail="Supplier not found")
return supplier
@router.delete("/{id}", response_model=schemas.Supplier)
def delete_supplier(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Delete a supplier.
"""
supplier = crud.supplier.get(db, id=id)
if not supplier:
raise HTTPException(status_code=404, detail="Supplier not found")
supplier = crud.supplier.remove(db, id=id)
return supplier

View File

@ -0,0 +1,100 @@
from typing import Any, List
from fastapi import APIRouter, Body, Depends, HTTPException
from fastapi.encoders import jsonable_encoder
from pydantic import EmailStr
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.User])
def read_users(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Retrieve users.
"""
users = crud.user.get_multi(db, skip=skip, limit=limit)
return users
@router.get("/me", response_model=schemas.User)
def read_user_me(
db: Session = Depends(deps.get_db),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get current user.
"""
return current_user
@router.get("/{user_id}", response_model=schemas.User)
def read_user_by_id(
user_id: int,
current_user: models.User = Depends(deps.get_current_active_user),
db: Session = Depends(deps.get_db),
) -> Any:
"""
Get a specific user by id.
"""
user = crud.user.get(db, id=user_id)
if user == current_user:
return user
if not crud.user.is_superuser(current_user):
raise HTTPException(
status_code=400, detail="The user doesn't have enough privileges"
)
return user
@router.put("/me", response_model=schemas.User)
def update_user_me(
*,
db: Session = Depends(deps.get_db),
full_name: str = Body(None),
email: EmailStr = Body(None),
password: str = Body(None),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update own user.
"""
current_user_data = jsonable_encoder(current_user)
user_in = schemas.UserUpdate(**current_user_data)
if full_name is not None:
user_in.full_name = full_name
if email is not None:
user_in.email = email
if password is not None:
user_in.password = password
user = crud.user.update(db, db_obj=current_user, obj_in=user_in)
return user
@router.put("/{user_id}", response_model=schemas.User)
def update_user(
*,
db: Session = Depends(deps.get_db),
user_id: int,
user_in: schemas.UserUpdate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Update a user.
"""
user = crud.user.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this id does not exist in the system",
)
user = crud.user.update(db, db_obj=user, obj_in=user_in)
return user

52
app/api/deps.py Normal file
View File

@ -0,0 +1,52 @@
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.core import security
from app.core.config import settings
from app.db.session import get_db
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/auth/login"
)
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> models.User:
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[security.ALGORITHM]
)
token_data = schemas.TokenPayload(**payload)
except (jwt.JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
user = crud.user.get(db, id=token_data.sub)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
def get_current_active_user(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def get_current_active_superuser(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not current_user.is_superuser:
raise HTTPException(
status_code=400, detail="The user doesn't have enough privileges"
)
return current_user

43
app/core/config.py Normal file
View File

@ -0,0 +1,43 @@
import os
from pathlib import Path
from typing import List, Union
from pydantic import AnyHttpUrl, BaseSettings, EmailStr, validator
class Settings(BaseSettings):
API_V1_STR: str = "/api/v1"
SECRET_KEY: str = os.getenv("SECRET_KEY", "your-secret-key-for-development-only")
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8 # 8 days
# Server settings
SERVER_NAME: str = os.getenv("SERVER_NAME", "Small Business Inventory")
SERVER_HOST: AnyHttpUrl = os.getenv("SERVER_HOST", "http://localhost")
PROJECT_NAME: str = "Small Business Inventory Management System"
# Database settings
DB_DIR = Path("/app") / "storage" / "db"
DB_DIR.mkdir(parents=True, exist_ok=True)
SQLALCHEMY_DATABASE_URL: str = f"sqlite:///{DB_DIR}/db.sqlite"
# CORS settings
BACKEND_CORS_ORIGINS: List[AnyHttpUrl] = []
@validator("BACKEND_CORS_ORIGINS", pre=True)
def assemble_cors_origins(cls, v: Union[str, List[str]]) -> Union[List[str], str]:
if isinstance(v, str) and not v.startswith("["):
return [i.strip() for i in v.split(",")]
elif isinstance(v, (list, str)):
return v
raise ValueError(v)
# Superuser settings
FIRST_SUPERUSER: EmailStr = os.getenv("FIRST_SUPERUSER", "admin@example.com")
FIRST_SUPERUSER_PASSWORD: str = os.getenv("FIRST_SUPERUSER_PASSWORD", "admin")
class Config:
case_sensitive = True
env_file = ".env"
settings = Settings()

33
app/core/security.py Normal file
View File

@ -0,0 +1,33 @@
from datetime import datetime, timedelta
from typing import Any, Union
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
ALGORITHM = "HS256"
def create_access_token(
subject: Union[str, Any], expires_delta: timedelta = None
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)

6
app/crud/__init__.py Normal file
View File

@ -0,0 +1,6 @@
from .crud_user import user
from .crud_product import product
from .crud_category import category
from .crud_supplier import supplier
from .crud_inventory import inventory
from .crud_order import order

66
app/crud/base.py Normal file
View File

@ -0,0 +1,66 @@
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.db.base_class import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
"""
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
**Parameters**
* `model`: A SQLAlchemy model class
* `schema`: A Pydantic model (schema) class
"""
self.model = model
def get(self, db: Session, id: Any) -> Optional[ModelType]:
return db.query(self.model).filter(self.model.id == id).first()
def get_multi(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
return db.query(self.model).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self,
db: Session,
*,
db_obj: ModelType,
obj_in: Union[UpdateSchemaType, Dict[str, Any]]
) -> ModelType:
obj_data = jsonable_encoder(db_obj)
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def remove(self, db: Session, *, id: int) -> ModelType:
obj = db.query(self.model).get(id)
db.delete(obj)
db.commit()
return obj

20
app/crud/crud_category.py Normal file
View File

@ -0,0 +1,20 @@
from typing import List
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.category import Category
from app.schemas.category import CategoryCreate, CategoryUpdate
class CRUDCategory(CRUDBase[Category, CategoryCreate, CategoryUpdate]):
def get_by_name(self, db: Session, *, name: str) -> Category:
return db.query(Category).filter(Category.name == name).first()
def get_multi_by_ids(
self, db: Session, *, ids: List[int], skip: int = 0, limit: int = 100
) -> List[Category]:
return db.query(Category).filter(Category.id.in_(ids)).offset(skip).limit(limit).all()
category = CRUDCategory(Category)

View File

@ -0,0 +1,41 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.inventory import InventoryItem
from app.schemas.inventory import InventoryItemCreate, InventoryItemUpdate
class CRUDInventory(CRUDBase[InventoryItem, InventoryItemCreate, InventoryItemUpdate]):
def get_by_product(
self, db: Session, *, product_id: int
) -> Optional[InventoryItem]:
return db.query(InventoryItem).filter(InventoryItem.product_id == product_id).first()
def get_low_stock(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[InventoryItem]:
return (
db.query(InventoryItem)
.filter(InventoryItem.quantity <= InventoryItem.min_stock_level)
.offset(skip)
.limit(limit)
.all()
)
def update_quantity(
self, db: Session, *, item_id: int, quantity_change: int
) -> InventoryItem:
db_obj = db.query(InventoryItem).filter(InventoryItem.id == item_id).first()
if db_obj:
new_quantity = db_obj.quantity + quantity_change
# Ensure quantity never goes below 0
db_obj.quantity = max(0, new_quantity)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
inventory = CRUDInventory(InventoryItem)

47
app/crud/crud_order.py Normal file
View File

@ -0,0 +1,47 @@
from typing import List
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.order import Order, OrderItem
from app.schemas.order import OrderCreate, OrderUpdate
class CRUDOrder(CRUDBase[Order, OrderCreate, OrderUpdate]):
def create(self, db: Session, *, obj_in: OrderCreate, user_id: int) -> Order:
# Calculate total amount from order items
total_amount = sum(item.unit_price * item.quantity for item in obj_in.order_items)
# Create order
order_data = obj_in.dict(exclude={"order_items"})
db_obj = Order(**order_data, user_id=user_id, total_amount=total_amount)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
# Create order items
for item in obj_in.order_items:
db_item = OrderItem(
order_id=db_obj.id,
product_id=item.product_id,
quantity=item.quantity,
unit_price=item.unit_price
)
db.add(db_item)
db.commit()
db.refresh(db_obj)
return db_obj
def get_by_user(
self, db: Session, *, user_id: int, skip: int = 0, limit: int = 100
) -> List[Order]:
return db.query(Order).filter(Order.user_id == user_id).offset(skip).limit(limit).all()
def get_by_status(
self, db: Session, *, status: str, skip: int = 0, limit: int = 100
) -> List[Order]:
return db.query(Order).filter(Order.status == status).offset(skip).limit(limit).all()
order = CRUDOrder(Order)

33
app/crud/crud_product.py Normal file
View File

@ -0,0 +1,33 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.product import Product
from app.schemas.product import ProductCreate, ProductUpdate
class CRUDProduct(CRUDBase[Product, ProductCreate, ProductUpdate]):
def get_by_sku(self, db: Session, *, sku: str) -> Optional[Product]:
return db.query(Product).filter(Product.sku == sku).first()
def get_by_barcode(self, db: Session, *, barcode: str) -> Optional[Product]:
return db.query(Product).filter(Product.barcode == barcode).first()
def get_by_category(
self, db: Session, *, category_id: int, skip: int = 0, limit: int = 100
) -> List[Product]:
return db.query(Product).filter(Product.category_id == category_id).offset(skip).limit(limit).all()
def get_by_supplier(
self, db: Session, *, supplier_id: int, skip: int = 0, limit: int = 100
) -> List[Product]:
return db.query(Product).filter(Product.supplier_id == supplier_id).offset(skip).limit(limit).all()
def get_active(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[Product]:
return db.query(Product).filter(Product.is_active == True).offset(skip).limit(limit).all()
product = CRUDProduct(Product)

20
app/crud/crud_supplier.py Normal file
View File

@ -0,0 +1,20 @@
from typing import List
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.supplier import Supplier
from app.schemas.supplier import SupplierCreate, SupplierUpdate
class CRUDSupplier(CRUDBase[Supplier, SupplierCreate, SupplierUpdate]):
def get_by_name(self, db: Session, *, name: str) -> Supplier:
return db.query(Supplier).filter(Supplier.name == name).first()
def get_multi_by_ids(
self, db: Session, *, ids: List[int], skip: int = 0, limit: int = 100
) -> List[Supplier]:
return db.query(Supplier).filter(Supplier.id.in_(ids)).offset(skip).limit(limit).all()
supplier = CRUDSupplier(Supplier)

56
app/crud/crud_user.py Normal file
View File

@ -0,0 +1,56 @@
from typing import Any, Dict, Optional, Union
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.crud.base import CRUDBase
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()
def create(self, db: Session, *, obj_in: UserCreate) -> User:
db_obj = User(
email=obj_in.email,
hashed_password=get_password_hash(obj_in.password),
full_name=obj_in.full_name,
is_superuser=obj_in.is_superuser,
is_active=obj_in.is_active
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self, db: Session, *, db_obj: User, obj_in: Union[UserUpdate, Dict[str, Any]]
) -> User:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
if update_data.get("password"):
hashed_password = get_password_hash(update_data["password"])
del update_data["password"]
update_data["hashed_password"] = hashed_password
return super().update(db, db_obj=db_obj, obj_in=update_data)
def authenticate(self, db: Session, *, email: str, password: str) -> Optional[User]:
user = self.get_by_email(db, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active(self, user: User) -> bool:
return user.is_active
def is_superuser(self, user: User) -> bool:
return user.is_superuser
user = CRUDUser(User)

9
app/db/base.py Normal file
View File

@ -0,0 +1,9 @@
# Import all the models, so that Base has them before being
# imported by Alembic
from app.db.base_class import Base # noqa
from app.models.user import User # noqa
from app.models.product import Product # noqa
from app.models.category import Category # noqa
from app.models.supplier import Supplier # noqa
from app.models.inventory import InventoryItem # noqa
from app.models.order import Order, OrderItem # noqa

14
app/db/base_class.py Normal file
View File

@ -0,0 +1,14 @@
from typing import Any
from sqlalchemy.ext.declarative import as_declarative, declared_attr
@as_declarative()
class Base:
id: Any
__name__: str
# Generate tablename automatically
@declared_attr
def __tablename__(cls) -> str:
return cls.__name__.lower()

18
app/db/session.py Normal file
View File

@ -0,0 +1,18 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
engine = create_engine(
settings.SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

13
app/models/category.py Normal file
View File

@ -0,0 +1,13 @@
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import relationship
from app.db.base_class import Base
class Category(Base):
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True, nullable=False)
description = Column(String, nullable=True)
# Relationships
products = relationship("Product", back_populates="category")

20
app/models/inventory.py Normal file
View File

@ -0,0 +1,20 @@
from sqlalchemy import Column, Integer, String, ForeignKey, DateTime
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.db.base_class import Base
class InventoryItem(Base):
id = Column(Integer, primary_key=True, index=True)
quantity = Column(Integer, default=0, nullable=False)
location = Column(String, nullable=True)
last_updated = Column(DateTime, default=func.now(), onupdate=func.now())
min_stock_level = Column(Integer, default=0)
max_stock_level = Column(Integer, default=0)
# Foreign keys
product_id = Column(Integer, ForeignKey("product.id"), nullable=False)
# Relationships
product = relationship("Product", back_populates="inventory_items")

43
app/models/order.py Normal file
View File

@ -0,0 +1,43 @@
from sqlalchemy import Column, Integer, String, Float, ForeignKey, DateTime, Enum
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
import enum
from app.db.base_class import Base
class OrderStatus(str, enum.Enum):
pending = "pending"
processing = "processing"
shipped = "shipped"
delivered = "delivered"
cancelled = "cancelled"
class Order(Base):
id = Column(Integer, primary_key=True, index=True)
order_date = Column(DateTime, default=func.now())
status = Column(Enum(OrderStatus), default=OrderStatus.pending)
total_amount = Column(Float, default=0.0)
notes = Column(String, nullable=True)
# Foreign keys
user_id = Column(Integer, ForeignKey("user.id"), nullable=False)
# Relationships
user = relationship("User", back_populates="orders")
order_items = relationship("OrderItem", back_populates="order", cascade="all, delete-orphan")
class OrderItem(Base):
id = Column(Integer, primary_key=True, index=True)
quantity = Column(Integer, nullable=False)
unit_price = Column(Float, nullable=False)
# Foreign keys
order_id = Column(Integer, ForeignKey("order.id"), nullable=False)
product_id = Column(Integer, ForeignKey("product.id"), nullable=False)
# Relationships
order = relationship("Order", back_populates="order_items")
product = relationship("Product", back_populates="order_items")

25
app/models/product.py Normal file
View File

@ -0,0 +1,25 @@
from sqlalchemy import Column, Integer, String, Float, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from app.db.base_class import Base
class Product(Base):
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True, nullable=False)
description = Column(String, nullable=True)
sku = Column(String, index=True, unique=True, nullable=False)
barcode = Column(String, index=True, unique=True, nullable=True)
price = Column(Float, nullable=False)
cost = Column(Float, nullable=False)
is_active = Column(Boolean, default=True)
# Foreign keys
category_id = Column(Integer, ForeignKey("category.id"), nullable=True)
supplier_id = Column(Integer, ForeignKey("supplier.id"), nullable=True)
# Relationships
category = relationship("Category", back_populates="products")
supplier = relationship("Supplier", back_populates="products")
inventory_items = relationship("InventoryItem", back_populates="product")
order_items = relationship("OrderItem", back_populates="product")

16
app/models/supplier.py Normal file
View File

@ -0,0 +1,16 @@
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import relationship
from app.db.base_class import Base
class Supplier(Base):
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True, nullable=False)
contact_name = Column(String, nullable=True)
email = Column(String, nullable=True)
phone = Column(String, nullable=True)
address = Column(String, nullable=True)
# Relationships
products = relationship("Product", back_populates="supplier")

16
app/models/user.py Normal file
View File

@ -0,0 +1,16 @@
from sqlalchemy import Boolean, Column, Integer, String
from sqlalchemy.orm import relationship
from app.db.base_class import Base
class User(Base):
id = Column(Integer, primary_key=True, index=True)
full_name = Column(String, index=True)
email = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean(), default=True)
is_superuser = Column(Boolean(), default=False)
# Relationships
orders = relationship("Order", back_populates="user")

7
app/schemas/__init__.py Normal file
View File

@ -0,0 +1,7 @@
from .token import Token, TokenPayload
from .user import User, UserCreate, UserInDB, UserUpdate
from .category import Category, CategoryCreate, CategoryInDB, CategoryUpdate
from .supplier import Supplier, SupplierCreate, SupplierInDB, SupplierUpdate
from .product import Product, ProductCreate, ProductInDB, ProductUpdate
from .inventory import InventoryItem, InventoryItemCreate, InventoryItemInDB, InventoryItemUpdate
from .order import Order, OrderCreate, OrderInDB, OrderUpdate, OrderItem, OrderItemCreate, OrderItemInDB, OrderItemUpdate

36
app/schemas/category.py Normal file
View File

@ -0,0 +1,36 @@
from typing import Optional
from pydantic import BaseModel
# Shared properties
class CategoryBase(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
# Properties to receive via API on creation
class CategoryCreate(CategoryBase):
name: str
# Properties to receive via API on update
class CategoryUpdate(CategoryBase):
pass
class CategoryInDBBase(CategoryBase):
id: int
class Config:
orm_mode = True
# Additional properties to return via API
class Category(CategoryInDBBase):
pass
# Additional properties stored in DB
class CategoryInDB(CategoryInDBBase):
pass

42
app/schemas/inventory.py Normal file
View File

@ -0,0 +1,42 @@
from typing import Optional
from datetime import datetime
from pydantic import BaseModel, Field
# Shared properties
class InventoryItemBase(BaseModel):
product_id: Optional[int] = None
quantity: Optional[int] = 0
location: Optional[str] = None
min_stock_level: Optional[int] = 0
max_stock_level: Optional[int] = 0
# Properties to receive via API on creation
class InventoryItemCreate(InventoryItemBase):
product_id: int
quantity: int = Field(..., ge=0)
# Properties to receive via API on update
class InventoryItemUpdate(InventoryItemBase):
pass
class InventoryItemInDBBase(InventoryItemBase):
id: int
last_updated: datetime
class Config:
orm_mode = True
# Additional properties to return via API
class InventoryItem(InventoryItemInDBBase):
pass
# Additional properties stored in DB
class InventoryItemInDB(InventoryItemInDBBase):
pass

71
app/schemas/order.py Normal file
View File

@ -0,0 +1,71 @@
from typing import Optional, List
from datetime import datetime
from pydantic import BaseModel, Field
from app.models.order import OrderStatus
# OrderItem schemas
class OrderItemBase(BaseModel):
product_id: Optional[int] = None
quantity: Optional[int] = None
unit_price: Optional[float] = None
class OrderItemCreate(OrderItemBase):
product_id: int
quantity: int = Field(..., gt=0)
unit_price: float = Field(..., gt=0)
class OrderItemUpdate(OrderItemBase):
pass
class OrderItemInDBBase(OrderItemBase):
id: int
order_id: int
class Config:
orm_mode = True
class OrderItem(OrderItemInDBBase):
pass
class OrderItemInDB(OrderItemInDBBase):
pass
# Order schemas
class OrderBase(BaseModel):
status: Optional[OrderStatus] = None
notes: Optional[str] = None
class OrderCreate(OrderBase):
order_items: List[OrderItemCreate]
class OrderUpdate(OrderBase):
pass
class OrderInDBBase(OrderBase):
id: int
order_date: datetime
total_amount: float
user_id: int
class Config:
orm_mode = True
class Order(OrderInDBBase):
order_items: List[OrderItem] = []
class OrderInDB(OrderInDBBase):
pass

46
app/schemas/product.py Normal file
View File

@ -0,0 +1,46 @@
from typing import Optional
from pydantic import BaseModel, Field
# Shared properties
class ProductBase(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
sku: Optional[str] = None
barcode: Optional[str] = None
price: Optional[float] = None
cost: Optional[float] = None
is_active: Optional[bool] = True
category_id: Optional[int] = None
supplier_id: Optional[int] = None
# Properties to receive via API on creation
class ProductCreate(ProductBase):
name: str
sku: str
price: float = Field(..., gt=0)
cost: float = Field(..., ge=0)
# Properties to receive via API on update
class ProductUpdate(ProductBase):
pass
class ProductInDBBase(ProductBase):
id: int
class Config:
orm_mode = True
# Additional properties to return via API
class Product(ProductInDBBase):
pass
# Additional properties stored in DB
class ProductInDB(ProductInDBBase):
pass

39
app/schemas/supplier.py Normal file
View File

@ -0,0 +1,39 @@
from typing import Optional
from pydantic import BaseModel, EmailStr
# Shared properties
class SupplierBase(BaseModel):
name: Optional[str] = None
contact_name: Optional[str] = None
email: Optional[EmailStr] = None
phone: Optional[str] = None
address: Optional[str] = None
# Properties to receive via API on creation
class SupplierCreate(SupplierBase):
name: str
# Properties to receive via API on update
class SupplierUpdate(SupplierBase):
pass
class SupplierInDBBase(SupplierBase):
id: int
class Config:
orm_mode = True
# Additional properties to return via API
class Supplier(SupplierInDBBase):
pass
# Additional properties stored in DB
class SupplierInDB(SupplierInDBBase):
pass

12
app/schemas/token.py Normal file
View File

@ -0,0 +1,12 @@
from typing import Optional
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: Optional[int] = None

39
app/schemas/user.py Normal file
View File

@ -0,0 +1,39 @@
from typing import Optional
from pydantic import BaseModel, EmailStr
# Shared properties
class UserBase(BaseModel):
email: Optional[EmailStr] = None
is_active: Optional[bool] = True
is_superuser: bool = False
full_name: Optional[str] = None
# Properties to receive via API on creation
class UserCreate(UserBase):
email: EmailStr
password: str
# Properties to receive via API on update
class UserUpdate(UserBase):
password: Optional[str] = None
class UserInDBBase(UserBase):
id: Optional[int] = None
class Config:
orm_mode = True
# Additional properties to return via API
class User(UserInDBBase):
pass
# Additional properties stored in DB
class UserInDB(UserInDBBase):
hashed_password: str

10
kubernetes/secrets.yaml Normal file
View File

@ -0,0 +1,10 @@
apiVersion: v1
kind: Secret
metadata:
name: inventory-api-secrets
type: Opaque
data:
# These are example base64-encoded values. You should replace them with actual values.
# For example, to generate a value: echo -n "your-secret-key" | base64
secret-key: eW91ci1zZWNyZXQta2V5LWZvci1wcm9kdWN0aW9u
admin-password: YWRtaW4tcGFzc3dvcmQ=

15
kubernetes/service.yaml Normal file
View File

@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: prod-pod-service
labels:
app: inventory-api
spec:
type: ClusterIP
ports:
- port: 80
targetPort: 8000
protocol: TCP
name: http
selector:
app: inventory-api

58
main.py Normal file
View File

@ -0,0 +1,58 @@
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import RedirectResponse
from app.api.api_v1.api import api_router
from app.core.config import settings
app = FastAPI(
title=settings.PROJECT_NAME,
description="Small Business Inventory Management System API",
version="0.1.0",
openapi_url="/openapi.json",
docs_url="/docs",
redoc_url="/redoc",
)
# Set up CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include API router
app.include_router(api_router, prefix=settings.API_V1_STR)
@app.get("/", tags=["Root"])
async def root():
"""
Root endpoint returning basic service information.
"""
return {
"title": settings.PROJECT_NAME,
"documentation": "/docs",
"health": "/health"
}
@app.get("/health", tags=["Health"])
async def health():
"""
Health check endpoint.
"""
return {"status": "healthy"}
# Redirect to docs
@app.get("/docs-redirect")
async def docs_redirect():
return RedirectResponse(url="/docs")
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

11
requirements.txt Normal file
View File

@ -0,0 +1,11 @@
fastapi>=0.68.0,<0.69.0
pydantic>=1.8.0,<2.0.0
uvicorn>=0.15.0,<0.16.0
SQLAlchemy>=1.4.23,<1.5.0
alembic>=1.7.5,<1.8.0
python-jose[cryptography]>=3.3.0,<3.4.0
passlib[bcrypt]>=1.7.4,<1.8.0
python-multipart>=0.0.5,<0.0.6
email-validator>=1.1.3,<1.2.0
python-dotenv>=0.19.1,<0.20.0
ruff>=0.0.262