
- Created FastAPI application with transaction ingestion endpoints - Built dynamic rule engine supporting velocity checks and aggregations - Implemented real-time and batch screening capabilities - Added rule management with versioning and rollback functionality - Created comprehensive audit and reporting endpoints with pagination - Set up SQLite database with proper migrations using Alembic - Added intelligent caching for aggregate computations - Included extensive API documentation and example rule definitions - Configured CORS, health endpoints, and proper error handling - Added support for time-windowed aggregations (sum, count, avg, max, min) - Built background processing for high-volume batch screening - Implemented field-agnostic rule conditions with flexible operators Features include transaction ingestion, rule CRUD operations, real-time screening, batch processing, aggregation computations, and comprehensive reporting capabilities suitable for fintech fraud monitoring systems.
224 lines
7.7 KiB
Python
224 lines
7.7 KiB
Python
from typing import List, Optional
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import desc
|
|
import json
|
|
|
|
from app.db.session import get_db
|
|
from app.models.transaction import Transaction
|
|
from app.core.schemas import TransactionCreate, TransactionUpdate, Transaction as TransactionSchema, PaginatedResponse
|
|
|
|
router = APIRouter()
|
|
|
|
@router.post("/", response_model=TransactionSchema)
|
|
async def create_transaction(
|
|
transaction: TransactionCreate,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Ingest a new transaction into the system.
|
|
|
|
This endpoint accepts transaction data and stores it for fraud screening.
|
|
The transaction will be available for real-time screening immediately.
|
|
"""
|
|
# Check if transaction already exists
|
|
existing = db.query(Transaction).filter(Transaction.transaction_id == transaction.transaction_id).first()
|
|
if existing:
|
|
raise HTTPException(status_code=400, detail="Transaction with this ID already exists")
|
|
|
|
# Convert metadata to JSON string if provided
|
|
metadata_json = json.dumps(transaction.metadata) if transaction.metadata else None
|
|
|
|
db_transaction = Transaction(
|
|
transaction_id=transaction.transaction_id,
|
|
user_id=transaction.user_id,
|
|
account_id=transaction.account_id,
|
|
amount=transaction.amount,
|
|
currency=transaction.currency,
|
|
transaction_type=transaction.transaction_type,
|
|
merchant_id=transaction.merchant_id,
|
|
merchant_category=transaction.merchant_category,
|
|
channel=transaction.channel,
|
|
location=transaction.location,
|
|
ip_address=transaction.ip_address,
|
|
device_id=transaction.device_id,
|
|
status=transaction.status,
|
|
metadata=metadata_json
|
|
)
|
|
|
|
db.add(db_transaction)
|
|
db.commit()
|
|
db.refresh(db_transaction)
|
|
|
|
# Convert metadata back to dict for response
|
|
result = TransactionSchema.from_orm(db_transaction)
|
|
if db_transaction.metadata:
|
|
result.metadata = json.loads(db_transaction.metadata)
|
|
|
|
return result
|
|
|
|
@router.post("/bulk", response_model=List[TransactionSchema])
|
|
async def create_transactions_bulk(
|
|
transactions: List[TransactionCreate],
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Ingest multiple transactions in bulk.
|
|
|
|
This endpoint is optimized for high-volume transaction ingestion.
|
|
Duplicate transaction IDs will be skipped.
|
|
"""
|
|
created_transactions = []
|
|
|
|
for transaction in transactions:
|
|
# Check if transaction already exists
|
|
existing = db.query(Transaction).filter(Transaction.transaction_id == transaction.transaction_id).first()
|
|
if existing:
|
|
continue
|
|
|
|
metadata_json = json.dumps(transaction.metadata) if transaction.metadata else None
|
|
|
|
db_transaction = Transaction(
|
|
transaction_id=transaction.transaction_id,
|
|
user_id=transaction.user_id,
|
|
account_id=transaction.account_id,
|
|
amount=transaction.amount,
|
|
currency=transaction.currency,
|
|
transaction_type=transaction.transaction_type,
|
|
merchant_id=transaction.merchant_id,
|
|
merchant_category=transaction.merchant_category,
|
|
channel=transaction.channel,
|
|
location=transaction.location,
|
|
ip_address=transaction.ip_address,
|
|
device_id=transaction.device_id,
|
|
status=transaction.status,
|
|
metadata=metadata_json
|
|
)
|
|
|
|
db.add(db_transaction)
|
|
created_transactions.append(db_transaction)
|
|
|
|
db.commit()
|
|
|
|
# Refresh and convert to response format
|
|
results = []
|
|
for db_transaction in created_transactions:
|
|
db.refresh(db_transaction)
|
|
result = TransactionSchema.from_orm(db_transaction)
|
|
if db_transaction.metadata:
|
|
result.metadata = json.loads(db_transaction.metadata)
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
@router.get("/", response_model=PaginatedResponse)
|
|
async def get_transactions(
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(100, ge=1, le=1000),
|
|
user_id: Optional[str] = None,
|
|
account_id: Optional[str] = None,
|
|
transaction_type: Optional[str] = None,
|
|
channel: Optional[str] = None,
|
|
status: Optional[str] = None,
|
|
min_amount: Optional[float] = None,
|
|
max_amount: Optional[float] = None,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Retrieve transactions with filtering and pagination.
|
|
|
|
Supports filtering by various transaction attributes and returns
|
|
paginated results for efficient data retrieval.
|
|
"""
|
|
query = db.query(Transaction)
|
|
|
|
# Apply filters
|
|
if user_id:
|
|
query = query.filter(Transaction.user_id == user_id)
|
|
if account_id:
|
|
query = query.filter(Transaction.account_id == account_id)
|
|
if transaction_type:
|
|
query = query.filter(Transaction.transaction_type == transaction_type)
|
|
if channel:
|
|
query = query.filter(Transaction.channel == channel)
|
|
if status:
|
|
query = query.filter(Transaction.status == status)
|
|
if min_amount is not None:
|
|
query = query.filter(Transaction.amount >= min_amount)
|
|
if max_amount is not None:
|
|
query = query.filter(Transaction.amount <= max_amount)
|
|
|
|
# Get total count
|
|
total = query.count()
|
|
|
|
# Apply pagination
|
|
offset = (page - 1) * page_size
|
|
transactions = query.order_by(desc(Transaction.created_at)).offset(offset).limit(page_size).all()
|
|
|
|
# Convert to response format
|
|
items = []
|
|
for transaction in transactions:
|
|
result = TransactionSchema.from_orm(transaction)
|
|
if transaction.metadata:
|
|
result.metadata = json.loads(transaction.metadata)
|
|
items.append(result)
|
|
|
|
return PaginatedResponse(
|
|
items=items,
|
|
total=total,
|
|
page=page,
|
|
page_size=page_size,
|
|
total_pages=(total + page_size - 1) // page_size
|
|
)
|
|
|
|
@router.get("/{transaction_id}", response_model=TransactionSchema)
|
|
async def get_transaction(
|
|
transaction_id: str,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Retrieve a specific transaction by ID.
|
|
"""
|
|
transaction = db.query(Transaction).filter(Transaction.transaction_id == transaction_id).first()
|
|
if not transaction:
|
|
raise HTTPException(status_code=404, detail="Transaction not found")
|
|
|
|
result = TransactionSchema.from_orm(transaction)
|
|
if transaction.metadata:
|
|
result.metadata = json.loads(transaction.metadata)
|
|
|
|
return result
|
|
|
|
@router.patch("/{transaction_id}", response_model=TransactionSchema)
|
|
async def update_transaction(
|
|
transaction_id: str,
|
|
transaction_update: TransactionUpdate,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Update a transaction's status or metadata.
|
|
|
|
This endpoint allows updating transaction status (e.g., from pending to completed)
|
|
and adding additional metadata without modifying core transaction data.
|
|
"""
|
|
transaction = db.query(Transaction).filter(Transaction.transaction_id == transaction_id).first()
|
|
if not transaction:
|
|
raise HTTPException(status_code=404, detail="Transaction not found")
|
|
|
|
if transaction_update.status is not None:
|
|
transaction.status = transaction_update.status
|
|
|
|
if transaction_update.metadata is not None:
|
|
# Merge with existing metadata
|
|
existing_metadata = json.loads(transaction.metadata) if transaction.metadata else {}
|
|
existing_metadata.update(transaction_update.metadata)
|
|
transaction.metadata = json.dumps(existing_metadata)
|
|
|
|
db.commit()
|
|
db.refresh(transaction)
|
|
|
|
result = TransactionSchema.from_orm(transaction)
|
|
if transaction.metadata:
|
|
result.metadata = json.loads(transaction.metadata)
|
|
|
|
return result |