Create simple ecommerce store API with FastAPI and SQLite

This commit is contained in:
Automated Action 2025-05-16 04:57:26 +00:00
parent 9ce6ee31a4
commit fa9451f3d4
43 changed files with 1730 additions and 2 deletions

113
README.md
View File

@ -1,3 +1,112 @@
# FastAPI Application # SimpleEcommerceStore API
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform. A simple ecommerce store API built with FastAPI and SQLite.
## Features
- User authentication and registration
- Product management (CRUD operations)
- Shopping cart functionality
- Order processing
- Admin operations for product and order management
## Tech Stack
- **Framework**: FastAPI
- **Database**: SQLite with SQLAlchemy ORM
- **Authentication**: JWT tokens
- **Migrations**: Alembic
- **Validation**: Pydantic
- **Linting**: Ruff
## Project Structure
```
├── app/ # Application package
│ ├── api/ # API endpoints
│ │ ├── v1/ # API version 1
│ │ │ ├── endpoints/ # API endpoint modules
│ │ │ └── api.py # API router
│ ├── core/ # Core modules
│ │ ├── config.py # Configuration settings
│ │ └── security.py # Security utilities
│ ├── db/ # Database
│ │ ├── deps.py # Dependency functions
│ │ └── session.py # Database session
│ ├── models/ # SQLAlchemy models
│ ├── schemas/ # Pydantic schemas
│ ├── services/ # Business logic
│ └── main.py # FastAPI app
├── migrations/ # Alembic migrations
├── alembic.ini # Alembic configuration
├── main.py # Application entry point
├── pyproject.toml # Project configuration
└── requirements.txt # Dependencies
```
## API Endpoints
### Health Check
- `GET /health`: Application health status
### Authentication
- `POST /api/v1/auth/login`: Login and get access token
- `POST /api/v1/auth/register`: Register a new user
### Users
- `GET /api/v1/users/me`: Get current user information
- `PATCH /api/v1/users/me`: Update current user
- `GET /api/v1/users/{user_id}`: Get user by ID (admin only)
- `GET /api/v1/users/`: List all users (admin only)
### Products
- `GET /api/v1/products/`: List all products
- `GET /api/v1/products/{id}`: Get product by ID
- `POST /api/v1/products/`: Create a new product (admin only)
- `PUT /api/v1/products/{id}`: Update a product (admin only)
- `DELETE /api/v1/products/{id}`: Delete a product (admin only)
### Cart
- `GET /api/v1/cart/`: Get user's cart summary
- `POST /api/v1/cart/items`: Add item to cart
- `PUT /api/v1/cart/items/{item_id}`: Update cart item quantity
- `DELETE /api/v1/cart/items/{item_id}`: Remove item from cart
- `DELETE /api/v1/cart/`: Clear cart
### Orders
- `GET /api/v1/orders/`: List user's orders
- `POST /api/v1/orders/`: Create order from cart
- `GET /api/v1/orders/{id}`: Get order by ID
- `PUT /api/v1/orders/{id}/status`: Update order status (admin only)
## Setup and Running
1. Install dependencies:
```bash
pip install -r requirements.txt
```
2. Run migrations:
```bash
alembic upgrade head
```
3. Run the development server:
```bash
python main.py
```
The API will be available at `http://localhost:8000`.
API documentation is available at:
- Swagger UI: `http://localhost:8000/docs`
- ReDoc: `http://localhost:8000/redoc`
## Database
The application uses SQLite as its database, stored at `/app/storage/db/db.sqlite`. SQLAlchemy is used as the ORM layer to interact with the database.
## Authentication
The API uses JWT tokens for authentication. To access protected endpoints:
1. Login with valid credentials at `/api/v1/auth/login`
2. Use the returned access token in the Authorization header: `Bearer {token}`

85
alembic.ini Normal file
View File

@ -0,0 +1,85 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat migrations/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# SQLite URL example
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks=black
# black.type=console_scripts
# black.entrypoint=black
# black.options=-l 79
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

1
app/__init__.py Normal file
View File

@ -0,0 +1 @@
# This file is intentionally left empty to make the directory a Python package.

1
app/api/__init__.py Normal file
View File

@ -0,0 +1 @@
# This file is intentionally left empty to make the directory a Python package.

0
app/api/deps.py Normal file
View File

1
app/api/v1/__init__.py Normal file
View File

@ -0,0 +1 @@
# This file is intentionally left empty to make the directory a Python package.

11
app/api/v1/api.py Normal file
View File

@ -0,0 +1,11 @@
from fastapi import APIRouter
from app.api.v1.endpoints import auth, cart, health, orders, products, users
api_router = APIRouter()
api_router.include_router(health.router, prefix="/health", tags=["health"])
api_router.include_router(auth.router, prefix="/auth", tags=["auth"])
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(products.router, prefix="/products", tags=["products"])
api_router.include_router(cart.router, prefix="/cart", tags=["cart"])
api_router.include_router(orders.router, prefix="/orders", tags=["orders"])

View File

@ -0,0 +1 @@
# This file is intentionally left empty to make the directory a Python package.

View File

@ -0,0 +1,66 @@
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app import schemas
from app.api.deps import get_db
from app.core import security
from app.core.config import settings
from app.services import user_service
router = APIRouter()
@router.post("/login", response_model=schemas.Token)
def login_access_token(
db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests
"""
user = user_service.authenticate(
db, email=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
)
elif not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Inactive user",
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/register", response_model=schemas.User)
def register_user(
*,
db: Session = Depends(get_db),
user_in: schemas.UserCreate,
) -> Any:
"""
Create new user
"""
user = user_service.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this email already exists in the system",
)
username_exists = user_service.get_by_username(db, username=user_in.username)
if username_exists:
raise HTTPException(
status_code=400,
detail="The username is already taken",
)
user = user_service.create(db, obj_in=user_in)
return user

View File

@ -0,0 +1,132 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app import schemas
from app.api.deps import get_current_active_user, get_db
from app.models.user import User
from app.services import cart_service, product_service
router = APIRouter()
@router.get("/", response_model=schemas.CartSummary)
def read_cart(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Retrieve user's cart.
"""
cart_items = cart_service.get_cart_items(db, user_id=current_user.id)
total = sum(item.unit_price * item.quantity for item in cart_items)
return {"items": cart_items, "total": total}
@router.post("/items", response_model=schemas.CartItem)
def add_cart_item(
*,
db: Session = Depends(get_db),
item_in: schemas.CartItemCreate,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Add item to cart.
"""
# Check if product exists and is in stock
product = product_service.get(db, id=item_in.product_id)
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Product not found",
)
if not product.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Product is not available",
)
if product.stock < item_in.quantity:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Not enough stock. Available: {product.stock}",
)
# Check if item already in cart
existing_item = cart_service.get_cart_item(
db, user_id=current_user.id, product_id=item_in.product_id
)
if existing_item:
# Update quantity if already in cart
return cart_service.update_item_quantity(
db,
db_obj=existing_item,
quantity=existing_item.quantity + item_in.quantity
)
# Add new item to cart
return cart_service.add_item_to_cart(
db,
obj_in=item_in,
user_id=current_user.id,
unit_price=product.price
)
@router.put("/items/{item_id}", response_model=schemas.CartItem)
def update_cart_item(
*,
db: Session = Depends(get_db),
item_id: int,
item_in: schemas.CartItemUpdate,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Update cart item quantity.
"""
cart_item = cart_service.get(db, id=item_id)
if not cart_item or cart_item.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Cart item not found",
)
# Check if product has enough stock
product = product_service.get(db, id=cart_item.product_id)
if product.stock < item_in.quantity:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Not enough stock. Available: {product.stock}",
)
cart_item = cart_service.update_item_quantity(
db, db_obj=cart_item, quantity=item_in.quantity
)
return cart_item
@router.delete("/items/{item_id}", response_model=schemas.CartItem)
def remove_cart_item(
*,
db: Session = Depends(get_db),
item_id: int,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Remove item from cart.
"""
cart_item = cart_service.get(db, id=item_id)
if not cart_item or cart_item.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Cart item not found",
)
cart_item = cart_service.remove(db, id=item_id)
return cart_item
@router.delete("/", response_model=List[schemas.CartItem])
def clear_cart(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Clear user's cart.
"""
return cart_service.clear_cart(db, user_id=current_user.id)

View File

@ -0,0 +1,25 @@
from typing import Dict
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.api.deps import get_db
router = APIRouter()
@router.get("/", response_model=Dict[str, str])
def health_check(db: Session = Depends(get_db)) -> Dict[str, str]:
"""
Check service health.
"""
# Try to execute a simple query to verify database connection
try:
db.execute("SELECT 1")
db_status = "healthy"
except Exception:
db_status = "unhealthy"
return {
"status": "healthy",
"database": db_status
}

View File

@ -0,0 +1,124 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app import schemas
from app.api.deps import get_current_active_superuser, get_current_active_user, get_db
from app.models.user import User
from app.services import cart_service, order_service, product_service
router = APIRouter()
@router.get("/", response_model=List[schemas.Order])
def read_orders(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Retrieve user's orders.
"""
if current_user.is_superuser:
orders = order_service.get_multi(db, skip=skip, limit=limit)
else:
orders = order_service.get_user_orders(
db, user_id=current_user.id, skip=skip, limit=limit
)
return orders
@router.post("/", response_model=schemas.Order)
def create_order(
*,
db: Session = Depends(get_db),
order_in: schemas.OrderCreate,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Create new order from user's cart.
"""
# Get user's cart
cart_items = cart_service.get_cart_items(db, user_id=current_user.id)
if not cart_items:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cart is empty",
)
# Check if all products have enough stock
for item in cart_items:
product = product_service.get(db, id=item.product_id)
if not product or not product.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Product {item.product_id} is not available",
)
if product.stock < item.quantity:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Not enough stock for product {product.name}. Available: {product.stock}",
)
# Create order
order = order_service.create_from_cart(
db,
user_id=current_user.id,
cart_items=cart_items,
shipping_address=order_in.shipping_address
)
# Update product stock
for item in order.items:
product = product_service.get(db, id=item.product_id)
product_service.update_stock(
db, db_obj=product, new_stock=product.stock - item.quantity
)
# Clear user's cart
cart_service.clear_cart(db, user_id=current_user.id)
return order
@router.get("/{id}", response_model=schemas.Order)
def read_order(
*,
db: Session = Depends(get_db),
id: int,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Get order by ID.
"""
order = order_service.get(db, id=id)
if not order:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Order not found",
)
if order.user_id != current_user.id and not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions",
)
return order
@router.put("/{id}/status", response_model=schemas.Order)
def update_order_status(
*,
db: Session = Depends(get_db),
id: int,
order_in: schemas.OrderUpdate,
current_user: User = Depends(get_current_active_superuser),
) -> Any:
"""
Update order status (admin only).
"""
order = order_service.get(db, id=id)
if not order:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Order not found",
)
order = order_service.update(db, db_obj=order, obj_in=order_in)
return order

View File

@ -0,0 +1,91 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app import schemas
from app.api.deps import get_current_active_superuser, get_db
from app.services import product_service
router = APIRouter()
@router.get("/", response_model=List[schemas.Product])
def read_products(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
) -> Any:
"""
Retrieve products.
"""
products = product_service.get_multi(db, skip=skip, limit=limit)
return products
@router.post("/", response_model=schemas.Product)
def create_product(
*,
db: Session = Depends(get_db),
product_in: schemas.ProductCreate,
current_user: schemas.User = Depends(get_current_active_superuser),
) -> Any:
"""
Create new product.
"""
product = product_service.create(db, obj_in=product_in)
return product
@router.get("/{id}", response_model=schemas.Product)
def read_product(
*,
db: Session = Depends(get_db),
id: int,
) -> Any:
"""
Get product by ID.
"""
product = product_service.get(db, id=id)
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Product not found",
)
return product
@router.put("/{id}", response_model=schemas.Product)
def update_product(
*,
db: Session = Depends(get_db),
id: int,
product_in: schemas.ProductUpdate,
current_user: schemas.User = Depends(get_current_active_superuser),
) -> Any:
"""
Update a product.
"""
product = product_service.get(db, id=id)
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Product not found",
)
product = product_service.update(db, db_obj=product, obj_in=product_in)
return product
@router.delete("/{id}", response_model=schemas.Product)
def delete_product(
*,
db: Session = Depends(get_db),
id: int,
current_user: schemas.User = Depends(get_current_active_superuser),
) -> Any:
"""
Delete a product.
"""
product = product_service.get(db, id=id)
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Product not found",
)
product = product_service.remove(db, id=id)
return product

View File

@ -0,0 +1,76 @@
from typing import Any, List
from fastapi import APIRouter, Body, Depends, HTTPException, status
from fastapi.encoders import jsonable_encoder
from sqlalchemy.orm import Session
from app import schemas
from app.api.deps import get_current_active_superuser, get_current_active_user, get_db
from app.services import user_service
router = APIRouter()
@router.get("/", response_model=List[schemas.User])
def read_users(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
current_user: schemas.User = Depends(get_current_active_superuser),
) -> Any:
"""
Retrieve users.
"""
users = user_service.get_multi(db, skip=skip, limit=limit)
return users
@router.get("/me", response_model=schemas.User)
def read_user_me(
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_current_active_user),
) -> Any:
"""
Get current user.
"""
return current_user
@router.patch("/me", response_model=schemas.User)
def update_user_me(
*,
db: Session = Depends(get_db),
full_name: str = Body(None),
email: str = Body(None),
password: str = Body(None),
current_user: schemas.User = Depends(get_current_active_user),
) -> Any:
"""
Update own user.
"""
current_user_data = jsonable_encoder(current_user)
user_in = schemas.UserUpdate(**current_user_data)
if full_name is not None:
user_in.full_name = full_name
if email is not None:
user_in.email = email
if password is not None:
user_in.password = password
user = user_service.update(db, db_obj=current_user, obj_in=user_in)
return user
@router.get("/{user_id}", response_model=schemas.User)
def read_user_by_id(
user_id: int,
current_user: schemas.User = Depends(get_current_active_user),
db: Session = Depends(get_db),
) -> Any:
"""
Get a specific user by id.
"""
user = user_service.get(db, id=user_id)
if user == current_user:
return user
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="The user doesn't have enough privileges"
)
return user

27
app/core/config.py Normal file
View File

@ -0,0 +1,27 @@
from pathlib import Path
from typing import List
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
API_V1_STR: str = "/api/v1"
PROJECT_NAME: str = "SimpleEcommerceStore"
# Database
DB_DIR = Path("/app/storage/db")
DB_DIR.mkdir(parents=True, exist_ok=True)
SQLALCHEMY_DATABASE_URL: str = f"sqlite:///{DB_DIR}/db.sqlite"
# JWT
SECRET_KEY: str = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# CORS
BACKEND_CORS_ORIGINS: List[str] = ["*"]
class Config:
case_sensitive = True
settings = Settings()

28
app/core/security.py Normal file
View File

@ -0,0 +1,28 @@
from datetime import datetime, timedelta
from typing import Any, Union
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(
subject: Union[str, Any], expires_delta: timedelta = None
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)

1
app/db/__init__.py Normal file
View File

@ -0,0 +1 @@
# This file is intentionally left empty to make the directory a Python package.

62
app/db/deps.py Normal file
View File

@ -0,0 +1,62 @@
from typing import Generator
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app.core.config import settings
from app.db.session import SessionLocal
from app.models.user import User
from app.schemas.token import TokenPayload
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login")
def get_db() -> Generator:
db = SessionLocal()
try:
yield db
finally:
db.close()
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> User:
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
token_data = TokenPayload(**payload)
except (JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
user = db.query(User).filter(User.id == token_data.sub).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
return current_user
def get_current_active_superuser(
current_user: User = Depends(get_current_user),
) -> User:
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="The user doesn't have enough privileges"
)
return current_user

20
app/db/session.py Normal file
View File

@ -0,0 +1,20 @@
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# Create directory if it does not exist
DB_DIR = Path("/app/storage/db")
DB_DIR.mkdir(parents=True, exist_ok=True)
SQLALCHEMY_DATABASE_URL = f"sqlite:///{DB_DIR}/db.sqlite"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

32
app/main.py Normal file
View File

@ -0,0 +1,32 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.v1.api import api_router
from app.core.config import settings
app = FastAPI(
title=settings.PROJECT_NAME,
openapi_url=f"{settings.API_V1_STR}/openapi.json",
docs_url="/docs",
redoc_url="/redoc",
)
# Set all CORS enabled origins
if settings.BACKEND_CORS_ORIGINS:
app.add_middleware(
CORSMiddleware,
allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(api_router, prefix=settings.API_V1_STR)
@app.get("/health")
def health_check():
"""
Basic health check endpoint
"""
return {"status": "healthy"}

4
app/models/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from app.models.cart import CartItem
from app.models.order import Order, OrderItem, OrderStatus
from app.models.product import Product
from app.models.user import User

21
app/models/cart.py Normal file
View File

@ -0,0 +1,21 @@
from sqlalchemy import Column, DateTime, Float, ForeignKey, Integer
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.db.session import Base
class CartItem(Base):
__tablename__ = "cart_items"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
quantity = Column(Integer, nullable=False, default=1)
unit_price = Column(Float, nullable=False) # Store price at the time of adding to cart
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationships
user = relationship("User", backref="cart_items")
product = relationship("Product")

44
app/models/order.py Normal file
View File

@ -0,0 +1,44 @@
import enum
from sqlalchemy import Column, DateTime, Enum, Float, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.db.session import Base
class OrderStatus(str, enum.Enum):
PENDING = "pending"
PAID = "paid"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
total_amount = Column(Float, nullable=False)
shipping_address = Column(String, nullable=False)
status = Column(Enum(OrderStatus), default=OrderStatus.PENDING)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationships
user = relationship("User", backref="orders")
items = relationship("OrderItem", back_populates="order")
class OrderItem(Base):
__tablename__ = "order_items"
id = Column(Integer, primary_key=True, index=True)
order_id = Column(Integer, ForeignKey("orders.id"), nullable=False)
product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
quantity = Column(Integer, nullable=False)
unit_price = Column(Float, nullable=False) # Price at the time of order
created_at = Column(DateTime(timezone=True), server_default=func.now())
# Relationships
order = relationship("Order", back_populates="items")
product = relationship("Product")

18
app/models/product.py Normal file
View File

@ -0,0 +1,18 @@
from sqlalchemy import Boolean, Column, DateTime, Float, Integer, String, Text
from sqlalchemy.sql import func
from app.db.session import Base
class Product(Base):
__tablename__ = "products"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True, nullable=False)
description = Column(Text, nullable=True)
price = Column(Float, nullable=False, index=True)
stock = Column(Integer, nullable=False, default=0)
image_url = Column(String, nullable=True)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

18
app/models/user.py Normal file
View File

@ -0,0 +1,18 @@
from sqlalchemy import Boolean, Column, DateTime, Integer, String
from sqlalchemy.sql import func
from app.db.session import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
username = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
full_name = Column(String, index=True)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

5
app/schemas/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from app.schemas.cart import CartItem, CartItemCreate, CartItemUpdate, CartSummary
from app.schemas.order import Order, OrderCreate, OrderItem, OrderUpdate
from app.schemas.product import Product, ProductCreate, ProductUpdate
from app.schemas.token import Token, TokenPayload
from app.schemas.user import User, UserCreate, UserInDB, UserUpdate

38
app/schemas/cart.py Normal file
View File

@ -0,0 +1,38 @@
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Field
# Shared properties
class CartItemBase(BaseModel):
product_id: int
quantity: int = Field(..., gt=0)
# Properties to receive on cart item creation
class CartItemCreate(CartItemBase):
pass
# Properties to receive on cart item update
class CartItemUpdate(BaseModel):
quantity: int = Field(..., gt=0)
# Properties shared by models stored in DB
class CartItemInDBBase(CartItemBase):
id: int
user_id: int
unit_price: float
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# Properties to return to client
class CartItem(CartItemInDBBase):
pass
# Properties to return for cart summary
class CartSummary(BaseModel):
items: List[CartItem]
total: float

53
app/schemas/order.py Normal file
View File

@ -0,0 +1,53 @@
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Field
from app.models.order import OrderStatus
# Order Item Schemas
class OrderItemBase(BaseModel):
product_id: int
quantity: int = Field(..., gt=0)
unit_price: float = Field(..., gt=0)
class OrderItemCreate(OrderItemBase):
pass
class OrderItemInDBBase(OrderItemBase):
id: int
order_id: int
created_at: datetime
class Config:
from_attributes = True
class OrderItem(OrderItemInDBBase):
pass
# Order Schemas
class OrderBase(BaseModel):
shipping_address: str
status: Optional[OrderStatus] = OrderStatus.PENDING
class OrderCreate(OrderBase):
pass
class OrderUpdate(BaseModel):
status: Optional[OrderStatus] = None
shipping_address: Optional[str] = None
class OrderInDBBase(OrderBase):
id: int
user_id: int
total_amount: float
created_at: datetime
updated_at: Optional[datetime] = None
status: OrderStatus
class Config:
from_attributes = True
class Order(OrderInDBBase):
items: List[OrderItem]

44
app/schemas/product.py Normal file
View File

@ -0,0 +1,44 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
# Shared properties
class ProductBase(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
price: Optional[float] = None
stock: Optional[int] = None
image_url: Optional[str] = None
is_active: Optional[bool] = True
# Properties to receive on product creation
class ProductCreate(ProductBase):
name: str
price: float = Field(..., gt=0)
stock: int = Field(..., ge=0)
# Properties to receive on product update
class ProductUpdate(ProductBase):
pass
# Properties shared by models stored in DB
class ProductInDBBase(ProductBase):
id: int
name: str
price: float
stock: int
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# Properties to return to client
class Product(ProductInDBBase):
pass
# Properties stored in DB
class ProductInDB(ProductInDBBase):
pass

11
app/schemas/token.py Normal file
View File

@ -0,0 +1,11 @@
from typing import Optional
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: Optional[int] = None

47
app/schemas/user.py Normal file
View File

@ -0,0 +1,47 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, EmailStr, model_validator
# Shared properties
class UserBase(BaseModel):
email: Optional[EmailStr] = None
username: Optional[str] = None
is_active: Optional[bool] = True
is_superuser: bool = False
full_name: Optional[str] = None
# Properties to receive via API on creation
class UserCreate(UserBase):
email: EmailStr
username: str
password: str
@model_validator(mode="after")
def validate_username(self):
username = self.username
if len(username) < 3:
raise ValueError("Username must be at least 3 characters long")
return self
# Properties to receive via API on update
class UserUpdate(UserBase):
password: Optional[str] = None
# Properties shared by models stored in DB
class UserInDBBase(UserBase):
id: int
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# Additional properties to return via API
class User(UserInDBBase):
pass
# Additional properties stored in DB
class UserInDB(UserInDBBase):
hashed_password: str

4
app/services/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from app.services.cart_service import cart_service
from app.services.order_service import order_service
from app.services.product_service import product_service
from app.services.user_service import user_service

65
app/services/base.py Normal file
View File

@ -0,0 +1,65 @@
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.db.session import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
"""
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
**Parameters**
* `model`: A SQLAlchemy model class
* `schema`: A Pydantic model (schema) class
"""
self.model = model
def get(self, db: Session, id: Any) -> Optional[ModelType]:
return db.query(self.model).filter(self.model.id == id).first()
def get_multi(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
return db.query(self.model).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self,
db: Session,
*,
db_obj: ModelType,
obj_in: Union[UpdateSchemaType, Dict[str, Any]]
) -> ModelType:
obj_data = jsonable_encoder(db_obj)
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.model_dump(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def remove(self, db: Session, *, id: int) -> ModelType:
obj = db.query(self.model).get(id)
db.delete(obj)
db.commit()
return obj

View File

@ -0,0 +1,53 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.models.cart import CartItem
from app.schemas.cart import CartItemCreate, CartItemUpdate
from app.services.base import CRUDBase
class CartService(CRUDBase[CartItem, CartItemCreate, CartItemUpdate]):
def get_cart_items(self, db: Session, *, user_id: int) -> List[CartItem]:
return db.query(CartItem).filter(CartItem.user_id == user_id).all()
def get_cart_item(
self, db: Session, *, user_id: int, product_id: int
) -> Optional[CartItem]:
return (
db.query(CartItem)
.filter(CartItem.user_id == user_id, CartItem.product_id == product_id)
.first()
)
def add_item_to_cart(
self, db: Session, *, obj_in: CartItemCreate, user_id: int, unit_price: float
) -> CartItem:
db_obj = CartItem(
user_id=user_id,
product_id=obj_in.product_id,
quantity=obj_in.quantity,
unit_price=unit_price
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update_item_quantity(
self, db: Session, *, db_obj: CartItem, quantity: int
) -> CartItem:
db_obj.quantity = quantity
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def clear_cart(self, db: Session, *, user_id: int) -> List[CartItem]:
cart_items = self.get_cart_items(db, user_id=user_id)
for item in cart_items:
db.delete(item)
db.commit()
return cart_items
cart_service = CartService(CartItem)

View File

@ -0,0 +1,52 @@
from typing import List
from sqlalchemy.orm import Session
from app.models.cart import CartItem
from app.models.order import Order, OrderItem
from app.schemas.order import OrderCreate, OrderUpdate
from app.services.base import CRUDBase
class OrderService(CRUDBase[Order, OrderCreate, OrderUpdate]):
def get_user_orders(
self, db: Session, *, user_id: int, skip: int = 0, limit: int = 100
) -> List[Order]:
return (
db.query(Order)
.filter(Order.user_id == user_id)
.offset(skip)
.limit(limit)
.all()
)
def create_from_cart(
self, db: Session, *, user_id: int, cart_items: List[CartItem], shipping_address: str
) -> Order:
# Calculate total amount
total_amount = sum(item.unit_price * item.quantity for item in cart_items)
# Create order
db_obj = Order(
user_id=user_id,
total_amount=total_amount,
shipping_address=shipping_address
)
db.add(db_obj)
db.flush() # Flush to get the order ID
# Create order items
for cart_item in cart_items:
order_item = OrderItem(
order_id=db_obj.id,
product_id=cart_item.product_id,
quantity=cart_item.quantity,
unit_price=cart_item.unit_price
)
db.add(order_item)
db.commit()
db.refresh(db_obj)
return db_obj
order_service = OrderService(Order)

View File

@ -0,0 +1,36 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.models.product import Product
from app.schemas.product import ProductCreate, ProductUpdate
from app.services.base import CRUDBase
class ProductService(CRUDBase[Product, ProductCreate, ProductUpdate]):
def get_active(self, db: Session, *, id: int) -> Optional[Product]:
return (
db.query(Product)
.filter(Product.id == id, Product.is_active)
.first()
)
def get_multi_active(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[Product]:
return (
db.query(Product)
.filter(Product.is_active)
.offset(skip)
.limit(limit)
.all()
)
def update_stock(self, db: Session, *, db_obj: Product, new_stock: int) -> Product:
db_obj.stock = new_stock
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
product_service = ProductService(Product)

View File

@ -0,0 +1,59 @@
from typing import Any, Dict, Optional, Union
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
from app.services.base import CRUDBase
class UserService(CRUDBase[User, UserCreate, UserUpdate]):
def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()
def get_by_username(self, db: Session, *, username: str) -> Optional[User]:
return db.query(User).filter(User.username == username).first()
def create(self, db: Session, *, obj_in: UserCreate) -> User:
db_obj = User(
email=obj_in.email,
username=obj_in.username,
hashed_password=get_password_hash(obj_in.password),
full_name=obj_in.full_name,
is_active=obj_in.is_active,
is_superuser=obj_in.is_superuser,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self, db: Session, *, db_obj: User, obj_in: Union[UserUpdate, Dict[str, Any]]
) -> User:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.model_dump(exclude_unset=True)
if update_data.get("password"):
hashed_password = get_password_hash(update_data["password"])
del update_data["password"]
update_data["hashed_password"] = hashed_password
return super().update(db, db_obj=db_obj, obj_in=update_data)
def authenticate(self, db: Session, *, email: str, password: str) -> Optional[User]:
user = self.get_by_email(db, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active(self, user: User) -> bool:
return user.is_active
def is_superuser(self, user: User) -> bool:
return user.is_superuser
user_service = UserService(User)

10
main.py Normal file
View File

@ -0,0 +1,10 @@
from pathlib import Path
import uvicorn
# Ensure storage directory exists
storage_dir = Path("/app/storage/db")
storage_dir.mkdir(parents=True, exist_ok=True)
if __name__ == "__main__":
uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True)

81
migrations/env.py Normal file
View File

@ -0,0 +1,81 @@
from logging.config import fileConfig
from pathlib import Path
from alembic import context
from sqlalchemy import engine_from_config, pool
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
from app.db.session import Base
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
# Ensure SQLite directory exists
DB_DIR = Path("/app/storage/db")
DB_DIR.mkdir(parents=True, exist_ok=True)
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
migrations/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,122 @@
"""Initial migration
Revision ID: 01_initial_migration
Revises: None
Create Date: 2023-12-22 10:00:00.000000
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = '01_initial_migration'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# Create users table
op.create_table(
'users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email', sa.String(), nullable=False),
sa.Column('username', sa.String(), nullable=False),
sa.Column('hashed_password', sa.String(), nullable=False),
sa.Column('full_name', sa.String(), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=True, default=True),
sa.Column('is_superuser', sa.Boolean(), nullable=True, default=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True)
op.create_index(op.f('ix_users_username'), 'users', ['username'], unique=True)
op.create_index(op.f('ix_users_id'), 'users', ['id'], unique=False)
op.create_index(op.f('ix_users_full_name'), 'users', ['full_name'], unique=False)
# Create products table
op.create_table(
'products',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('price', sa.Float(), nullable=False),
sa.Column('stock', sa.Integer(), nullable=False, default=0),
sa.Column('image_url', sa.String(), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=True, default=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_products_id'), 'products', ['id'], unique=False)
op.create_index(op.f('ix_products_name'), 'products', ['name'], unique=False)
op.create_index(op.f('ix_products_price'), 'products', ['price'], unique=False)
# Create cart_items table
op.create_table(
'cart_items',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('product_id', sa.Integer(), nullable=False),
sa.Column('quantity', sa.Integer(), nullable=False, default=1),
sa.Column('unit_price', sa.Float(), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(['product_id'], ['products.id'], ),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_cart_items_id'), 'cart_items', ['id'], unique=False)
# Create orders table
op.create_table(
'orders',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('total_amount', sa.Float(), nullable=False),
sa.Column('shipping_address', sa.String(), nullable=False),
sa.Column('status', sa.Enum('pending', 'paid', 'shipped', 'delivered', 'cancelled', name='orderstatus'), nullable=True, default='pending'),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_orders_id'), 'orders', ['id'], unique=False)
# Create order_items table
op.create_table(
'order_items',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('order_id', sa.Integer(), nullable=False),
sa.Column('product_id', sa.Integer(), nullable=False),
sa.Column('quantity', sa.Integer(), nullable=False),
sa.Column('unit_price', sa.Float(), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=True),
sa.ForeignKeyConstraint(['order_id'], ['orders.id'], ),
sa.ForeignKeyConstraint(['product_id'], ['products.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_order_items_id'), 'order_items', ['id'], unique=False)
def downgrade():
op.drop_index(op.f('ix_order_items_id'), table_name='order_items')
op.drop_table('order_items')
op.drop_index(op.f('ix_orders_id'), table_name='orders')
op.drop_table('orders')
op.drop_index(op.f('ix_cart_items_id'), table_name='cart_items')
op.drop_table('cart_items')
op.drop_index(op.f('ix_products_price'), table_name='products')
op.drop_index(op.f('ix_products_name'), table_name='products')
op.drop_index(op.f('ix_products_id'), table_name='products')
op.drop_table('products')
op.drop_index(op.f('ix_users_full_name'), table_name='users')
op.drop_index(op.f('ix_users_id'), table_name='users')
op.drop_index(op.f('ix_users_username'), table_name='users')
op.drop_index(op.f('ix_users_email'), table_name='users')
op.drop_table('users')

13
pyproject.toml Normal file
View File

@ -0,0 +1,13 @@
[tool.ruff]
line-length = 100
target-version = "py39"
[tool.ruff.lint]
select = ["E", "F", "I", "W"]
ignore = ["E203", "E501", "E402"] # E203 for black compatibility, E501 for line length, E402 for imports not at top
[tool.ruff.lint.isort]
known-third-party = ["fastapi", "pydantic", "sqlalchemy", "alembic", "jose", "passlib"]
[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"] # Ignore unused imports in __init__.py files

13
requirements.txt Normal file
View File

@ -0,0 +1,13 @@
fastapi==0.108.0
uvicorn==0.25.0
sqlalchemy==2.0.25
pydantic==2.5.3
pydantic-settings==2.1.0
python-jose==3.3.0
passlib==1.7.4
bcrypt==4.1.1
python-multipart==0.0.6
alembic==1.13.1
ruff==0.1.11
pytest==7.4.4
httpx==0.26.0