Automated Action 040210f43f Add advanced search and filtering functionality
- Implement date range filtering for creation and due dates
- Add customer name and email search with case-insensitive partial matching
- Support amount range filtering with min and max values
- Enhance sorting with multiple field options
- Update README with comprehensive documentation for all filter options
2025-05-30 08:42:46 +00:00

357 lines
13 KiB
Python

from datetime import datetime, date
from typing import Optional, List
from fastapi import APIRouter, Depends, HTTPException, Query, status
from fastapi.responses import JSONResponse
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.core.utils import generate_invoice_number
from app.models.invoice import Invoice, InvoiceItem
from app.schemas.invoice import (
InvoiceCreate,
InvoiceDB,
InvoiceStatusUpdate,
InvoiceUpdate,
InvoiceAdvancedFilter,
invoice_db_filterable,
)
router = APIRouter()
@router.post("/", response_model=InvoiceDB, status_code=status.HTTP_201_CREATED)
def create_invoice(invoice_data: InvoiceCreate, db: Session = Depends(get_db)):
"""
Create a new invoice.
"""
# Generate unique invoice number
invoice_number = generate_invoice_number()
# Create new invoice
db_invoice = Invoice(
invoice_number=invoice_number,
date_created=datetime.utcnow(),
due_date=invoice_data.due_date,
customer_name=invoice_data.customer_name,
customer_email=invoice_data.customer_email,
customer_address=invoice_data.customer_address,
notes=invoice_data.notes,
status="PENDING",
total_amount=0, # Will be calculated from items
)
# Add to DB
db.add(db_invoice)
db.flush() # Flush to get the ID
# Create invoice items
total_amount = 0
for item_data in invoice_data.items:
item_amount = item_data.quantity * item_data.unit_price
total_amount += item_amount
db_item = InvoiceItem(
invoice_id=db_invoice.id,
description=item_data.description,
quantity=item_data.quantity,
unit_price=item_data.unit_price,
amount=item_amount,
)
db.add(db_item)
# Update total amount
db_invoice.total_amount = total_amount
# Commit the transaction
db.commit()
db.refresh(db_invoice)
return db_invoice
@router.get("/", response_model=List[InvoiceDB])
def get_invoices(
skip: int = 0,
limit: int = 100,
fields: Optional[str] = None,
status: Optional[str] = None,
# Date range parameters
created_after: Optional[date] = Query(None, description="Filter invoices created on or after this date (YYYY-MM-DD)"),
created_before: Optional[date] = Query(None, description="Filter invoices created on or before this date (YYYY-MM-DD)"),
due_after: Optional[date] = Query(None, description="Filter invoices due on or after this date (YYYY-MM-DD)"),
due_before: Optional[date] = Query(None, description="Filter invoices due on or before this date (YYYY-MM-DD)"),
# Customer search parameters
customer_name: Optional[str] = Query(None, description="Filter invoices by customer name (case-insensitive, partial match)"),
customer_email: Optional[str] = Query(None, description="Filter invoices by customer email (case-insensitive, partial match)"),
# Amount range parameters
min_amount: Optional[float] = Query(None, description="Filter invoices with total amount greater than or equal to this value"),
max_amount: Optional[float] = Query(None, description="Filter invoices with total amount less than or equal to this value"),
# Sorting parameters
sort_by: Optional[str] = Query(None, description="Field to sort by (date_created, due_date, total_amount, customer_name)"),
sort_order: Optional[str] = Query("desc", description="Sort order: 'asc' for ascending, 'desc' for descending"),
db: Session = Depends(get_db)
):
"""
Retrieve a list of invoices with advanced filtering and sorting options.
Parameters:
- skip: Number of records to skip
- limit: Maximum number of records to return
- fields: Comma-separated list of fields to include in the response (e.g., "id,invoice_number,total_amount")
If not provided, all fields will be returned.
- status: Filter invoices by status (e.g., "PENDING", "PAID", "CANCELLED")
If not provided, all invoices regardless of status will be returned.
Date Range Filtering:
- created_after: Filter invoices created on or after this date (YYYY-MM-DD)
- created_before: Filter invoices created on or before this date (YYYY-MM-DD)
- due_after: Filter invoices due on or after this date (YYYY-MM-DD)
- due_before: Filter invoices due on or before this date (YYYY-MM-DD)
Customer Filtering:
- customer_name: Filter invoices by customer name (case-insensitive, partial match)
- customer_email: Filter invoices by customer email (case-insensitive, partial match)
Amount Range Filtering:
- min_amount: Filter invoices with total amount greater than or equal to this value
- max_amount: Filter invoices with total amount less than or equal to this value
Sorting:
- sort_by: Field to sort by (date_created, due_date, total_amount, customer_name)
- sort_order: Sort order: 'asc' for ascending, 'desc' for descending (default: desc)
"""
# Validate filter parameters using the schema
filter_params = InvoiceAdvancedFilter(
created_after=created_after,
created_before=created_before,
due_after=due_after,
due_before=due_before,
customer_name=customer_name,
customer_email=customer_email,
min_amount=min_amount,
max_amount=max_amount,
sort_by=sort_by,
sort_order=sort_order,
)
# Build the query with filters
query = db.query(Invoice)
# Apply status filter if provided
if status:
# Validate the status value
allowed_statuses = ["PENDING", "PAID", "CANCELLED"]
if status not in allowed_statuses:
raise HTTPException(
status_code=400,
detail=f"Status must be one of {', '.join(allowed_statuses)}"
)
# Apply the filter
query = query.filter(Invoice.status == status)
# Apply date range filters
if filter_params.created_after:
# Convert date to datetime with time at start of day (00:00:00)
created_after_datetime = datetime.combine(filter_params.created_after, datetime.min.time())
query = query.filter(Invoice.date_created >= created_after_datetime)
if filter_params.created_before:
# Convert date to datetime with time at end of day (23:59:59)
created_before_datetime = datetime.combine(filter_params.created_before, datetime.max.time())
query = query.filter(Invoice.date_created <= created_before_datetime)
if filter_params.due_after:
# Convert date to datetime with time at start of day (00:00:00)
due_after_datetime = datetime.combine(filter_params.due_after, datetime.min.time())
query = query.filter(Invoice.due_date >= due_after_datetime)
if filter_params.due_before:
# Convert date to datetime with time at end of day (23:59:59)
due_before_datetime = datetime.combine(filter_params.due_before, datetime.max.time())
query = query.filter(Invoice.due_date <= due_before_datetime)
# Apply customer name and email filters (case-insensitive partial matches)
if filter_params.customer_name:
query = query.filter(func.lower(Invoice.customer_name).contains(filter_params.customer_name.lower()))
if filter_params.customer_email:
query = query.filter(func.lower(Invoice.customer_email).contains(filter_params.customer_email.lower()))
# Apply amount range filters
if filter_params.min_amount is not None:
query = query.filter(Invoice.total_amount >= filter_params.min_amount)
if filter_params.max_amount is not None:
query = query.filter(Invoice.total_amount <= filter_params.max_amount)
# Apply sorting based on sort_by and sort_order
if filter_params.sort_by:
sort_field = getattr(Invoice, filter_params.sort_by)
if filter_params.sort_order == "asc":
query = query.order_by(sort_field.asc())
else: # Default to 'desc'
query = query.order_by(sort_field.desc())
else:
# If sort_by is not provided, sort by date_created
if filter_params.sort_order == "asc":
query = query.order_by(Invoice.date_created.asc())
else: # Default to 'desc'
query = query.order_by(Invoice.date_created.desc())
# Apply pagination
invoices = query.offset(skip).limit(limit).all()
# If fields parameter is provided, filter the response
if fields:
# Process the response to include only the specified fields
filtered_response = invoice_db_filterable.process_response(invoices, fields)
return JSONResponse(content=filtered_response)
# Otherwise, return all fields
return invoices
@router.get("/{invoice_id}")
def get_invoice(
invoice_id: int,
fields: Optional[str] = None,
db: Session = Depends(get_db)
):
"""
Retrieve a specific invoice by ID.
Parameters:
- invoice_id: ID of the invoice to retrieve
- fields: Comma-separated list of fields to include in the response (e.g., "id,invoice_number,total_amount")
If not provided, all fields will be returned.
"""
invoice = db.query(Invoice).filter(Invoice.id == invoice_id).first()
if invoice is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Invoice with ID {invoice_id} not found",
)
# If fields parameter is provided, filter the response
if fields:
# Process the response to include only the specified fields
filtered_response = invoice_db_filterable.process_response(invoice, fields)
return JSONResponse(content=filtered_response)
# Otherwise, return all fields
return invoice
@router.get("/find", response_model=InvoiceDB)
def find_invoice_by_number(
invoice_number: str,
fields: Optional[str] = None,
db: Session = Depends(get_db)
):
"""
Find an invoice by its invoice number.
Parameters:
- invoice_number: The invoice number to search for
- fields: Comma-separated list of fields to include in the response (e.g., "id,invoice_number,total_amount")
If not provided, all fields will be returned.
"""
invoice = db.query(Invoice).filter(Invoice.invoice_number == invoice_number).first()
if invoice is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Invoice with number {invoice_number} not found",
)
# If fields parameter is provided, filter the response
if fields:
# Process the response to include only the specified fields
filtered_response = invoice_db_filterable.process_response(invoice, fields)
return JSONResponse(content=filtered_response)
# Otherwise, return all fields
return invoice
@router.patch("/{invoice_id}", response_model=InvoiceDB)
def update_invoice(
invoice_id: int,
invoice_update: InvoiceUpdate,
db: Session = Depends(get_db)
):
"""
Update an existing invoice.
"""
# Get the invoice
invoice = db.query(Invoice).filter(Invoice.id == invoice_id).first()
if invoice is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Invoice with ID {invoice_id} not found",
)
# Update invoice fields
update_data = invoice_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(invoice, field, value)
# Commit changes
db.commit()
db.refresh(invoice)
return invoice
@router.patch("/{invoice_id}/status", response_model=InvoiceDB)
def update_invoice_status(
invoice_id: int,
status_update: InvoiceStatusUpdate,
db: Session = Depends(get_db)
):
"""
Update the status of an invoice.
"""
# Get the invoice
invoice = db.query(Invoice).filter(Invoice.id == invoice_id).first()
if invoice is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Invoice with ID {invoice_id} not found",
)
# Update status
invoice.status = status_update.status
# Commit changes
db.commit()
db.refresh(invoice)
return invoice
@router.delete("/{invoice_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_invoice(invoice_id: int, db: Session = Depends(get_db)):
"""
Delete an invoice.
"""
# Get the invoice
invoice = db.query(Invoice).filter(Invoice.id == invoice_id).first()
if invoice is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Invoice with ID {invoice_id} not found",
)
# Delete the invoice
db.delete(invoice)
db.commit()
return None