Transaction Fraud Monitoring API

A comprehensive API-driven transaction monitoring system designed to screen transactions against dynamic fraud rules in real-time. The system supports velocity checks, aggregation computations, rule management, and provides both real-time and batch screening capabilities.

Features

  • Transaction Ingestion: Bulk and individual transaction ingestion
  • Dynamic Rule Engine: Create and manage fraud detection rules with aggregation support
  • Real-time Screening: Instant transaction screening against active rules
  • Batch Processing: Background batch screening for historical data analysis
  • Aggregation Support: Velocity checks, sum, count, average computations over time windows
  • Rule Versioning: Complete audit trail with rule rollback capabilities
  • Caching: Intelligent caching for aggregate computations to improve performance
  • Comprehensive Reporting: Detailed screening results with pagination and filtering

Architecture

Core Components

  1. Rule Engine: Evaluates transactions against configurable rules with support for:

    • Field comparisons (eq, ne, gt, gte, lt, lte, in, contains)
    • Aggregate functions (sum, count, avg, max, min)
    • Time-based windowing (24h, 7d, 30d)
    • Group-by operations (user_id, account_id, device_id)
  2. Caching Layer: Optimizes aggregate computations with intelligent cache expiration

  3. Background Processing: Asynchronous batch screening for high-volume processing

  4. Audit Trail: Complete history of rule changes and screening results

Installation

  1. Install dependencies:
pip install -r requirements.txt
  1. Run database migrations:
alembic upgrade head
  1. Start the application:
uvicorn main:app --host 0.0.0.0 --port 8000

Environment Variables

The following environment variables can be configured:

  • SECRET_KEY: Secret key for API security (default: "your-secret-key-here")
  • DATABASE_URL: Database connection URL (optional, uses SQLite by default)

API Endpoints

Transaction Management

POST /api/v1/transactions/

Ingest a single transaction.

Request Body:

{
  "transaction_id": "txn_12345",
  "user_id": "user_123",
  "account_id": "acc_456",
  "amount": 50000.00,
  "currency": "NGN",
  "transaction_type": "debit",
  "merchant_id": "merchant_789",
  "merchant_category": "grocery",
  "channel": "mobile",
  "location": "Lagos, Nigeria",
  "ip_address": "192.168.1.1",
  "device_id": "device_abc",
  "status": "pending",
  "metadata": {
    "app_version": "2.1.0",
    "user_agent": "Mobile App"
  }
}

POST /api/v1/transactions/bulk

Ingest multiple transactions in bulk.

GET /api/v1/transactions/

Retrieve transactions with filtering and pagination.

Query Parameters:

  • page: Page number (default: 1)
  • page_size: Items per page (default: 100, max: 1000)
  • user_id: Filter by user ID
  • account_id: Filter by account ID
  • transaction_type: Filter by transaction type
  • channel: Filter by channel
  • status: Filter by status
  • min_amount: Minimum amount filter
  • max_amount: Maximum amount filter

Rule Management

POST /api/v1/rules/

Create a new fraud detection rule.

Example Rule for Velocity Check:

{
  "name": "High Velocity Transaction Check",
  "description": "Flag if user has more than 10 transactions > ₦100,000 in 24 hours",
  "rule_type": "velocity",
  "conditions": [
    {
      "field": "amount",
      "operator": "gt",
      "value": 100000,
      "aggregate_function": "count",
      "time_window": "24h",
      "group_by": ["user_id"]
    }
  ],
  "actions": [
    {
      "action_type": "flag",
      "parameters": {
        "risk_score": 80,
        "reason": "High velocity detected"
      }
    }
  ],
  "priority": 1,
  "is_active": true
}

Example Rule for Amount Limit:

{
  "name": "Large Single Transaction",
  "description": "Flag transactions over ₦500,000",
  "rule_type": "amount_limit",
  "conditions": [
    {
      "field": "amount",
      "operator": "gt",
      "value": 500000
    }
  ],
  "actions": [
    {
      "action_type": "flag",
      "parameters": {
        "risk_score": 90,
        "reason": "Large transaction amount"
      }
    }
  ],
  "priority": 2,
  "is_active": true
}

Example Rule for Sum-based Velocity:

{
  "name": "Daily Spending Limit",
  "description": "Flag if sum of transactions from same user > ₦500,000 within 24 hours",
  "rule_type": "velocity",
  "conditions": [
    {
      "field": "amount",
      "operator": "gt",
      "value": 500000,
      "aggregate_function": "sum",
      "time_window": "24h",
      "group_by": ["user_id"]
    }
  ],
  "actions": [
    {
      "action_type": "flag",
      "parameters": {
        "risk_score": 85,
        "reason": "Daily spending limit exceeded"
      }
    }
  ],
  "priority": 1,
  "is_active": true
}

GET /api/v1/rules/

Retrieve all rules with filtering and pagination.

PUT /api/v1/rules/{rule_id}

Update an existing rule (creates new version).

POST /api/v1/rules/{rule_id}/rollback/{version}

Rollback a rule to a previous version.

Screening

POST /api/v1/screening/transactions/{transaction_id}

Screen a single transaction in real-time.

Response:

{
  "transaction_id": "txn_12345",
  "results": [
    {
      "id": 1,
      "transaction_id": "txn_12345",
      "rule_id": 1,
      "rule_name": "High Velocity Transaction Check",
      "rule_version": 1,
      "status": "flagged",
      "risk_score": 80.0,
      "details": {
        "rule_triggered": true,
        "conditions_met": 1,
        "evaluation_time_ms": 45.2,
        "actions": [...]
      },
      "aggregated_data": {
        "value": 12,
        "function": "count",
        "field": "amount",
        "time_window": "24h",
        "group_by": ["user_id"]
      },
      "screening_type": "real_time",
      "created_at": "2024-01-01T12:00:00Z"
    }
  ],
  "overall_status": "flagged",
  "total_risk_score": 80.0,
  "screening_duration_ms": 67.8
}

POST /api/v1/screening/batch

Create a batch screening job.

GET /api/v1/screening/batch

Retrieve batch screening jobs.

GET /api/v1/screening/results

Retrieve screening results with audit capabilities.

Aggregation

POST /api/v1/screening/aggregate

Compute aggregate values with caching.

Request Example:

{
  "aggregate_function": "sum",
  "field": "amount",
  "group_by": ["user_id"],
  "filters": {
    "transaction_type": "debit"
  },
  "time_window": "24h"
}

Rule Definition Format

Rules are defined using a flexible JSON structure:

Condition Fields

  • field: Transaction field to evaluate
  • operator: Comparison operator (eq, ne, gt, gte, lt, lte, in, not_in, contains, starts_with, ends_with)
  • value: Value to compare against
  • aggregate_function: Optional aggregation (sum, count, avg, max, min)
  • time_window: Time window for aggregation (1h, 24h, 7d, 30d)
  • group_by: Fields to group by for aggregation

Action Types

  • flag: Mark transaction as suspicious
  • block: Block transaction (highest risk)
  • alert: Send alert notification
  • score: Assign custom risk score

Time Windows

  • 1h, 2h, 24h: Hours
  • 1d, 7d, 30d: Days
  • 15m, 30m: Minutes

Background Jobs

For high-volume processing, use batch screening:

  1. Create batch job with filters
  2. Monitor job progress via batch endpoints
  3. Retrieve results when completed

Batch jobs run asynchronously to avoid blocking the API.

Performance Considerations

  • Caching: Aggregate computations are cached based on time windows
  • Indexing: Database indexes on key fields (user_id, account_id, device_id, created_at)
  • Background Processing: Batch jobs prevent API blocking
  • Pagination: All list endpoints support pagination

Security

  • CORS enabled for all origins (configure for production)
  • Request validation using Pydantic schemas
  • SQLAlchemy ORM prevents SQL injection
  • Environment variable configuration for secrets

Development

Run the development server:

uvicorn main:app --reload --host 0.0.0.0 --port 8000

Run linting:

ruff check . --fix

API Documentation

Health Check

Database

The system uses SQLite by default with the database file stored at /app/storage/db/db.sqlite. The database includes:

  • transactions: Core transaction data
  • rules: Fraud detection rules with versioning
  • rule_versions: Complete audit trail of rule changes
  • screening_results: Individual screening outcomes
  • screening_batches: Batch job management
  • aggregate_cache: Performance optimization cache

Example Workflows

1. Basic Setup

  1. Create rules using POST /api/v1/rules/
  2. Ingest transactions using POST /api/v1/transactions/
  3. Screen transactions using POST /api/v1/screening/transactions/{id}

2. Batch Processing

  1. Ingest historical transactions
  2. Create batch screening job
  3. Monitor batch progress
  4. Retrieve and analyze results

3. Rule Management

  1. Create initial rules
  2. Monitor screening results
  3. Update rules based on performance
  4. Use versioning for audit compliance
Description
Project: Transaction Fraud Monitoring API
Readme 61 KiB
Languages
Python 99.5%
Mako 0.5%