Create Small Business Inventory Management System API

- Set up project structure with FastAPI and SQLite
- Create database models for inventory management
- Set up SQLAlchemy and Alembic for database migrations
- Create initial database migrations
- Implement CRUD operations for products, categories, suppliers
- Implement stock movement tracking and inventory management
- Add authentication and user management
- Add API endpoints for all entities
- Add health check endpoint
- Update README with project information and usage instructions
This commit is contained in:
Automated Action 2025-05-23 11:56:20 +00:00
parent f5b4151fe5
commit d340d37ac9
42 changed files with 1973 additions and 2 deletions

131
README.md
View File

@ -1,3 +1,130 @@
# FastAPI Application
# Small Business Inventory Management System
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform.
A RESTful API for small business inventory management built with FastAPI and SQLite.
## Features
- **User Authentication**: Secure login and role-based access control
- **Product Management**: Add, update, and remove products with detailed information
- **Category and Supplier Management**: Organize products by categories and suppliers
- **Inventory Tracking**: Monitor stock levels and receive alerts for low stock
- **Stock Movement Tracking**: Record purchases, sales, returns, and inventory adjustments
- **Search Functionality**: Search for products, categories, and suppliers
## Technical Stack
- **Backend**: FastAPI (Python)
- **Database**: SQLite
- **ORM**: SQLAlchemy
- **Migrations**: Alembic
- **Authentication**: JWT (JSON Web Tokens)
## API Endpoints
### Authentication
- `POST /api/v1/login/access-token` - Get access token
- `POST /api/v1/login/test-token` - Test token validity
### Users
- `GET /api/v1/users/` - List all users (admin only)
- `POST /api/v1/users/` - Create a new user (admin only)
- `GET /api/v1/users/me` - Get current user information
- `PUT /api/v1/users/me` - Update current user information
- `GET /api/v1/users/{user_id}` - Get specific user information
- `PUT /api/v1/users/{user_id}` - Update specific user (admin only)
### Products
- `GET /api/v1/products/` - List all products
- `POST /api/v1/products/` - Create a new product
- `GET /api/v1/products/{product_id}` - Get specific product
- `PUT /api/v1/products/{product_id}` - Update specific product
- `DELETE /api/v1/products/{product_id}` - Delete specific product (admin only)
- `GET /api/v1/products/by-category/{category_id}` - Get products by category
- `GET /api/v1/products/by-supplier/{supplier_id}` - Get products by supplier
- `GET /api/v1/products/search/` - Search products
- `GET /api/v1/products/low-stock/` - Get products with low stock
### Categories
- `GET /api/v1/categories/` - List all categories
- `POST /api/v1/categories/` - Create a new category
- `GET /api/v1/categories/{category_id}` - Get specific category
- `PUT /api/v1/categories/{category_id}` - Update specific category
- `DELETE /api/v1/categories/{category_id}` - Delete specific category (admin only)
- `GET /api/v1/categories/search/` - Search categories
### Suppliers
- `GET /api/v1/suppliers/` - List all suppliers
- `POST /api/v1/suppliers/` - Create a new supplier
- `GET /api/v1/suppliers/{supplier_id}` - Get specific supplier
- `PUT /api/v1/suppliers/{supplier_id}` - Update specific supplier
- `DELETE /api/v1/suppliers/{supplier_id}` - Delete specific supplier (admin only)
- `GET /api/v1/suppliers/search/` - Search suppliers
### Stock Movements
- `GET /api/v1/stock-movements/` - List all stock movements
- `POST /api/v1/stock-movements/` - Create a new stock movement
- `GET /api/v1/stock-movements/{stock_movement_id}` - Get specific stock movement
- `GET /api/v1/stock-movements/by-product/{product_id}` - Get stock movements by product
- `GET /api/v1/stock-movements/by-type/{movement_type}` - Get stock movements by type
### Health Check
- `GET /health` - Check API health status
## Installation and Setup
1. Clone the repository:
```bash
git clone <repository-url>
cd smallbusinessinventorymanagementsystem
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Run the application:
```bash
uvicorn main:app --reload
```
4. Access the API documentation:
- Swagger UI: http://localhost:8000/docs
- ReDoc: http://localhost:8000/redoc
## Database Migrations
Initialize and apply database migrations:
```bash
alembic upgrade head
```
## Creating a Superuser
To create an admin user, use the following API endpoint:
```
POST /api/v1/users/
```
With the payload:
```json
{
"email": "admin@example.com",
"username": "admin",
"password": "your-secure-password",
"is_superuser": true
}
```
## License
This project is licensed under the MIT License - see the LICENSE file for details.

106
alembic.ini Normal file
View File

@ -0,0 +1,106 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python-dateutil library that can be
# installed by adding `alembic[tz]` to the pip requirements
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:migrations/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# SQLite URL example
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

1
app/__init__.py Normal file
View File

@ -0,0 +1 @@
# app package initialization

1
app/api/__init__.py Normal file
View File

@ -0,0 +1 @@
# API package initialization

View File

@ -0,0 +1 @@
# API v1 package initialization

11
app/api/api_v1/api.py Normal file
View File

@ -0,0 +1,11 @@
from fastapi import APIRouter
from app.api.api_v1.endpoints import login, users, products, categories, suppliers, stock_movements
api_router = APIRouter()
api_router.include_router(login.router, tags=["login"])
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(products.router, prefix="/products", tags=["products"])
api_router.include_router(categories.router, prefix="/categories", tags=["categories"])
api_router.include_router(suppliers.router, prefix="/suppliers", tags=["suppliers"])
api_router.include_router(stock_movements.router, prefix="/stock-movements", tags=["stock-movements"])

View File

@ -0,0 +1,120 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.Category])
def read_categories(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve categories.
"""
categories = crud.category.get_multi(db, skip=skip, limit=limit)
return categories
@router.post("/", response_model=schemas.Category)
def create_category(
*,
db: Session = Depends(deps.get_db),
category_in: schemas.CategoryCreate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new category.
"""
category = crud.category.get_by_name(db, name=category_in.name)
if category:
raise HTTPException(
status_code=400,
detail="A category with this name already exists in the system.",
)
category = crud.category.create(db, obj_in=category_in)
return category
@router.get("/{category_id}", response_model=schemas.Category)
def read_category(
*,
db: Session = Depends(deps.get_db),
category_id: str,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get category by ID.
"""
category = crud.category.get(db, id=category_id)
if not category:
raise HTTPException(status_code=404, detail="Category not found")
return category
@router.put("/{category_id}", response_model=schemas.Category)
def update_category(
*,
db: Session = Depends(deps.get_db),
category_id: str,
category_in: schemas.CategoryUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update a category.
"""
category = crud.category.get(db, id=category_id)
if not category:
raise HTTPException(status_code=404, detail="Category not found")
# If name is being updated, check for duplicates
if category_in.name and category_in.name != category.name:
existing_category = crud.category.get_by_name(db, name=category_in.name)
if existing_category:
raise HTTPException(
status_code=400,
detail="A category with this name already exists in the system.",
)
category = crud.category.update(db, db_obj=category, obj_in=category_in)
return category
@router.delete("/{category_id}", response_model=schemas.Category)
def delete_category(
*,
db: Session = Depends(deps.get_db),
category_id: str,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Delete a category.
"""
category = crud.category.get(db, id=category_id)
if not category:
raise HTTPException(status_code=404, detail="Category not found")
category = crud.category.remove(db, id=category_id)
return category
@router.get("/search/", response_model=List[schemas.Category])
def search_categories(
*,
db: Session = Depends(deps.get_db),
query: str = Query(..., min_length=1),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Search categories by name or description.
"""
categories = crud.category.search(db, query=query, skip=skip, limit=limit)
return categories

View File

@ -0,0 +1,44 @@
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
from app.core import security
from app.core.config import settings
router = APIRouter()
@router.post("/login/access-token", response_model=schemas.Token)
def login_access_token(
db: Session = Depends(deps.get_db), form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests
"""
user = crud.user.authenticate(
db, username=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
elif not crud.user.is_active(user):
raise HTTPException(status_code=400, detail="Inactive user")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/login/test-token", response_model=schemas.User)
def test_token(current_user: models.User = Depends(deps.get_current_user)) -> Any:
"""
Test access token
"""
return current_user

View File

@ -0,0 +1,185 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.Product])
def read_products(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve products.
"""
products = crud.product.get_multi(db, skip=skip, limit=limit)
return products
@router.post("/", response_model=schemas.Product)
def create_product(
*,
db: Session = Depends(deps.get_db),
product_in: schemas.ProductCreate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new product.
"""
product = crud.product.get_by_sku(db, sku=product_in.sku)
if product:
raise HTTPException(
status_code=400,
detail="The product with this SKU already exists in the system.",
)
if product_in.barcode:
product = crud.product.get_by_barcode(db, barcode=product_in.barcode)
if product:
raise HTTPException(
status_code=400,
detail="The product with this barcode already exists in the system.",
)
product = crud.product.create(db, obj_in=product_in)
return product
@router.get("/{product_id}", response_model=schemas.Product)
def read_product(
*,
db: Session = Depends(deps.get_db),
product_id: str,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get product by ID.
"""
product = crud.product.get(db, id=product_id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
return product
@router.put("/{product_id}", response_model=schemas.Product)
def update_product(
*,
db: Session = Depends(deps.get_db),
product_id: str,
product_in: schemas.ProductUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update a product.
"""
product = crud.product.get(db, id=product_id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
# If SKU is being updated, check for duplicates
if product_in.sku and product_in.sku != product.sku:
existing_product = crud.product.get_by_sku(db, sku=product_in.sku)
if existing_product:
raise HTTPException(
status_code=400,
detail="The product with this SKU already exists in the system.",
)
# If barcode is being updated, check for duplicates
if product_in.barcode and product_in.barcode != product.barcode:
existing_product = crud.product.get_by_barcode(db, barcode=product_in.barcode)
if existing_product:
raise HTTPException(
status_code=400,
detail="The product with this barcode already exists in the system.",
)
product = crud.product.update(db, db_obj=product, obj_in=product_in)
return product
@router.delete("/{product_id}", response_model=schemas.Product)
def delete_product(
*,
db: Session = Depends(deps.get_db),
product_id: str,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Delete a product.
"""
product = crud.product.get(db, id=product_id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
product = crud.product.remove(db, id=product_id)
return product
@router.get("/by-category/{category_id}", response_model=List[schemas.Product])
def read_products_by_category(
*,
db: Session = Depends(deps.get_db),
category_id: str,
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get products by category.
"""
products = crud.product.get_by_category(db, category_id=category_id, skip=skip, limit=limit)
return products
@router.get("/by-supplier/{supplier_id}", response_model=List[schemas.Product])
def read_products_by_supplier(
*,
db: Session = Depends(deps.get_db),
supplier_id: str,
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get products by supplier.
"""
products = crud.product.get_by_supplier(db, supplier_id=supplier_id, skip=skip, limit=limit)
return products
@router.get("/search/", response_model=List[schemas.Product])
def search_products(
*,
db: Session = Depends(deps.get_db),
query: str = Query(..., min_length=1),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Search products by name, SKU, or barcode.
"""
products = crud.product.search(db, query=query, skip=skip, limit=limit)
return products
@router.get("/low-stock/", response_model=List[schemas.Product])
def get_low_stock_products(
*,
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get products with stock levels at or below reorder level.
"""
products = crud.product.get_low_stock(db, skip=skip, limit=limit)
return products

View File

@ -0,0 +1,106 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
from app.models.stock_movement import MovementType
router = APIRouter()
@router.get("/", response_model=List[schemas.StockMovement])
def read_stock_movements(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve stock movements.
"""
stock_movements = crud.stock_movement.get_multi(db, skip=skip, limit=limit)
return stock_movements
@router.post("/", response_model=schemas.StockMovement)
def create_stock_movement(
*,
db: Session = Depends(deps.get_db),
stock_movement_in: schemas.StockMovementCreate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new stock movement and update product stock levels.
"""
# Check if product exists
product = crud.product.get(db, id=stock_movement_in.product_id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
# For outgoing stock, make sure we have enough
if stock_movement_in.movement_type in [MovementType.SALE, MovementType.ADJUSTMENT] and stock_movement_in.quantity > 0:
if product.current_stock < stock_movement_in.quantity:
raise HTTPException(
status_code=400,
detail=f"Not enough stock available. Current stock: {product.current_stock}",
)
# Create stock movement and update product stock level
stock_movement = crud.stock_movement.create_with_product_update(
db, obj_in=stock_movement_in, created_by=current_user.id
)
return stock_movement
@router.get("/{stock_movement_id}", response_model=schemas.StockMovement)
def read_stock_movement(
*,
db: Session = Depends(deps.get_db),
stock_movement_id: str,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get stock movement by ID.
"""
stock_movement = crud.stock_movement.get(db, id=stock_movement_id)
if not stock_movement:
raise HTTPException(status_code=404, detail="Stock movement not found")
return stock_movement
@router.get("/by-product/{product_id}", response_model=List[schemas.StockMovement])
def read_stock_movements_by_product(
*,
db: Session = Depends(deps.get_db),
product_id: str,
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get stock movements by product.
"""
stock_movements = crud.stock_movement.get_by_product(
db, product_id=product_id, skip=skip, limit=limit
)
return stock_movements
@router.get("/by-type/{movement_type}", response_model=List[schemas.StockMovement])
def read_stock_movements_by_type(
*,
db: Session = Depends(deps.get_db),
movement_type: MovementType,
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get stock movements by movement type.
"""
stock_movements = crud.stock_movement.get_by_movement_type(
db, movement_type=movement_type, skip=skip, limit=limit
)
return stock_movements

View File

@ -0,0 +1,138 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.Supplier])
def read_suppliers(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve suppliers.
"""
suppliers = crud.supplier.get_multi(db, skip=skip, limit=limit)
return suppliers
@router.post("/", response_model=schemas.Supplier)
def create_supplier(
*,
db: Session = Depends(deps.get_db),
supplier_in: schemas.SupplierCreate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new supplier.
"""
supplier = crud.supplier.get_by_name(db, name=supplier_in.name)
if supplier:
raise HTTPException(
status_code=400,
detail="A supplier with this name already exists in the system.",
)
if supplier_in.email:
supplier = crud.supplier.get_by_email(db, email=supplier_in.email)
if supplier:
raise HTTPException(
status_code=400,
detail="A supplier with this email already exists in the system.",
)
supplier = crud.supplier.create(db, obj_in=supplier_in)
return supplier
@router.get("/{supplier_id}", response_model=schemas.Supplier)
def read_supplier(
*,
db: Session = Depends(deps.get_db),
supplier_id: str,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get supplier by ID.
"""
supplier = crud.supplier.get(db, id=supplier_id)
if not supplier:
raise HTTPException(status_code=404, detail="Supplier not found")
return supplier
@router.put("/{supplier_id}", response_model=schemas.Supplier)
def update_supplier(
*,
db: Session = Depends(deps.get_db),
supplier_id: str,
supplier_in: schemas.SupplierUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update a supplier.
"""
supplier = crud.supplier.get(db, id=supplier_id)
if not supplier:
raise HTTPException(status_code=404, detail="Supplier not found")
# If name is being updated, check for duplicates
if supplier_in.name and supplier_in.name != supplier.name:
existing_supplier = crud.supplier.get_by_name(db, name=supplier_in.name)
if existing_supplier:
raise HTTPException(
status_code=400,
detail="A supplier with this name already exists in the system.",
)
# If email is being updated, check for duplicates
if supplier_in.email and supplier_in.email != supplier.email:
existing_supplier = crud.supplier.get_by_email(db, email=supplier_in.email)
if existing_supplier:
raise HTTPException(
status_code=400,
detail="A supplier with this email already exists in the system.",
)
supplier = crud.supplier.update(db, db_obj=supplier, obj_in=supplier_in)
return supplier
@router.delete("/{supplier_id}", response_model=schemas.Supplier)
def delete_supplier(
*,
db: Session = Depends(deps.get_db),
supplier_id: str,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Delete a supplier.
"""
supplier = crud.supplier.get(db, id=supplier_id)
if not supplier:
raise HTTPException(status_code=404, detail="Supplier not found")
supplier = crud.supplier.remove(db, id=supplier_id)
return supplier
@router.get("/search/", response_model=List[schemas.Supplier])
def search_suppliers(
*,
db: Session = Depends(deps.get_db),
query: str = Query(..., min_length=1),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Search suppliers by name, contact name, or email.
"""
suppliers = crud.supplier.search(db, query=query, skip=skip, limit=limit)
return suppliers

View File

@ -0,0 +1,125 @@
from typing import Any, List
from fastapi import APIRouter, Body, Depends, HTTPException
from fastapi.encoders import jsonable_encoder
from pydantic import EmailStr
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.User])
def read_users(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Retrieve users.
"""
users = crud.user.get_multi(db, skip=skip, limit=limit)
return users
@router.post("/", response_model=schemas.User)
def create_user(
*,
db: Session = Depends(deps.get_db),
user_in: schemas.UserCreate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Create new user.
"""
user = crud.user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this email already exists in the system.",
)
user = crud.user.get_by_username(db, username=user_in.username)
if user:
raise HTTPException(
status_code=400,
detail="The user with this username already exists in the system.",
)
user = crud.user.create(db, obj_in=user_in)
return user
@router.put("/me", response_model=schemas.User)
def update_user_me(
*,
db: Session = Depends(deps.get_db),
password: str = Body(None),
full_name: str = Body(None),
email: EmailStr = Body(None),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update own user.
"""
current_user_data = jsonable_encoder(current_user)
user_in = schemas.UserUpdate(**current_user_data)
if password is not None:
user_in.password = password
if full_name is not None:
user_in.full_name = full_name
if email is not None:
user_in.email = email
user = crud.user.update(db, db_obj=current_user, obj_in=user_in)
return user
@router.get("/me", response_model=schemas.User)
def read_user_me(
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get current user.
"""
return current_user
@router.get("/{user_id}", response_model=schemas.User)
def read_user_by_id(
user_id: str,
current_user: models.User = Depends(deps.get_current_active_user),
db: Session = Depends(deps.get_db),
) -> Any:
"""
Get a specific user by id.
"""
user = crud.user.get(db, id=user_id)
if user == current_user:
return user
if not crud.user.is_superuser(current_user):
raise HTTPException(
status_code=400, detail="The user doesn't have enough privileges"
)
return user
@router.put("/{user_id}", response_model=schemas.User)
def update_user(
*,
db: Session = Depends(deps.get_db),
user_id: str,
user_in: schemas.UserUpdate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Update a user.
"""
user = crud.user.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this id does not exist in the system",
)
user = crud.user.update(db, db_obj=user, obj_in=user_in)
return user

49
app/api/deps.py Normal file
View File

@ -0,0 +1,49 @@
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.core.config import settings
from app.db.session import get_db
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/login/access-token")
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> models.User:
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
token_data = schemas.TokenPayload(**payload)
except (jwt.JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
user = crud.user.get(db, id=token_data.sub)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
def get_current_active_user(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not crud.user.is_active(current_user):
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def get_current_active_superuser(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not crud.user.is_superuser(current_user):
raise HTTPException(
status_code=400, detail="The user doesn't have enough privileges"
)
return current_user

24
app/core/config.py Normal file
View File

@ -0,0 +1,24 @@
from pydantic_settings import BaseSettings
from pathlib import Path
class Settings(BaseSettings):
API_V1_STR: str = "/api/v1"
PROJECT_NAME: str = "Small Business Inventory Management System"
# Security
SECRET_KEY: str = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# Database
DB_DIR: Path = Path("/app") / "storage" / "db"
class Config:
case_sensitive = True
settings = Settings()
# Create database directory if it doesn't exist
settings.DB_DIR.mkdir(parents=True, exist_ok=True)

31
app/core/security.py Normal file
View File

@ -0,0 +1,31 @@
from datetime import datetime, timedelta
from typing import Any, Union
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(
subject: Union[str, Any], expires_delta: timedelta = None
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)

8
app/crud/__init__.py Normal file
View File

@ -0,0 +1,8 @@
# Re-export CRUD instances for easy access
from app.crud.crud_user import user as user
from app.crud.crud_product import product as product
from app.crud.crud_category import category as category
from app.crud.crud_supplier import supplier as supplier
from app.crud.crud_stock_movement import stock_movement as stock_movement
__all__ = ["user", "product", "category", "supplier", "stock_movement"]

68
app/crud/base.py Normal file
View File

@ -0,0 +1,68 @@
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.orm import Session
import uuid
from app.db.base_class import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
"""
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
**Parameters**
* `model`: A SQLAlchemy model class
* `schema`: A Pydantic model (schema) class
"""
self.model = model
def get(self, db: Session, id: Any) -> Optional[ModelType]:
return db.query(self.model).filter(self.model.id == id).first()
def get_multi(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
return db.query(self.model).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
obj_in_data = jsonable_encoder(obj_in)
# Generate a UUID string for new object
obj_in_data["id"] = str(uuid.uuid4())
db_obj = self.model(**obj_in_data)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self,
db: Session,
*,
db_obj: ModelType,
obj_in: Union[UpdateSchemaType, Dict[str, Any]]
) -> ModelType:
obj_data = jsonable_encoder(db_obj)
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def remove(self, db: Session, *, id: str) -> ModelType:
obj = db.query(self.model).get(id)
db.delete(obj)
db.commit()
return obj

20
app/crud/crud_category.py Normal file
View File

@ -0,0 +1,20 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.category import Category
from app.schemas.category import CategoryCreate, CategoryUpdate
class CRUDCategory(CRUDBase[Category, CategoryCreate, CategoryUpdate]):
def get_by_name(self, db: Session, *, name: str) -> Optional[Category]:
return db.query(Category).filter(Category.name == name).first()
def search(self, db: Session, *, query: str, skip: int = 0, limit: int = 100) -> List[Category]:
return db.query(Category).filter(
Category.name.ilike(f"%{query}%") |
Category.description.ilike(f"%{query}%")
).offset(skip).limit(limit).all()
category = CRUDCategory(Category)

42
app/crud/crud_product.py Normal file
View File

@ -0,0 +1,42 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.product import Product
from app.schemas.product import ProductCreate, ProductUpdate
class CRUDProduct(CRUDBase[Product, ProductCreate, ProductUpdate]):
def get_by_sku(self, db: Session, *, sku: str) -> Optional[Product]:
return db.query(Product).filter(Product.sku == sku).first()
def get_by_barcode(self, db: Session, *, barcode: str) -> Optional[Product]:
return db.query(Product).filter(Product.barcode == barcode).first()
def get_by_category(self, db: Session, *, category_id: str, skip: int = 0, limit: int = 100) -> List[Product]:
return db.query(Product).filter(Product.category_id == category_id).offset(skip).limit(limit).all()
def get_by_supplier(self, db: Session, *, supplier_id: str, skip: int = 0, limit: int = 100) -> List[Product]:
return db.query(Product).filter(Product.supplier_id == supplier_id).offset(skip).limit(limit).all()
def get_low_stock(self, db: Session, *, skip: int = 0, limit: int = 100) -> List[Product]:
return db.query(Product).filter(Product.current_stock <= Product.reorder_level).offset(skip).limit(limit).all()
def search(self, db: Session, *, query: str, skip: int = 0, limit: int = 100) -> List[Product]:
return db.query(Product).filter(
Product.name.ilike(f"%{query}%") |
Product.sku.ilike(f"%{query}%") |
Product.barcode.ilike(f"%{query}%")
).offset(skip).limit(limit).all()
def update_stock(self, db: Session, *, product_id: str, quantity: int) -> Product:
product = self.get(db, id=product_id)
if product:
product.current_stock += quantity
db.add(product)
db.commit()
db.refresh(product)
return product
product = CRUDProduct(Product)

View File

@ -0,0 +1,54 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.stock_movement import StockMovement, MovementType
from app.schemas.stock_movement import StockMovementCreate
from app.crud.crud_product import product as product_crud
class CRUDStockMovement(CRUDBase[StockMovement, StockMovementCreate, StockMovementCreate]):
def create_with_product_update(
self, db: Session, *, obj_in: StockMovementCreate, created_by: Optional[str] = None
) -> StockMovement:
# Create the stock movement
obj_in_data = obj_in.dict()
obj_in_data["created_by"] = created_by
db_obj = super().create(db, obj_in=obj_in)
# Update product stock level
quantity = obj_in.quantity
if obj_in.movement_type in [MovementType.SALE, MovementType.ADJUSTMENT]:
if obj_in.quantity > 0:
quantity = -quantity # Negate for outgoing stock
product_crud.update_stock(db, product_id=obj_in.product_id, quantity=quantity)
return db_obj
def get_by_product(
self, db: Session, *, product_id: str, skip: int = 0, limit: int = 100
) -> List[StockMovement]:
return (
db.query(StockMovement)
.filter(StockMovement.product_id == product_id)
.order_by(StockMovement.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
def get_by_movement_type(
self, db: Session, *, movement_type: MovementType, skip: int = 0, limit: int = 100
) -> List[StockMovement]:
return (
db.query(StockMovement)
.filter(StockMovement.movement_type == movement_type)
.order_by(StockMovement.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
stock_movement = CRUDStockMovement(StockMovement)

24
app/crud/crud_supplier.py Normal file
View File

@ -0,0 +1,24 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.supplier import Supplier
from app.schemas.supplier import SupplierCreate, SupplierUpdate
class CRUDSupplier(CRUDBase[Supplier, SupplierCreate, SupplierUpdate]):
def get_by_name(self, db: Session, *, name: str) -> Optional[Supplier]:
return db.query(Supplier).filter(Supplier.name == name).first()
def get_by_email(self, db: Session, *, email: str) -> Optional[Supplier]:
return db.query(Supplier).filter(Supplier.email == email).first()
def search(self, db: Session, *, query: str, skip: int = 0, limit: int = 100) -> List[Supplier]:
return db.query(Supplier).filter(
Supplier.name.ilike(f"%{query}%") |
Supplier.contact_name.ilike(f"%{query}%") |
Supplier.email.ilike(f"%{query}%")
).offset(skip).limit(limit).all()
supplier = CRUDSupplier(Supplier)

59
app/crud/crud_user.py Normal file
View File

@ -0,0 +1,59 @@
from typing import Any, Dict, Optional, Union
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.crud.base import CRUDBase
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()
def get_by_username(self, db: Session, *, username: str) -> Optional[User]:
return db.query(User).filter(User.username == username).first()
def create(self, db: Session, *, obj_in: UserCreate) -> User:
db_obj = User(
email=obj_in.email,
username=obj_in.username,
hashed_password=get_password_hash(obj_in.password),
full_name=obj_in.full_name,
is_superuser=obj_in.is_superuser,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self, db: Session, *, db_obj: User, obj_in: Union[UserUpdate, Dict[str, Any]]
) -> User:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
if update_data.get("password"):
hashed_password = get_password_hash(update_data["password"])
del update_data["password"]
update_data["hashed_password"] = hashed_password
return super().update(db, db_obj=db_obj, obj_in=update_data)
def authenticate(self, db: Session, *, username: str, password: str) -> Optional[User]:
user = self.get_by_username(db, username=username)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active(self, user: User) -> bool:
return user.is_active
def is_superuser(self, user: User) -> bool:
return user.is_superuser
user = CRUDUser(User)

8
app/db/base.py Normal file
View File

@ -0,0 +1,8 @@
# Import all the models, so that Base has them before being
# imported by Alembic
from app.db.base_class import Base # noqa
from app.models.user import User # noqa
from app.models.product import Product # noqa
from app.models.category import Category # noqa
from app.models.supplier import Supplier # noqa
from app.models.stock_movement import StockMovement # noqa

13
app/db/base_class.py Normal file
View File

@ -0,0 +1,13 @@
from typing import Any
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
id: Any
__name__: str
# Generate __tablename__ automatically
@declared_attr
def __tablename__(cls) -> str:
return cls.__name__.lower()

24
app/db/session.py Normal file
View File

@ -0,0 +1,24 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
# Ensure the database directory exists
DB_DIR = settings.DB_DIR
DB_DIR.mkdir(parents=True, exist_ok=True)
SQLALCHEMY_DATABASE_URL = f"sqlite:///{DB_DIR}/db.sqlite"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

12
app/models/category.py Normal file
View File

@ -0,0 +1,12 @@
from sqlalchemy import Column, String, DateTime, Text
from sqlalchemy.sql import func
from app.db.base_class import Base
class Category(Base):
id = Column(String, primary_key=True, index=True)
name = Column(String, index=True, nullable=False)
description = Column(Text, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())

30
app/models/product.py Normal file
View File

@ -0,0 +1,30 @@
from sqlalchemy import Column, String, Integer, Float, DateTime, Text, ForeignKey, Boolean
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.db.base_class import Base
class Product(Base):
id = Column(String, primary_key=True, index=True)
name = Column(String, index=True, nullable=False)
description = Column(Text, nullable=True)
sku = Column(String, unique=True, index=True, nullable=False)
barcode = Column(String, unique=True, nullable=True)
category_id = Column(String, ForeignKey("category.id"), nullable=True)
category = relationship("Category", backref="products")
supplier_id = Column(String, ForeignKey("supplier.id"), nullable=True)
supplier = relationship("Supplier", backref="products")
cost_price = Column(Float, nullable=False, default=0.0)
selling_price = Column(Float, nullable=False, default=0.0)
current_stock = Column(Integer, nullable=False, default=0)
reorder_level = Column(Integer, nullable=False, default=0)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())

View File

@ -0,0 +1,35 @@
from sqlalchemy import Column, String, Integer, Float, DateTime, Text, ForeignKey, Enum
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
import enum
from app.db.base_class import Base
class MovementType(str, enum.Enum):
PURCHASE = "purchase"
SALE = "sale"
ADJUSTMENT = "adjustment"
RETURN = "return"
INITIAL = "initial"
class StockMovement(Base):
id = Column(String, primary_key=True, index=True)
product_id = Column(String, ForeignKey("product.id"), nullable=False)
product = relationship("Product", backref="stock_movements")
quantity = Column(Integer, nullable=False)
unit_price = Column(Float, nullable=True)
movement_type = Column(Enum(MovementType), nullable=False)
reference = Column(String, nullable=True) # Reference to purchase order, invoice, etc.
notes = Column(Text, nullable=True)
created_by = Column(String, ForeignKey("user.id"), nullable=True)
user = relationship("User", backref="stock_movements")
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())

16
app/models/supplier.py Normal file
View File

@ -0,0 +1,16 @@
from sqlalchemy import Column, String, DateTime, Text
from sqlalchemy.sql import func
from app.db.base_class import Base
class Supplier(Base):
id = Column(String, primary_key=True, index=True)
name = Column(String, index=True, nullable=False)
contact_name = Column(String, nullable=True)
email = Column(String, nullable=True)
phone = Column(String, nullable=True)
address = Column(Text, nullable=True)
notes = Column(Text, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())

16
app/models/user.py Normal file
View File

@ -0,0 +1,16 @@
from sqlalchemy import Boolean, Column, String, DateTime
from sqlalchemy.sql import func
from app.db.base_class import Base
class User(Base):
id = Column(String, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
username = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
full_name = Column(String, nullable=True)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())

14
app/schemas/__init__.py Normal file
View File

@ -0,0 +1,14 @@
# Re-export schemas for easy access
from app.schemas.user import User as User, UserCreate as UserCreate, UserUpdate as UserUpdate, Token as Token, TokenPayload as TokenPayload
from app.schemas.product import Product as Product, ProductCreate as ProductCreate, ProductUpdate as ProductUpdate
from app.schemas.category import Category as Category, CategoryCreate as CategoryCreate, CategoryUpdate as CategoryUpdate
from app.schemas.supplier import Supplier as Supplier, SupplierCreate as SupplierCreate, SupplierUpdate as SupplierUpdate
from app.schemas.stock_movement import StockMovement as StockMovement, StockMovementCreate as StockMovementCreate
__all__ = [
"User", "UserCreate", "UserUpdate", "Token", "TokenPayload",
"Product", "ProductCreate", "ProductUpdate",
"Category", "CategoryCreate", "CategoryUpdate",
"Supplier", "SupplierCreate", "SupplierUpdate",
"StockMovement", "StockMovementCreate"
]

33
app/schemas/category.py Normal file
View File

@ -0,0 +1,33 @@
from pydantic import BaseModel
from typing import Optional
from datetime import datetime
# Shared properties
class CategoryBase(BaseModel):
name: str
description: Optional[str] = None
# Properties to receive via API on creation
class CategoryCreate(CategoryBase):
pass
# Properties to receive via API on update
class CategoryUpdate(CategoryBase):
name: Optional[str] = None
class CategoryInDBBase(CategoryBase):
id: str
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# Additional properties to return via API
class Category(CategoryInDBBase):
pass

45
app/schemas/product.py Normal file
View File

@ -0,0 +1,45 @@
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
# Shared properties
class ProductBase(BaseModel):
name: str
description: Optional[str] = None
sku: str
barcode: Optional[str] = None
category_id: Optional[str] = None
supplier_id: Optional[str] = None
cost_price: float = Field(ge=0)
selling_price: float = Field(ge=0)
reorder_level: int = Field(ge=0, default=0)
is_active: bool = True
# Properties to receive via API on creation
class ProductCreate(ProductBase):
pass
# Properties to receive via API on update
class ProductUpdate(ProductBase):
name: Optional[str] = None
sku: Optional[str] = None
cost_price: Optional[float] = None
selling_price: Optional[float] = None
class ProductInDBBase(ProductBase):
id: str
current_stock: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# Additional properties to return via API
class Product(ProductInDBBase):
pass

View File

@ -0,0 +1,34 @@
from pydantic import BaseModel
from typing import Optional
from datetime import datetime
from app.models.stock_movement import MovementType
# Shared properties
class StockMovementBase(BaseModel):
product_id: str
quantity: int
unit_price: Optional[float] = None
movement_type: MovementType
reference: Optional[str] = None
notes: Optional[str] = None
# Properties to receive via API on creation
class StockMovementCreate(StockMovementBase):
pass
class StockMovementInDBBase(StockMovementBase):
id: str
created_by: Optional[str] = None
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# Additional properties to return via API
class StockMovement(StockMovementInDBBase):
pass

37
app/schemas/supplier.py Normal file
View File

@ -0,0 +1,37 @@
from pydantic import BaseModel, EmailStr
from typing import Optional
from datetime import datetime
# Shared properties
class SupplierBase(BaseModel):
name: str
contact_name: Optional[str] = None
email: Optional[EmailStr] = None
phone: Optional[str] = None
address: Optional[str] = None
notes: Optional[str] = None
# Properties to receive via API on creation
class SupplierCreate(SupplierBase):
pass
# Properties to receive via API on update
class SupplierUpdate(SupplierBase):
name: Optional[str] = None
class SupplierInDBBase(SupplierBase):
id: str
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# Additional properties to return via API
class Supplier(SupplierInDBBase):
pass

53
app/schemas/user.py Normal file
View File

@ -0,0 +1,53 @@
from pydantic import BaseModel, EmailStr
from typing import Optional
from datetime import datetime
# Shared properties
class UserBase(BaseModel):
email: Optional[EmailStr] = None
username: Optional[str] = None
is_active: Optional[bool] = True
is_superuser: bool = False
full_name: Optional[str] = None
# Properties to receive via API on creation
class UserCreate(UserBase):
email: EmailStr
username: str
password: str
# Properties to receive via API on update
class UserUpdate(UserBase):
password: Optional[str] = None
class UserInDBBase(UserBase):
id: str
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# Additional properties to return via API
class User(UserInDBBase):
pass
# Additional properties stored in DB
class UserInDB(UserInDBBase):
hashed_password: str
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: str = None
exp: int = None

24
main.py Normal file
View File

@ -0,0 +1,24 @@
import uvicorn
from fastapi import FastAPI
from app.api.api_v1.api import api_router
from app.core.config import settings
app = FastAPI(
title=settings.PROJECT_NAME,
openapi_url=f"{settings.API_V1_STR}/openapi.json",
description="Small Business Inventory Management System API",
version="0.1.0"
)
app.include_router(api_router, prefix=settings.API_V1_STR)
@app.get("/health", tags=["health"])
async def health_check():
"""
Health check endpoint to verify the application is running
"""
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

1
migrations/README Normal file
View File

@ -0,0 +1 @@
Generic single-database configuration for Alembic.

80
migrations/env.py Normal file
View File

@ -0,0 +1,80 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from app.db.base import Base
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
is_sqlite = connection.dialect.name == 'sqlite'
context.configure(
connection=connection,
target_metadata=target_metadata,
render_as_batch=is_sqlite, # Key configuration for SQLite
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
migrations/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,117 @@
"""Initial schema
Revision ID: 2023_11_24_initial
Revises:
Create Date: 2023-11-24
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import func
import enum
# revision identifiers, used by Alembic.
revision = '2023_11_24_initial'
down_revision = None
branch_labels = None
depends_on = None
class MovementType(str, enum.Enum):
PURCHASE = "purchase"
SALE = "sale"
ADJUSTMENT = "adjustment"
RETURN = "return"
INITIAL = "initial"
def upgrade() -> None:
# Create user table
op.create_table(
'user',
sa.Column('id', sa.String(), primary_key=True, index=True),
sa.Column('email', sa.String(), unique=True, index=True, nullable=False),
sa.Column('username', sa.String(), unique=True, index=True, nullable=False),
sa.Column('hashed_password', sa.String(), nullable=False),
sa.Column('full_name', sa.String(), nullable=True),
sa.Column('is_active', sa.Boolean(), default=True),
sa.Column('is_superuser', sa.Boolean(), default=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=func.now()),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
)
# Create category table
op.create_table(
'category',
sa.Column('id', sa.String(), primary_key=True, index=True),
sa.Column('name', sa.String(), index=True, nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=func.now()),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
)
# Create supplier table
op.create_table(
'supplier',
sa.Column('id', sa.String(), primary_key=True, index=True),
sa.Column('name', sa.String(), index=True, nullable=False),
sa.Column('contact_name', sa.String(), nullable=True),
sa.Column('email', sa.String(), nullable=True),
sa.Column('phone', sa.String(), nullable=True),
sa.Column('address', sa.Text(), nullable=True),
sa.Column('notes', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=func.now()),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
)
# Create product table
op.create_table(
'product',
sa.Column('id', sa.String(), primary_key=True, index=True),
sa.Column('name', sa.String(), index=True, nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('sku', sa.String(), unique=True, index=True, nullable=False),
sa.Column('barcode', sa.String(), unique=True, nullable=True),
sa.Column('category_id', sa.String(), sa.ForeignKey("category.id"), nullable=True),
sa.Column('supplier_id', sa.String(), sa.ForeignKey("supplier.id"), nullable=True),
sa.Column('cost_price', sa.Float(), nullable=False, default=0.0),
sa.Column('selling_price', sa.Float(), nullable=False, default=0.0),
sa.Column('current_stock', sa.Integer(), nullable=False, default=0),
sa.Column('reorder_level', sa.Integer(), nullable=False, default=0),
sa.Column('is_active', sa.Boolean(), default=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=func.now()),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
)
# Create stock_movement table
op.create_table(
'stockmovement',
sa.Column('id', sa.String(), primary_key=True, index=True),
sa.Column('product_id', sa.String(), sa.ForeignKey("product.id"), nullable=False),
sa.Column('quantity', sa.Integer(), nullable=False),
sa.Column('unit_price', sa.Float(), nullable=True),
sa.Column('movement_type', sa.Enum(MovementType), nullable=False),
sa.Column('reference', sa.String(), nullable=True),
sa.Column('notes', sa.Text(), nullable=True),
sa.Column('created_by', sa.String(), sa.ForeignKey("user.id"), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=func.now()),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
)
# Create indexes
op.create_index(op.f('ix_product_name'), 'product', ['name'], unique=False)
op.create_index(op.f('ix_product_sku'), 'product', ['sku'], unique=True)
op.create_index(op.f('ix_category_name'), 'category', ['name'], unique=False)
op.create_index(op.f('ix_supplier_name'), 'supplier', ['name'], unique=False)
op.create_index(op.f('ix_user_email'), 'user', ['email'], unique=True)
op.create_index(op.f('ix_user_username'), 'user', ['username'], unique=True)
def downgrade() -> None:
# Drop tables in reverse order of creation
op.drop_table('stockmovement')
op.drop_table('product')
op.drop_table('supplier')
op.drop_table('category')
op.drop_table('user')

11
requirements.txt Normal file
View File

@ -0,0 +1,11 @@
fastapi==0.104.1
uvicorn==0.23.2
sqlalchemy==2.0.23
alembic==1.12.1
pydantic==2.4.2
pydantic-settings==2.0.3
python-jose==3.3.0
passlib==1.7.4
python-multipart==0.0.6
bcrypt==4.0.1
ruff==0.1.5