Automated Action 4730c37915 Implement comprehensive transaction fraud monitoring API
- Created FastAPI application with transaction ingestion endpoints
- Built dynamic rule engine supporting velocity checks and aggregations
- Implemented real-time and batch screening capabilities
- Added rule management with versioning and rollback functionality
- Created comprehensive audit and reporting endpoints with pagination
- Set up SQLite database with proper migrations using Alembic
- Added intelligent caching for aggregate computations
- Included extensive API documentation and example rule definitions
- Configured CORS, health endpoints, and proper error handling
- Added support for time-windowed aggregations (sum, count, avg, max, min)
- Built background processing for high-volume batch screening
- Implemented field-agnostic rule conditions with flexible operators

Features include transaction ingestion, rule CRUD operations, real-time screening,
batch processing, aggregation computations, and comprehensive reporting capabilities
suitable for fintech fraud monitoring systems.
2025-06-27 16:00:48 +00:00

141 lines
7.3 KiB
Python

"""Initial migration
Revision ID: 001
Revises:
Create Date: 2024-01-01 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '001'
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
# Create transactions table
op.create_table('transactions',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('transaction_id', sa.String(), nullable=False),
sa.Column('user_id', sa.String(), nullable=False),
sa.Column('account_id', sa.String(), nullable=False),
sa.Column('amount', sa.Float(), nullable=False),
sa.Column('currency', sa.String(), nullable=True),
sa.Column('transaction_type', sa.String(), nullable=False),
sa.Column('merchant_id', sa.String(), nullable=True),
sa.Column('merchant_category', sa.String(), nullable=True),
sa.Column('channel', sa.String(), nullable=False),
sa.Column('location', sa.String(), nullable=True),
sa.Column('ip_address', sa.String(), nullable=True),
sa.Column('device_id', sa.String(), nullable=True),
sa.Column('status', sa.String(), nullable=True),
sa.Column('metadata', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_transactions_id'), 'transactions', ['id'], unique=False)
op.create_index(op.f('ix_transactions_transaction_id'), 'transactions', ['transaction_id'], unique=True)
op.create_index(op.f('ix_transactions_user_id'), 'transactions', ['user_id'], unique=False)
op.create_index(op.f('ix_transactions_account_id'), 'transactions', ['account_id'], unique=False)
op.create_index(op.f('ix_transactions_merchant_id'), 'transactions', ['merchant_id'], unique=False)
op.create_index(op.f('ix_transactions_device_id'), 'transactions', ['device_id'], unique=False)
# Create rules table
op.create_table('rules',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('rule_type', sa.String(), nullable=False),
sa.Column('conditions', sa.Text(), nullable=False),
sa.Column('actions', sa.Text(), nullable=False),
sa.Column('priority', sa.Integer(), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.Column('version', sa.Integer(), nullable=True),
sa.Column('created_by', sa.String(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_rules_id'), 'rules', ['id'], unique=False)
op.create_index(op.f('ix_rules_name'), 'rules', ['name'], unique=True)
# Create rule_versions table
op.create_table('rule_versions',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('rule_id', sa.Integer(), nullable=False),
sa.Column('version', sa.Integer(), nullable=False),
sa.Column('name', sa.String(), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('rule_type', sa.String(), nullable=False),
sa.Column('conditions', sa.Text(), nullable=False),
sa.Column('actions', sa.Text(), nullable=False),
sa.Column('priority', sa.Integer(), nullable=True),
sa.Column('created_by', sa.String(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_rule_versions_id'), 'rule_versions', ['id'], unique=False)
op.create_index(op.f('ix_rule_versions_rule_id'), 'rule_versions', ['rule_id'], unique=False)
# Create screening_results table
op.create_table('screening_results',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('transaction_id', sa.String(), nullable=False),
sa.Column('rule_id', sa.Integer(), nullable=False),
sa.Column('rule_name', sa.String(), nullable=False),
sa.Column('rule_version', sa.Integer(), nullable=True),
sa.Column('status', sa.String(), nullable=False),
sa.Column('risk_score', sa.Float(), nullable=True),
sa.Column('details', sa.Text(), nullable=True),
sa.Column('aggregated_data', sa.Text(), nullable=True),
sa.Column('screening_type', sa.String(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_screening_results_id'), 'screening_results', ['id'], unique=False)
op.create_index(op.f('ix_screening_results_transaction_id'), 'screening_results', ['transaction_id'], unique=False)
op.create_index(op.f('ix_screening_results_rule_id'), 'screening_results', ['rule_id'], unique=False)
# Create screening_batches table
op.create_table('screening_batches',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('batch_id', sa.String(), nullable=False),
sa.Column('name', sa.String(), nullable=True),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('status', sa.String(), nullable=True),
sa.Column('total_transactions', sa.Integer(), nullable=True),
sa.Column('processed_transactions', sa.Integer(), nullable=True),
sa.Column('flagged_transactions', sa.Integer(), nullable=True),
sa.Column('rules_applied', sa.Text(), nullable=True),
sa.Column('started_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('completed_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_screening_batches_id'), 'screening_batches', ['id'], unique=False)
op.create_index(op.f('ix_screening_batches_batch_id'), 'screening_batches', ['batch_id'], unique=True)
# Create aggregate_cache table
op.create_table('aggregate_cache',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('cache_key', sa.String(), nullable=False),
sa.Column('cache_value', sa.Text(), nullable=False),
sa.Column('expires_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_aggregate_cache_id'), 'aggregate_cache', ['id'], unique=False)
op.create_index(op.f('ix_aggregate_cache_cache_key'), 'aggregate_cache', ['cache_key'], unique=True)
def downgrade() -> None:
op.drop_table('aggregate_cache')
op.drop_table('screening_batches')
op.drop_table('screening_results')
op.drop_table('rule_versions')
op.drop_table('rules')
op.drop_table('transactions')