Transaction Fraud Monitoring API
A comprehensive API-driven transaction monitoring system designed to screen transactions against dynamic fraud rules in real-time. The system supports velocity checks, aggregation computations, rule management, and provides both real-time and batch screening capabilities.
Features
- Transaction Ingestion: Bulk and individual transaction ingestion
- Dynamic Rule Engine: Create and manage fraud detection rules with aggregation support
- Real-time Screening: Instant transaction screening against active rules
- Batch Processing: Background batch screening for historical data analysis
- Aggregation Support: Velocity checks, sum, count, average computations over time windows
- Rule Versioning: Complete audit trail with rule rollback capabilities
- Caching: Intelligent caching for aggregate computations to improve performance
- Comprehensive Reporting: Detailed screening results with pagination and filtering
Architecture
Core Components
-
Rule Engine: Evaluates transactions against configurable rules with support for:
- Field comparisons (eq, ne, gt, gte, lt, lte, in, contains)
- Aggregate functions (sum, count, avg, max, min)
- Time-based windowing (24h, 7d, 30d)
- Group-by operations (user_id, account_id, device_id)
-
Caching Layer: Optimizes aggregate computations with intelligent cache expiration
-
Background Processing: Asynchronous batch screening for high-volume processing
-
Audit Trail: Complete history of rule changes and screening results
Installation
- Install dependencies:
pip install -r requirements.txt
- Run database migrations:
alembic upgrade head
- Start the application:
uvicorn main:app --host 0.0.0.0 --port 8000
Environment Variables
The following environment variables can be configured:
SECRET_KEY
: Secret key for API security (default: "your-secret-key-here")DATABASE_URL
: Database connection URL (optional, uses SQLite by default)
API Endpoints
Transaction Management
POST /api/v1/transactions/
Ingest a single transaction.
Request Body:
{
"transaction_id": "txn_12345",
"user_id": "user_123",
"account_id": "acc_456",
"amount": 50000.00,
"currency": "NGN",
"transaction_type": "debit",
"merchant_id": "merchant_789",
"merchant_category": "grocery",
"channel": "mobile",
"location": "Lagos, Nigeria",
"ip_address": "192.168.1.1",
"device_id": "device_abc",
"status": "pending",
"metadata": {
"app_version": "2.1.0",
"user_agent": "Mobile App"
}
}
POST /api/v1/transactions/bulk
Ingest multiple transactions in bulk.
GET /api/v1/transactions/
Retrieve transactions with filtering and pagination.
Query Parameters:
page
: Page number (default: 1)page_size
: Items per page (default: 100, max: 1000)user_id
: Filter by user IDaccount_id
: Filter by account IDtransaction_type
: Filter by transaction typechannel
: Filter by channelstatus
: Filter by statusmin_amount
: Minimum amount filtermax_amount
: Maximum amount filter
Rule Management
POST /api/v1/rules/
Create a new fraud detection rule.
Example Rule for Velocity Check:
{
"name": "High Velocity Transaction Check",
"description": "Flag if user has more than 10 transactions > ₦100,000 in 24 hours",
"rule_type": "velocity",
"conditions": [
{
"field": "amount",
"operator": "gt",
"value": 100000,
"aggregate_function": "count",
"time_window": "24h",
"group_by": ["user_id"]
}
],
"actions": [
{
"action_type": "flag",
"parameters": {
"risk_score": 80,
"reason": "High velocity detected"
}
}
],
"priority": 1,
"is_active": true
}
Example Rule for Amount Limit:
{
"name": "Large Single Transaction",
"description": "Flag transactions over ₦500,000",
"rule_type": "amount_limit",
"conditions": [
{
"field": "amount",
"operator": "gt",
"value": 500000
}
],
"actions": [
{
"action_type": "flag",
"parameters": {
"risk_score": 90,
"reason": "Large transaction amount"
}
}
],
"priority": 2,
"is_active": true
}
Example Rule for Sum-based Velocity:
{
"name": "Daily Spending Limit",
"description": "Flag if sum of transactions from same user > ₦500,000 within 24 hours",
"rule_type": "velocity",
"conditions": [
{
"field": "amount",
"operator": "gt",
"value": 500000,
"aggregate_function": "sum",
"time_window": "24h",
"group_by": ["user_id"]
}
],
"actions": [
{
"action_type": "flag",
"parameters": {
"risk_score": 85,
"reason": "Daily spending limit exceeded"
}
}
],
"priority": 1,
"is_active": true
}
GET /api/v1/rules/
Retrieve all rules with filtering and pagination.
PUT /api/v1/rules/{rule_id}
Update an existing rule (creates new version).
POST /api/v1/rules/{rule_id}/rollback/{version}
Rollback a rule to a previous version.
Screening
POST /api/v1/screening/transactions/{transaction_id}
Screen a single transaction in real-time.
Response:
{
"transaction_id": "txn_12345",
"results": [
{
"id": 1,
"transaction_id": "txn_12345",
"rule_id": 1,
"rule_name": "High Velocity Transaction Check",
"rule_version": 1,
"status": "flagged",
"risk_score": 80.0,
"details": {
"rule_triggered": true,
"conditions_met": 1,
"evaluation_time_ms": 45.2,
"actions": [...]
},
"aggregated_data": {
"value": 12,
"function": "count",
"field": "amount",
"time_window": "24h",
"group_by": ["user_id"]
},
"screening_type": "real_time",
"created_at": "2024-01-01T12:00:00Z"
}
],
"overall_status": "flagged",
"total_risk_score": 80.0,
"screening_duration_ms": 67.8
}
POST /api/v1/screening/batch
Create a batch screening job.
GET /api/v1/screening/batch
Retrieve batch screening jobs.
GET /api/v1/screening/results
Retrieve screening results with audit capabilities.
Aggregation
POST /api/v1/screening/aggregate
Compute aggregate values with caching.
Request Example:
{
"aggregate_function": "sum",
"field": "amount",
"group_by": ["user_id"],
"filters": {
"transaction_type": "debit"
},
"time_window": "24h"
}
Rule Definition Format
Rules are defined using a flexible JSON structure:
Condition Fields
field
: Transaction field to evaluateoperator
: Comparison operator (eq, ne, gt, gte, lt, lte, in, not_in, contains, starts_with, ends_with)value
: Value to compare againstaggregate_function
: Optional aggregation (sum, count, avg, max, min)time_window
: Time window for aggregation (1h, 24h, 7d, 30d)group_by
: Fields to group by for aggregation
Action Types
flag
: Mark transaction as suspiciousblock
: Block transaction (highest risk)alert
: Send alert notificationscore
: Assign custom risk score
Time Windows
1h
,2h
,24h
: Hours1d
,7d
,30d
: Days15m
,30m
: Minutes
Background Jobs
For high-volume processing, use batch screening:
- Create batch job with filters
- Monitor job progress via batch endpoints
- Retrieve results when completed
Batch jobs run asynchronously to avoid blocking the API.
Performance Considerations
- Caching: Aggregate computations are cached based on time windows
- Indexing: Database indexes on key fields (user_id, account_id, device_id, created_at)
- Background Processing: Batch jobs prevent API blocking
- Pagination: All list endpoints support pagination
Security
- CORS enabled for all origins (configure for production)
- Request validation using Pydantic schemas
- SQLAlchemy ORM prevents SQL injection
- Environment variable configuration for secrets
Development
Run the development server:
uvicorn main:app --reload --host 0.0.0.0 --port 8000
Run linting:
ruff check . --fix
API Documentation
- Interactive API docs: http://localhost:8000/docs
- ReDoc documentation: http://localhost:8000/redoc
- OpenAPI JSON: http://localhost:8000/openapi.json
Health Check
- Health endpoint: http://localhost:8000/health
- Base info: http://localhost:8000/
Database
The system uses SQLite by default with the database file stored at /app/storage/db/db.sqlite
. The database includes:
- transactions: Core transaction data
- rules: Fraud detection rules with versioning
- rule_versions: Complete audit trail of rule changes
- screening_results: Individual screening outcomes
- screening_batches: Batch job management
- aggregate_cache: Performance optimization cache
Example Workflows
1. Basic Setup
- Create rules using POST /api/v1/rules/
- Ingest transactions using POST /api/v1/transactions/
- Screen transactions using POST /api/v1/screening/transactions/{id}
2. Batch Processing
- Ingest historical transactions
- Create batch screening job
- Monitor batch progress
- Retrieve and analyze results
3. Rule Management
- Create initial rules
- Monitor screening results
- Update rules based on performance
- Use versioning for audit compliance