Automated Action 0fc3927871 Build comprehensive SaaS invoicing application with FastAPI
- Implemented complete authentication system with JWT tokens
- Created user management with registration and profile endpoints
- Built client management with full CRUD operations
- Developed invoice system with line items and automatic calculations
- Set up SQLite database with proper migrations using Alembic
- Added health monitoring and API documentation
- Configured CORS for cross-origin requests
- Included comprehensive README with usage examples
2025-06-23 14:56:50 +00:00

144 lines
5.1 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
from datetime import datetime
from app.db.session import get_db
from app.core.security import get_current_user
from app.models.user import User
from app.models.client import Client
from app.models.invoice import Invoice
from app.models.invoice_item import InvoiceItem
from app.schemas.invoice import InvoiceCreate, InvoiceUpdate, InvoiceResponse
router = APIRouter()
def calculate_invoice_totals(items: List[InvoiceItem], tax_rate: float = 0.0):
subtotal = sum(item.total_price for item in items)
tax_amount = subtotal * (tax_rate / 100)
total_amount = subtotal + tax_amount
return subtotal, tax_amount, total_amount
@router.post("/", response_model=InvoiceResponse, status_code=status.HTTP_201_CREATED)
async def create_invoice(
invoice: InvoiceCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
client = db.query(Client).filter(Client.id == invoice.client_id, Client.owner_id == current_user.id).first()
if not client:
raise HTTPException(status_code=404, detail="Client not found")
existing_invoice = db.query(Invoice).filter(Invoice.invoice_number == invoice.invoice_number).first()
if existing_invoice:
raise HTTPException(status_code=400, detail="Invoice number already exists")
db_invoice = Invoice(
**invoice.dict(exclude={'items'}),
owner_id=current_user.id
)
db.add(db_invoice)
db.flush()
invoice_items = []
for item_data in invoice.items:
total_price = item_data.quantity * item_data.unit_price
db_item = InvoiceItem(
**item_data.dict(),
invoice_id=db_invoice.id,
total_price=total_price
)
db.add(db_item)
invoice_items.append(db_item)
db.flush()
subtotal, tax_amount, total_amount = calculate_invoice_totals(invoice_items, invoice.tax_rate)
db_invoice.subtotal = subtotal
db_invoice.tax_amount = tax_amount
db_invoice.total_amount = total_amount
db.commit()
db.refresh(db_invoice)
return db_invoice
@router.get("/", response_model=List[InvoiceResponse])
async def read_invoices(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
invoices = db.query(Invoice).filter(Invoice.owner_id == current_user.id).offset(skip).limit(limit).all()
return invoices
@router.get("/{invoice_id}", response_model=InvoiceResponse)
async def read_invoice(
invoice_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
invoice = db.query(Invoice).filter(Invoice.id == invoice_id, Invoice.owner_id == current_user.id).first()
if invoice is None:
raise HTTPException(status_code=404, detail="Invoice not found")
return invoice
@router.put("/{invoice_id}", response_model=InvoiceResponse)
async def update_invoice(
invoice_id: int,
invoice_update: InvoiceUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
invoice = db.query(Invoice).filter(Invoice.id == invoice_id, Invoice.owner_id == current_user.id).first()
if invoice is None:
raise HTTPException(status_code=404, detail="Invoice not found")
update_data = invoice_update.dict(exclude_unset=True, exclude={'items'})
if invoice_update.client_id:
client = db.query(Client).filter(Client.id == invoice_update.client_id, Client.owner_id == current_user.id).first()
if not client:
raise HTTPException(status_code=404, detail="Client not found")
for field, value in update_data.items():
setattr(invoice, field, value)
if invoice_update.items is not None:
db.query(InvoiceItem).filter(InvoiceItem.invoice_id == invoice_id).delete()
invoice_items = []
for item_data in invoice_update.items:
total_price = item_data.quantity * item_data.unit_price
db_item = InvoiceItem(
**item_data.dict(),
invoice_id=invoice.id,
total_price=total_price
)
db.add(db_item)
invoice_items.append(db_item)
db.flush()
subtotal, tax_amount, total_amount = calculate_invoice_totals(invoice_items, invoice.tax_rate)
invoice.subtotal = subtotal
invoice.tax_amount = tax_amount
invoice.total_amount = total_amount
invoice.updated_at = datetime.utcnow()
db.commit()
db.refresh(invoice)
return invoice
@router.delete("/{invoice_id}")
async def delete_invoice(
invoice_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
invoice = db.query(Invoice).filter(Invoice.id == invoice_id, Invoice.owner_id == current_user.id).first()
if invoice is None:
raise HTTPException(status_code=404, detail="Invoice not found")
db.delete(invoice)
db.commit()
return {"message": "Invoice deleted successfully"}