Implement simple cart system with FastAPI and SQLite

This commit is contained in:
Automated Action 2025-05-18 18:02:59 +00:00
parent 76fd675889
commit 13673fe36d
28 changed files with 1392 additions and 2 deletions

107
README.md
View File

@ -1,3 +1,106 @@
# FastAPI Application
# Simple Cart System API
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform.
A simple cart system API built with FastAPI and SQLite for managing products and shopping carts.
## Features
- User registration and authentication with JWT tokens
- Product management (CRUD operations)
- Shopping cart functionality (add, update, remove, clear)
- SQLite database with SQLAlchemy ORM
- Alembic migrations
## Tech Stack
- **FastAPI:** High-performance web framework for building APIs
- **SQLAlchemy:** SQL toolkit and ORM
- **SQLite:** Lightweight, file-based database
- **Alembic:** Database migration tool
- **Pydantic:** Data validation and settings management
- **Python-Jose:** JWT token handling
- **Uvicorn:** ASGI server for running the application
## Project Structure
```
.
├── alembic.ini # Alembic configuration file
├── app # Main application package
│ ├── api # API routes
│ │ └── routes # API route modules
│ ├── core # Core functionality
│ │ ├── config.py # Configuration settings
│ │ └── security.py # Authentication and security utilities
│ ├── db # Database related modules
│ │ ├── models # SQLAlchemy models
│ │ └── session.py # Database session management
│ └── schemas # Pydantic models for request/response validation
├── main.py # Application entry point
├── migrations # Alembic migrations
│ └── versions # Migration script versions
└── requirements.txt # Project dependencies
```
## Getting Started
### Prerequisites
- Python 3.8+
- pip
### Installation
1. Clone the repository
2. Install dependencies
```bash
pip install -r requirements.txt
```
3. Run database migrations
```bash
alembic upgrade head
```
4. Start the application
```bash
uvicorn main:app --reload
```
5. Access the API documentation
- Swagger UI: http://localhost:8000/docs
- ReDoc: http://localhost:8000/redoc
## API Endpoints
### Authentication
- `POST /api/v1/login/access-token` - Get access token
### Users
- `POST /api/v1/users/` - Create a new user
- `GET /api/v1/users/` - Get all users
- `GET /api/v1/users/{user_id}` - Get a specific user
- `PATCH /api/v1/users/{user_id}` - Update a user
- `DELETE /api/v1/users/{user_id}` - Delete a user
### Products
- `POST /api/v1/products/` - Create a new product (admin only)
- `GET /api/v1/products/` - Get all products
- `GET /api/v1/products/{product_id}` - Get a specific product
- `PATCH /api/v1/products/{product_id}` - Update a product (admin only)
- `DELETE /api/v1/products/{product_id}` - Delete a product (admin only)
### Cart
- `POST /api/v1/cart/items/` - Add an item to cart
- `GET /api/v1/cart/` - Get current user's cart
- `GET /api/v1/cart/summary` - Get cart summary
- `PATCH /api/v1/cart/items/{item_id}` - Update cart item quantity
- `DELETE /api/v1/cart/items/{item_id}` - Remove an item from cart
- `DELETE /api/v1/cart/` - Clear cart
### Health Check
- `GET /health` - Check application health status
## License
MIT

102
alembic.ini Normal file
View File

@ -0,0 +1,102 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python-dateutil library that can be
# installed by adding `alembic[tz]` to the pip requirements
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:migrations/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# SQLite URL example
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

0
app/__init__.py Normal file
View File

0
app/api/__init__.py Normal file
View File

View File

@ -0,0 +1,9 @@
from fastapi import APIRouter
from app.api.routes import users, products, cart, login
api_router = APIRouter()
api_router.include_router(login.router, tags=["authentication"])
api_router.include_router(users.router, tags=["users"])
api_router.include_router(products.router, tags=["products"])
api_router.include_router(cart.router, tags=["cart"])

222
app/api/routes/cart.py Normal file
View File

@ -0,0 +1,222 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.db.models.cart import Cart, CartItem
from app.db.models.product import Product
from app.db.models.user import User
from app.schemas.cart import (
CartResponse, CartItemResponse,
AddToCartRequest, CartItemUpdate,
CartSummary
)
from app.core.security import get_current_active_user
router = APIRouter()
@router.post("/cart/items/", response_model=CartItemResponse, status_code=status.HTTP_201_CREATED)
def add_to_cart(
item_in: AddToCartRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""
Add an item to the user's cart.
"""
# Check if product exists and has enough stock
product = db.query(Product).filter(Product.id == item_in.product_id).first()
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Product not found"
)
if product.stock < item_in.quantity:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Not enough stock available"
)
# Get user's active cart or create a new one
cart = db.query(Cart).filter(Cart.user_id == current_user.id).first()
if not cart:
cart = Cart(user_id=current_user.id)
db.add(cart)
db.commit()
db.refresh(cart)
# Check if the product is already in the cart
cart_item = db.query(CartItem).filter(
CartItem.cart_id == cart.id,
CartItem.product_id == item_in.product_id
).first()
if cart_item:
# Update existing cart item quantity
new_quantity = cart_item.quantity + item_in.quantity
# Check if enough stock
if product.stock < new_quantity:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Not enough stock available"
)
cart_item.quantity = new_quantity
else:
# Add new cart item
cart_item = CartItem(
cart_id=cart.id,
product_id=item_in.product_id,
user_id=current_user.id,
quantity=item_in.quantity,
unit_price=product.price
)
db.add(cart_item)
db.commit()
db.refresh(cart_item)
return cart_item
@router.get("/cart/", response_model=CartResponse)
def get_cart(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""
Get the current user's cart.
"""
cart = db.query(Cart).filter(Cart.user_id == current_user.id).first()
if not cart:
cart = Cart(user_id=current_user.id)
db.add(cart)
db.commit()
db.refresh(cart)
return cart
@router.get("/cart/summary", response_model=CartSummary)
def get_cart_summary(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""
Get a summary of the current user's cart.
"""
cart = db.query(Cart).filter(Cart.user_id == current_user.id).first()
if not cart:
cart = Cart(user_id=current_user.id)
db.add(cart)
db.commit()
db.refresh(cart)
items_count = db.query(CartItem).filter(CartItem.cart_id == cart.id).count()
return {
"id": cart.id,
"total": cart.total,
"items_count": items_count
}
@router.patch("/cart/items/{item_id}", response_model=CartItemResponse)
def update_cart_item(
item_id: int,
item_in: CartItemUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""
Update quantity of an item in the cart.
"""
cart_item = db.query(CartItem).filter(
CartItem.id == item_id,
CartItem.user_id == current_user.id
).first()
if not cart_item:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Cart item not found"
)
if item_in.quantity:
if item_in.quantity < 1:
# Remove item if quantity is zero or negative
db.delete(cart_item)
db.commit()
raise HTTPException(
status_code=status.HTTP_204_NO_CONTENT,
detail="Item removed from cart"
)
# Check if product has enough stock
product = db.query(Product).filter(Product.id == cart_item.product_id).first()
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Product not found"
)
if product.stock < item_in.quantity:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Not enough stock available"
)
cart_item.quantity = item_in.quantity
db.add(cart_item)
db.commit()
db.refresh(cart_item)
return cart_item
@router.delete("/cart/items/{item_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def remove_from_cart(
item_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""
Remove an item from the cart.
"""
cart_item = db.query(CartItem).filter(
CartItem.id == item_id,
CartItem.user_id == current_user.id
).first()
if not cart_item:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Cart item not found"
)
db.delete(cart_item)
db.commit()
return None
@router.delete("/cart/", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def clear_cart(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""
Clear the user's cart (remove all items).
"""
cart = db.query(Cart).filter(Cart.user_id == current_user.id).first()
if not cart:
return None
# Delete all cart items
db.query(CartItem).filter(CartItem.cart_id == cart.id).delete()
db.commit()
return None

11
app/api/routes/health.py Normal file
View File

@ -0,0 +1,11 @@
from fastapi import APIRouter
router = APIRouter()
@router.get("/health", tags=["health"])
async def health_check():
"""
Health check endpoint.
"""
return {"status": "ok"}

51
app/api/routes/login.py Normal file
View File

@ -0,0 +1,51 @@
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.core.config import settings
from app.core.security import create_access_token, verify_password
from app.db.session import get_db
from app.db.models.user import User
from app.schemas.token import Token
router = APIRouter()
@router.post("/login/access-token", response_model=Token)
def login_access_token(
db: Session = Depends(get_db),
form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests.
"""
# Try to authenticate with username/password
user = db.query(User).filter(User.email == form_data.username).first()
if not user:
# If email doesn't match, try username
user = db.query(User).filter(User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
# Create access token
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}

117
app/api/routes/products.py Normal file
View File

@ -0,0 +1,117 @@
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.db.models.product import Product
from app.db.models.user import User
from app.schemas.product import ProductCreate, ProductResponse, ProductUpdate
from app.core.security import get_current_active_superuser
router = APIRouter()
@router.post("/products/", response_model=ProductResponse, status_code=status.HTTP_201_CREATED)
def create_product(
product_in: ProductCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_superuser)
):
"""
Create a new product (superuser only).
"""
product = Product(
name=product_in.name,
description=product_in.description,
price=product_in.price,
stock=product_in.stock,
image_url=product_in.image_url
)
db.add(product)
db.commit()
db.refresh(product)
return product
@router.get("/products/", response_model=List[ProductResponse])
def get_products(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
"""
Get all products.
"""
products = db.query(Product).offset(skip).limit(limit).all()
return products
@router.get("/products/{product_id}", response_model=ProductResponse)
def get_product(
product_id: int,
db: Session = Depends(get_db)
):
"""
Get a specific product by ID.
"""
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Product not found"
)
return product
@router.patch("/products/{product_id}", response_model=ProductResponse)
def update_product(
product_id: int,
product_in: ProductUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_superuser)
):
"""
Update a product (superuser only).
"""
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Product not found"
)
# Update product fields if provided
update_data = product_in.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(product, field, value)
db.add(product)
db.commit()
db.refresh(product)
return product
@router.delete("/products/{product_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_product(
product_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_superuser)
):
"""
Delete a product (superuser only).
"""
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Product not found"
)
db.delete(product)
db.commit()
return None

135
app/api/routes/users.py Normal file
View File

@ -0,0 +1,135 @@
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.db.models.user import User
from app.schemas.user import UserCreate, UserResponse, UserUpdate
from app.core.security import get_password_hash
router = APIRouter()
@router.post("/users/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(
user_in: UserCreate,
db: Session = Depends(get_db)
):
"""
Create a new user.
"""
# Check if user with given email exists
user = db.query(User).filter(User.email == user_in.email).first()
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Check if username is taken
user = db.query(User).filter(User.username == user_in.username).first()
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already taken"
)
# Create new user
hashed_password = get_password_hash(user_in.password)
user = User(
email=user_in.email,
username=user_in.username,
hashed_password=hashed_password,
is_active=user_in.is_active,
is_superuser=user_in.is_superuser
)
db.add(user)
db.commit()
db.refresh(user)
return user
@router.get("/users/", response_model=List[UserResponse])
def get_users(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
"""
Get all users.
"""
users = db.query(User).offset(skip).limit(limit).all()
return users
@router.get("/users/{user_id}", response_model=UserResponse)
def get_user(
user_id: int,
db: Session = Depends(get_db)
):
"""
Get a specific user by ID.
"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
@router.patch("/users/{user_id}", response_model=UserResponse)
def update_user(
user_id: int,
user_in: UserUpdate,
db: Session = Depends(get_db)
):
"""
Update a user.
"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Update user fields if provided
update_data = user_in.dict(exclude_unset=True)
# Hash the password if it's being updated
if "password" in update_data:
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
for field, value in update_data.items():
setattr(user, field, value)
db.add(user)
db.commit()
db.refresh(user)
return user
@router.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_user(
user_id: int,
db: Session = Depends(get_db)
):
"""
Delete a user.
"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
db.delete(user)
db.commit()
return None

30
app/core/config.py Normal file
View File

@ -0,0 +1,30 @@
from pathlib import Path
from typing import List
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
API_V1_STR: str = "/api/v1"
PROJECT_NAME: str = "Simple Cart System"
PROJECT_DESCRIPTION: str = "A simple cart system API using FastAPI and SQLite"
PROJECT_VERSION: str = "0.1.0"
CORS_ORIGINS: List[str] = ["*"]
# Database settings
DB_DIR = Path("/app") / "storage" / "db"
DB_DIR.mkdir(parents=True, exist_ok=True)
DATABASE_URL: str = f"sqlite:///{DB_DIR}/db.sqlite"
# Security settings
SECRET_KEY: str = "supersecretkey" # You should change this in production
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# CORS configuration
BACKEND_CORS_ORIGINS: List[str] = ["*"]
class Config:
case_sensitive = True
env_file = ".env"
settings = Settings()

114
app/core/security.py Normal file
View File

@ -0,0 +1,114 @@
from datetime import datetime, timedelta
from typing import Any, Optional
from jose import jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from app.core.config import settings
from app.db.session import get_db
from app.db.models.user import User
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/login/access-token")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Verify if the password matches the hash.
"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""
Generate a password hash.
"""
return pwd_context.hash(password)
def create_access_token(subject: Any, expires_delta: Optional[timedelta] = None) -> str:
"""
Create a new JWT access token.
"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def get_current_user(
db: Session = Depends(get_db),
token: str = Depends(oauth2_scheme)
) -> User:
"""
Decode JWT token and get current user.
"""
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
user_id: str = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
except jwt.JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
"""
Check if the current user is active.
"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
return current_user
def get_current_active_superuser(
current_user: User = Depends(get_current_user),
) -> User:
"""
Check if the current user is a superuser.
"""
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user

20
app/db/models/base.py Normal file
View File

@ -0,0 +1,20 @@
from datetime import datetime
from typing import Any
from sqlalchemy import Column, DateTime
from sqlalchemy.ext.declarative import as_declarative, declared_attr
@as_declarative()
class Base:
id: Any
__name__: str
# Generate tablename automatically based on class name
@declared_attr
def __tablename__(cls) -> str:
return cls.__name__.lower()
# Add created_at and updated_at columns to all models
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)

35
app/db/models/cart.py Normal file
View File

@ -0,0 +1,35 @@
from sqlalchemy import Column, Integer, ForeignKey, Float
from sqlalchemy.orm import relationship
from app.db.models.base import Base
class Cart(Base):
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("user.id"), nullable=False)
# Relationships
user = relationship("User", back_populates="carts")
items = relationship("CartItem", back_populates="cart", cascade="all, delete-orphan")
@property
def total(self) -> float:
return sum(item.subtotal for item in self.items)
class CartItem(Base):
id = Column(Integer, primary_key=True, index=True)
cart_id = Column(Integer, ForeignKey("cart.id"), nullable=False)
product_id = Column(Integer, ForeignKey("product.id"), nullable=False)
user_id = Column(Integer, ForeignKey("user.id"), nullable=False)
quantity = Column(Integer, nullable=False, default=1)
unit_price = Column(Float, nullable=False)
# Relationships
cart = relationship("Cart", back_populates="items")
product = relationship("Product")
user = relationship("User", back_populates="cart_items")
@property
def subtotal(self) -> float:
return self.quantity * self.unit_price

12
app/db/models/product.py Normal file
View File

@ -0,0 +1,12 @@
from sqlalchemy import Column, Integer, String, Float, Text
from app.db.models.base import Base
class Product(Base):
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True, nullable=False)
description = Column(Text, nullable=True)
price = Column(Float, nullable=False)
stock = Column(Integer, nullable=False, default=0)
image_url = Column(String, nullable=True)

17
app/db/models/user.py Normal file
View File

@ -0,0 +1,17 @@
from sqlalchemy import Column, Integer, String, Boolean
from sqlalchemy.orm import relationship
from app.db.models.base import Base
class User(Base):
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
username = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
# Relationships
carts = relationship("Cart", back_populates="user", cascade="all, delete-orphan")
cart_items = relationship("CartItem", back_populates="user", cascade="all, delete-orphan")

25
app/db/session.py Normal file
View File

@ -0,0 +1,25 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
# Create the SQLAlchemy engine with SQLite
engine = create_engine(
settings.DATABASE_URL,
connect_args={"check_same_thread": False} # Needed for SQLite
)
# Create SessionLocal class for creating database sessions
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Create Base class for declarative models
Base = declarative_base()
# Dependency to get a database session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

3
app/schemas/__init__.py Normal file
View File

@ -0,0 +1,3 @@
# Import schemas for easier access
# This file is intentionally left empty
# Import from the specific schema files directly when needed

59
app/schemas/cart.py Normal file
View File

@ -0,0 +1,59 @@
from typing import List, Optional
from pydantic import BaseModel, Field
# Cart Item schemas
class CartItemBase(BaseModel):
product_id: int
quantity: int = Field(..., gt=0)
class CartItemCreate(CartItemBase):
pass
class CartItemUpdate(BaseModel):
quantity: Optional[int] = Field(None, gt=0)
class CartItemResponse(CartItemBase):
id: int
unit_price: float
subtotal: float
class Config:
from_attributes = True
# Cart schemas
class CartBase(BaseModel):
user_id: int
class CartCreate(CartBase):
pass
class CartResponse(CartBase):
id: int
items: List[CartItemResponse] = []
total: float
class Config:
from_attributes = True
# Schema for adding items to cart
class AddToCartRequest(CartItemBase):
pass
# Schema for cart summary
class CartSummary(BaseModel):
id: int
total: float
items_count: int
class Config:
from_attributes = True

34
app/schemas/product.py Normal file
View File

@ -0,0 +1,34 @@
from typing import Optional
from pydantic import BaseModel, Field
# Base Product Schema
class ProductBase(BaseModel):
name: str
description: Optional[str] = None
price: float = Field(..., gt=0)
stock: int = Field(..., ge=0)
image_url: Optional[str] = None
# Schema for creating a new product
class ProductCreate(ProductBase):
pass
# Schema for updating an existing product
class ProductUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
price: Optional[float] = Field(None, gt=0)
stock: Optional[int] = Field(None, ge=0)
image_url: Optional[str] = None
# Schema for returning product data
class ProductResponse(ProductBase):
id: int
class Config:
from_attributes = True

12
app/schemas/token.py Normal file
View File

@ -0,0 +1,12 @@
from typing import Optional
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: Optional[int] = None

33
app/schemas/user.py Normal file
View File

@ -0,0 +1,33 @@
from typing import Optional
from pydantic import BaseModel, EmailStr, Field
# Base User Schema
class UserBase(BaseModel):
email: EmailStr
username: str
is_active: Optional[bool] = True
is_superuser: Optional[bool] = False
# Schema for creating a new user
class UserCreate(UserBase):
password: str = Field(..., min_length=8)
# Schema for updating an existing user
class UserUpdate(BaseModel):
email: Optional[EmailStr] = None
username: Optional[str] = None
password: Optional[str] = Field(None, min_length=8)
is_active: Optional[bool] = None
is_superuser: Optional[bool] = None
# Schema for returning user data
class UserResponse(UserBase):
id: int
class Config:
from_attributes = True

32
main.py Normal file
View File

@ -0,0 +1,32 @@
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.routes import products, cart, health
from app.core.config import settings
app = FastAPI(
title=settings.PROJECT_NAME,
description=settings.PROJECT_DESCRIPTION,
version=settings.PROJECT_VERSION,
openapi_url=f"{settings.API_V1_STR}/openapi.json",
docs_url="/docs",
redoc_url="/redoc",
)
# Set up CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(health.router, tags=["health"])
app.include_router(products.router, prefix=settings.API_V1_STR, tags=["products"])
app.include_router(cart.router, prefix=settings.API_V1_STR, tags=["cart"])
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

0
migrations/__init__.py Normal file
View File

84
migrations/env.py Normal file
View File

@ -0,0 +1,84 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# Import database models
from app.db.models.base import Base
from app.db.models import user, product, cart # noqa
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
render_as_batch=True, # Added for SQLite
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
is_sqlite = connection.dialect.name == "sqlite"
context.configure(
connection=connection,
target_metadata=target_metadata,
render_as_batch=is_sqlite, # Added for SQLite
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
migrations/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,95 @@
"""Initial database migration
Revision ID: 1_initial_migration
Revises:
Create Date: 2023-05-18 00:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '1_initial_migration'
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
# Create user table
op.create_table(
'user',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email', sa.String(), nullable=False),
sa.Column('username', sa.String(), nullable=False),
sa.Column('hashed_password', sa.String(), nullable=False),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.Column('is_superuser', sa.Boolean(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_user_email'), 'user', ['email'], unique=True)
op.create_index(op.f('ix_user_id'), 'user', ['id'], unique=False)
op.create_index(op.f('ix_user_username'), 'user', ['username'], unique=True)
# Create product table
op.create_table(
'product',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('price', sa.Float(), nullable=False),
sa.Column('stock', sa.Integer(), nullable=False),
sa.Column('image_url', sa.String(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_product_id'), 'product', ['id'], unique=False)
op.create_index(op.f('ix_product_name'), 'product', ['name'], unique=False)
# Create cart table
op.create_table(
'cart',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_cart_id'), 'cart', ['id'], unique=False)
# Create cart_item table
op.create_table(
'cartitem',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('cart_id', sa.Integer(), nullable=False),
sa.Column('product_id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('quantity', sa.Integer(), nullable=False),
sa.Column('unit_price', sa.Float(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(['cart_id'], ['cart.id'], ),
sa.ForeignKeyConstraint(['product_id'], ['product.id'], ),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_cartitem_id'), 'cartitem', ['id'], unique=False)
def downgrade() -> None:
op.drop_index(op.f('ix_cartitem_id'), table_name='cartitem')
op.drop_table('cartitem')
op.drop_index(op.f('ix_cart_id'), table_name='cart')
op.drop_table('cart')
op.drop_index(op.f('ix_product_name'), table_name='product')
op.drop_index(op.f('ix_product_id'), table_name='product')
op.drop_table('product')
op.drop_index(op.f('ix_user_username'), table_name='user')
op.drop_index(op.f('ix_user_id'), table_name='user')
op.drop_index(op.f('ix_user_email'), table_name='user')
op.drop_table('user')

11
requirements.txt Normal file
View File

@ -0,0 +1,11 @@
fastapi>=0.103.1
uvicorn>=0.23.2
sqlalchemy>=2.0.21
alembic>=1.12.0
pydantic>=2.4.2
pydantic-settings>=2.0.3
python-dotenv>=1.0.0
python-jose>=3.3.0
python-multipart>=0.0.6
email-validator>=2.0.0
ruff>=0.0.292