
- Created FastAPI application with transaction ingestion endpoints - Built dynamic rule engine supporting velocity checks and aggregations - Implemented real-time and batch screening capabilities - Added rule management with versioning and rollback functionality - Created comprehensive audit and reporting endpoints with pagination - Set up SQLite database with proper migrations using Alembic - Added intelligent caching for aggregate computations - Included extensive API documentation and example rule definitions - Configured CORS, health endpoints, and proper error handling - Added support for time-windowed aggregations (sum, count, avg, max, min) - Built background processing for high-volume batch screening - Implemented field-agnostic rule conditions with flexible operators Features include transaction ingestion, rule CRUD operations, real-time screening, batch processing, aggregation computations, and comprehensive reporting capabilities suitable for fintech fraud monitoring systems.
370 lines
9.3 KiB
Markdown
370 lines
9.3 KiB
Markdown
# Transaction Fraud Monitoring API
|
|
|
|
A comprehensive API-driven transaction monitoring system designed to screen transactions against dynamic fraud rules in real-time. The system supports velocity checks, aggregation computations, rule management, and provides both real-time and batch screening capabilities.
|
|
|
|
## Features
|
|
|
|
- **Transaction Ingestion**: Bulk and individual transaction ingestion
|
|
- **Dynamic Rule Engine**: Create and manage fraud detection rules with aggregation support
|
|
- **Real-time Screening**: Instant transaction screening against active rules
|
|
- **Batch Processing**: Background batch screening for historical data analysis
|
|
- **Aggregation Support**: Velocity checks, sum, count, average computations over time windows
|
|
- **Rule Versioning**: Complete audit trail with rule rollback capabilities
|
|
- **Caching**: Intelligent caching for aggregate computations to improve performance
|
|
- **Comprehensive Reporting**: Detailed screening results with pagination and filtering
|
|
|
|
## Architecture
|
|
|
|
### Core Components
|
|
|
|
1. **Rule Engine**: Evaluates transactions against configurable rules with support for:
|
|
- Field comparisons (eq, ne, gt, gte, lt, lte, in, contains)
|
|
- Aggregate functions (sum, count, avg, max, min)
|
|
- Time-based windowing (24h, 7d, 30d)
|
|
- Group-by operations (user_id, account_id, device_id)
|
|
|
|
2. **Caching Layer**: Optimizes aggregate computations with intelligent cache expiration
|
|
|
|
3. **Background Processing**: Asynchronous batch screening for high-volume processing
|
|
|
|
4. **Audit Trail**: Complete history of rule changes and screening results
|
|
|
|
## Installation
|
|
|
|
1. Install dependencies:
|
|
```bash
|
|
pip install -r requirements.txt
|
|
```
|
|
|
|
2. Run database migrations:
|
|
```bash
|
|
alembic upgrade head
|
|
```
|
|
|
|
3. Start the application:
|
|
```bash
|
|
uvicorn main:app --host 0.0.0.0 --port 8000
|
|
```
|
|
|
|
## Environment Variables
|
|
|
|
The following environment variables can be configured:
|
|
|
|
- `SECRET_KEY`: Secret key for API security (default: "your-secret-key-here")
|
|
- `DATABASE_URL`: Database connection URL (optional, uses SQLite by default)
|
|
|
|
## API Endpoints
|
|
|
|
### Transaction Management
|
|
|
|
#### POST /api/v1/transactions/
|
|
Ingest a single transaction.
|
|
|
|
**Request Body:**
|
|
```json
|
|
{
|
|
"transaction_id": "txn_12345",
|
|
"user_id": "user_123",
|
|
"account_id": "acc_456",
|
|
"amount": 50000.00,
|
|
"currency": "NGN",
|
|
"transaction_type": "debit",
|
|
"merchant_id": "merchant_789",
|
|
"merchant_category": "grocery",
|
|
"channel": "mobile",
|
|
"location": "Lagos, Nigeria",
|
|
"ip_address": "192.168.1.1",
|
|
"device_id": "device_abc",
|
|
"status": "pending",
|
|
"metadata": {
|
|
"app_version": "2.1.0",
|
|
"user_agent": "Mobile App"
|
|
}
|
|
}
|
|
```
|
|
|
|
#### POST /api/v1/transactions/bulk
|
|
Ingest multiple transactions in bulk.
|
|
|
|
#### GET /api/v1/transactions/
|
|
Retrieve transactions with filtering and pagination.
|
|
|
|
**Query Parameters:**
|
|
- `page`: Page number (default: 1)
|
|
- `page_size`: Items per page (default: 100, max: 1000)
|
|
- `user_id`: Filter by user ID
|
|
- `account_id`: Filter by account ID
|
|
- `transaction_type`: Filter by transaction type
|
|
- `channel`: Filter by channel
|
|
- `status`: Filter by status
|
|
- `min_amount`: Minimum amount filter
|
|
- `max_amount`: Maximum amount filter
|
|
|
|
### Rule Management
|
|
|
|
#### POST /api/v1/rules/
|
|
Create a new fraud detection rule.
|
|
|
|
**Example Rule for Velocity Check:**
|
|
```json
|
|
{
|
|
"name": "High Velocity Transaction Check",
|
|
"description": "Flag if user has more than 10 transactions > ₦100,000 in 24 hours",
|
|
"rule_type": "velocity",
|
|
"conditions": [
|
|
{
|
|
"field": "amount",
|
|
"operator": "gt",
|
|
"value": 100000,
|
|
"aggregate_function": "count",
|
|
"time_window": "24h",
|
|
"group_by": ["user_id"]
|
|
}
|
|
],
|
|
"actions": [
|
|
{
|
|
"action_type": "flag",
|
|
"parameters": {
|
|
"risk_score": 80,
|
|
"reason": "High velocity detected"
|
|
}
|
|
}
|
|
],
|
|
"priority": 1,
|
|
"is_active": true
|
|
}
|
|
```
|
|
|
|
**Example Rule for Amount Limit:**
|
|
```json
|
|
{
|
|
"name": "Large Single Transaction",
|
|
"description": "Flag transactions over ₦500,000",
|
|
"rule_type": "amount_limit",
|
|
"conditions": [
|
|
{
|
|
"field": "amount",
|
|
"operator": "gt",
|
|
"value": 500000
|
|
}
|
|
],
|
|
"actions": [
|
|
{
|
|
"action_type": "flag",
|
|
"parameters": {
|
|
"risk_score": 90,
|
|
"reason": "Large transaction amount"
|
|
}
|
|
}
|
|
],
|
|
"priority": 2,
|
|
"is_active": true
|
|
}
|
|
```
|
|
|
|
**Example Rule for Sum-based Velocity:**
|
|
```json
|
|
{
|
|
"name": "Daily Spending Limit",
|
|
"description": "Flag if sum of transactions from same user > ₦500,000 within 24 hours",
|
|
"rule_type": "velocity",
|
|
"conditions": [
|
|
{
|
|
"field": "amount",
|
|
"operator": "gt",
|
|
"value": 500000,
|
|
"aggregate_function": "sum",
|
|
"time_window": "24h",
|
|
"group_by": ["user_id"]
|
|
}
|
|
],
|
|
"actions": [
|
|
{
|
|
"action_type": "flag",
|
|
"parameters": {
|
|
"risk_score": 85,
|
|
"reason": "Daily spending limit exceeded"
|
|
}
|
|
}
|
|
],
|
|
"priority": 1,
|
|
"is_active": true
|
|
}
|
|
```
|
|
|
|
#### GET /api/v1/rules/
|
|
Retrieve all rules with filtering and pagination.
|
|
|
|
#### PUT /api/v1/rules/{rule_id}
|
|
Update an existing rule (creates new version).
|
|
|
|
#### POST /api/v1/rules/{rule_id}/rollback/{version}
|
|
Rollback a rule to a previous version.
|
|
|
|
### Screening
|
|
|
|
#### POST /api/v1/screening/transactions/{transaction_id}
|
|
Screen a single transaction in real-time.
|
|
|
|
**Response:**
|
|
```json
|
|
{
|
|
"transaction_id": "txn_12345",
|
|
"results": [
|
|
{
|
|
"id": 1,
|
|
"transaction_id": "txn_12345",
|
|
"rule_id": 1,
|
|
"rule_name": "High Velocity Transaction Check",
|
|
"rule_version": 1,
|
|
"status": "flagged",
|
|
"risk_score": 80.0,
|
|
"details": {
|
|
"rule_triggered": true,
|
|
"conditions_met": 1,
|
|
"evaluation_time_ms": 45.2,
|
|
"actions": [...]
|
|
},
|
|
"aggregated_data": {
|
|
"value": 12,
|
|
"function": "count",
|
|
"field": "amount",
|
|
"time_window": "24h",
|
|
"group_by": ["user_id"]
|
|
},
|
|
"screening_type": "real_time",
|
|
"created_at": "2024-01-01T12:00:00Z"
|
|
}
|
|
],
|
|
"overall_status": "flagged",
|
|
"total_risk_score": 80.0,
|
|
"screening_duration_ms": 67.8
|
|
}
|
|
```
|
|
|
|
#### POST /api/v1/screening/batch
|
|
Create a batch screening job.
|
|
|
|
#### GET /api/v1/screening/batch
|
|
Retrieve batch screening jobs.
|
|
|
|
#### GET /api/v1/screening/results
|
|
Retrieve screening results with audit capabilities.
|
|
|
|
### Aggregation
|
|
|
|
#### POST /api/v1/screening/aggregate
|
|
Compute aggregate values with caching.
|
|
|
|
**Request Example:**
|
|
```json
|
|
{
|
|
"aggregate_function": "sum",
|
|
"field": "amount",
|
|
"group_by": ["user_id"],
|
|
"filters": {
|
|
"transaction_type": "debit"
|
|
},
|
|
"time_window": "24h"
|
|
}
|
|
```
|
|
|
|
## Rule Definition Format
|
|
|
|
Rules are defined using a flexible JSON structure:
|
|
|
|
### Condition Fields
|
|
- `field`: Transaction field to evaluate
|
|
- `operator`: Comparison operator (eq, ne, gt, gte, lt, lte, in, not_in, contains, starts_with, ends_with)
|
|
- `value`: Value to compare against
|
|
- `aggregate_function`: Optional aggregation (sum, count, avg, max, min)
|
|
- `time_window`: Time window for aggregation (1h, 24h, 7d, 30d)
|
|
- `group_by`: Fields to group by for aggregation
|
|
|
|
### Action Types
|
|
- `flag`: Mark transaction as suspicious
|
|
- `block`: Block transaction (highest risk)
|
|
- `alert`: Send alert notification
|
|
- `score`: Assign custom risk score
|
|
|
|
### Time Windows
|
|
- `1h`, `2h`, `24h`: Hours
|
|
- `1d`, `7d`, `30d`: Days
|
|
- `15m`, `30m`: Minutes
|
|
|
|
## Background Jobs
|
|
|
|
For high-volume processing, use batch screening:
|
|
|
|
1. Create batch job with filters
|
|
2. Monitor job progress via batch endpoints
|
|
3. Retrieve results when completed
|
|
|
|
Batch jobs run asynchronously to avoid blocking the API.
|
|
|
|
## Performance Considerations
|
|
|
|
- **Caching**: Aggregate computations are cached based on time windows
|
|
- **Indexing**: Database indexes on key fields (user_id, account_id, device_id, created_at)
|
|
- **Background Processing**: Batch jobs prevent API blocking
|
|
- **Pagination**: All list endpoints support pagination
|
|
|
|
## Security
|
|
|
|
- CORS enabled for all origins (configure for production)
|
|
- Request validation using Pydantic schemas
|
|
- SQLAlchemy ORM prevents SQL injection
|
|
- Environment variable configuration for secrets
|
|
|
|
## Development
|
|
|
|
Run the development server:
|
|
```bash
|
|
uvicorn main:app --reload --host 0.0.0.0 --port 8000
|
|
```
|
|
|
|
Run linting:
|
|
```bash
|
|
ruff check . --fix
|
|
```
|
|
|
|
## API Documentation
|
|
|
|
- Interactive API docs: http://localhost:8000/docs
|
|
- ReDoc documentation: http://localhost:8000/redoc
|
|
- OpenAPI JSON: http://localhost:8000/openapi.json
|
|
|
|
## Health Check
|
|
|
|
- Health endpoint: http://localhost:8000/health
|
|
- Base info: http://localhost:8000/
|
|
|
|
## Database
|
|
|
|
The system uses SQLite by default with the database file stored at `/app/storage/db/db.sqlite`. The database includes:
|
|
|
|
- **transactions**: Core transaction data
|
|
- **rules**: Fraud detection rules with versioning
|
|
- **rule_versions**: Complete audit trail of rule changes
|
|
- **screening_results**: Individual screening outcomes
|
|
- **screening_batches**: Batch job management
|
|
- **aggregate_cache**: Performance optimization cache
|
|
|
|
## Example Workflows
|
|
|
|
### 1. Basic Setup
|
|
1. Create rules using POST /api/v1/rules/
|
|
2. Ingest transactions using POST /api/v1/transactions/
|
|
3. Screen transactions using POST /api/v1/screening/transactions/{id}
|
|
|
|
### 2. Batch Processing
|
|
1. Ingest historical transactions
|
|
2. Create batch screening job
|
|
3. Monitor batch progress
|
|
4. Retrieve and analyze results
|
|
|
|
### 3. Rule Management
|
|
1. Create initial rules
|
|
2. Monitor screening results
|
|
3. Update rules based on performance
|
|
4. Use versioning for audit compliance |