Complete SaaS invoicing application with FastAPI, authentication, CRUD operations, and PDF generation

This commit is contained in:
Automated Action 2025-06-27 09:21:32 +00:00
parent 256ecb1345
commit c0582f29ca
33 changed files with 1434 additions and 2 deletions

221
README.md
View File

@ -1,3 +1,220 @@
# FastAPI Application
# SaaS Invoicing Application
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform.
A comprehensive SaaS invoicing application built with FastAPI, SQLAlchemy, and SQLite. This application provides a complete invoicing solution with user authentication, customer management, invoice creation, and PDF generation capabilities.
## Features
- **User Authentication**: JWT-based authentication with registration and login
- **Customer Management**: Full CRUD operations for customer data
- **Invoice Management**: Create, update, and manage invoices with line items
- **PDF Generation**: Generate professional PDF invoices for download
- **RESTful API**: Well-structured REST API with OpenAPI documentation
- **Database Management**: SQLite database with Alembic migrations
- **Security**: Password hashing, JWT tokens, and user-based data isolation
## Technology Stack
- **Backend**: FastAPI
- **Database**: SQLite with SQLAlchemy ORM
- **Authentication**: JWT tokens with python-jose
- **PDF Generation**: ReportLab
- **Migrations**: Alembic
- **Password Hashing**: Passlib with bcrypt
- **Code Quality**: Ruff for linting and formatting
## Installation & Setup
1. **Install Dependencies**:
```bash
pip install -r requirements.txt
```
2. **Set Environment Variables**:
```bash
export SECRET_KEY="your-super-secret-key-here"
```
Required environment variables:
- `SECRET_KEY`: JWT signing secret key (required for production)
3. **Run Database Migrations**:
```bash
alembic upgrade head
```
4. **Start the Application**:
```bash
uvicorn main:app --reload
```
The application will be available at `http://localhost:8000`
## API Documentation
- **Interactive API Docs**: `http://localhost:8000/docs`
- **ReDoc Documentation**: `http://localhost:8000/redoc`
- **OpenAPI Schema**: `http://localhost:8000/openapi.json`
## API Endpoints
### Authentication
- `POST /auth/register` - Register a new user
- `POST /auth/login` - Login and get access token
### Customers
- `GET /customers/` - List all customers (authenticated)
- `POST /customers/` - Create a new customer
- `GET /customers/{id}` - Get customer details
- `PUT /customers/{id}` - Update customer
- `DELETE /customers/{id}` - Delete customer
### Invoices
- `GET /invoices/` - List all invoices (authenticated)
- `POST /invoices/` - Create a new invoice
- `GET /invoices/{id}` - Get invoice details
- `PUT /invoices/{id}` - Update invoice
- `DELETE /invoices/{id}` - Delete invoice
- `GET /invoices/{id}/pdf` - Download invoice as PDF
### Health & Info
- `GET /` - Application info and links
- `GET /health` - Health check endpoint
## Database Schema
### Users
- id, email, full_name, company_name, hashed_password, is_active
- One-to-many relationships with customers and invoices
### Customers
- id, name, email, phone, address, city, country, postal_code
- Belongs to a user
### Invoices
- id, invoice_number, issue_date, due_date, status, amounts, notes
- Belongs to a user and customer
- Has many invoice items
### Invoice Items
- id, description, quantity, unit_price, total_price
- Belongs to an invoice
## Authentication
The API uses JWT (JSON Web Tokens) for authentication:
1. Register a new user or login with existing credentials
2. Include the JWT token in the Authorization header: `Authorization: Bearer <token>`
3. Tokens expire after 30 minutes (configurable)
## Usage Examples
### Register a new user
```bash
curl -X POST "http://localhost:8000/auth/register" \
-H "Content-Type: application/json" \
-d '{
"email": "user@example.com",
"password": "secret123",
"full_name": "John Doe",
"company_name": "Acme Corp"
}'
```
### Login
```bash
curl -X POST "http://localhost:8000/auth/login" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=user@example.com&password=secret123"
```
### Create a customer
```bash
curl -X POST "http://localhost:8000/customers/" \
-H "Authorization: Bearer <your-token>" \
-H "Content-Type: application/json" \
-d '{
"name": "Client Company",
"email": "client@example.com",
"address": "123 Main St",
"city": "New York",
"country": "USA"
}'
```
### Create an invoice
```bash
curl -X POST "http://localhost:8000/invoices/" \
-H "Authorization: Bearer <your-token>" \
-H "Content-Type: application/json" \
-d '{
"invoice_number": "INV-2024-001",
"issue_date": "2024-01-15T00:00:00Z",
"due_date": "2024-02-15T00:00:00Z",
"customer_id": 1,
"status": "draft",
"subtotal": 1000.00,
"tax_rate": 10.00,
"tax_amount": 100.00,
"total_amount": 1100.00,
"items": [
{
"description": "Web Development Services",
"quantity": 40,
"unit_price": 25.00,
"total_price": 1000.00
}
]
}'
```
## Development
### Code Quality
Run Ruff for linting and formatting:
```bash
ruff check .
ruff format .
```
### Database Migrations
Create a new migration:
```bash
alembic revision --autogenerate -m "description"
```
Apply migrations:
```bash
alembic upgrade head
```
## Project Structure
```
├── main.py # FastAPI application entry point
├── requirements.txt # Python dependencies
├── alembic.ini # Alembic configuration
├── alembic/ # Database migrations
├── app/
│ ├── core/ # Core functionality (auth, config, deps)
│ ├── db/ # Database configuration
│ ├── models/ # SQLAlchemy models
│ ├── routers/ # API route handlers
│ ├── schemas/ # Pydantic models
│ └── services/ # Business logic and CRUD operations
└── /app/storage/ # Application file storage
└── db/ # SQLite database location
```
## Environment Variables
The following environment variables can be configured:
- `SECRET_KEY`: JWT signing secret (required for production)
- `ACCESS_TOKEN_EXPIRE_MINUTES`: Token expiration time (default: 30)
Set these in your production environment for security.
## License
This project is created by BackendIM for SaaS invoicing management.

41
alembic.ini Normal file
View File

@ -0,0 +1,41 @@
[alembic]
script_location = alembic
prepend_sys_path = .
version_path_separator = os
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
[post_write_hooks]
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

54
alembic/env.py Normal file
View File

@ -0,0 +1,54 @@
import sys
from logging.config import fileConfig
from pathlib import Path
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
sys.path.append(str(Path(__file__).parent.parent))
from app.db.base import Base
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

23
alembic/script.py.mako Normal file
View File

@ -0,0 +1,23 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,95 @@
"""Initial migration
Revision ID: 001
Revises:
Create Date: 2024-01-01 00:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = '001'
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table('users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email', sa.String(), nullable=False),
sa.Column('full_name', sa.String(), nullable=False),
sa.Column('company_name', sa.String(), nullable=True),
sa.Column('hashed_password', sa.String(), nullable=False),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True)
op.create_index(op.f('ix_users_id'), 'users', ['id'], unique=False)
op.create_table('customers',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(), nullable=False),
sa.Column('email', sa.String(), nullable=False),
sa.Column('phone', sa.String(), nullable=True),
sa.Column('address', sa.String(), nullable=True),
sa.Column('city', sa.String(), nullable=True),
sa.Column('country', sa.String(), nullable=True),
sa.Column('postal_code', sa.String(), nullable=True),
sa.Column('user_id', sa.Integer(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_customers_id'), 'customers', ['id'], unique=False)
op.create_table('invoices',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('invoice_number', sa.String(), nullable=False),
sa.Column('issue_date', sa.DateTime(timezone=True), nullable=False),
sa.Column('due_date', sa.DateTime(timezone=True), nullable=False),
sa.Column('status', sa.String(), nullable=True),
sa.Column('subtotal', sa.Numeric(precision=10, scale=2), nullable=True),
sa.Column('tax_rate', sa.Numeric(precision=5, scale=2), nullable=True),
sa.Column('tax_amount', sa.Numeric(precision=10, scale=2), nullable=True),
sa.Column('total_amount', sa.Numeric(precision=10, scale=2), nullable=True),
sa.Column('notes', sa.Text(), nullable=True),
sa.Column('user_id', sa.Integer(), nullable=True),
sa.Column('customer_id', sa.Integer(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(['customer_id'], ['customers.id'], ),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_invoices_id'), 'invoices', ['id'], unique=False)
op.create_index(op.f('ix_invoices_invoice_number'), 'invoices', ['invoice_number'], unique=True)
op.create_table('invoice_items',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('description', sa.String(), nullable=False),
sa.Column('quantity', sa.Numeric(precision=10, scale=2), nullable=False),
sa.Column('unit_price', sa.Numeric(precision=10, scale=2), nullable=False),
sa.Column('total_price', sa.Numeric(precision=10, scale=2), nullable=False),
sa.Column('invoice_id', sa.Integer(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=True),
sa.ForeignKeyConstraint(['invoice_id'], ['invoices.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_invoice_items_id'), 'invoice_items', ['id'], unique=False)
def downgrade() -> None:
op.drop_index(op.f('ix_invoice_items_id'), table_name='invoice_items')
op.drop_table('invoice_items')
op.drop_index(op.f('ix_invoices_invoice_number'), table_name='invoices')
op.drop_index(op.f('ix_invoices_id'), table_name='invoices')
op.drop_table('invoices')
op.drop_index(op.f('ix_customers_id'), table_name='customers')
op.drop_table('customers')
op.drop_index(op.f('ix_users_id'), table_name='users')
op.drop_index(op.f('ix_users_email'), table_name='users')
op.drop_table('users')

0
app/__init__.py Normal file
View File

0
app/core/__init__.py Normal file
View File

16
app/core/config.py Normal file
View File

@ -0,0 +1,16 @@
import os
from pydantic import BaseSettings
class Settings(BaseSettings):
SECRET_KEY: str = os.getenv("SECRET_KEY", "your-secret-key-here")
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
PROJECT_NAME: str = "SaaS Invoicing Application"
API_V1_STR: str = "/api/v1"
class Config:
env_file = ".env"
settings = Settings()

40
app/core/deps.py Normal file
View File

@ -0,0 +1,40 @@
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app import models
from app.core.config import settings
from app.db.session import get_db
from app.schemas.token import TokenPayload
security_scheme = HTTPBearer()
def get_current_user(
db: Session = Depends(get_db),
credentials: HTTPAuthorizationCredentials = Depends(security_scheme),
) -> models.User:
try:
payload = jwt.decode(
credentials.credentials, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
token_data = TokenPayload(**payload)
except (jwt.JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
user = db.query(models.User).filter(models.User.id == token_data.sub).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
def get_current_active_user(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user

30
app/core/security.py Normal file
View File

@ -0,0 +1,30 @@
from datetime import datetime, timedelta
from typing import Any, Union
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(
subject: Union[str, Any], expires_delta: timedelta = None
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)

4
app/db/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from .base import Base
from .session import get_db, engine
__all__ = ["Base", "get_db", "engine"]

3
app/db/base.py Normal file
View File

@ -0,0 +1,3 @@
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

24
app/db/session.py Normal file
View File

@ -0,0 +1,24 @@
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
DB_DIR = Path("/app") / "storage" / "db"
DB_DIR.mkdir(parents=True, exist_ok=True)
SQLALCHEMY_DATABASE_URL = f"sqlite:///{DB_DIR}/db.sqlite"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

5
app/models/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from .user import User
from .customer import Customer
from .invoice import Invoice, InvoiceItem
__all__ = ["User", "Customer", "Invoice", "InvoiceItem"]

24
app/models/customer.py Normal file
View File

@ -0,0 +1,24 @@
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.db.base import Base
class Customer(Base):
__tablename__ = "customers"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
email = Column(String, nullable=False)
phone = Column(String, nullable=True)
address = Column(String, nullable=True)
city = Column(String, nullable=True)
country = Column(String, nullable=True)
postal_code = Column(String, nullable=True)
user_id = Column(Integer, ForeignKey("users.id"))
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
user = relationship("User", back_populates="customers")
invoices = relationship("Invoice", back_populates="customer")

42
app/models/invoice.py Normal file
View File

@ -0,0 +1,42 @@
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Numeric, Text
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.db.base import Base
class Invoice(Base):
__tablename__ = "invoices"
id = Column(Integer, primary_key=True, index=True)
invoice_number = Column(String, unique=True, nullable=False, index=True)
issue_date = Column(DateTime(timezone=True), nullable=False)
due_date = Column(DateTime(timezone=True), nullable=False)
status = Column(String, default="draft") # draft, sent, paid, overdue
subtotal = Column(Numeric(10, 2), default=0)
tax_rate = Column(Numeric(5, 2), default=0)
tax_amount = Column(Numeric(10, 2), default=0)
total_amount = Column(Numeric(10, 2), default=0)
notes = Column(Text, nullable=True)
user_id = Column(Integer, ForeignKey("users.id"))
customer_id = Column(Integer, ForeignKey("customers.id"))
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
user = relationship("User", back_populates="invoices")
customer = relationship("Customer", back_populates="invoices")
items = relationship("InvoiceItem", back_populates="invoice", cascade="all, delete-orphan")
class InvoiceItem(Base):
__tablename__ = "invoice_items"
id = Column(Integer, primary_key=True, index=True)
description = Column(String, nullable=False)
quantity = Column(Numeric(10, 2), nullable=False)
unit_price = Column(Numeric(10, 2), nullable=False)
total_price = Column(Numeric(10, 2), nullable=False)
invoice_id = Column(Integer, ForeignKey("invoices.id"))
created_at = Column(DateTime(timezone=True), server_default=func.now())
invoice = relationship("Invoice", back_populates="items")

21
app/models/user.py Normal file
View File

@ -0,0 +1,21 @@
from sqlalchemy import Column, Integer, String, DateTime, Boolean
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.db.base import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
full_name = Column(String, nullable=False)
company_name = Column(String, nullable=True)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
customers = relationship("Customer", back_populates="user")
invoices = relationship("Invoice", back_populates="user")

0
app/routers/__init__.py Normal file
View File

49
app/routers/auth.py Normal file
View File

@ -0,0 +1,49 @@
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app import schemas
from app.core import security
from app.core.config import settings
from app.core.deps import get_db
from app.services.user import user_service
router = APIRouter()
@router.post("/register", response_model=schemas.User)
def register(
*,
db: Session = Depends(get_db),
user_in: schemas.UserCreate,
) -> Any:
user = user_service.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this email already exists in the system.",
)
user = user_service.create(db, obj_in=user_in)
return user
@router.post("/login", response_model=schemas.Token)
def login_for_access_token(
db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
user = user_service.authenticate(
db, email=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(status_code=400, detail="Incorrect email or password")
elif not user_service.is_active(user):
raise HTTPException(status_code=400, detail="Inactive user")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}

79
app/routers/customers.py Normal file
View File

@ -0,0 +1,79 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app import models, schemas
from app.core.deps import get_current_active_user, get_db
from app.services.customer import customer_service
router = APIRouter()
@router.get("/", response_model=List[schemas.Customer])
def read_customers(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
customers = customer_service.get_multi_by_user(db, user_id=current_user.id, skip=skip, limit=limit)
return customers
@router.post("/", response_model=schemas.Customer)
def create_customer(
*,
db: Session = Depends(get_db),
customer_in: schemas.CustomerCreate,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
customer = customer_service.create(db=db, obj_in=customer_in, user_id=current_user.id)
return customer
@router.put("/{customer_id}", response_model=schemas.Customer)
def update_customer(
*,
db: Session = Depends(get_db),
customer_id: int,
customer_in: schemas.CustomerUpdate,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
customer = customer_service.get(db=db, customer_id=customer_id)
if not customer:
raise HTTPException(status_code=404, detail="Customer not found")
if customer.user_id != current_user.id:
raise HTTPException(status_code=400, detail="Not enough permissions")
customer = customer_service.update(db=db, db_obj=customer, obj_in=customer_in)
return customer
@router.get("/{customer_id}", response_model=schemas.Customer)
def read_customer(
*,
db: Session = Depends(get_db),
customer_id: int,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
customer = customer_service.get(db=db, customer_id=customer_id)
if not customer:
raise HTTPException(status_code=404, detail="Customer not found")
if customer.user_id != current_user.id:
raise HTTPException(status_code=400, detail="Not enough permissions")
return customer
@router.delete("/{customer_id}", response_model=schemas.Customer)
def delete_customer(
*,
db: Session = Depends(get_db),
customer_id: int,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
customer = customer_service.get(db=db, customer_id=customer_id)
if not customer:
raise HTTPException(status_code=404, detail="Customer not found")
if customer.user_id != current_user.id:
raise HTTPException(status_code=400, detail="Not enough permissions")
customer = customer_service.remove(db=db, customer_id=customer_id)
return customer

118
app/routers/invoices.py Normal file
View File

@ -0,0 +1,118 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from app import models, schemas
from app.core.deps import get_current_active_user, get_db
from app.services.invoice import invoice_service
from app.services.customer import customer_service
from app.services.pdf_generator import pdf_generator
router = APIRouter()
@router.get("/", response_model=List[schemas.Invoice])
def read_invoices(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
invoices = invoice_service.get_multi_by_user(db, user_id=current_user.id, skip=skip, limit=limit)
return invoices
@router.post("/", response_model=schemas.Invoice)
def create_invoice(
*,
db: Session = Depends(get_db),
invoice_in: schemas.InvoiceCreate,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
customer = customer_service.get(db=db, customer_id=invoice_in.customer_id)
if not customer or customer.user_id != current_user.id:
raise HTTPException(status_code=400, detail="Customer not found or not owned by user")
existing_invoice = invoice_service.get_by_number(db=db, invoice_number=invoice_in.invoice_number)
if existing_invoice:
raise HTTPException(status_code=400, detail="Invoice number already exists")
invoice = invoice_service.create(db=db, obj_in=invoice_in, user_id=current_user.id)
return invoice
@router.put("/{invoice_id}", response_model=schemas.Invoice)
def update_invoice(
*,
db: Session = Depends(get_db),
invoice_id: int,
invoice_in: schemas.InvoiceUpdate,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
invoice = invoice_service.get(db=db, invoice_id=invoice_id)
if not invoice:
raise HTTPException(status_code=404, detail="Invoice not found")
if invoice.user_id != current_user.id:
raise HTTPException(status_code=400, detail="Not enough permissions")
if invoice_in.customer_id:
customer = customer_service.get(db=db, customer_id=invoice_in.customer_id)
if not customer or customer.user_id != current_user.id:
raise HTTPException(status_code=400, detail="Customer not found or not owned by user")
invoice = invoice_service.update(db=db, db_obj=invoice, obj_in=invoice_in)
return invoice
@router.get("/{invoice_id}", response_model=schemas.Invoice)
def read_invoice(
*,
db: Session = Depends(get_db),
invoice_id: int,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
invoice = invoice_service.get(db=db, invoice_id=invoice_id)
if not invoice:
raise HTTPException(status_code=404, detail="Invoice not found")
if invoice.user_id != current_user.id:
raise HTTPException(status_code=400, detail="Not enough permissions")
return invoice
@router.delete("/{invoice_id}", response_model=schemas.Invoice)
def delete_invoice(
*,
db: Session = Depends(get_db),
invoice_id: int,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
invoice = invoice_service.get(db=db, invoice_id=invoice_id)
if not invoice:
raise HTTPException(status_code=404, detail="Invoice not found")
if invoice.user_id != current_user.id:
raise HTTPException(status_code=400, detail="Not enough permissions")
invoice = invoice_service.remove(db=db, invoice_id=invoice_id)
return invoice
@router.get("/{invoice_id}/pdf")
def download_invoice_pdf(
*,
db: Session = Depends(get_db),
invoice_id: int,
current_user: models.User = Depends(get_current_active_user),
) -> StreamingResponse:
invoice = invoice_service.get(db=db, invoice_id=invoice_id)
if not invoice:
raise HTTPException(status_code=404, detail="Invoice not found")
if invoice.user_id != current_user.id:
raise HTTPException(status_code=400, detail="Not enough permissions")
pdf_buffer = pdf_generator.generate_invoice_pdf(invoice)
return StreamingResponse(
iter([pdf_buffer.read()]),
media_type="application/pdf",
headers={"Content-Disposition": f"attachment; filename=invoice_{invoice.invoice_number}.pdf"}
)

11
app/schemas/__init__.py Normal file
View File

@ -0,0 +1,11 @@
from .user import User, UserCreate, UserUpdate
from .customer import Customer, CustomerCreate, CustomerUpdate
from .invoice import Invoice, InvoiceCreate, InvoiceUpdate, InvoiceItem, InvoiceItemCreate
from .token import Token, TokenPayload
__all__ = [
"User", "UserCreate", "UserUpdate",
"Customer", "CustomerCreate", "CustomerUpdate",
"Invoice", "InvoiceCreate", "InvoiceUpdate", "InvoiceItem", "InvoiceItemCreate",
"Token", "TokenPayload"
]

41
app/schemas/customer.py Normal file
View File

@ -0,0 +1,41 @@
from typing import Optional
from pydantic import BaseModel, EmailStr
from datetime import datetime
class CustomerBase(BaseModel):
name: str
email: EmailStr
phone: Optional[str] = None
address: Optional[str] = None
city: Optional[str] = None
country: Optional[str] = None
postal_code: Optional[str] = None
class CustomerCreate(CustomerBase):
pass
class CustomerUpdate(BaseModel):
name: Optional[str] = None
email: Optional[EmailStr] = None
phone: Optional[str] = None
address: Optional[str] = None
city: Optional[str] = None
country: Optional[str] = None
postal_code: Optional[str] = None
class CustomerInDBBase(CustomerBase):
id: int
user_id: int
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class Customer(CustomerInDBBase):
pass

81
app/schemas/invoice.py Normal file
View File

@ -0,0 +1,81 @@
from typing import List, Optional
from pydantic import BaseModel
from datetime import datetime
from decimal import Decimal
class InvoiceItemBase(BaseModel):
description: str
quantity: Decimal
unit_price: Decimal
total_price: Decimal
class InvoiceItemCreate(InvoiceItemBase):
pass
class InvoiceItemUpdate(BaseModel):
description: Optional[str] = None
quantity: Optional[Decimal] = None
unit_price: Optional[Decimal] = None
total_price: Optional[Decimal] = None
class InvoiceItemInDBBase(InvoiceItemBase):
id: int
invoice_id: int
created_at: datetime
class Config:
from_attributes = True
class InvoiceItem(InvoiceItemInDBBase):
pass
class InvoiceBase(BaseModel):
invoice_number: str
issue_date: datetime
due_date: datetime
status: str = "draft"
subtotal: Decimal = Decimal("0.00")
tax_rate: Decimal = Decimal("0.00")
tax_amount: Decimal = Decimal("0.00")
total_amount: Decimal = Decimal("0.00")
notes: Optional[str] = None
class InvoiceCreate(InvoiceBase):
customer_id: int
items: List[InvoiceItemCreate] = []
class InvoiceUpdate(BaseModel):
invoice_number: Optional[str] = None
issue_date: Optional[datetime] = None
due_date: Optional[datetime] = None
status: Optional[str] = None
subtotal: Optional[Decimal] = None
tax_rate: Optional[Decimal] = None
tax_amount: Optional[Decimal] = None
total_amount: Optional[Decimal] = None
notes: Optional[str] = None
customer_id: Optional[int] = None
items: Optional[List[InvoiceItemCreate]] = None
class InvoiceInDBBase(InvoiceBase):
id: int
user_id: int
customer_id: int
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class Invoice(InvoiceInDBBase):
items: List[InvoiceItem] = []

11
app/schemas/token.py Normal file
View File

@ -0,0 +1,11 @@
from typing import Optional
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: Optional[int] = None

39
app/schemas/user.py Normal file
View File

@ -0,0 +1,39 @@
from typing import Optional
from pydantic import BaseModel, EmailStr
from datetime import datetime
class UserBase(BaseModel):
email: EmailStr
full_name: str
company_name: Optional[str] = None
is_active: bool = True
class UserCreate(UserBase):
password: str
class UserUpdate(BaseModel):
email: Optional[EmailStr] = None
full_name: Optional[str] = None
company_name: Optional[str] = None
password: Optional[str] = None
is_active: Optional[bool] = None
class UserInDBBase(UserBase):
id: int
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class User(UserInDBBase):
pass
class UserInDB(UserInDBBase):
hashed_password: str

5
app/services/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from .user import user_service
from .customer import customer_service
from .invoice import invoice_service
__all__ = ["user_service", "customer_service", "invoice_service"]

39
app/services/customer.py Normal file
View File

@ -0,0 +1,39 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.models.customer import Customer
from app.schemas.customer import CustomerCreate, CustomerUpdate
class CustomerService:
def get(self, db: Session, customer_id: int) -> Optional[Customer]:
return db.query(Customer).filter(Customer.id == customer_id).first()
def get_multi_by_user(self, db: Session, user_id: int, skip: int = 0, limit: int = 100) -> List[Customer]:
return db.query(Customer).filter(Customer.user_id == user_id).offset(skip).limit(limit).all()
def create(self, db: Session, obj_in: CustomerCreate, user_id: int) -> Customer:
db_obj = Customer(**obj_in.dict(), user_id=user_id)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(self, db: Session, db_obj: Customer, obj_in: CustomerUpdate) -> Customer:
update_data = obj_in.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_obj, field, value)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def remove(self, db: Session, customer_id: int) -> Customer:
obj = db.query(Customer).get(customer_id)
db.delete(obj)
db.commit()
return obj
customer_service = CustomerService()

91
app/services/invoice.py Normal file
View File

@ -0,0 +1,91 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from decimal import Decimal
from app.models.invoice import Invoice, InvoiceItem
from app.schemas.invoice import InvoiceCreate, InvoiceUpdate
class InvoiceService:
def get(self, db: Session, invoice_id: int) -> Optional[Invoice]:
return db.query(Invoice).filter(Invoice.id == invoice_id).first()
def get_by_number(self, db: Session, invoice_number: str) -> Optional[Invoice]:
return db.query(Invoice).filter(Invoice.invoice_number == invoice_number).first()
def get_multi_by_user(self, db: Session, user_id: int, skip: int = 0, limit: int = 100) -> List[Invoice]:
return db.query(Invoice).filter(Invoice.user_id == user_id).offset(skip).limit(limit).all()
def create(self, db: Session, obj_in: InvoiceCreate, user_id: int) -> Invoice:
db_obj = Invoice(
invoice_number=obj_in.invoice_number,
issue_date=obj_in.issue_date,
due_date=obj_in.due_date,
status=obj_in.status,
subtotal=obj_in.subtotal,
tax_rate=obj_in.tax_rate,
tax_amount=obj_in.tax_amount,
total_amount=obj_in.total_amount,
notes=obj_in.notes,
user_id=user_id,
customer_id=obj_in.customer_id,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
for item in obj_in.items:
db_item = InvoiceItem(
description=item.description,
quantity=item.quantity,
unit_price=item.unit_price,
total_price=item.total_price,
invoice_id=db_obj.id,
)
db.add(db_item)
db.commit()
db.refresh(db_obj)
return db_obj
def update(self, db: Session, db_obj: Invoice, obj_in: InvoiceUpdate) -> Invoice:
update_data = obj_in.dict(exclude_unset=True)
if "items" in update_data:
items_data = update_data.pop("items")
db.query(InvoiceItem).filter(InvoiceItem.invoice_id == db_obj.id).delete()
for item in items_data:
db_item = InvoiceItem(
description=item["description"],
quantity=item["quantity"],
unit_price=item["unit_price"],
total_price=item["total_price"],
invoice_id=db_obj.id,
)
db.add(db_item)
for field, value in update_data.items():
setattr(db_obj, field, value)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def remove(self, db: Session, invoice_id: int) -> Invoice:
obj = db.query(Invoice).get(invoice_id)
db.delete(obj)
db.commit()
return obj
def calculate_totals(self, subtotal: Decimal, tax_rate: Decimal) -> dict:
tax_amount = subtotal * (tax_rate / 100)
total_amount = subtotal + tax_amount
return {
"tax_amount": tax_amount,
"total_amount": total_amount
}
invoice_service = InvoiceService()

View File

@ -0,0 +1,126 @@
from io import BytesIO
from reportlab.lib.pagesizes import letter
from reportlab.lib.units import inch
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.enums import TA_CENTER
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle
from reportlab.lib import colors
from app.models.invoice import Invoice
class PDFGenerator:
def generate_invoice_pdf(self, invoice: Invoice) -> BytesIO:
buffer = BytesIO()
doc = SimpleDocTemplate(buffer, pagesize=letter)
styles = getSampleStyleSheet()
story = []
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontSize=24,
spaceAfter=30,
alignment=TA_CENTER,
)
header_style = ParagraphStyle(
'Header',
parent=styles['Normal'],
fontSize=12,
spaceAfter=6,
)
story.append(Paragraph("INVOICE", title_style))
story.append(Spacer(1, 20))
invoice_info = [
["Invoice Number:", invoice.invoice_number],
["Issue Date:", invoice.issue_date.strftime("%Y-%m-%d")],
["Due Date:", invoice.due_date.strftime("%Y-%m-%d")],
["Status:", invoice.status.title()],
]
invoice_table = Table(invoice_info, colWidths=[2*inch, 3*inch])
invoice_table.setStyle(TableStyle([
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, -1), 10),
('BOTTOMPADDING', (0, 0), (-1, -1), 6),
]))
story.append(invoice_table)
story.append(Spacer(1, 20))
if invoice.user.company_name:
story.append(Paragraph(f"<b>From:</b> {invoice.user.company_name}", header_style))
story.append(Paragraph(f"<b>Billed by:</b> {invoice.user.full_name}", header_style))
story.append(Paragraph(f"<b>Email:</b> {invoice.user.email}", header_style))
story.append(Spacer(1, 10))
story.append(Paragraph("<b>Bill To:</b>", header_style))
story.append(Paragraph(f"<b>Customer:</b> {invoice.customer.name}", header_style))
story.append(Paragraph(f"<b>Email:</b> {invoice.customer.email}", header_style))
if invoice.customer.address:
story.append(Paragraph(f"<b>Address:</b> {invoice.customer.address}", header_style))
if invoice.customer.city:
story.append(Paragraph(f"<b>City:</b> {invoice.customer.city}", header_style))
if invoice.customer.country:
story.append(Paragraph(f"<b>Country:</b> {invoice.customer.country}", header_style))
story.append(Spacer(1, 20))
if invoice.items:
items_data = [["Description", "Quantity", "Unit Price", "Total"]]
for item in invoice.items:
items_data.append([
item.description,
str(item.quantity),
f"${item.unit_price:.2f}",
f"${item.total_price:.2f}"
])
items_table = Table(items_data, colWidths=[3*inch, 1*inch, 1.5*inch, 1.5*inch])
items_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 10),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black),
('ALIGN', (1, 1), (-1, -1), 'RIGHT'),
('ALIGN', (0, 1), (0, -1), 'LEFT'),
]))
story.append(items_table)
story.append(Spacer(1, 20))
totals_data = [
["Subtotal:", f"${invoice.subtotal:.2f}"],
[f"Tax ({invoice.tax_rate}%):", f"${invoice.tax_amount:.2f}"],
["Total:", f"${invoice.total_amount:.2f}"],
]
totals_table = Table(totals_data, colWidths=[4*inch, 2*inch])
totals_table.setStyle(TableStyle([
('ALIGN', (0, 0), (-1, -1), 'RIGHT'),
('FONTNAME', (0, -1), (-1, -1), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, -1), 10),
('BOTTOMPADDING', (0, 0), (-1, -1), 6),
('LINEBELOW', (0, -1), (-1, -1), 2, colors.black),
]))
story.append(totals_table)
if invoice.notes:
story.append(Spacer(1, 20))
story.append(Paragraph("<b>Notes:</b>", header_style))
story.append(Paragraph(invoice.notes, styles['Normal']))
doc.build(story)
buffer.seek(0)
return buffer
pdf_generator = PDFGenerator()

56
app/services/user.py Normal file
View File

@ -0,0 +1,56 @@
from typing import Optional
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
class UserService:
def get(self, db: Session, user_id: int) -> Optional[User]:
return db.query(User).filter(User.id == user_id).first()
def get_by_email(self, db: Session, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()
def create(self, db: Session, obj_in: UserCreate) -> User:
db_obj = User(
email=obj_in.email,
hashed_password=get_password_hash(obj_in.password),
full_name=obj_in.full_name,
company_name=obj_in.company_name,
is_active=obj_in.is_active,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(self, db: Session, db_obj: User, obj_in: UserUpdate) -> User:
update_data = obj_in.dict(exclude_unset=True)
if "password" in update_data:
hashed_password = get_password_hash(update_data["password"])
del update_data["password"]
update_data["hashed_password"] = hashed_password
for field, value in update_data.items():
setattr(db_obj, field, value)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def authenticate(self, db: Session, email: str, password: str) -> Optional[User]:
user = self.get_by_email(db, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active(self, user: User) -> bool:
return user.is_active
user_service = UserService()

36
main.py Normal file
View File

@ -0,0 +1,36 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.core.config import settings
from app.routers import auth, customers, invoices
app = FastAPI(
title=settings.PROJECT_NAME,
openapi_url="/openapi.json",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(auth.router, prefix="/auth", tags=["authentication"])
app.include_router(customers.router, prefix="/customers", tags=["customers"])
app.include_router(invoices.router, prefix="/invoices", tags=["invoices"])
@app.get("/")
async def root():
return {
"title": settings.PROJECT_NAME,
"documentation": "/docs",
"health_check": "/health"
}
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": settings.PROJECT_NAME}

11
requirements.txt Normal file
View File

@ -0,0 +1,11 @@
fastapi==0.104.1
uvicorn[standard]==0.24.0
sqlalchemy==2.0.23
alembic==1.13.1
pydantic==2.5.0
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.6
reportlab==4.0.7
python-dotenv==1.0.0
ruff==0.1.6