Build complete SaaS invoicing application with FastAPI

Features:
- JWT authentication with user registration and login
- Customer management with full CRUD operations
- Invoice management with automatic calculations
- Multi-tenant data isolation by user
- SQLite database with Alembic migrations
- RESTful API with comprehensive documentation
- Tax calculations and invoice status tracking
- Code formatted with Ruff linting
This commit is contained in:
Automated Action 2025-06-20 09:52:34 +00:00
parent 0008f92434
commit bad5cc0eba
29 changed files with 1461 additions and 2 deletions

192
README.md
View File

@ -1,3 +1,191 @@
# FastAPI Application
# SaaS Invoicing Application
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform.
A comprehensive SaaS invoicing solution built with FastAPI and SQLite for managing customers, invoices, and billing operations.
## Features
- **User Authentication**: JWT-based authentication with secure registration and login
- **Customer Management**: Create, read, update, and delete customer records
- **Invoice Management**: Full invoice lifecycle management with automatic calculations
- **Invoice Items**: Line-item management with quantity, unit price, and total calculations
- **Multiple Invoice Statuses**: Draft, Sent, Paid, Overdue, and Cancelled
- **Tax Calculations**: Configurable tax rates with automatic tax amount calculations
- **RESTful API**: Comprehensive REST API with FastAPI documentation
## Technology Stack
- **Backend**: FastAPI (Python)
- **Database**: SQLite with SQLAlchemy ORM
- **Authentication**: JWT tokens with passlib for password hashing
- **API Documentation**: Automatic OpenAPI/Swagger documentation
- **Database Migrations**: Alembic for database schema management
- **Code Quality**: Ruff for linting and code formatting
## Environment Variables
The following environment variables should be set:
- `SECRET_KEY` - Secret key for JWT token signing (required for production)
## Project Structure
```
├── main.py # FastAPI application entry point
├── requirements.txt # Python dependencies
├── alembic.ini # Alembic configuration
├── alembic/ # Database migrations
├── app/
│ ├── api/ # API route handlers
│ │ ├── auth.py # Authentication endpoints
│ │ ├── users.py # User management endpoints
│ │ ├── customers.py # Customer management endpoints
│ │ ├── invoices.py # Invoice management endpoints
│ │ └── routes.py # Main API router
│ ├── core/ # Core functionality
│ │ ├── config.py # Application configuration
│ │ ├── security.py # Authentication and security
│ │ └── deps.py # Dependency injection
│ ├── db/ # Database configuration
│ │ ├── base.py # SQLAlchemy base class
│ │ └── session.py # Database session management
│ ├── models/ # SQLAlchemy models
│ │ ├── user.py # User model
│ │ ├── customer.py # Customer model
│ │ └── invoice.py # Invoice and InvoiceItem models
│ ├── schemas/ # Pydantic schemas
│ │ ├── user.py # User schemas
│ │ ├── customer.py # Customer schemas
│ │ └── invoice.py # Invoice schemas
│ └── services/ # Business logic
│ ├── user_service.py # User business logic
│ ├── customer_service.py # Customer business logic
│ └── invoice_service.py # Invoice business logic
```
## Installation & Setup
1. Install dependencies:
```bash
pip install -r requirements.txt
```
2. Set environment variables:
```bash
export SECRET_KEY="your-secret-key-here"
```
3. Run database migrations:
```bash
alembic upgrade head
```
4. Start the application:
```bash
uvicorn main:app --reload
```
The application will be available at `http://localhost:8000`
## API Documentation
- **Interactive API Documentation**: http://localhost:8000/docs
- **Alternative Documentation**: http://localhost:8000/redoc
- **OpenAPI Schema**: http://localhost:8000/openapi.json
- **Health Check**: http://localhost:8000/health
## API Endpoints
### Authentication
- `POST /auth/register` - Register a new user
- `POST /auth/login` - Login user
- `POST /auth/token` - OAuth2 compatible token endpoint
### Users
- `GET /users/me` - Get current user information
- `PUT /users/me` - Update current user information
### Customers
- `GET /customers/` - List all customers (paginated)
- `POST /customers/` - Create a new customer
- `GET /customers/{id}` - Get customer by ID
- `PUT /customers/{id}` - Update customer
- `DELETE /customers/{id}` - Delete customer
### Invoices
- `GET /invoices/` - List all invoices (paginated)
- `POST /invoices/` - Create a new invoice
- `GET /invoices/{id}` - Get invoice by ID
- `PUT /invoices/{id}` - Update invoice
- `DELETE /invoices/{id}` - Delete invoice
- `PATCH /invoices/{id}/status` - Update invoice status
## Data Models
### User
- ID, email, password, full name, company name
- Active status and timestamps
### Customer
- ID, name, email, phone, address details
- Associated with user (multi-tenant)
### Invoice
- ID, invoice number, customer, dates, status
- Subtotal, tax rate, tax amount, total
- Notes and timestamps
### Invoice Items
- ID, invoice, description, quantity, unit price, total price
## Database Storage
The application uses SQLite database stored at `/app/storage/db/db.sqlite` with automatic directory creation.
## Security Features
- JWT token-based authentication
- Password hashing with bcrypt
- User isolation (multi-tenant data access)
- CORS enabled for cross-origin requests
## Development
### Code Quality
Run Ruff for linting and formatting:
```bash
ruff check .
ruff format .
```
### Database Operations
Create new migration:
```bash
alembic revision --autogenerate -m "Description"
```
Apply migrations:
```bash
alembic upgrade head
```
## Health Check
The application provides a health check endpoint at `/health` that returns:
```json
{
"status": "healthy",
"service": "invoicing-api"
}
```
## Production Deployment
1. Set a strong `SECRET_KEY` environment variable
2. Configure proper database backups for `/app/storage/db/`
3. Use a production WSGI server like Gunicorn
4. Set up proper logging and monitoring
5. Configure reverse proxy (nginx/Apache) if needed
## License
This project is available for use in SaaS applications and business purposes.

41
alembic.ini Normal file
View File

@ -0,0 +1,41 @@
[alembic]
script_location = alembic
prepend_sys_path = .
version_path_separator = os
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
[post_write_hooks]
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

62
alembic/env.py Normal file
View File

@ -0,0 +1,62 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
import sys
import os
# Add the parent directory to the path to import app modules
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from app.db.base import Base
from app.core.config import settings
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Set the SQLAlchemy URL from our settings
config.set_main_option("sqlalchemy.url", settings.SQLALCHEMY_DATABASE_URL)
# Interpret the config file for Python logging.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode."""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
alembic/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,148 @@
"""Initial tables
Revision ID: 001
Revises:
Create Date: 2024-01-01 00:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "001"
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
# Create users table
op.create_table(
"users",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("email", sa.String(), nullable=False),
sa.Column("hashed_password", sa.String(), nullable=False),
sa.Column("full_name", sa.String(), nullable=False),
sa.Column("company_name", sa.String(), nullable=True),
sa.Column("is_active", sa.Boolean(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("CURRENT_TIMESTAMP"),
nullable=True,
),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_users_id"), "users", ["id"], unique=False)
op.create_index(op.f("ix_users_email"), "users", ["email"], unique=True)
# Create customers table
op.create_table(
"customers",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("name", sa.String(), nullable=False),
sa.Column("email", sa.String(), nullable=False),
sa.Column("phone", sa.String(), nullable=True),
sa.Column("address", sa.Text(), nullable=True),
sa.Column("city", sa.String(), nullable=True),
sa.Column("state", sa.String(), nullable=True),
sa.Column("zip_code", sa.String(), nullable=True),
sa.Column("country", sa.String(), nullable=True),
sa.Column("user_id", sa.Integer(), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("CURRENT_TIMESTAMP"),
nullable=True,
),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(
["user_id"],
["users.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_customers_id"), "customers", ["id"], unique=False)
# Create invoices table
op.create_table(
"invoices",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("invoice_number", sa.String(), nullable=False),
sa.Column("customer_id", sa.Integer(), nullable=False),
sa.Column("user_id", sa.Integer(), nullable=False),
sa.Column(
"issue_date",
sa.DateTime(timezone=True),
server_default=sa.text("CURRENT_TIMESTAMP"),
nullable=True,
),
sa.Column("due_date", sa.DateTime(timezone=True), nullable=False),
sa.Column(
"status",
sa.Enum(
"DRAFT", "SENT", "PAID", "OVERDUE", "CANCELLED", name="invoicestatus"
),
nullable=True,
),
sa.Column("subtotal", sa.Numeric(precision=10, scale=2), nullable=True),
sa.Column("tax_rate", sa.Numeric(precision=5, scale=2), nullable=True),
sa.Column("tax_amount", sa.Numeric(precision=10, scale=2), nullable=True),
sa.Column("total", sa.Numeric(precision=10, scale=2), nullable=True),
sa.Column("notes", sa.Text(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("CURRENT_TIMESTAMP"),
nullable=True,
),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(
["customer_id"],
["customers.id"],
),
sa.ForeignKeyConstraint(
["user_id"],
["users.id"],
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("invoice_number"),
)
op.create_index(op.f("ix_invoices_id"), "invoices", ["id"], unique=False)
# Create invoice_items table
op.create_table(
"invoice_items",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("invoice_id", sa.Integer(), nullable=False),
sa.Column("description", sa.String(), nullable=False),
sa.Column("quantity", sa.Numeric(precision=10, scale=2), nullable=False),
sa.Column("unit_price", sa.Numeric(precision=10, scale=2), nullable=False),
sa.Column("total_price", sa.Numeric(precision=10, scale=2), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("CURRENT_TIMESTAMP"),
nullable=True,
),
sa.ForeignKeyConstraint(
["invoice_id"],
["invoices.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_invoice_items_id"), "invoice_items", ["id"], unique=False)
def downgrade() -> None:
op.drop_index(op.f("ix_invoice_items_id"), table_name="invoice_items")
op.drop_table("invoice_items")
op.drop_index(op.f("ix_invoices_id"), table_name="invoices")
op.drop_table("invoices")
op.drop_index(op.f("ix_customers_id"), table_name="customers")
op.drop_table("customers")
op.drop_index(op.f("ix_users_email"), table_name="users")
op.drop_index(op.f("ix_users_id"), table_name="users")
op.drop_table("users")

0
app/__init__.py Normal file
View File

0
app/api/__init__.py Normal file
View File

67
app/api/auth.py Normal file
View File

@ -0,0 +1,67 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from datetime import timedelta
from app.db.session import get_db
from app.services.user_service import UserService
from app.schemas.user import UserCreate, User, Token, UserLogin
from app.core.security import create_access_token
from app.core.config import settings
router = APIRouter()
@router.post("/register", response_model=User, status_code=status.HTTP_201_CREATED)
def register(user_data: UserCreate, db: Session = Depends(get_db)):
user_service = UserService(db)
existing_user = user_service.get_user_by_email(user_data.email)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered"
)
user = user_service.create_user(user_data)
return user
@router.post("/login", response_model=Token)
def login(user_data: UserLogin, db: Session = Depends(get_db)):
user_service = UserService(db)
user = user_service.authenticate_user(user_data.email, user_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/token", response_model=Token)
def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)
):
user_service = UserService(db)
user = user_service.authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}

83
app/api/customers.py Normal file
View File

@ -0,0 +1,83 @@
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from typing import List
from app.db.session import get_db
from app.services.customer_service import CustomerService
from app.schemas.customer import Customer, CustomerCreate, CustomerUpdate
from app.core.deps import get_current_user
from app.models.user import User
router = APIRouter()
@router.get("/", response_model=List[Customer])
def get_customers(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
customer_service = CustomerService(db)
return customer_service.get_customers(current_user.id, skip, limit)
@router.post("/", response_model=Customer, status_code=status.HTTP_201_CREATED)
def create_customer(
customer_data: CustomerCreate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
customer_service = CustomerService(db)
return customer_service.create_customer(customer_data, current_user.id)
@router.get("/{customer_id}", response_model=Customer)
def get_customer(
customer_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
customer_service = CustomerService(db)
customer = customer_service.get_customer(customer_id, current_user.id)
if not customer:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Customer not found"
)
return customer
@router.put("/{customer_id}", response_model=Customer)
def update_customer(
customer_id: int,
customer_data: CustomerUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
customer_service = CustomerService(db)
customer = customer_service.update_customer(
customer_id, customer_data, current_user.id
)
if not customer:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Customer not found"
)
return customer
@router.delete("/{customer_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_customer(
customer_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
customer_service = CustomerService(db)
success = customer_service.delete_customer(customer_id, current_user.id)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Customer not found"
)

100
app/api/invoices.py Normal file
View File

@ -0,0 +1,100 @@
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from typing import List
from app.db.session import get_db
from app.services.invoice_service import InvoiceService
from app.schemas.invoice import Invoice, InvoiceCreate, InvoiceUpdate
from app.core.deps import get_current_user
from app.models.user import User
from app.models.invoice import InvoiceStatus
router = APIRouter()
@router.get("/", response_model=List[Invoice])
def get_invoices(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
invoice_service = InvoiceService(db)
return invoice_service.get_invoices(current_user.id, skip, limit)
@router.post("/", response_model=Invoice, status_code=status.HTTP_201_CREATED)
def create_invoice(
invoice_data: InvoiceCreate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
invoice_service = InvoiceService(db)
return invoice_service.create_invoice(invoice_data, current_user.id)
@router.get("/{invoice_id}", response_model=Invoice)
def get_invoice(
invoice_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
invoice_service = InvoiceService(db)
invoice = invoice_service.get_invoice(invoice_id, current_user.id)
if not invoice:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Invoice not found"
)
return invoice
@router.put("/{invoice_id}", response_model=Invoice)
def update_invoice(
invoice_id: int,
invoice_data: InvoiceUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
invoice_service = InvoiceService(db)
invoice = invoice_service.update_invoice(invoice_id, invoice_data, current_user.id)
if not invoice:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Invoice not found"
)
return invoice
@router.delete("/{invoice_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_invoice(
invoice_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
invoice_service = InvoiceService(db)
success = invoice_service.delete_invoice(invoice_id, current_user.id)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Invoice not found"
)
@router.patch("/{invoice_id}/status", response_model=Invoice)
def update_invoice_status(
invoice_id: int,
status: InvoiceStatus,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
invoice_service = InvoiceService(db)
invoice = invoice_service.update_invoice_status(invoice_id, status, current_user.id)
if not invoice:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Invoice not found"
)
return invoice

9
app/api/routes.py Normal file
View File

@ -0,0 +1,9 @@
from fastapi import APIRouter
from app.api import auth, users, customers, invoices
api_router = APIRouter()
api_router.include_router(auth.router, prefix="/auth", tags=["authentication"])
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(customers.router, prefix="/customers", tags=["customers"])
api_router.include_router(invoices.router, prefix="/invoices", tags=["invoices"])

31
app/api/users.py Normal file
View File

@ -0,0 +1,31 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.services.user_service import UserService
from app.schemas.user import User, UserUpdate
from app.core.deps import get_current_user
from app.models.user import User as UserModel
router = APIRouter()
@router.get("/me", response_model=User)
def get_current_user_info(current_user: UserModel = Depends(get_current_user)):
return current_user
@router.put("/me", response_model=User)
def update_current_user(
user_data: UserUpdate,
current_user: UserModel = Depends(get_current_user),
db: Session = Depends(get_db),
):
user_service = UserService(db)
updated_user = user_service.update_user(current_user.id, user_data)
if not updated_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
return updated_user

22
app/core/config.py Normal file
View File

@ -0,0 +1,22 @@
import os
from pathlib import Path
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
SECRET_KEY: str = os.getenv("SECRET_KEY", "your-secret-key-change-in-production")
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
DB_DIR: Path = Path("/app/storage/db")
def __init__(self):
super().__init__()
self.DB_DIR.mkdir(parents=True, exist_ok=True)
@property
def SQLALCHEMY_DATABASE_URL(self) -> str:
return f"sqlite:///{self.DB_DIR}/db.sqlite"
settings = Settings()

46
app/core/deps.py Normal file
View File

@ -0,0 +1,46 @@
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.models.user import User
from app.core.security import verify_token
security = HTTPBearer()
def get_current_user(
db: Session = Depends(get_db),
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> User:
token = credentials.credentials
payload = verify_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
email: str = payload.get("sub")
if email is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user = db.query(User).filter(User.email == email).first()
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
)
return user

40
app/core/security.py Normal file
View File

@ -0,0 +1,40 @@
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(
to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM
)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def verify_token(token: str) -> Optional[dict]:
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
return payload
except JWTError:
return None

3
app/db/base.py Normal file
View File

@ -0,0 +1,3 @@
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

17
app/db/session.py Normal file
View File

@ -0,0 +1,17 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
engine = create_engine(
settings.SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

12
app/models/__init__.py Normal file
View File

@ -0,0 +1,12 @@
from sqlalchemy.orm import relationship
from .user import User
from .customer import Customer as Customer
from .invoice import (
Invoice as Invoice,
InvoiceItem as InvoiceItem,
InvoiceStatus as InvoiceStatus,
)
# Update relationships after all models are imported
User.customers = relationship("Customer", back_populates="owner")
User.invoices = relationship("Invoice", back_populates="user")

24
app/models/customer.py Normal file
View File

@ -0,0 +1,24 @@
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Text
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.db.base import Base
class Customer(Base):
__tablename__ = "customers"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
email = Column(String, nullable=False)
phone = Column(String, nullable=True)
address = Column(Text, nullable=True)
city = Column(String, nullable=True)
state = Column(String, nullable=True)
zip_code = Column(String, nullable=True)
country = Column(String, nullable=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
owner = relationship("User", back_populates="customers")
invoices = relationship("Invoice", back_populates="customer")

61
app/models/invoice.py Normal file
View File

@ -0,0 +1,61 @@
from sqlalchemy import (
Column,
Integer,
String,
DateTime,
ForeignKey,
Numeric,
Text,
Enum,
)
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.db.base import Base
import enum
class InvoiceStatus(enum.Enum):
DRAFT = "draft"
SENT = "sent"
PAID = "paid"
OVERDUE = "overdue"
CANCELLED = "cancelled"
class Invoice(Base):
__tablename__ = "invoices"
id = Column(Integer, primary_key=True, index=True)
invoice_number = Column(String, unique=True, nullable=False)
customer_id = Column(Integer, ForeignKey("customers.id"), nullable=False)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
issue_date = Column(DateTime(timezone=True), server_default=func.now())
due_date = Column(DateTime(timezone=True), nullable=False)
status = Column(Enum(InvoiceStatus), default=InvoiceStatus.DRAFT)
subtotal = Column(Numeric(10, 2), default=0)
tax_rate = Column(Numeric(5, 2), default=0)
tax_amount = Column(Numeric(10, 2), default=0)
total = Column(Numeric(10, 2), default=0)
notes = Column(Text, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
customer = relationship("Customer", back_populates="invoices")
user = relationship("User", back_populates="invoices")
items = relationship(
"InvoiceItem", back_populates="invoice", cascade="all, delete-orphan"
)
class InvoiceItem(Base):
__tablename__ = "invoice_items"
id = Column(Integer, primary_key=True, index=True)
invoice_id = Column(Integer, ForeignKey("invoices.id"), nullable=False)
description = Column(String, nullable=False)
quantity = Column(Numeric(10, 2), nullable=False)
unit_price = Column(Numeric(10, 2), nullable=False)
total_price = Column(Numeric(10, 2), nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
invoice = relationship("Invoice", back_populates="items")

16
app/models/user.py Normal file
View File

@ -0,0 +1,16 @@
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from sqlalchemy.sql import func
from app.db.base import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
full_name = Column(String, nullable=False)
company_name = Column(String, nullable=True)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

43
app/schemas/customer.py Normal file
View File

@ -0,0 +1,43 @@
from pydantic import BaseModel, EmailStr
from typing import Optional
from datetime import datetime
class CustomerBase(BaseModel):
name: str
email: EmailStr
phone: Optional[str] = None
address: Optional[str] = None
city: Optional[str] = None
state: Optional[str] = None
zip_code: Optional[str] = None
country: Optional[str] = None
class CustomerCreate(CustomerBase):
pass
class CustomerUpdate(BaseModel):
name: Optional[str] = None
email: Optional[EmailStr] = None
phone: Optional[str] = None
address: Optional[str] = None
city: Optional[str] = None
state: Optional[str] = None
zip_code: Optional[str] = None
country: Optional[str] = None
class CustomerInDB(CustomerBase):
id: int
user_id: int
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class Customer(CustomerInDB):
pass

79
app/schemas/invoice.py Normal file
View File

@ -0,0 +1,79 @@
from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime
from decimal import Decimal
from app.models.invoice import InvoiceStatus
class InvoiceItemBase(BaseModel):
description: str
quantity: Decimal
unit_price: Decimal
class InvoiceItemCreate(InvoiceItemBase):
pass
class InvoiceItemUpdate(BaseModel):
description: Optional[str] = None
quantity: Optional[Decimal] = None
unit_price: Optional[Decimal] = None
class InvoiceItemInDB(InvoiceItemBase):
id: int
invoice_id: int
total_price: Decimal
created_at: datetime
class Config:
from_attributes = True
class InvoiceItem(InvoiceItemInDB):
pass
class InvoiceBase(BaseModel):
customer_id: int
due_date: datetime
tax_rate: Optional[Decimal] = Decimal("0")
notes: Optional[str] = None
class InvoiceCreate(InvoiceBase):
items: List[InvoiceItemCreate]
class InvoiceUpdate(BaseModel):
customer_id: Optional[int] = None
due_date: Optional[datetime] = None
status: Optional[InvoiceStatus] = None
tax_rate: Optional[Decimal] = None
notes: Optional[str] = None
items: Optional[List[InvoiceItemCreate]] = None
class InvoiceInDB(InvoiceBase):
id: int
invoice_number: str
user_id: int
issue_date: datetime
status: InvoiceStatus
subtotal: Decimal
tax_amount: Decimal
total: Decimal
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class Invoice(InvoiceInDB):
items: List[InvoiceItem] = []
class InvoiceWithCustomer(Invoice):
customer: Optional[dict] = None

47
app/schemas/user.py Normal file
View File

@ -0,0 +1,47 @@
from pydantic import BaseModel, EmailStr
from typing import Optional
from datetime import datetime
class UserBase(BaseModel):
email: EmailStr
full_name: str
company_name: Optional[str] = None
class UserCreate(UserBase):
password: str
class UserUpdate(BaseModel):
full_name: Optional[str] = None
company_name: Optional[str] = None
is_active: Optional[bool] = None
class UserInDB(UserBase):
id: int
is_active: bool
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class User(UserInDB):
pass
class UserLogin(BaseModel):
email: EmailStr
password: str
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
email: Optional[str] = None

View File

@ -0,0 +1,58 @@
from sqlalchemy.orm import Session
from app.models.customer import Customer
from app.schemas.customer import CustomerCreate, CustomerUpdate
from typing import List, Optional
class CustomerService:
def __init__(self, db: Session):
self.db = db
def get_customers(
self, user_id: int, skip: int = 0, limit: int = 100
) -> List[Customer]:
return (
self.db.query(Customer)
.filter(Customer.user_id == user_id)
.offset(skip)
.limit(limit)
.all()
)
def get_customer(self, customer_id: int, user_id: int) -> Optional[Customer]:
return (
self.db.query(Customer)
.filter(Customer.id == customer_id, Customer.user_id == user_id)
.first()
)
def create_customer(self, customer_data: CustomerCreate, user_id: int) -> Customer:
customer = Customer(**customer_data.dict(), user_id=user_id)
self.db.add(customer)
self.db.commit()
self.db.refresh(customer)
return customer
def update_customer(
self, customer_id: int, customer_data: CustomerUpdate, user_id: int
) -> Optional[Customer]:
customer = self.get_customer(customer_id, user_id)
if not customer:
return None
update_data = customer_data.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(customer, field, value)
self.db.commit()
self.db.refresh(customer)
return customer
def delete_customer(self, customer_id: int, user_id: int) -> bool:
customer = self.get_customer(customer_id, user_id)
if not customer:
return False
self.db.delete(customer)
self.db.commit()
return True

View File

@ -0,0 +1,142 @@
from sqlalchemy.orm import Session
from app.models.invoice import Invoice, InvoiceItem, InvoiceStatus
from app.schemas.invoice import InvoiceCreate, InvoiceUpdate
from typing import List, Optional
from decimal import Decimal
import uuid
class InvoiceService:
def __init__(self, db: Session):
self.db = db
def get_invoices(
self, user_id: int, skip: int = 0, limit: int = 100
) -> List[Invoice]:
return (
self.db.query(Invoice)
.filter(Invoice.user_id == user_id)
.offset(skip)
.limit(limit)
.all()
)
def get_invoice(self, invoice_id: int, user_id: int) -> Optional[Invoice]:
return (
self.db.query(Invoice)
.filter(Invoice.id == invoice_id, Invoice.user_id == user_id)
.first()
)
def create_invoice(self, invoice_data: InvoiceCreate, user_id: int) -> Invoice:
# Generate unique invoice number
invoice_number = f"INV-{uuid.uuid4().hex[:8].upper()}"
# Calculate totals
subtotal = sum(item.quantity * item.unit_price for item in invoice_data.items)
tax_amount = (
subtotal * (invoice_data.tax_rate / 100)
if invoice_data.tax_rate
else Decimal("0")
)
total = subtotal + tax_amount
# Create invoice
invoice = Invoice(
invoice_number=invoice_number,
customer_id=invoice_data.customer_id,
user_id=user_id,
due_date=invoice_data.due_date,
tax_rate=invoice_data.tax_rate or Decimal("0"),
subtotal=subtotal,
tax_amount=tax_amount,
total=total,
notes=invoice_data.notes,
)
self.db.add(invoice)
self.db.flush() # Get the invoice ID
# Create invoice items
for item_data in invoice_data.items:
item_total = item_data.quantity * item_data.unit_price
item = InvoiceItem(
invoice_id=invoice.id,
description=item_data.description,
quantity=item_data.quantity,
unit_price=item_data.unit_price,
total_price=item_total,
)
self.db.add(item)
self.db.commit()
self.db.refresh(invoice)
return invoice
def update_invoice(
self, invoice_id: int, invoice_data: InvoiceUpdate, user_id: int
) -> Optional[Invoice]:
invoice = self.get_invoice(invoice_id, user_id)
if not invoice:
return None
# Update basic fields
update_data = invoice_data.dict(exclude_unset=True, exclude={"items"})
for field, value in update_data.items():
setattr(invoice, field, value)
# Update items if provided
if invoice_data.items is not None:
# Delete existing items
self.db.query(InvoiceItem).filter(
InvoiceItem.invoice_id == invoice_id
).delete()
# Add new items
subtotal = Decimal("0")
for item_data in invoice_data.items:
item_total = item_data.quantity * item_data.unit_price
subtotal += item_total
item = InvoiceItem(
invoice_id=invoice.id,
description=item_data.description,
quantity=item_data.quantity,
unit_price=item_data.unit_price,
total_price=item_total,
)
self.db.add(item)
# Recalculate totals
tax_amount = (
subtotal * (invoice.tax_rate / 100)
if invoice.tax_rate
else Decimal("0")
)
invoice.subtotal = subtotal
invoice.tax_amount = tax_amount
invoice.total = subtotal + tax_amount
self.db.commit()
self.db.refresh(invoice)
return invoice
def delete_invoice(self, invoice_id: int, user_id: int) -> bool:
invoice = self.get_invoice(invoice_id, user_id)
if not invoice:
return False
self.db.delete(invoice)
self.db.commit()
return True
def update_invoice_status(
self, invoice_id: int, status: InvoiceStatus, user_id: int
) -> Optional[Invoice]:
invoice = self.get_invoice(invoice_id, user_id)
if not invoice:
return None
invoice.status = status
self.db.commit()
self.db.refresh(invoice)
return invoice

View File

@ -0,0 +1,48 @@
from sqlalchemy.orm import Session
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
from app.core.security import get_password_hash, verify_password
from typing import Optional
class UserService:
def __init__(self, db: Session):
self.db = db
def get_user_by_email(self, email: str) -> Optional[User]:
return self.db.query(User).filter(User.email == email).first()
def get_user_by_id(self, user_id: int) -> Optional[User]:
return self.db.query(User).filter(User.id == user_id).first()
def create_user(self, user_data: UserCreate) -> User:
hashed_password = get_password_hash(user_data.password)
user = User(
email=user_data.email,
hashed_password=hashed_password,
full_name=user_data.full_name,
company_name=user_data.company_name,
)
self.db.add(user)
self.db.commit()
self.db.refresh(user)
return user
def authenticate_user(self, email: str, password: str) -> Optional[User]:
user = self.get_user_by_email(email)
if not user or not verify_password(password, user.hashed_password):
return None
return user
def update_user(self, user_id: int, user_data: UserUpdate) -> Optional[User]:
user = self.get_user_by_id(user_id)
if not user:
return None
update_data = user_data.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(user, field, value)
self.db.commit()
self.db.refresh(user)
return user

36
main.py Normal file
View File

@ -0,0 +1,36 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.routes import api_router
app = FastAPI(
title="SaaS Invoicing Application",
description="A comprehensive invoicing solution for SaaS businesses",
version="1.0.0",
openapi_url="/openapi.json",
docs_url="/docs",
redoc_url="/redoc",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(api_router)
@app.get("/")
async def root():
return {
"title": "SaaS Invoicing Application",
"documentation": "/docs",
"health": "/health",
}
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "invoicing-api"}

12
requirements.txt Normal file
View File

@ -0,0 +1,12 @@
fastapi==0.104.1
uvicorn[standard]==0.24.0
sqlalchemy==2.0.23
alembic==1.12.1
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.6
pydantic==2.5.0
pydantic-settings==2.1.0
python-dotenv==1.0.0
email-validator==2.1.0
ruff==0.1.6