Implement Small Business Inventory Management System

This commit includes:
- Project structure setup with FastAPI and SQLite
- Database models and schemas for inventory management
- CRUD operations for all entities
- API endpoints for product, category, supplier, and inventory management
- User authentication with JWT tokens
- Initial database migration
- Comprehensive README with setup instructions
This commit is contained in:
Automated Action 2025-06-17 19:02:35 +00:00
parent d6b91cb22e
commit a17fe518a9
48 changed files with 2369 additions and 2 deletions

162
README.md
View File

@ -1,3 +1,161 @@
# FastAPI Application
# Small Business Inventory Management System
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform.
A comprehensive inventory management system designed for small businesses built with FastAPI and SQLite.
## Features
- **Product Management**: Create, update, and track products with SKU, barcode, price, cost, and more
- **Category Organization**: Organize products into categories for easier management
- **Supplier Management**: Track product suppliers with contact information
- **Inventory Tracking**: Monitor stock levels across your business
- **Transaction History**: Keep a complete history of inventory movements (purchases, sales, returns, etc.)
- **Low Stock Alerts**: Easily identify products that need to be reordered
- **User Authentication**: Secure API with JWT-based authentication and role-based permissions
## Installation
### Prerequisites
- Python 3.8 or higher
- pip (Python package manager)
### Setup
1. Clone the repository:
```bash
git clone <repository-url>
cd small-business-inventory-management-system
```
2. Create a virtual environment:
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
3. Install dependencies:
```bash
pip install -r requirements.txt
```
4. Set up environment variables:
```bash
# For development
export SECRET_KEY="your-super-secret-key-here" # On Windows: set SECRET_KEY=your-super-secret-key-here
```
5. Run database migrations:
```bash
alembic upgrade head
```
6. Create a superuser:
```bash
python -m app.initial_data # Script not included in this version, create manually via API
```
## Running the Application
Start the server with:
```bash
uvicorn main:app --reload --host 0.0.0.0 --port 8000
```
The API will be available at http://localhost:8000/
API documentation is available at:
- Swagger UI: http://localhost:8000/docs
- ReDoc: http://localhost:8000/redoc
## API Endpoints
### Authentication
- `POST /api/v1/login/access-token` - Get access token (login)
- `POST /api/v1/login/test-token` - Test token validity
### Users
- `GET /api/v1/users/` - List users (admin only)
- `POST /api/v1/users/` - Create user (admin only)
- `GET /api/v1/users/me` - Get current user
- `PUT /api/v1/users/me` - Update current user
- `GET /api/v1/users/{user_id}` - Get user by ID
- `PUT /api/v1/users/{user_id}` - Update user by ID (admin only)
### Categories
- `GET /api/v1/categories/` - List categories
- `POST /api/v1/categories/` - Create category
- `GET /api/v1/categories/{id}` - Get category by ID
- `PUT /api/v1/categories/{id}` - Update category
- `DELETE /api/v1/categories/{id}` - Delete category
### Suppliers
- `GET /api/v1/suppliers/` - List suppliers
- `POST /api/v1/suppliers/` - Create supplier
- `GET /api/v1/suppliers/{id}` - Get supplier by ID
- `PUT /api/v1/suppliers/{id}` - Update supplier
- `DELETE /api/v1/suppliers/{id}` - Delete supplier
### Products
- `GET /api/v1/products/` - List products (with optional filtering)
- `POST /api/v1/products/` - Create product
- `GET /api/v1/products/{id}` - Get product by ID
- `GET /api/v1/products/sku/{sku}` - Get product by SKU
- `GET /api/v1/products/barcode/{barcode}` - Get product by barcode
- `PUT /api/v1/products/{id}` - Update product
- `DELETE /api/v1/products/{id}` - Delete product
### Inventory
- `GET /api/v1/inventory/` - List inventory items
- `GET /api/v1/inventory/product/{product_id}` - Get inventory by product ID
- `GET /api/v1/inventory/low-stock` - Get products with low stock
- `GET /api/v1/inventory/summary` - Get inventory summary
- `PUT /api/v1/inventory/adjust/{product_id}` - Adjust inventory for a product
### Inventory Transactions
- `POST /api/v1/inventory/transactions/` - Create inventory transaction
- `GET /api/v1/inventory/transactions/` - List transactions (with optional filtering)
- `GET /api/v1/inventory/transactions/{transaction_id}` - Get transaction by ID
## Environment Variables
The application uses the following environment variables:
- `SECRET_KEY` - Secret key for JWT token generation and validation (default: "your-secret-key-here-replace-in-production")
- `ACCESS_TOKEN_EXPIRE_MINUTES` - JWT token expiration time in minutes (default: 30)
## Database Structure
The application uses SQLite as the database, stored in `/app/storage/db/db.sqlite`. The database schema includes:
- **User**: Authentication and system users
- **Category**: Product categories
- **Supplier**: Product suppliers
- **Product**: Product information
- **Inventory**: Current stock levels
- **InventoryTransaction**: History of inventory movements
## Contributing
1. Fork the repository
2. Create a feature branch: `git checkout -b feature-name`
3. Commit your changes: `git commit -am 'Add some feature'`
4. Push to the branch: `git push origin feature-name`
5. Submit a pull request
## License
This project is licensed under the MIT License - see the LICENSE file for details.

106
alembic.ini Normal file
View File

@ -0,0 +1,106 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python-dateutil library that can be
# installed by adding `alembic[tz]` to the pip requirements
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:migrations/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# SQLite URL example
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

1
app/__init__.py Normal file
View File

@ -0,0 +1 @@
# app package initialization

1
app/api/__init__.py Normal file
View File

@ -0,0 +1 @@
# api package initialization

60
app/api/deps.py Normal file
View File

@ -0,0 +1,60 @@
from typing import Generator
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.core.config import settings
from app.db.session import SessionLocal
reusable_oauth2 = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/login/access-token"
)
def get_db() -> Generator:
try:
db = SessionLocal()
yield db
finally:
db.close()
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(reusable_oauth2)
) -> models.User:
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
token_data = schemas.TokenPayload(**payload)
except (jwt.JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
user = crud.user.get(db, id=token_data.sub)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
def get_current_active_user(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not crud.user.is_active(current_user):
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def get_current_active_superuser(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not crud.user.is_superuser(current_user):
raise HTTPException(
status_code=400, detail="The user doesn't have enough privileges"
)
return current_user

1
app/api/v1/__init__.py Normal file
View File

@ -0,0 +1 @@
# api v1 package initialization

12
app/api/v1/api.py Normal file
View File

@ -0,0 +1,12 @@
from fastapi import APIRouter
from app.api.v1.endpoints import login, users, products, categories, suppliers, inventory
api_router = APIRouter()
api_router.include_router(login.router, tags=["Authentication"])
api_router.include_router(users.router, prefix="/users", tags=["Users"])
api_router.include_router(products.router, prefix="/products", tags=["Products"])
api_router.include_router(categories.router, prefix="/categories", tags=["Categories"])
api_router.include_router(suppliers.router, prefix="/suppliers", tags=["Suppliers"])
api_router.include_router(inventory.router, prefix="/inventory", tags=["Inventory"])

View File

@ -0,0 +1 @@
# API endpoints initialization

View File

@ -0,0 +1,112 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.Category])
def read_categories(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve categories.
"""
categories = crud.category.get_multi(db, skip=skip, limit=limit)
return categories
@router.post("/", response_model=schemas.Category)
def create_category(
*,
db: Session = Depends(deps.get_db),
category_in: schemas.CategoryCreate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new category.
"""
category = crud.category.get_by_name(db, name=category_in.name)
if category:
raise HTTPException(
status_code=400,
detail="The category with this name already exists in the system.",
)
category = crud.category.create(db, obj_in=category_in)
return category
@router.get("/{id}", response_model=schemas.Category)
def read_category(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get category by ID.
"""
category = crud.category.get(db, id=id)
if not category:
raise HTTPException(status_code=404, detail="Category not found")
return category
@router.put("/{id}", response_model=schemas.Category)
def update_category(
*,
db: Session = Depends(deps.get_db),
id: int,
category_in: schemas.CategoryUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update a category.
"""
category = crud.category.get(db, id=id)
if not category:
raise HTTPException(status_code=404, detail="Category not found")
# Check if the updated name conflicts with an existing category
if category_in.name and category_in.name != category.name:
existing_category = crud.category.get_by_name(db, name=category_in.name)
if existing_category:
raise HTTPException(
status_code=400,
detail="The category with this name already exists in the system.",
)
category = crud.category.update(db, db_obj=category, obj_in=category_in)
return category
@router.delete("/{id}", status_code=204, response_model=None)
def delete_category(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Delete a category.
"""
category = crud.category.get(db, id=id)
if not category:
raise HTTPException(status_code=404, detail="Category not found")
# Check if the category has products
if category.products:
raise HTTPException(
status_code=400,
detail="Cannot delete category with associated products. Reassign or delete the products first.",
)
crud.category.remove(db, id=id)
return None

View File

@ -0,0 +1,200 @@
from typing import Any, List, Optional
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Body
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.Inventory])
def read_inventories(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
location: Optional[str] = None,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve inventory items with optional filtering by location.
"""
if location:
inventories = crud.inventory.get_by_location(db, location=location, skip=skip, limit=limit)
else:
inventories = crud.inventory.get_multi(db, skip=skip, limit=limit)
return inventories
@router.get("/product/{product_id}", response_model=schemas.Inventory)
def read_inventory_by_product(
*,
db: Session = Depends(deps.get_db),
product_id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get inventory by product ID.
"""
inventory = crud.inventory.get_by_product_id(db, product_id=product_id)
if not inventory:
raise HTTPException(status_code=404, detail="Inventory not found for this product")
return inventory
@router.get("/low-stock", response_model=schemas.InventorySummaryList)
def read_low_stock(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get products with inventory below minimum stock level.
"""
low_stock_items = crud.inventory.get_low_stock(db, skip=skip, limit=limit)
return {"inventories": low_stock_items}
@router.get("/summary", response_model=schemas.InventorySummaryList)
def read_inventory_summary(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get inventory summary for all products.
"""
summary = crud.inventory.get_inventory_summary(db, skip=skip, limit=limit)
return {"inventories": summary}
@router.put("/adjust/{product_id}", response_model=schemas.Inventory)
def adjust_inventory(
*,
db: Session = Depends(deps.get_db),
product_id: int,
quantity_change: int = Body(..., description="Amount to add or subtract from inventory"),
location: Optional[str] = Body(None, description="Location of the inventory"),
notes: Optional[str] = Body(None, description="Notes about the adjustment"),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Adjust inventory quantity for a product.
Positive values increase stock, negative values decrease stock.
"""
# Verify the product exists
product = crud.product.get(db, id=product_id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
# Update inventory and create transaction record
result = crud.inventory.update_stock(
db=db,
product_id=product_id,
quantity_change=quantity_change,
user_id=current_user.id,
transaction_type="adjustment",
notes=notes
)
# Update location if provided
if location and location != result["inventory"].location:
inventory_update = schemas.InventoryUpdate(
product_id=product_id,
quantity=result["inventory"].quantity,
location=location
)
inventory = crud.inventory.update(db, db_obj=result["inventory"], obj_in=inventory_update)
return inventory
return result["inventory"]
@router.post("/transactions/", response_model=schemas.InventoryTransaction)
def create_inventory_transaction(
*,
db: Session = Depends(deps.get_db),
transaction_in: schemas.InventoryTransactionCreate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create a new inventory transaction and update inventory accordingly.
"""
# Verify the product exists
product = crud.product.get(db, id=transaction_in.product_id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
# Validate transaction type
valid_types = ["purchase", "sale", "adjustment", "return", "transfer"]
if transaction_in.transaction_type not in valid_types:
raise HTTPException(
status_code=400,
detail=f"Invalid transaction type. Must be one of: {', '.join(valid_types)}"
)
# Update inventory and create transaction record
result = crud.inventory.update_stock(
db=db,
product_id=transaction_in.product_id,
quantity_change=transaction_in.quantity,
user_id=current_user.id,
transaction_type=transaction_in.transaction_type,
reference=transaction_in.reference,
unit_price=transaction_in.unit_price,
notes=transaction_in.notes
)
return result["transaction"]
@router.get("/transactions/", response_model=List[schemas.InventoryTransaction])
def read_inventory_transactions(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
product_id: Optional[int] = None,
transaction_type: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve inventory transactions with optional filtering.
"""
if product_id:
transactions = crud.inventory_transaction.get_by_product_id(
db, product_id=product_id, skip=skip, limit=limit
)
elif transaction_type:
transactions = crud.inventory_transaction.get_by_type(
db, transaction_type=transaction_type, skip=skip, limit=limit
)
elif start_date and end_date:
transactions = crud.inventory_transaction.get_by_date_range(
db, start_date=start_date, end_date=end_date, skip=skip, limit=limit
)
else:
transactions = crud.inventory_transaction.get_multi(db, skip=skip, limit=limit)
return transactions
@router.get("/transactions/{transaction_id}", response_model=schemas.InventoryTransaction)
def read_inventory_transaction(
*,
db: Session = Depends(deps.get_db),
transaction_id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get inventory transaction by ID.
"""
transaction = crud.inventory_transaction.get(db, id=transaction_id)
if not transaction:
raise HTTPException(status_code=404, detail="Transaction not found")
return transaction

View File

@ -0,0 +1,44 @@
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
from app.core import security
from app.core.config import settings
router = APIRouter()
@router.post("/login/access-token", response_model=schemas.Token)
def login_access_token(
db: Session = Depends(deps.get_db), form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests
"""
user = crud.user.authenticate(
db, email=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(status_code=400, detail="Incorrect email or password")
elif not crud.user.is_active(user):
raise HTTPException(status_code=400, detail="Inactive user")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/login/test-token", response_model=schemas.User)
def test_token(current_user: models.User = Depends(deps.get_current_user)) -> Any:
"""
Test access token
"""
return current_user

View File

@ -0,0 +1,221 @@
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.Product])
def read_products(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
name: Optional[str] = None,
category_id: Optional[int] = None,
supplier_id: Optional[int] = None,
active_only: bool = False,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve products with optional filtering.
"""
if name:
products = crud.product.get_by_name(db, name=name)
elif category_id:
products = crud.product.get_by_category_id(db, category_id=category_id, skip=skip, limit=limit)
elif supplier_id:
products = crud.product.get_by_supplier_id(db, supplier_id=supplier_id, skip=skip, limit=limit)
elif active_only:
products = crud.product.get_active(db, skip=skip, limit=limit)
else:
products = crud.product.get_multi(db, skip=skip, limit=limit)
return products
@router.post("/", response_model=schemas.Product)
def create_product(
*,
db: Session = Depends(deps.get_db),
product_in: schemas.ProductCreate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new product.
"""
# Check if SKU exists
product = crud.product.get_by_sku(db, sku=product_in.sku)
if product:
raise HTTPException(
status_code=400,
detail="A product with this SKU already exists in the system.",
)
# Check if barcode exists (if provided)
if product_in.barcode:
product = crud.product.get_by_barcode(db, barcode=product_in.barcode)
if product:
raise HTTPException(
status_code=400,
detail="A product with this barcode already exists in the system.",
)
# Validate category_id if provided
if product_in.category_id:
category = crud.category.get(db, id=product_in.category_id)
if not category:
raise HTTPException(
status_code=404,
detail=f"Category with ID {product_in.category_id} not found",
)
# Validate supplier_id if provided
if product_in.supplier_id:
supplier = crud.supplier.get(db, id=product_in.supplier_id)
if not supplier:
raise HTTPException(
status_code=404,
detail=f"Supplier with ID {product_in.supplier_id} not found",
)
product = crud.product.create(db, obj_in=product_in)
# Create initial inventory record with 0 quantity
inventory_in = schemas.InventoryCreate(
product_id=product.id,
quantity=0,
location=None
)
crud.inventory.create(db, obj_in=inventory_in)
return product
@router.get("/{id}", response_model=schemas.Product)
def read_product(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get product by ID.
"""
product = crud.product.get(db, id=id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
return product
@router.get("/sku/{sku}", response_model=schemas.Product)
def read_product_by_sku(
*,
db: Session = Depends(deps.get_db),
sku: str,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get product by SKU.
"""
product = crud.product.get_by_sku(db, sku=sku)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
return product
@router.get("/barcode/{barcode}", response_model=schemas.Product)
def read_product_by_barcode(
*,
db: Session = Depends(deps.get_db),
barcode: str,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get product by barcode.
"""
product = crud.product.get_by_barcode(db, barcode=barcode)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
return product
@router.put("/{id}", response_model=schemas.Product)
def update_product(
*,
db: Session = Depends(deps.get_db),
id: int,
product_in: schemas.ProductUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update a product.
"""
product = crud.product.get(db, id=id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
# Check if SKU exists and is different from current product
if product_in.sku and product_in.sku != product.sku:
existing_product = crud.product.get_by_sku(db, sku=product_in.sku)
if existing_product:
raise HTTPException(
status_code=400,
detail="A product with this SKU already exists in the system.",
)
# Check if barcode exists and is different from current product
if product_in.barcode and product_in.barcode != product.barcode:
existing_product = crud.product.get_by_barcode(db, barcode=product_in.barcode)
if existing_product:
raise HTTPException(
status_code=400,
detail="A product with this barcode already exists in the system.",
)
# Validate category_id if provided
if product_in.category_id and product_in.category_id != product.category_id:
category = crud.category.get(db, id=product_in.category_id)
if not category:
raise HTTPException(
status_code=404,
detail=f"Category with ID {product_in.category_id} not found",
)
# Validate supplier_id if provided
if product_in.supplier_id and product_in.supplier_id != product.supplier_id:
supplier = crud.supplier.get(db, id=product_in.supplier_id)
if not supplier:
raise HTTPException(
status_code=404,
detail=f"Supplier with ID {product_in.supplier_id} not found",
)
product = crud.product.update(db, db_obj=product, obj_in=product_in)
return product
@router.delete("/{id}", status_code=204, response_model=None)
def delete_product(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Delete a product.
"""
product = crud.product.get(db, id=id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
# First, delete associated inventory records
inventory = crud.inventory.get_by_product_id(db, product_id=id)
if inventory:
db.delete(inventory)
db.commit()
crud.product.remove(db, id=id)
return None

View File

@ -0,0 +1,96 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.Supplier])
def read_suppliers(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve suppliers.
"""
suppliers = crud.supplier.get_multi(db, skip=skip, limit=limit)
return suppliers
@router.post("/", response_model=schemas.Supplier)
def create_supplier(
*,
db: Session = Depends(deps.get_db),
supplier_in: schemas.SupplierCreate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new supplier.
"""
supplier = crud.supplier.create(db, obj_in=supplier_in)
return supplier
@router.get("/{id}", response_model=schemas.Supplier)
def read_supplier(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get supplier by ID.
"""
supplier = crud.supplier.get(db, id=id)
if not supplier:
raise HTTPException(status_code=404, detail="Supplier not found")
return supplier
@router.put("/{id}", response_model=schemas.Supplier)
def update_supplier(
*,
db: Session = Depends(deps.get_db),
id: int,
supplier_in: schemas.SupplierUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update a supplier.
"""
supplier = crud.supplier.get(db, id=id)
if not supplier:
raise HTTPException(status_code=404, detail="Supplier not found")
supplier = crud.supplier.update(db, db_obj=supplier, obj_in=supplier_in)
return supplier
@router.delete("/{id}", status_code=204, response_model=None)
def delete_supplier(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Delete a supplier.
"""
supplier = crud.supplier.get(db, id=id)
if not supplier:
raise HTTPException(status_code=404, detail="Supplier not found")
# Check if the supplier has products
if supplier.products:
raise HTTPException(
status_code=400,
detail="Cannot delete supplier with associated products. Reassign or delete the products first.",
)
crud.supplier.remove(db, id=id)
return None

View File

@ -0,0 +1,120 @@
from typing import Any, List
from fastapi import APIRouter, Body, Depends, HTTPException
from fastapi.encoders import jsonable_encoder
from pydantic import EmailStr
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.User])
def read_users(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Retrieve users.
"""
users = crud.user.get_multi(db, skip=skip, limit=limit)
return users
@router.post("/", response_model=schemas.User)
def create_user(
*,
db: Session = Depends(deps.get_db),
user_in: schemas.UserCreate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Create new user.
"""
user = crud.user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this email already exists in the system.",
)
user = crud.user.create(db, obj_in=user_in)
return user
@router.get("/me", response_model=schemas.User)
def read_user_me(
db: Session = Depends(deps.get_db),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get current user.
"""
return current_user
@router.put("/me", response_model=schemas.User)
def update_user_me(
*,
db: Session = Depends(deps.get_db),
password: str = Body(None),
full_name: str = Body(None),
email: EmailStr = Body(None),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update own user.
"""
current_user_data = jsonable_encoder(current_user)
user_in = schemas.UserUpdate(**current_user_data)
if password is not None:
user_in.password = password
if full_name is not None:
user_in.full_name = full_name
if email is not None:
user_in.email = email
user = crud.user.update(db, db_obj=current_user, obj_in=user_in)
return user
@router.get("/{user_id}", response_model=schemas.User)
def read_user_by_id(
user_id: int,
current_user: models.User = Depends(deps.get_current_active_user),
db: Session = Depends(deps.get_db),
) -> Any:
"""
Get a specific user by id.
"""
user = crud.user.get(db, id=user_id)
if user == current_user:
return user
if not crud.user.is_superuser(current_user):
raise HTTPException(
status_code=400, detail="The user doesn't have enough privileges"
)
return user
@router.put("/{user_id}", response_model=schemas.User)
def update_user(
*,
db: Session = Depends(deps.get_db),
user_id: int,
user_in: schemas.UserUpdate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Update a user.
"""
user = crud.user.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system",
)
user = crud.user.update(db, db_obj=user, obj_in=user_in)
return user

1
app/core/__init__.py Normal file
View File

@ -0,0 +1 @@
# core package initialization

27
app/core/config.py Normal file
View File

@ -0,0 +1,27 @@
import os
from pathlib import Path
from typing import List
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
API_V1_STR: str = "/api/v1"
PROJECT_NAME: str = "Small Business Inventory Management System"
VERSION: str = "0.1.0"
# Security
SECRET_KEY: str = os.environ.get("SECRET_KEY", "your-secret-key-here-replace-in-production")
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# Database
DB_DIR: Path = Path("/app") / "storage" / "db"
# CORS
BACKEND_CORS_ORIGINS: List[str] = ["*"]
model_config = SettingsConfigDict(case_sensitive=True)
settings = Settings()
# Ensure DB directory exists
settings.DB_DIR.mkdir(parents=True, exist_ok=True)

31
app/core/security.py Normal file
View File

@ -0,0 +1,31 @@
from datetime import datetime, timedelta
from typing import Any, Union
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(
subject: Union[str, Any], expires_delta: timedelta = None
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)

5
app/crud/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from .crud_user import user as user
from .crud_category import category as category
from .crud_supplier import supplier as supplier
from .crud_product import product as product
from .crud_inventory import inventory as inventory, inventory_transaction as inventory_transaction

66
app/crud/base.py Normal file
View File

@ -0,0 +1,66 @@
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.db.base import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
"""
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
**Parameters**
* `model`: A SQLAlchemy model class
* `schema`: A Pydantic model (schema) class
"""
self.model = model
def get(self, db: Session, id: Any) -> Optional[ModelType]:
return db.query(self.model).filter(self.model.id == id).first()
def get_multi(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
return db.query(self.model).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self,
db: Session,
*,
db_obj: ModelType,
obj_in: Union[UpdateSchemaType, Dict[str, Any]]
) -> ModelType:
obj_data = jsonable_encoder(db_obj)
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.model_dump(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def remove(self, db: Session, *, id: int) -> ModelType:
obj = db.query(self.model).get(id)
db.delete(obj)
db.commit()
return obj

20
app/crud/crud_category.py Normal file
View File

@ -0,0 +1,20 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.category import Category
from app.schemas.category import CategoryCreate, CategoryUpdate
class CRUDCategory(CRUDBase[Category, CategoryCreate, CategoryUpdate]):
def get_by_name(self, db: Session, *, name: str) -> Optional[Category]:
return db.query(Category).filter(Category.name == name).first()
def get_multi_by_ids(
self, db: Session, *, ids: List[int], skip: int = 0, limit: int = 100
) -> List[Category]:
return db.query(Category).filter(Category.id.in_(ids)).offset(skip).limit(limit).all()
category = CRUDCategory(Category)

172
app/crud/crud_inventory.py Normal file
View File

@ -0,0 +1,172 @@
from typing import List, Optional, Dict, Any
from datetime import datetime
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.inventory import Inventory, InventoryTransaction
from app.models.product import Product
from app.schemas.inventory import (
InventoryCreate, InventoryUpdate,
InventoryTransactionCreate, InventoryTransactionUpdate
)
class CRUDInventory(CRUDBase[Inventory, InventoryCreate, InventoryUpdate]):
def get_by_product_id(self, db: Session, *, product_id: int) -> Optional[Inventory]:
return db.query(Inventory).filter(Inventory.product_id == product_id).first()
def get_by_location(
self, db: Session, *, location: str, skip: int = 0, limit: int = 100
) -> List[Inventory]:
return db.query(Inventory).filter(Inventory.location == location).offset(skip).limit(limit).all()
def get_low_stock(self, db: Session, *, skip: int = 0, limit: int = 100) -> List[Dict[str, Any]]:
"""Get products where inventory is below minimum stock level."""
query = (
db.query(
Inventory,
Product.name,
Product.sku,
Product.min_stock_level
)
.join(Product)
.filter(Inventory.quantity <= Product.min_stock_level)
.offset(skip)
.limit(limit)
)
result = []
for inventory, name, sku, min_stock_level in query.all():
result.append({
"product_id": inventory.product_id,
"product_name": name,
"sku": sku,
"current_stock": inventory.quantity,
"min_stock_level": min_stock_level,
"is_low_stock": True
})
return result
def get_inventory_summary(self, db: Session, *, skip: int = 0, limit: int = 100) -> List[Dict[str, Any]]:
"""Get inventory summary for all products."""
query = (
db.query(
Inventory,
Product.name,
Product.sku,
Product.min_stock_level
)
.join(Product)
.offset(skip)
.limit(limit)
)
result = []
for inventory, name, sku, min_stock_level in query.all():
result.append({
"product_id": inventory.product_id,
"product_name": name,
"sku": sku,
"current_stock": inventory.quantity,
"min_stock_level": min_stock_level,
"is_low_stock": inventory.quantity <= min_stock_level
})
return result
def update_stock(
self, db: Session, *, product_id: int, quantity_change: int, user_id: Optional[int] = None,
transaction_type: str = "adjustment", reference: Optional[str] = None,
unit_price: Optional[float] = None, notes: Optional[str] = None
) -> Dict[str, Any]:
"""Update stock level and create a transaction record."""
inventory = self.get_by_product_id(db=db, product_id=product_id)
if not inventory:
# Create new inventory record if it doesn't exist
inventory_in = InventoryCreate(
product_id=product_id,
quantity=quantity_change if quantity_change > 0 else 0, # Don't allow negative initial stock
last_counted_at=datetime.now()
)
inventory = super().create(db=db, obj_in=inventory_in)
else:
# Update existing inventory
new_quantity = inventory.quantity + quantity_change
if new_quantity < 0:
new_quantity = 0 # Don't allow negative stock
inventory_in = InventoryUpdate(
product_id=product_id,
quantity=new_quantity,
last_counted_at=datetime.now()
)
inventory = super().update(db=db, db_obj=inventory, obj_in=inventory_in)
# Create transaction record
transaction = InventoryTransaction(
product_id=product_id,
quantity=quantity_change,
transaction_type=transaction_type,
reference=reference,
unit_price=unit_price,
notes=notes,
transaction_date=datetime.now(),
user_id=user_id
)
db.add(transaction)
db.commit()
db.refresh(transaction)
return {
"inventory": inventory,
"transaction": transaction
}
class CRUDInventoryTransaction(CRUDBase[InventoryTransaction, InventoryTransactionCreate, InventoryTransactionUpdate]):
def get_by_product_id(
self, db: Session, *, product_id: int, skip: int = 0, limit: int = 100
) -> List[InventoryTransaction]:
return (
db.query(InventoryTransaction)
.filter(InventoryTransaction.product_id == product_id)
.order_by(InventoryTransaction.transaction_date.desc())
.offset(skip)
.limit(limit)
.all()
)
def get_by_type(
self, db: Session, *, transaction_type: str, skip: int = 0, limit: int = 100
) -> List[InventoryTransaction]:
return (
db.query(InventoryTransaction)
.filter(InventoryTransaction.transaction_type == transaction_type)
.order_by(InventoryTransaction.transaction_date.desc())
.offset(skip)
.limit(limit)
.all()
)
def get_by_date_range(
self, db: Session, *, start_date: datetime, end_date: datetime,
skip: int = 0, limit: int = 100
) -> List[InventoryTransaction]:
return (
db.query(InventoryTransaction)
.filter(
InventoryTransaction.transaction_date >= start_date,
InventoryTransaction.transaction_date <= end_date
)
.order_by(InventoryTransaction.transaction_date.desc())
.offset(skip)
.limit(limit)
.all()
)
inventory = CRUDInventory(Inventory)
inventory_transaction = CRUDInventoryTransaction(InventoryTransaction)

36
app/crud/crud_product.py Normal file
View File

@ -0,0 +1,36 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.product import Product
from app.schemas.product import ProductCreate, ProductUpdate
class CRUDProduct(CRUDBase[Product, ProductCreate, ProductUpdate]):
def get_by_sku(self, db: Session, *, sku: str) -> Optional[Product]:
return db.query(Product).filter(Product.sku == sku).first()
def get_by_barcode(self, db: Session, *, barcode: str) -> Optional[Product]:
return db.query(Product).filter(Product.barcode == barcode).first()
def get_by_name(self, db: Session, *, name: str) -> List[Product]:
return db.query(Product).filter(Product.name.ilike(f"%{name}%")).all()
def get_by_category_id(
self, db: Session, *, category_id: int, skip: int = 0, limit: int = 100
) -> List[Product]:
return db.query(Product).filter(Product.category_id == category_id).offset(skip).limit(limit).all()
def get_by_supplier_id(
self, db: Session, *, supplier_id: int, skip: int = 0, limit: int = 100
) -> List[Product]:
return db.query(Product).filter(Product.supplier_id == supplier_id).offset(skip).limit(limit).all()
def get_active(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[Product]:
return db.query(Product).filter(Product.is_active).offset(skip).limit(limit).all()
product = CRUDProduct(Product)

20
app/crud/crud_supplier.py Normal file
View File

@ -0,0 +1,20 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.supplier import Supplier
from app.schemas.supplier import SupplierCreate, SupplierUpdate
class CRUDSupplier(CRUDBase[Supplier, SupplierCreate, SupplierUpdate]):
def get_by_name(self, db: Session, *, name: str) -> Optional[Supplier]:
return db.query(Supplier).filter(Supplier.name == name).first()
def get_multi_by_ids(
self, db: Session, *, ids: List[int], skip: int = 0, limit: int = 100
) -> List[Supplier]:
return db.query(Supplier).filter(Supplier.id.in_(ids)).offset(skip).limit(limit).all()
supplier = CRUDSupplier(Supplier)

56
app/crud/crud_user.py Normal file
View File

@ -0,0 +1,56 @@
from typing import Any, Dict, Optional, Union
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.crud.base import CRUDBase
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()
def create(self, db: Session, *, obj_in: UserCreate) -> User:
db_obj = User(
email=obj_in.email,
hashed_password=get_password_hash(obj_in.password),
full_name=obj_in.full_name,
is_superuser=obj_in.is_superuser,
is_active=obj_in.is_active
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self, db: Session, *, db_obj: User, obj_in: Union[UserUpdate, Dict[str, Any]]
) -> User:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.model_dump(exclude_unset=True)
if update_data.get("password"):
hashed_password = get_password_hash(update_data["password"])
del update_data["password"]
update_data["hashed_password"] = hashed_password
return super().update(db, db_obj=db_obj, obj_in=update_data)
def authenticate(self, db: Session, *, email: str, password: str) -> Optional[User]:
user = self.get_by_email(db, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active(self, user: User) -> bool:
return user.is_active
def is_superuser(self, user: User) -> bool:
return user.is_superuser
user = CRUDUser(User)

1
app/db/__init__.py Normal file
View File

@ -0,0 +1 @@
# db package initialization

6
app/db/base.py Normal file
View File

@ -0,0 +1,6 @@
# Import all models here for Alembic to discover
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# Import all models so Alembic can discover them

19
app/db/base_class.py Normal file
View File

@ -0,0 +1,19 @@
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy import Column, Integer, DateTime
from sqlalchemy.sql import func
class BaseClass:
"""
Base class for all SQLAlchemy models.
Provides common attributes like id and created_at/updated_at timestamps.
"""
id = Column(Integer, primary_key=True, index=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
# Generate __tablename__ automatically
@declared_attr
def __tablename__(cls) -> str:
return cls.__name__.lower()

26
app/db/session.py Normal file
View File

@ -0,0 +1,26 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
# Ensure directory exists
settings.DB_DIR.mkdir(parents=True, exist_ok=True)
SQLALCHEMY_DATABASE_URL = f"sqlite:///{settings.DB_DIR}/db.sqlite"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
"""
Dependency function to get a DB session.
"""
db = SessionLocal()
try:
yield db
finally:
db.close()

1
app/models/__init__.py Normal file
View File

@ -0,0 +1 @@
# models package initialization

16
app/models/category.py Normal file
View File

@ -0,0 +1,16 @@
from sqlalchemy import Column, String, Text
from sqlalchemy.orm import relationship
from app.db.base_class import BaseClass
from app.db.base import Base
class Category(Base, BaseClass):
"""
Category model for product categorization.
"""
name = Column(String(255), unique=True, index=True, nullable=False)
description = Column(Text, nullable=True)
# Relationships
products = relationship("Product", back_populates="category")

41
app/models/inventory.py Normal file
View File

@ -0,0 +1,41 @@
from sqlalchemy import Column, Integer, String, ForeignKey, Float, DateTime, Text
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.db.base_class import BaseClass
from app.db.base import Base
class Inventory(Base, BaseClass):
"""
Inventory model for tracking stock levels of products.
"""
product_id = Column(Integer, ForeignKey("product.id"), nullable=False)
quantity = Column(Integer, nullable=False, default=0)
location = Column(String(255), nullable=True)
last_counted_at = Column(DateTime(timezone=True), nullable=True)
# Relationships
product = relationship("Product", back_populates="inventory_items")
class InventoryTransaction(Base, BaseClass):
"""
Inventory Transaction model for tracking stock movements.
"""
TRANSACTION_TYPES = ["purchase", "sale", "adjustment", "return", "transfer"]
product_id = Column(Integer, ForeignKey("product.id"), nullable=False)
quantity = Column(Integer, nullable=False)
transaction_type = Column(String(50), nullable=False) # purchase, sale, adjustment, return, transfer
reference = Column(String(255), nullable=True) # Order number, invoice number, etc.
unit_price = Column(Float, nullable=True)
notes = Column(Text, nullable=True)
transaction_date = Column(DateTime(timezone=True), server_default=func.now())
# Optional user who performed the transaction
user_id = Column(Integer, ForeignKey("user.id"), nullable=True)
# Relationships
product = relationship("Product")
user = relationship("User")

30
app/models/product.py Normal file
View File

@ -0,0 +1,30 @@
from sqlalchemy import Column, String, Text, Integer, Float, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from app.db.base_class import BaseClass
from app.db.base import Base
class Product(Base, BaseClass):
"""
Product model for inventory items.
"""
name = Column(String(255), index=True, nullable=False)
description = Column(Text, nullable=True)
sku = Column(String(50), unique=True, index=True, nullable=False)
barcode = Column(String(100), unique=True, index=True, nullable=True)
price = Column(Float, nullable=False, default=0.0)
cost = Column(Float, nullable=False, default=0.0)
# Stock management
min_stock_level = Column(Integer, nullable=False, default=0)
is_active = Column(Boolean, default=True)
# Foreign keys
category_id = Column(Integer, ForeignKey("category.id"), nullable=True)
supplier_id = Column(Integer, ForeignKey("supplier.id"), nullable=True)
# Relationships
category = relationship("Category", back_populates="products")
supplier = relationship("Supplier", back_populates="products")
inventory_items = relationship("Inventory", back_populates="product")

21
app/models/supplier.py Normal file
View File

@ -0,0 +1,21 @@
from sqlalchemy import Column, String, Text
from sqlalchemy.orm import relationship
from app.db.base_class import BaseClass
from app.db.base import Base
class Supplier(Base, BaseClass):
"""
Supplier model for managing product suppliers.
"""
name = Column(String(255), index=True, nullable=False)
contact_name = Column(String(255), nullable=True)
email = Column(String(255), nullable=True)
phone = Column(String(50), nullable=True)
address = Column(Text, nullable=True)
website = Column(String(255), nullable=True)
notes = Column(Text, nullable=True)
# Relationships
products = relationship("Product", back_populates="supplier")

19
app/models/user.py Normal file
View File

@ -0,0 +1,19 @@
from sqlalchemy import Boolean, Column, String
from app.db.base_class import BaseClass
from app.db.base import Base
class User(Base, BaseClass):
"""
User model for authentication and authorization.
"""
email = Column(String, unique=True, index=True, nullable=False)
full_name = Column(String, index=True)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
# Relationships
# When we create inventory transactions, we can link them to users
# transactions = relationship("InventoryTransaction", back_populates="user")

12
app/schemas/__init__.py Normal file
View File

@ -0,0 +1,12 @@
from .token import Token as Token, TokenPayload as TokenPayload
from .user import User as User, UserCreate as UserCreate, UserInDB as UserInDB, UserUpdate as UserUpdate
from .category import Category as Category, CategoryCreate as CategoryCreate, CategoryUpdate as CategoryUpdate
from .supplier import Supplier as Supplier, SupplierCreate as SupplierCreate, SupplierUpdate as SupplierUpdate
from .product import Product as Product, ProductCreate as ProductCreate, ProductUpdate as ProductUpdate
from .inventory import (
Inventory as Inventory, InventoryCreate as InventoryCreate, InventoryUpdate as InventoryUpdate,
InventoryTransaction as InventoryTransaction,
InventoryTransactionCreate as InventoryTransactionCreate,
InventoryTransactionUpdate as InventoryTransactionUpdate,
InventorySummary as InventorySummary, InventorySummaryList as InventorySummaryList
)

37
app/schemas/category.py Normal file
View File

@ -0,0 +1,37 @@
from typing import Optional
from pydantic import BaseModel
# Shared properties
class CategoryBase(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
# Properties to receive on category creation
class CategoryCreate(CategoryBase):
name: str
# Properties to receive on category update
class CategoryUpdate(CategoryBase):
pass
# Properties shared by models in DB
class CategoryInDBBase(CategoryBase):
id: int
name: str
class Config:
from_attributes = True
# Properties to return to client
class Category(CategoryInDBBase):
pass
# Properties properties stored in DB
class CategoryInDB(CategoryInDBBase):
pass

98
app/schemas/inventory.py Normal file
View File

@ -0,0 +1,98 @@
from typing import Optional, List
from datetime import datetime
from pydantic import BaseModel
# Shared properties for Inventory
class InventoryBase(BaseModel):
product_id: int
quantity: int = 0
location: Optional[str] = None
last_counted_at: Optional[datetime] = None
# Properties to receive on inventory creation
class InventoryCreate(InventoryBase):
pass
# Properties to receive on inventory update
class InventoryUpdate(InventoryBase):
product_id: Optional[int] = None
# Properties shared by models in DB
class InventoryInDBBase(InventoryBase):
id: int
product_id: int
class Config:
from_attributes = True
# Properties to return to client
class Inventory(InventoryInDBBase):
pass
# Properties properties stored in DB
class InventoryInDB(InventoryInDBBase):
pass
# Shared properties for InventoryTransaction
class InventoryTransactionBase(BaseModel):
product_id: int
quantity: int
transaction_type: str # purchase, sale, adjustment, return, transfer
reference: Optional[str] = None
unit_price: Optional[float] = None
notes: Optional[str] = None
transaction_date: Optional[datetime] = None
user_id: Optional[int] = None
# Properties to receive on inventory transaction creation
class InventoryTransactionCreate(InventoryTransactionBase):
pass
# Properties to receive on inventory transaction update
class InventoryTransactionUpdate(InventoryTransactionBase):
product_id: Optional[int] = None
quantity: Optional[int] = None
transaction_type: Optional[str] = None
# Properties shared by models in DB
class InventoryTransactionInDBBase(InventoryTransactionBase):
id: int
transaction_date: datetime
class Config:
from_attributes = True
# Properties to return to client
class InventoryTransaction(InventoryTransactionInDBBase):
pass
# Properties properties stored in DB
class InventoryTransactionInDB(InventoryTransactionInDBBase):
pass
# Inventory summary for a product
class InventorySummary(BaseModel):
product_id: int
product_name: str
sku: str
current_stock: int
min_stock_level: int
is_low_stock: bool
# List of inventory summaries
class InventorySummaryList(BaseModel):
inventories: List[InventorySummary]

47
app/schemas/product.py Normal file
View File

@ -0,0 +1,47 @@
from typing import Optional
from pydantic import BaseModel
# Shared properties
class ProductBase(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
sku: Optional[str] = None
barcode: Optional[str] = None
price: Optional[float] = 0.0
cost: Optional[float] = 0.0
min_stock_level: Optional[int] = 0
is_active: Optional[bool] = True
category_id: Optional[int] = None
supplier_id: Optional[int] = None
# Properties to receive on product creation
class ProductCreate(ProductBase):
name: str
sku: str
# Properties to receive on product update
class ProductUpdate(ProductBase):
pass
# Properties shared by models in DB
class ProductInDBBase(ProductBase):
id: int
name: str
sku: str
class Config:
from_attributes = True
# Properties to return to client
class Product(ProductInDBBase):
pass
# Properties properties stored in DB
class ProductInDB(ProductInDBBase):
pass

42
app/schemas/supplier.py Normal file
View File

@ -0,0 +1,42 @@
from typing import Optional
from pydantic import BaseModel, EmailStr
# Shared properties
class SupplierBase(BaseModel):
name: Optional[str] = None
contact_name: Optional[str] = None
email: Optional[EmailStr] = None
phone: Optional[str] = None
address: Optional[str] = None
website: Optional[str] = None
notes: Optional[str] = None
# Properties to receive on supplier creation
class SupplierCreate(SupplierBase):
name: str
# Properties to receive on supplier update
class SupplierUpdate(SupplierBase):
pass
# Properties shared by models in DB
class SupplierInDBBase(SupplierBase):
id: int
name: str
class Config:
from_attributes = True
# Properties to return to client
class Supplier(SupplierInDBBase):
pass
# Properties properties stored in DB
class SupplierInDB(SupplierInDBBase):
pass

11
app/schemas/token.py Normal file
View File

@ -0,0 +1,11 @@
from typing import Optional
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: Optional[int] = None

38
app/schemas/user.py Normal file
View File

@ -0,0 +1,38 @@
from typing import Optional
from pydantic import BaseModel, EmailStr
# Shared properties
class UserBase(BaseModel):
email: Optional[EmailStr] = None
is_active: Optional[bool] = True
is_superuser: bool = False
full_name: Optional[str] = None
# Properties to receive via API on creation
class UserCreate(UserBase):
email: EmailStr
password: str
# Properties to receive via API on update
class UserUpdate(UserBase):
password: Optional[str] = None
class UserInDBBase(UserBase):
id: Optional[int] = None
class Config:
from_attributes = True
# Additional properties to return via API
class User(UserInDBBase):
pass
# Additional properties stored in DB
class UserInDB(UserInDBBase):
hashed_password: str

49
main.py Normal file
View File

@ -0,0 +1,49 @@
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import JSONResponse
from app.api.v1.api import api_router
from app.core.config import settings
app = FastAPI(
title=settings.PROJECT_NAME,
openapi_url="/openapi.json",
version=settings.VERSION
)
# Set up CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include API router
app.include_router(api_router, prefix=settings.API_V1_STR)
@app.get("/", tags=["Root"])
async def root():
"""
Root endpoint that returns basic service information.
"""
return JSONResponse({
"name": settings.PROJECT_NAME,
"version": settings.VERSION,
"documentation": "/docs",
"health_check": "/health"
})
@app.get("/health", tags=["Health"])
async def health_check():
"""
Health check endpoint.
"""
return JSONResponse({
"status": "healthy"
})
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

1
migrations/__init__.py Normal file
View File

@ -0,0 +1 @@
# migrations package

92
migrations/env.py Normal file
View File

@ -0,0 +1,92 @@
import sys
from logging.config import fileConfig
from pathlib import Path
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# Add the app directory to the Python path so we can import our models
sys.path.append(str(Path(__file__).resolve().parents[1]))
# This is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Import all models for Alembic to detect
from app.db.base import Base # noqa: E402
from app.models.user import User # noqa: E402, F401
from app.models.product import Product # noqa: E402, F401
from app.models.category import Category # noqa: E402, F401
from app.models.supplier import Supplier # noqa: E402, F401
from app.models.inventory import Inventory, InventoryTransaction # noqa: E402, F401
# Add your model's MetaData object here
# for 'autogenerate' support
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
is_sqlite = connection.dialect.name == "sqlite"
context.configure(
connection=connection,
target_metadata=target_metadata,
render_as_batch=is_sqlite # Use batch mode for SQLite
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

26
migrations/script.py.mako Normal file
View File

@ -0,0 +1,26 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

View File

@ -0,0 +1,156 @@
"""Initial setup
Revision ID: 01_initial_setup
Revises:
Create Date: 2023-11-22
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '01_initial_setup'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Create user table
op.create_table(
'user',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email', sa.String(), nullable=False),
sa.Column('full_name', sa.String(), nullable=True),
sa.Column('hashed_password', sa.String(), nullable=False),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.Column('is_superuser', sa.Boolean(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_user_email'), 'user', ['email'], unique=True)
op.create_index(op.f('ix_user_full_name'), 'user', ['full_name'], unique=False)
op.create_index(op.f('ix_user_id'), 'user', ['id'], unique=False)
# Create category table
op.create_table(
'category',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_category_id'), 'category', ['id'], unique=False)
op.create_index(op.f('ix_category_name'), 'category', ['name'], unique=True)
# Create supplier table
op.create_table(
'supplier',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('contact_name', sa.String(length=255), nullable=True),
sa.Column('email', sa.String(length=255), nullable=True),
sa.Column('phone', sa.String(length=50), nullable=True),
sa.Column('address', sa.Text(), nullable=True),
sa.Column('website', sa.String(length=255), nullable=True),
sa.Column('notes', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_supplier_id'), 'supplier', ['id'], unique=False)
op.create_index(op.f('ix_supplier_name'), 'supplier', ['name'], unique=False)
# Create product table
op.create_table(
'product',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('sku', sa.String(length=50), nullable=False),
sa.Column('barcode', sa.String(length=100), nullable=True),
sa.Column('price', sa.Float(), nullable=False, default=0.0),
sa.Column('cost', sa.Float(), nullable=False, default=0.0),
sa.Column('min_stock_level', sa.Integer(), nullable=False, default=0),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.Column('category_id', sa.Integer(), nullable=True),
sa.Column('supplier_id', sa.Integer(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.ForeignKeyConstraint(['category_id'], ['category.id'], ),
sa.ForeignKeyConstraint(['supplier_id'], ['supplier.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_product_barcode'), 'product', ['barcode'], unique=True)
op.create_index(op.f('ix_product_id'), 'product', ['id'], unique=False)
op.create_index(op.f('ix_product_name'), 'product', ['name'], unique=False)
op.create_index(op.f('ix_product_sku'), 'product', ['sku'], unique=True)
# Create inventory table
op.create_table(
'inventory',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('product_id', sa.Integer(), nullable=False),
sa.Column('quantity', sa.Integer(), nullable=False, default=0),
sa.Column('location', sa.String(length=255), nullable=True),
sa.Column('last_counted_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.ForeignKeyConstraint(['product_id'], ['product.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_inventory_id'), 'inventory', ['id'], unique=False)
# Create inventory_transaction table
op.create_table(
'inventory_transaction',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('product_id', sa.Integer(), nullable=False),
sa.Column('quantity', sa.Integer(), nullable=False),
sa.Column('transaction_type', sa.String(length=50), nullable=False),
sa.Column('reference', sa.String(length=255), nullable=True),
sa.Column('unit_price', sa.Float(), nullable=True),
sa.Column('notes', sa.Text(), nullable=True),
sa.Column('transaction_date', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('user_id', sa.Integer(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.ForeignKeyConstraint(['product_id'], ['product.id'], ),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_inventory_transaction_id'), 'inventory_transaction', ['id'], unique=False)
def downgrade() -> None:
# Drop tables in reverse order
op.drop_index(op.f('ix_inventory_transaction_id'), table_name='inventory_transaction')
op.drop_table('inventory_transaction')
op.drop_index(op.f('ix_inventory_id'), table_name='inventory')
op.drop_table('inventory')
op.drop_index(op.f('ix_product_sku'), table_name='product')
op.drop_index(op.f('ix_product_name'), table_name='product')
op.drop_index(op.f('ix_product_id'), table_name='product')
op.drop_index(op.f('ix_product_barcode'), table_name='product')
op.drop_table('product')
op.drop_index(op.f('ix_supplier_name'), table_name='supplier')
op.drop_index(op.f('ix_supplier_id'), table_name='supplier')
op.drop_table('supplier')
op.drop_index(op.f('ix_category_name'), table_name='category')
op.drop_index(op.f('ix_category_id'), table_name='category')
op.drop_table('category')
op.drop_index(op.f('ix_user_id'), table_name='user')
op.drop_index(op.f('ix_user_full_name'), table_name='user')
op.drop_index(op.f('ix_user_email'), table_name='user')
op.drop_table('user')

11
requirements.txt Normal file
View File

@ -0,0 +1,11 @@
fastapi>=0.104.0
uvicorn>=0.23.2
sqlalchemy>=2.0.23
alembic>=1.12.1
pydantic>=2.4.2
pydantic-settings>=2.0.3
python-jose[cryptography]>=3.3.0
passlib[bcrypt]>=1.7.4
python-multipart>=0.0.6
ruff>=0.1.3
pytest>=7.4.3