Create SaaS invoicing application with FastAPI and SQLite

- Set up project structure with modular organization
- Implement database models for users, organizations, clients, invoices
- Create Alembic migration scripts for database setup
- Implement JWT-based authentication and authorization
- Create API endpoints for users, organizations, clients, invoices
- Add PDF generation for invoices using ReportLab
- Add comprehensive documentation in README
This commit is contained in:
Automated Action 2025-06-06 11:21:11 +00:00
parent 388534ec6c
commit c8aed27755
43 changed files with 2214 additions and 2 deletions

204
README.md
View File

@ -1,3 +1,203 @@
# FastAPI Application # SaaS Invoicing Application
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform. A comprehensive SaaS invoicing application built with FastAPI and SQLite, designed for businesses to manage organizations, clients, and invoices efficiently.
## Features
- **Multi-tenant Architecture**: Supports multiple organizations with user management
- **User Authentication**: Secure JWT-based authentication and role-based permissions
- **Organization Management**: Create and manage organizations with detailed information
- **Client Management**: Maintain a database of clients for each organization
- **Invoice Management**: Create, update, and delete invoices with line items
- **PDF Generation**: Generate professional PDF invoices for sharing with clients
- **API Documentation**: Interactive API documentation with Swagger UI and ReDoc
## Prerequisites
- Python 3.8+
- pip (Python package manager)
## Environment Variables
The application uses the following environment variables:
| Variable | Description | Default Value |
|----------|-------------|---------------|
| SECRET_KEY | Secret key for JWT token generation | CHANGE_ME_TO_A_SECURE_RANDOM_STRING |
| ACCESS_TOKEN_EXPIRE_MINUTES | Access token expiration time in minutes | 30 |
| SERVER_HOST | Host URL for the server | http://localhost:8000 |
## Installation
1. Clone the repository:
```bash
git clone https://github.com/yourusername/saasinvoicingapplication.git
cd saasinvoicingapplication
```
2. Create a virtual environment (optional but recommended):
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
3. Install dependencies:
```bash
pip install -r requirements.txt
```
4. Create a `.env` file with your environment variables:
```
SECRET_KEY=your-secure-secret-key
ACCESS_TOKEN_EXPIRE_MINUTES=30
SERVER_HOST=http://localhost:8000
```
5. Initialize the database:
```bash
mkdir -p /app/storage/db
python -c "from app.db.base import Base; from app.db.session import engine; Base.metadata.create_all(bind=engine)"
alembic upgrade head
```
## Running the Application
Start the application using uvicorn:
```bash
uvicorn main:app --host 0.0.0.0 --port 8000 --reload
```
The API will be available at:
- API: http://localhost:8000
- Swagger UI Documentation: http://localhost:8000/docs
- ReDoc Documentation: http://localhost:8000/redoc
## API Structure
The API follows a RESTful design and is structured as follows:
### Authentication Endpoints
- `POST /api/v1/auth/token` - Get access token
- `POST /api/v1/auth/test-token` - Test token validity
### User Management
- `GET /api/v1/users/` - List users (admin only)
- `POST /api/v1/users/` - Create user (admin only)
- `GET /api/v1/users/me` - Get current user info
- `PUT /api/v1/users/me` - Update current user
- `GET /api/v1/users/{user_id}` - Get user by ID
- `PUT /api/v1/users/{user_id}` - Update user (admin only)
### Organization Management
- `GET /api/v1/organizations/` - List organizations
- `POST /api/v1/organizations/` - Create organization (admin only)
- `GET /api/v1/organizations/{id}` - Get organization by ID
- `PUT /api/v1/organizations/{id}` - Update organization
- `DELETE /api/v1/organizations/{id}` - Delete organization (admin only)
### Client Management
- `GET /api/v1/clients/` - List clients
- `POST /api/v1/clients/` - Create client
- `GET /api/v1/clients/{id}` - Get client by ID
- `PUT /api/v1/clients/{id}` - Update client
- `DELETE /api/v1/clients/{id}` - Delete client
### Invoice Management
- `GET /api/v1/invoices/` - List invoices
- `POST /api/v1/invoices/` - Create invoice
- `GET /api/v1/invoices/{id}` - Get invoice by ID
- `PUT /api/v1/invoices/{id}` - Update invoice
- `DELETE /api/v1/invoices/{id}` - Delete invoice
- `GET /api/v1/invoices/{id}/pdf` - Generate PDF for invoice
## Data Models
### User
- `id`: Unique identifier
- `email`: Email address (unique)
- `full_name`: Full name
- `hashed_password`: Hashed password
- `is_active`: User status
- `is_superuser`: Admin status
- `organization_id`: Associated organization
### Organization
- `id`: Unique identifier
- `name`: Organization name
- `address`, `city`, `state`, `postal_code`, `country`: Address information
- `phone`, `email`, `website`: Contact information
- `tax_id`: Tax identification number
- `logo_url`: URL to organization logo
### Client
- `id`: Unique identifier
- `name`: Client name
- `contact_name`: Primary contact
- `email`, `phone`: Contact information
- `address`, `city`, `state`, `postal_code`, `country`: Address information
- `tax_id`: Tax identification number
- `notes`: Additional notes
- `organization_id`: Associated organization
- `created_by_id`: User who created the client
### Invoice
- `id`: Unique identifier
- `invoice_number`: Invoice reference number
- `status`: Invoice status (draft, sent, paid, overdue, cancelled)
- `issue_date`, `due_date`: Invoice dates
- `subtotal`, `tax_rate`, `tax_amount`, `discount`, `total`: Financial information
- `notes`, `terms`: Additional information
- `is_recurring`, `recurring_interval`: Recurring invoice details
- `client_id`: Associated client
- `organization_id`: Associated organization
- `created_by_id`: User who created the invoice
### InvoiceItem
- `id`: Unique identifier
- `description`: Item description
- `quantity`, `unit_price`, `amount`: Item details
- `invoice_id`: Associated invoice
## Development
### Running Tests
Run tests using pytest:
```bash
pytest
```
### Database Migrations
Generate a new migration after model changes:
```bash
alembic revision --autogenerate -m "Description of changes"
```
Apply migrations:
```bash
alembic upgrade head
```
## License
This project is licensed under the MIT License - see the LICENSE file for details.

84
alembic.ini Normal file
View File

@ -0,0 +1,84 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat migrations/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks=black
# black.type=console_scripts
# black.entrypoint=black
# black.options=-l 79
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

1
app/api/__init__.py Normal file
View File

@ -0,0 +1 @@
# API package initialization

54
app/api/deps.py Normal file
View File

@ -0,0 +1,54 @@
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app import models, schemas
from app.core.config import settings
from app.db.session import get_db
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/auth/token"
)
def get_current_user(
db: Session = Depends(get_db),
token: str = Depends(oauth2_scheme)
) -> models.User:
try:
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM]
)
token_data = schemas.TokenPayload(**payload)
except (jwt.JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
user = db.query(models.User).filter(models.User.id == token_data.sub).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
def get_current_active_user(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def get_current_active_superuser(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not current_user.is_superuser:
raise HTTPException(
status_code=400, detail="The user doesn't have enough privileges"
)
return current_user

1
app/api/v1/__init__.py Normal file
View File

@ -0,0 +1 @@
# API v1 package initialization

10
app/api/v1/api.py Normal file
View File

@ -0,0 +1,10 @@
from fastapi import APIRouter
from app.api.v1.endpoints import auth, users, organizations, clients, invoices
api_router = APIRouter()
api_router.include_router(auth.router, prefix="/auth", tags=["authentication"])
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(organizations.router, prefix="/organizations", tags=["organizations"])
api_router.include_router(clients.router, prefix="/clients", tags=["clients"])
api_router.include_router(invoices.router, prefix="/invoices", tags=["invoices"])

View File

@ -0,0 +1 @@
# API endpoints package initialization

View File

@ -0,0 +1,50 @@
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
from app.core import security
from app.core.config import settings
router = APIRouter()
@router.post("/token", response_model=schemas.Token)
def login_access_token(
db: Session = Depends(deps.get_db),
form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests
"""
user = crud.user.authenticate(
db, email=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Incorrect email or password",
)
elif not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/test-token", response_model=schemas.User)
def test_token(current_user: models.User = Depends(deps.get_current_user)) -> Any:
"""
Test access token
"""
return current_user

View File

@ -0,0 +1,137 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.Client])
def read_clients(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve clients.
"""
if crud.user.is_superuser(current_user):
clients = crud.client.get_multi(db, skip=skip, limit=limit)
elif current_user.organization_id:
# Filter clients by the user's organization
clients = (
db.query(models.Client)
.filter(models.Client.organization_id == current_user.organization_id)
.offset(skip)
.limit(limit)
.all()
)
else:
clients = []
return clients
@router.post("/", response_model=schemas.Client)
def create_client(
*,
db: Session = Depends(deps.get_db),
client_in: schemas.ClientCreate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new client.
"""
# Ensure user has permission to create a client for the specified organization
if not crud.user.is_superuser(current_user) and (
current_user.organization_id != client_in.organization_id
):
raise HTTPException(status_code=400, detail="Not enough permissions")
# Create client with created_by information
client_data = client_in.dict()
client = models.Client(**client_data, created_by_id=current_user.id)
db.add(client)
db.commit()
db.refresh(client)
return client
@router.get("/{id}", response_model=schemas.Client)
def read_client(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get client by ID.
"""
client = crud.client.get(db, id=id)
if not client:
raise HTTPException(status_code=404, detail="Client not found")
if not crud.user.is_superuser(current_user) and (
current_user.organization_id != client.organization_id
):
raise HTTPException(status_code=400, detail="Not enough permissions")
return client
@router.put("/{id}", response_model=schemas.Client)
def update_client(
*,
db: Session = Depends(deps.get_db),
id: int,
client_in: schemas.ClientUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update a client.
"""
client = crud.client.get(db, id=id)
if not client:
raise HTTPException(status_code=404, detail="Client not found")
if not crud.user.is_superuser(current_user) and (
current_user.organization_id != client.organization_id
):
raise HTTPException(status_code=400, detail="Not enough permissions")
# Prevent changing organization_id if not a superuser
if (
client_in.organization_id is not None
and client_in.organization_id != client.organization_id
and not crud.user.is_superuser(current_user)
):
raise HTTPException(
status_code=400, detail="Cannot change client's organization"
)
client = crud.client.update(db, db_obj=client, obj_in=client_in)
return client
@router.delete("/{id}", response_model=schemas.Client)
def delete_client(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Delete a client.
"""
client = crud.client.get(db, id=id)
if not client:
raise HTTPException(status_code=404, detail="Client not found")
if not crud.user.is_superuser(current_user) and (
current_user.organization_id != client.organization_id
):
raise HTTPException(status_code=400, detail="Not enough permissions")
client = crud.client.remove(db, id=id)
return client

View File

@ -0,0 +1,201 @@
from typing import Any, List
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Response
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
from app.services.pdf_generator import generate_invoice_pdf
router = APIRouter()
@router.get("/", response_model=List[schemas.Invoice])
def read_invoices(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve invoices.
"""
if crud.user.is_superuser(current_user):
invoices = crud.invoice.get_multi(db, skip=skip, limit=limit)
elif current_user.organization_id:
# Filter invoices by the user's organization
invoices = (
db.query(models.Invoice)
.filter(models.Invoice.organization_id == current_user.organization_id)
.offset(skip)
.limit(limit)
.all()
)
else:
invoices = []
return invoices
@router.post("/", response_model=schemas.Invoice)
def create_invoice(
*,
db: Session = Depends(deps.get_db),
invoice_in: schemas.InvoiceCreate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new invoice.
"""
# Verify organization access
if not crud.user.is_superuser(current_user) and (
current_user.organization_id != invoice_in.organization_id
):
raise HTTPException(status_code=400, detail="Not enough permissions")
# Verify client belongs to the organization
client = crud.client.get(db, id=invoice_in.client_id)
if not client or client.organization_id != invoice_in.organization_id:
raise HTTPException(status_code=400, detail="Invalid client for this organization")
# Create invoice
invoice_data = invoice_in.dict(exclude={"items"})
invoice = models.Invoice(**invoice_data, created_by_id=current_user.id)
db.add(invoice)
db.commit()
db.refresh(invoice)
# Create invoice items
for item_data in invoice_in.items:
item = models.InvoiceItem(**item_data.dict(), invoice_id=invoice.id)
db.add(item)
db.commit()
db.refresh(invoice)
return invoice
@router.get("/{id}", response_model=schemas.Invoice)
def read_invoice(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get invoice by ID.
"""
invoice = crud.invoice.get(db, id=id)
if not invoice:
raise HTTPException(status_code=404, detail="Invoice not found")
if not crud.user.is_superuser(current_user) and (
current_user.organization_id != invoice.organization_id
):
raise HTTPException(status_code=400, detail="Not enough permissions")
return invoice
@router.put("/{id}", response_model=schemas.Invoice)
def update_invoice(
*,
db: Session = Depends(deps.get_db),
id: int,
invoice_in: schemas.InvoiceUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update an invoice.
"""
invoice = crud.invoice.get(db, id=id)
if not invoice:
raise HTTPException(status_code=404, detail="Invoice not found")
if not crud.user.is_superuser(current_user) and (
current_user.organization_id != invoice.organization_id
):
raise HTTPException(status_code=400, detail="Not enough permissions")
# Prevent changing organization_id if not a superuser
if (
invoice_in.organization_id is not None
and invoice_in.organization_id != invoice.organization_id
and not crud.user.is_superuser(current_user)
):
raise HTTPException(
status_code=400, detail="Cannot change invoice's organization"
)
# Update basic invoice data
update_data = invoice_in.dict(exclude={"items"}, exclude_unset=True)
for field, value in update_data.items():
setattr(invoice, field, value)
# Update invoice items if provided
if invoice_in.items:
# Delete existing items
db.query(models.InvoiceItem).filter(models.InvoiceItem.invoice_id == id).delete()
# Create new items
for item_data in invoice_in.items:
item = models.InvoiceItem(**item_data.dict(), invoice_id=id)
db.add(item)
db.commit()
db.refresh(invoice)
return invoice
@router.delete("/{id}", response_model=schemas.Invoice)
def delete_invoice(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Delete an invoice.
"""
invoice = crud.invoice.get(db, id=id)
if not invoice:
raise HTTPException(status_code=404, detail="Invoice not found")
if not crud.user.is_superuser(current_user) and (
current_user.organization_id != invoice.organization_id
):
raise HTTPException(status_code=400, detail="Not enough permissions")
invoice = crud.invoice.remove(db, id=id)
return invoice
@router.get("/{id}/pdf", response_class=Response)
def generate_pdf(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Generate a PDF for the invoice.
"""
invoice = crud.invoice.get(db, id=id)
if not invoice:
raise HTTPException(status_code=404, detail="Invoice not found")
if not crud.user.is_superuser(current_user) and (
current_user.organization_id != invoice.organization_id
):
raise HTTPException(status_code=400, detail="Not enough permissions")
# Generate PDF
pdf_content = generate_invoice_pdf(db, invoice)
# Return PDF as response
filename = f"invoice_{invoice.invoice_number}_{datetime.now().strftime('%Y%m%d')}.pdf"
return Response(
content=pdf_content,
media_type="application/pdf",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)

View File

@ -0,0 +1,103 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.Organization])
def read_organizations(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve organizations.
"""
if crud.user.is_superuser(current_user):
organizations = crud.organization.get_multi(db, skip=skip, limit=limit)
elif current_user.organization_id:
organizations = [crud.organization.get(db, id=current_user.organization_id)]
else:
organizations = []
return organizations
@router.post("/", response_model=schemas.Organization)
def create_organization(
*,
db: Session = Depends(deps.get_db),
organization_in: schemas.OrganizationCreate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Create new organization.
"""
organization = crud.organization.create(db, obj_in=organization_in)
return organization
@router.get("/{id}", response_model=schemas.Organization)
def read_organization(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get organization by ID.
"""
organization = crud.organization.get(db, id=id)
if not organization:
raise HTTPException(status_code=404, detail="Organization not found")
if not crud.user.is_superuser(current_user) and (
current_user.organization_id != id
):
raise HTTPException(status_code=400, detail="Not enough permissions")
return organization
@router.put("/{id}", response_model=schemas.Organization)
def update_organization(
*,
db: Session = Depends(deps.get_db),
id: int,
organization_in: schemas.OrganizationUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update an organization.
"""
organization = crud.organization.get(db, id=id)
if not organization:
raise HTTPException(status_code=404, detail="Organization not found")
if not crud.user.is_superuser(current_user) and (
current_user.organization_id != id
):
raise HTTPException(status_code=400, detail="Not enough permissions")
organization = crud.organization.update(
db, db_obj=organization, obj_in=organization_in
)
return organization
@router.delete("/{id}", response_model=schemas.Organization)
def delete_organization(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Delete an organization.
"""
organization = crud.organization.get(db, id=id)
if not organization:
raise HTTPException(status_code=404, detail="Organization not found")
organization = crud.organization.remove(db, id=id)
return organization

View File

@ -0,0 +1,119 @@
from typing import Any, List
from fastapi import APIRouter, Body, Depends, HTTPException
from fastapi.encoders import jsonable_encoder
from pydantic import EmailStr
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.User])
def read_users(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Retrieve users.
"""
users = crud.user.get_multi(db, skip=skip, limit=limit)
return users
@router.post("/", response_model=schemas.User)
def create_user(
*,
db: Session = Depends(deps.get_db),
user_in: schemas.UserCreate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Create new user.
"""
user = crud.user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this email already exists in the system.",
)
user = crud.user.create(db, obj_in=user_in)
return user
@router.put("/me", response_model=schemas.User)
def update_user_me(
*,
db: Session = Depends(deps.get_db),
password: str = Body(None),
full_name: str = Body(None),
email: EmailStr = Body(None),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update own user.
"""
current_user_data = jsonable_encoder(current_user)
user_in = schemas.UserUpdate(**current_user_data)
if password is not None:
user_in.password = password
if full_name is not None:
user_in.full_name = full_name
if email is not None:
user_in.email = email
user = crud.user.update(db, db_obj=current_user, obj_in=user_in)
return user
@router.get("/me", response_model=schemas.User)
def read_user_me(
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get current user.
"""
return current_user
@router.get("/{user_id}", response_model=schemas.User)
def read_user_by_id(
user_id: int,
current_user: models.User = Depends(deps.get_current_active_user),
db: Session = Depends(deps.get_db),
) -> Any:
"""
Get a specific user by id.
"""
user = crud.user.get(db, id=user_id)
if user == current_user:
return user
if not crud.user.is_superuser(current_user):
raise HTTPException(
status_code=400, detail="The user doesn't have enough privileges"
)
return user
@router.put("/{user_id}", response_model=schemas.User)
def update_user(
*,
db: Session = Depends(deps.get_db),
user_id: int,
user_in: schemas.UserUpdate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Update a user.
"""
user = crud.user.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system",
)
user = crud.user.update(db, db_obj=user, obj_in=user_in)
return user

51
app/core/config.py Normal file
View File

@ -0,0 +1,51 @@
from typing import List, Optional
from pydantic import AnyHttpUrl, EmailStr, validator
from pydantic_settings import BaseSettings
from pathlib import Path
class Settings(BaseSettings):
# Base
API_V1_STR: str = "/api/v1"
PROJECT_NAME: str = "SaaS Invoicing Application"
PROJECT_DESCRIPTION: str = "A SaaS invoicing application API built with FastAPI"
VERSION: str = "0.1.0"
# Server configuration
SERVER_HOST: str = "http://localhost:8000"
# Authentication
SECRET_KEY: str = "CHANGE_ME_TO_A_SECURE_RANDOM_STRING"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# Database
DB_DIR: Path = Path("/app") / "storage" / "db"
SQLALCHEMY_DATABASE_URL: str = f"sqlite:///{DB_DIR}/db.sqlite"
# CORS
BACKEND_CORS_ORIGINS: List[AnyHttpUrl] = []
@validator("BACKEND_CORS_ORIGINS", pre=True)
def assemble_cors_origins(cls, v: str | List[str]) -> List[str] | str:
if isinstance(v, str) and not v.startswith("["):
return [i.strip() for i in v.split(",")]
elif isinstance(v, (list, str)):
return v
raise ValueError(v)
# Emails
EMAILS_ENABLED: bool = False
EMAILS_FROM_EMAIL: Optional[EmailStr] = None
EMAILS_FROM_NAME: Optional[str] = None
# First superuser
FIRST_SUPERUSER_EMAIL: EmailStr = "admin@example.com"
FIRST_SUPERUSER_PASSWORD: str = "admin"
class Config:
case_sensitive = True
env_file = ".env"
settings = Settings()

28
app/core/security.py Normal file
View File

@ -0,0 +1,28 @@
from datetime import datetime, timedelta
from typing import Any, Optional, Union
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(
subject: Union[str, Any], expires_delta: Optional[timedelta] = None
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)

6
app/crud/__init__.py Normal file
View File

@ -0,0 +1,6 @@
from app.crud.crud_user import user
from app.crud.crud_organization import organization
from app.crud.crud_client import client
from app.crud.crud_invoice import invoice
__all__ = ["user", "organization", "client", "invoice"]

66
app/crud/base.py Normal file
View File

@ -0,0 +1,66 @@
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.db.base_class import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
"""
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
**Parameters**
* `model`: A SQLAlchemy model class
* `schema`: A Pydantic model (schema) class
"""
self.model = model
def get(self, db: Session, id: Any) -> Optional[ModelType]:
return db.query(self.model).filter(self.model.id == id).first()
def get_multi(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
return db.query(self.model).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self,
db: Session,
*,
db_obj: ModelType,
obj_in: Union[UpdateSchemaType, Dict[str, Any]]
) -> ModelType:
obj_data = jsonable_encoder(db_obj)
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def remove(self, db: Session, *, id: int) -> ModelType:
obj = db.query(self.model).get(id)
db.delete(obj)
db.commit()
return obj

10
app/crud/crud_client.py Normal file
View File

@ -0,0 +1,10 @@
from app.crud.base import CRUDBase
from app.models.client import Client
from app.schemas.client import ClientCreate, ClientUpdate
class CRUDClient(CRUDBase[Client, ClientCreate, ClientUpdate]):
pass
client = CRUDClient(Client)

10
app/crud/crud_invoice.py Normal file
View File

@ -0,0 +1,10 @@
from app.crud.base import CRUDBase
from app.models.invoice import Invoice
from app.schemas.invoice import InvoiceCreate, InvoiceUpdate
class CRUDInvoice(CRUDBase[Invoice, InvoiceCreate, InvoiceUpdate]):
pass
invoice = CRUDInvoice(Invoice)

View File

@ -0,0 +1,10 @@
from app.crud.base import CRUDBase
from app.models.organization import Organization
from app.schemas.organization import OrganizationCreate, OrganizationUpdate
class CRUDOrganization(CRUDBase[Organization, OrganizationCreate, OrganizationUpdate]):
pass
organization = CRUDOrganization(Organization)

57
app/crud/crud_user.py Normal file
View File

@ -0,0 +1,57 @@
from typing import Any, Dict, Optional, Union
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.crud.base import CRUDBase
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()
def create(self, db: Session, *, obj_in: UserCreate) -> User:
db_obj = User(
email=obj_in.email,
hashed_password=get_password_hash(obj_in.password),
full_name=obj_in.full_name,
is_superuser=obj_in.is_superuser,
is_active=obj_in.is_active,
organization_id=obj_in.organization_id,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self, db: Session, *, db_obj: User, obj_in: Union[UserUpdate, Dict[str, Any]]
) -> User:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
if update_data.get("password"):
hashed_password = get_password_hash(update_data["password"])
del update_data["password"]
update_data["hashed_password"] = hashed_password
return super().update(db, db_obj=db_obj, obj_in=update_data)
def authenticate(self, db: Session, *, email: str, password: str) -> Optional[User]:
user = self.get_by_email(db, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active(self, user: User) -> bool:
return user.is_active
def is_superuser(self, user: User) -> bool:
return user.is_superuser
user = CRUDUser(User)

8
app/db/__init__.py Normal file
View File

@ -0,0 +1,8 @@
from app.db.base import Base # noqa
from app.db.session import engine, SessionLocal, get_db # noqa
# Import all the models here so that Alembic can discover them
from app.models.user import User # noqa
from app.models.organization import Organization # noqa
from app.models.client import Client # noqa
from app.models.invoice import Invoice, InvoiceItem # noqa

3
app/db/base.py Normal file
View File

@ -0,0 +1,3 @@
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

20
app/db/base_class.py Normal file
View File

@ -0,0 +1,20 @@
from typing import Any
from sqlalchemy import Column, Integer, DateTime
from sqlalchemy.ext.declarative import as_declarative, declared_attr
import datetime
@as_declarative()
class Base:
id: Any
__name__: str
# Generate tablename automatically
@declared_attr
def __tablename__(cls) -> str:
return cls.__name__.lower()
# Common columns for all models
id = Column(Integer, primary_key=True, index=True)
created_at = Column(DateTime, default=datetime.datetime.utcnow)
updated_at = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)

21
app/db/session.py Normal file
View File

@ -0,0 +1,21 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
# Ensure the DB directory exists
settings.DB_DIR.mkdir(parents=True, exist_ok=True)
engine = create_engine(
settings.SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Dependency for database session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

6
app/models/__init__.py Normal file
View File

@ -0,0 +1,6 @@
from app.models.user import User
from app.models.organization import Organization
from app.models.client import Client
from app.models.invoice import Invoice, InvoiceItem
__all__ = ["User", "Organization", "Client", "Invoice", "InvoiceItem"]

26
app/models/client.py Normal file
View File

@ -0,0 +1,26 @@
from sqlalchemy import Column, String, Text, ForeignKey, Integer
from sqlalchemy.orm import relationship
from app.db.base_class import Base
class Client(Base):
name = Column(String, index=True, nullable=False)
contact_name = Column(String)
email = Column(String)
phone = Column(String)
address = Column(Text)
city = Column(String)
state = Column(String)
postal_code = Column(String)
country = Column(String)
tax_id = Column(String)
notes = Column(Text)
# Foreign keys
organization_id = Column(Integer, ForeignKey("organization.id", ondelete="CASCADE"))
created_by_id = Column(Integer, ForeignKey("user.id", ondelete="SET NULL"), nullable=True)
# Relationships
organization = relationship("Organization", back_populates="clients")
created_by = relationship("User", back_populates="clients")
invoices = relationship("Invoice", back_populates="client")

53
app/models/invoice.py Normal file
View File

@ -0,0 +1,53 @@
import enum
from sqlalchemy import Column, String, Text, ForeignKey, Integer, Enum, Float, Date, Boolean
from sqlalchemy.orm import relationship
from app.db.base_class import Base
class InvoiceStatus(str, enum.Enum):
DRAFT = "draft"
SENT = "sent"
PAID = "paid"
OVERDUE = "overdue"
CANCELLED = "cancelled"
class Invoice(Base):
invoice_number = Column(String, index=True, nullable=False)
status = Column(Enum(InvoiceStatus), default=InvoiceStatus.DRAFT)
issue_date = Column(Date, nullable=False)
due_date = Column(Date, nullable=False)
subtotal = Column(Float(precision=2), nullable=False)
tax_rate = Column(Float(precision=2), default=0.0)
tax_amount = Column(Float(precision=2), default=0.0)
discount = Column(Float(precision=2), default=0.0)
total = Column(Float(precision=2), nullable=False)
notes = Column(Text)
terms = Column(Text)
is_recurring = Column(Boolean, default=False)
recurring_interval = Column(String)
# Foreign keys
client_id = Column(Integer, ForeignKey("client.id", ondelete="CASCADE"))
organization_id = Column(Integer, ForeignKey("organization.id", ondelete="CASCADE"))
created_by_id = Column(Integer, ForeignKey("user.id", ondelete="SET NULL"), nullable=True)
# Relationships
items = relationship("InvoiceItem", back_populates="invoice", cascade="all, delete-orphan")
client = relationship("Client", back_populates="invoices")
organization = relationship("Organization", back_populates="invoices")
created_by = relationship("User", back_populates="invoices")
class InvoiceItem(Base):
description = Column(String, nullable=False)
quantity = Column(Float(precision=2), nullable=False)
unit_price = Column(Float(precision=2), nullable=False)
amount = Column(Float(precision=2), nullable=False)
# Foreign key
invoice_id = Column(Integer, ForeignKey("invoice.id", ondelete="CASCADE"))
# Relationship
invoice = relationship("Invoice", back_populates="items")

View File

@ -0,0 +1,22 @@
from sqlalchemy import Column, String, Text
from sqlalchemy.orm import relationship
from app.db.base_class import Base
class Organization(Base):
name = Column(String, index=True, nullable=False)
address = Column(Text)
city = Column(String)
state = Column(String)
postal_code = Column(String)
country = Column(String)
phone = Column(String)
email = Column(String)
website = Column(String)
tax_id = Column(String)
logo_url = Column(String)
# Relationships
users = relationship("User", back_populates="organization")
clients = relationship("Client", back_populates="organization")
invoices = relationship("Invoice", back_populates="organization")

21
app/models/user.py Normal file
View File

@ -0,0 +1,21 @@
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from app.db.base_class import Base
class User(Base):
email = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
full_name = Column(String, index=True)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
# Relationship with Organization
organization_id = Column(Integer, ForeignKey("organization.id", ondelete="CASCADE"), nullable=True)
organization = relationship("Organization", back_populates="users")
# Relationship with Invoice
invoices = relationship("Invoice", back_populates="created_by")
# Relationship with Client
clients = relationship("Client", back_populates="created_by")

1
app/schemas/__init__.py Normal file
View File

@ -0,0 +1 @@
# Import schemas to expose them at the package level

47
app/schemas/client.py Normal file
View File

@ -0,0 +1,47 @@
from typing import Optional
from pydantic import BaseModel, EmailStr
# Shared properties
class ClientBase(BaseModel):
name: Optional[str] = None
contact_name: Optional[str] = None
email: Optional[EmailStr] = None
phone: Optional[str] = None
address: Optional[str] = None
city: Optional[str] = None
state: Optional[str] = None
postal_code: Optional[str] = None
country: Optional[str] = None
tax_id: Optional[str] = None
notes: Optional[str] = None
organization_id: Optional[int] = None
# Properties to receive via API on creation
class ClientCreate(ClientBase):
name: str
organization_id: int
# Properties to receive via API on update
class ClientUpdate(ClientBase):
pass
class ClientInDBBase(ClientBase):
id: Optional[int] = None
created_by_id: Optional[int] = None
class Config:
orm_mode = True
# Additional properties to return via API
class Client(ClientInDBBase):
pass
# Additional properties stored in DB
class ClientInDB(ClientInDBBase):
pass

86
app/schemas/invoice.py Normal file
View File

@ -0,0 +1,86 @@
from typing import List, Optional
from datetime import date
from pydantic import BaseModel
from app.models.invoice import InvoiceStatus
# Invoice Item schemas
class InvoiceItemBase(BaseModel):
description: Optional[str] = None
quantity: Optional[float] = None
unit_price: Optional[float] = None
amount: Optional[float] = None
class InvoiceItemCreate(InvoiceItemBase):
description: str
quantity: float
unit_price: float
amount: float
class InvoiceItemUpdate(InvoiceItemBase):
pass
class InvoiceItemInDBBase(InvoiceItemBase):
id: Optional[int] = None
invoice_id: int
class Config:
orm_mode = True
class InvoiceItem(InvoiceItemInDBBase):
pass
# Invoice schemas
class InvoiceBase(BaseModel):
invoice_number: Optional[str] = None
status: Optional[InvoiceStatus] = None
issue_date: Optional[date] = None
due_date: Optional[date] = None
subtotal: Optional[float] = None
tax_rate: Optional[float] = None
tax_amount: Optional[float] = None
discount: Optional[float] = None
total: Optional[float] = None
notes: Optional[str] = None
terms: Optional[str] = None
is_recurring: Optional[bool] = False
recurring_interval: Optional[str] = None
client_id: Optional[int] = None
organization_id: Optional[int] = None
class InvoiceCreate(InvoiceBase):
invoice_number: str
issue_date: date
due_date: date
subtotal: float
total: float
client_id: int
organization_id: int
items: List[InvoiceItemCreate]
class InvoiceUpdate(InvoiceBase):
items: Optional[List[InvoiceItemCreate]] = None
class InvoiceInDBBase(InvoiceBase):
id: Optional[int] = None
created_by_id: Optional[int] = None
class Config:
orm_mode = True
class Invoice(InvoiceInDBBase):
items: List[InvoiceItem] = []
class InvoiceInDB(InvoiceInDBBase):
pass

View File

@ -0,0 +1,44 @@
from typing import Optional
from pydantic import BaseModel, EmailStr
# Shared properties
class OrganizationBase(BaseModel):
name: Optional[str] = None
address: Optional[str] = None
city: Optional[str] = None
state: Optional[str] = None
postal_code: Optional[str] = None
country: Optional[str] = None
phone: Optional[str] = None
email: Optional[EmailStr] = None
website: Optional[str] = None
tax_id: Optional[str] = None
logo_url: Optional[str] = None
# Properties to receive via API on creation
class OrganizationCreate(OrganizationBase):
name: str
# Properties to receive via API on update
class OrganizationUpdate(OrganizationBase):
pass
class OrganizationInDBBase(OrganizationBase):
id: Optional[int] = None
class Config:
orm_mode = True
# Additional properties to return via API
class Organization(OrganizationInDBBase):
pass
# Additional properties stored in DB
class OrganizationInDB(OrganizationInDBBase):
pass

11
app/schemas/token.py Normal file
View File

@ -0,0 +1,11 @@
from typing import Optional
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: Optional[int] = None

39
app/schemas/user.py Normal file
View File

@ -0,0 +1,39 @@
from typing import Optional
from pydantic import BaseModel, EmailStr
# Shared properties
class UserBase(BaseModel):
email: Optional[EmailStr] = None
full_name: Optional[str] = None
is_active: Optional[bool] = True
is_superuser: bool = False
organization_id: Optional[int] = None
# Properties to receive via API on creation
class UserCreate(UserBase):
email: EmailStr
password: str
# Properties to receive via API on update
class UserUpdate(UserBase):
password: Optional[str] = None
class UserInDBBase(UserBase):
id: Optional[int] = None
class Config:
orm_mode = True
# Additional properties to return via API
class User(UserInDBBase):
pass
# Additional properties stored in DB
class UserInDB(UserInDBBase):
hashed_password: str

1
app/services/__init__.py Normal file
View File

@ -0,0 +1 @@
# Service package initialization

View File

@ -0,0 +1,273 @@
import io
from reportlab.lib.pagesizes import letter
from reportlab.lib import colors
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from sqlalchemy.orm import Session
from app.models.invoice import Invoice
def generate_invoice_pdf(db: Session, invoice: Invoice) -> bytes:
"""
Generate a PDF document for an invoice.
Args:
db: Database session
invoice: Invoice model instance
Returns:
Bytes containing the PDF content
"""
# Create a BytesIO buffer to receive the PDF data
buffer = io.BytesIO()
# Create the PDF document using ReportLab
doc = SimpleDocTemplate(
buffer,
pagesize=letter,
rightMargin=72,
leftMargin=72,
topMargin=72,
bottomMargin=72
)
# Container for PDF elements
elements = []
# Get styles
styles = getSampleStyleSheet()
header_style = styles["Heading1"]
subheader_style = styles["Heading2"]
normal_style = styles["Normal"]
# Custom styles
label_style = ParagraphStyle(
"label",
parent=normal_style,
fontName="Helvetica-Bold",
fontSize=10,
)
# Add invoice header
elements.append(Paragraph(f"INVOICE #{invoice.invoice_number}", header_style))
elements.append(Spacer(1, 0.25 * inch))
# Add organization and client information
org = invoice.organization
client = invoice.client
# Date information
date_data = [
[Paragraph("Issue Date:", label_style), Paragraph(invoice.issue_date.strftime("%Y-%m-%d"), normal_style)],
[Paragraph("Due Date:", label_style), Paragraph(invoice.due_date.strftime("%Y-%m-%d"), normal_style)],
[Paragraph("Status:", label_style), Paragraph(invoice.status.value.upper(), normal_style)],
]
date_table = Table(date_data, colWidths=[1.5 * inch, 2 * inch])
date_table.setStyle(TableStyle([
("VALIGN", (0, 0), (-1, -1), "TOP"),
("ALIGN", (0, 0), (0, -1), "RIGHT"),
("TOPPADDING", (0, 0), (-1, -1), 1),
("BOTTOMPADDING", (0, 0), (-1, -1), 1),
]))
# Organization information
org_data = [
[Paragraph("From:", label_style)],
[Paragraph(org.name, normal_style)],
]
if org.address:
org_data.append([Paragraph(org.address, normal_style)])
city_state_zip = []
if org.city:
city_state_zip.append(org.city)
if org.state:
city_state_zip.append(org.state)
if org.postal_code:
city_state_zip.append(org.postal_code)
if city_state_zip:
org_data.append([Paragraph(", ".join(city_state_zip), normal_style)])
if org.country:
org_data.append([Paragraph(org.country, normal_style)])
if org.phone:
org_data.append([Paragraph(f"Phone: {org.phone}", normal_style)])
if org.email:
org_data.append([Paragraph(f"Email: {org.email}", normal_style)])
if org.tax_id:
org_data.append([Paragraph(f"Tax ID: {org.tax_id}", normal_style)])
org_table = Table(org_data, colWidths=[3 * inch])
org_table.setStyle(TableStyle([
("VALIGN", (0, 0), (-1, -1), "TOP"),
("TOPPADDING", (0, 0), (-1, -1), 1),
("BOTTOMPADDING", (0, 0), (-1, -1), 1),
]))
# Client information
client_data = [
[Paragraph("Bill To:", label_style)],
[Paragraph(client.name, normal_style)],
]
if client.contact_name:
client_data.append([Paragraph(f"Attn: {client.contact_name}", normal_style)])
if client.address:
client_data.append([Paragraph(client.address, normal_style)])
client_city_state_zip = []
if client.city:
client_city_state_zip.append(client.city)
if client.state:
client_city_state_zip.append(client.state)
if client.postal_code:
client_city_state_zip.append(client.postal_code)
if client_city_state_zip:
client_data.append([Paragraph(", ".join(client_city_state_zip), normal_style)])
if client.country:
client_data.append([Paragraph(client.country, normal_style)])
if client.email:
client_data.append([Paragraph(f"Email: {client.email}", normal_style)])
if client.phone:
client_data.append([Paragraph(f"Phone: {client.phone}", normal_style)])
client_table = Table(client_data, colWidths=[3 * inch])
client_table.setStyle(TableStyle([
("VALIGN", (0, 0), (-1, -1), "TOP"),
("TOPPADDING", (0, 0), (-1, -1), 1),
("BOTTOMPADDING", (0, 0), (-1, -1), 1),
]))
# Combine the top tables
top_data = [[date_table, "", ""]]
top_table = Table(top_data, colWidths=[2.5 * inch, 0.5 * inch, 3 * inch])
elements.append(top_table)
elements.append(Spacer(1, 0.25 * inch))
# Combine organization and client tables
address_data = [[org_table, client_table]]
address_table = Table(address_data, colWidths=[3 * inch, 3 * inch])
address_table.setStyle(TableStyle([
("VALIGN", (0, 0), (-1, -1), "TOP"),
]))
elements.append(address_table)
elements.append(Spacer(1, 0.5 * inch))
# Invoice items
elements.append(Paragraph("Invoice Items", subheader_style))
elements.append(Spacer(1, 0.25 * inch))
# Create data for the items table
items_data = [
[
Paragraph("Description", label_style),
Paragraph("Quantity", label_style),
Paragraph("Unit Price", label_style),
Paragraph("Amount", label_style),
]
]
for item in invoice.items:
items_data.append([
Paragraph(item.description, normal_style),
Paragraph(f"{item.quantity:.2f}", normal_style),
Paragraph(f"${item.unit_price:.2f}", normal_style),
Paragraph(f"${item.amount:.2f}", normal_style),
])
# Create the items table
items_table = Table(items_data, colWidths=[3.5 * inch, 1 * inch, 1 * inch, 1 * inch])
items_table.setStyle(TableStyle([
("BACKGROUND", (0, 0), (-1, 0), colors.lightgrey),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("ALIGN", (1, 0), (-1, -1), "RIGHT"),
("GRID", (0, 0), (-1, -1), 0.5, colors.grey),
("TOPPADDING", (0, 0), (-1, -1), 6),
("BOTTOMPADDING", (0, 0), (-1, -1), 6),
]))
elements.append(items_table)
elements.append(Spacer(1, 0.25 * inch))
# Summary table (subtotal, tax, discount, total)
summary_data = []
if invoice.subtotal:
summary_data.append([
"",
Paragraph("Subtotal:", label_style),
Paragraph(f"${invoice.subtotal:.2f}", normal_style),
])
if invoice.tax_rate:
summary_data.append([
"",
Paragraph(f"Tax ({invoice.tax_rate:.2f}%):", label_style),
Paragraph(f"${invoice.tax_amount:.2f}", normal_style),
])
if invoice.discount:
summary_data.append([
"",
Paragraph("Discount:", label_style),
Paragraph(f"${invoice.discount:.2f}", normal_style),
])
# Always include total
summary_data.append([
"",
Paragraph("Total:", ParagraphStyle(
"total",
parent=label_style,
fontSize=12,
)),
Paragraph(f"${invoice.total:.2f}", ParagraphStyle(
"total_amount",
parent=normal_style,
fontSize=12,
fontName="Helvetica-Bold",
)),
])
summary_table = Table(summary_data, colWidths=[3.5 * inch, 1.5 * inch, 1.5 * inch])
summary_table.setStyle(TableStyle([
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("ALIGN", (1, 0), (1, -1), "RIGHT"),
("ALIGN", (2, 0), (2, -1), "RIGHT"),
("TOPPADDING", (0, 0), (-1, -1), 3),
("BOTTOMPADDING", (0, 0), (-1, -1), 3),
("LINEABOVE", (1, -1), (2, -1), 1, colors.black),
]))
elements.append(summary_table)
elements.append(Spacer(1, 0.5 * inch))
# Notes
if invoice.notes:
elements.append(Paragraph("Notes:", label_style))
elements.append(Paragraph(invoice.notes, normal_style))
elements.append(Spacer(1, 0.25 * inch))
# Terms
if invoice.terms:
elements.append(Paragraph("Terms and Conditions:", label_style))
elements.append(Paragraph(invoice.terms, normal_style))
# Build the PDF
doc.build(elements)
# Get the PDF content from the buffer
pdf_content = buffer.getvalue()
buffer.close()
return pdf_content

45
main.py Normal file
View File

@ -0,0 +1,45 @@
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.v1.api import api_router
from app.core.config import settings
app = FastAPI(
title=settings.PROJECT_NAME,
description=settings.PROJECT_DESCRIPTION,
version=settings.VERSION,
openapi_url="/openapi.json",
docs_url="/docs",
redoc_url="/redoc",
)
# Set up CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include API router
app.include_router(api_router, prefix=settings.API_V1_STR)
# Root endpoint
@app.get("/", tags=["Root"])
async def root():
return {
"title": settings.PROJECT_NAME,
"description": settings.PROJECT_DESCRIPTION,
"docs": f"{settings.SERVER_HOST}/docs",
"health": f"{settings.SERVER_HOST}/health",
}
# Health check endpoint
@app.get("/health", tags=["Health"])
async def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

1
migrations/__init__.py Normal file
View File

@ -0,0 +1 @@
# Empty init file to make migrations a proper package

83
migrations/env.py Normal file
View File

@ -0,0 +1,83 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# Import from the application models
from app.db.base import Base
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
is_sqlite = connection.dialect.name == 'sqlite'
context.configure(
connection=connection,
target_metadata=target_metadata,
render_as_batch=is_sqlite, # Key configuration for SQLite
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
migrations/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,163 @@
"""Initial tables
Revision ID: 001
Revises:
Create Date: 2023-12-01
"""
from alembic import op
import sqlalchemy as sa
import enum
# revision identifiers, used by Alembic.
revision = '001'
down_revision = None
branch_labels = None
depends_on = None
class InvoiceStatus(str, enum.Enum):
DRAFT = "draft"
SENT = "sent"
PAID = "paid"
OVERDUE = "overdue"
CANCELLED = "cancelled"
def upgrade():
# Create organization table
op.create_table(
'organization',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('name', sa.String(), nullable=False),
sa.Column('address', sa.Text(), nullable=True),
sa.Column('city', sa.String(), nullable=True),
sa.Column('state', sa.String(), nullable=True),
sa.Column('postal_code', sa.String(), nullable=True),
sa.Column('country', sa.String(), nullable=True),
sa.Column('phone', sa.String(), nullable=True),
sa.Column('email', sa.String(), nullable=True),
sa.Column('website', sa.String(), nullable=True),
sa.Column('tax_id', sa.String(), nullable=True),
sa.Column('logo_url', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_organization_id'), 'organization', ['id'], unique=False)
op.create_index(op.f('ix_organization_name'), 'organization', ['name'], unique=False)
# Create user table
op.create_table(
'user',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('email', sa.String(), nullable=False),
sa.Column('hashed_password', sa.String(), nullable=False),
sa.Column('full_name', sa.String(), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.Column('is_superuser', sa.Boolean(), nullable=True),
sa.Column('organization_id', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_user_email'), 'user', ['email'], unique=True)
op.create_index(op.f('ix_user_full_name'), 'user', ['full_name'], unique=False)
op.create_index(op.f('ix_user_id'), 'user', ['id'], unique=False)
# Create client table
op.create_table(
'client',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('name', sa.String(), nullable=False),
sa.Column('contact_name', sa.String(), nullable=True),
sa.Column('email', sa.String(), nullable=True),
sa.Column('phone', sa.String(), nullable=True),
sa.Column('address', sa.Text(), nullable=True),
sa.Column('city', sa.String(), nullable=True),
sa.Column('state', sa.String(), nullable=True),
sa.Column('postal_code', sa.String(), nullable=True),
sa.Column('country', sa.String(), nullable=True),
sa.Column('tax_id', sa.String(), nullable=True),
sa.Column('notes', sa.Text(), nullable=True),
sa.Column('organization_id', sa.Integer(), nullable=False),
sa.Column('created_by_id', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['created_by_id'], ['user.id'], ondelete='SET NULL'),
sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_client_id'), 'client', ['id'], unique=False)
op.create_index(op.f('ix_client_name'), 'client', ['name'], unique=False)
# Create invoice table
op.create_table(
'invoice',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('invoice_number', sa.String(), nullable=False),
sa.Column('status', sa.Enum('draft', 'sent', 'paid', 'overdue', 'cancelled', name='invoicestatus'), nullable=True),
sa.Column('issue_date', sa.Date(), nullable=False),
sa.Column('due_date', sa.Date(), nullable=False),
sa.Column('subtotal', sa.Float(precision=2), nullable=False),
sa.Column('tax_rate', sa.Float(precision=2), nullable=True),
sa.Column('tax_amount', sa.Float(precision=2), nullable=True),
sa.Column('discount', sa.Float(precision=2), nullable=True),
sa.Column('total', sa.Float(precision=2), nullable=False),
sa.Column('notes', sa.Text(), nullable=True),
sa.Column('terms', sa.Text(), nullable=True),
sa.Column('is_recurring', sa.Boolean(), nullable=True),
sa.Column('recurring_interval', sa.String(), nullable=True),
sa.Column('client_id', sa.Integer(), nullable=False),
sa.Column('organization_id', sa.Integer(), nullable=False),
sa.Column('created_by_id', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['client_id'], ['client.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['created_by_id'], ['user.id'], ondelete='SET NULL'),
sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_invoice_id'), 'invoice', ['id'], unique=False)
op.create_index(op.f('ix_invoice_invoice_number'), 'invoice', ['invoice_number'], unique=False)
# Create invoice_item table
op.create_table(
'invoiceitem',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('description', sa.String(), nullable=False),
sa.Column('quantity', sa.Float(precision=2), nullable=False),
sa.Column('unit_price', sa.Float(precision=2), nullable=False),
sa.Column('amount', sa.Float(precision=2), nullable=False),
sa.Column('invoice_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['invoice_id'], ['invoice.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_invoiceitem_id'), 'invoiceitem', ['id'], unique=False)
def downgrade():
# Drop tables in reverse order
op.drop_index(op.f('ix_invoiceitem_id'), table_name='invoiceitem')
op.drop_table('invoiceitem')
op.drop_index(op.f('ix_invoice_invoice_number'), table_name='invoice')
op.drop_index(op.f('ix_invoice_id'), table_name='invoice')
op.drop_table('invoice')
op.drop_index(op.f('ix_client_name'), table_name='client')
op.drop_index(op.f('ix_client_id'), table_name='client')
op.drop_table('client')
op.drop_index(op.f('ix_user_id'), table_name='user')
op.drop_index(op.f('ix_user_full_name'), table_name='user')
op.drop_index(op.f('ix_user_email'), table_name='user')
op.drop_table('user')
op.drop_index(op.f('ix_organization_name'), table_name='organization')
op.drop_index(op.f('ix_organization_id'), table_name='organization')
op.drop_table('organization')

15
requirements.txt Normal file
View File

@ -0,0 +1,15 @@
fastapi==0.105.0
uvicorn==0.24.0
sqlalchemy==2.0.23
alembic==1.13.0
pydantic==2.5.2
pydantic-settings==2.1.0
pydantic[email]==2.5.2
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.6
reportlab==4.0.7
email-validator==2.1.0.post1
ruff==0.1.6
pytest==7.4.3
httpx==0.25.2