
- Create project structure with FastAPI - Add database models for blocks, transactions, arbitrages, pools, and DEXes - Implement Solana RPC client for fetching blockchain data - Create arbitrage detection algorithm - Implement comprehensive API endpoints for analytics - Set up database migrations with Alembic - Add detailed project documentation generated with BackendIM... (backend.im) Co-Authored-By: Claude <noreply@anthropic.com>
179 lines
7.1 KiB
Python
179 lines
7.1 KiB
Python
import json
|
|
from typing import List, Dict, Any, Optional, Set, Tuple
|
|
from decimal import Decimal
|
|
|
|
from loguru import logger
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.config import settings
|
|
from app.crud import block, transaction, dex, pool, arbitrage
|
|
from app.models.block import Block
|
|
from app.models.transaction import Transaction
|
|
from app.schemas.arbitrage import ArbitrageCreate, ArbitrageLegCreate
|
|
|
|
|
|
# Common DEX program IDs on Solana
|
|
DEX_PROGRAM_IDS = {
|
|
"jup-ag": "JUP4Fb2cqiRUcaTHdrPC8h2gNsA8fvKXYbXUJJqRUrZP", # Jupiter Aggregator
|
|
"orca": "whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc", # Orca Whirlpools
|
|
"raydium": "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8", # Raydium
|
|
"serum": "9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin", # Serum v3
|
|
"aldrin": "CURVGoZn8zycx6FXwwevgBTB2gVvdbGTEpvMJDbgs2t4", # Aldrin
|
|
"saber": "SSwpkEEcbUqx4vtoEByFjSkhKdCT862DNVb52nZg1UZ", # Saber
|
|
"mercurial": "MERLuDFBMmsHnsBPZw2sDQZHvXFMwp8EdjudcU2HKky", # Mercurial
|
|
"lifinity": "EewxydAPCCVuNEyrVN68PuSYdQ7wKn27V9Gjeoi8dy3S", # Lifinity
|
|
"crema": "6MLxLqiXaaSUpkgMnLbse4ConZQZnxLi3VPYSNznE9Ky", # Crema Finance
|
|
"meteora": "M2mx93ekt1fmXSVkTrUL9xVFHkmME8HTUi5Cyc5aF7K", # Meteora
|
|
"cykura": "cysPXAjehMpVKUapzbMCCnpFxUFFryEWEaLgnb9NrR8", # Cykura
|
|
}
|
|
|
|
|
|
class ArbitrageDetector:
|
|
"""Service for detecting arbitrage opportunities in Solana transactions."""
|
|
|
|
def __init__(self, db: Session):
|
|
"""Initialize arbitrage detector."""
|
|
self.db = db
|
|
|
|
def detect_arbitrages(self, num_blocks: Optional[int] = None) -> int:
|
|
"""
|
|
Analyze unprocessed blocks to detect arbitrages.
|
|
|
|
Returns the number of blocks processed.
|
|
"""
|
|
# Get unprocessed blocks
|
|
unprocessed_blocks = block.get_unprocessed_blocks(
|
|
self.db, limit=num_blocks or 100
|
|
)
|
|
|
|
if not unprocessed_blocks:
|
|
logger.info("No unprocessed blocks found")
|
|
return 0
|
|
|
|
logger.info(f"Analyzing {len(unprocessed_blocks)} blocks for arbitrages")
|
|
|
|
processed_count = 0
|
|
for blk in unprocessed_blocks:
|
|
try:
|
|
# Get the transactions for this block
|
|
txs = transaction.get_by_block(self.db, block_id=blk.id)
|
|
|
|
# Process each transaction
|
|
for tx in txs:
|
|
self._analyze_transaction(tx)
|
|
|
|
# Mark block as processed
|
|
block.mark_as_processed(self.db, block_id=blk.id)
|
|
processed_count += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing block {blk.block_height}: {str(e)}")
|
|
continue
|
|
|
|
logger.info(f"Processed {processed_count} blocks, detected arbitrages")
|
|
return processed_count
|
|
|
|
def _analyze_transaction(self, tx: Transaction) -> None:
|
|
"""
|
|
Analyze a transaction for potential arbitrage patterns.
|
|
|
|
This method looks for common arbitrage patterns:
|
|
1. Same token in/out (circular trading)
|
|
2. Jupiter/DEX aggregator interactions
|
|
3. Multiple DEX interactions in the same transaction
|
|
"""
|
|
# Skip failed transactions
|
|
if not tx.success:
|
|
return
|
|
|
|
# Skip transactions with no program IDs
|
|
if not tx.program_ids:
|
|
return
|
|
|
|
# Check if there are any DEX program IDs in this transaction
|
|
has_dex_interaction = False
|
|
dex_names = []
|
|
|
|
for program_id in tx.program_ids:
|
|
for dex_name, dex_program in DEX_PROGRAM_IDS.items():
|
|
if program_id == dex_program:
|
|
has_dex_interaction = True
|
|
dex_names.append(dex_name)
|
|
break
|
|
|
|
# Jupiter aggregator is a strong signal for arbitrage
|
|
is_jupiter = any(p == DEX_PROGRAM_IDS["jup-ag"] for p in tx.program_ids)
|
|
|
|
# Multiple different DEXes is another strong signal
|
|
multiple_dexes = len(set(dex_names)) > 1
|
|
|
|
if not (has_dex_interaction and (is_jupiter or multiple_dexes)):
|
|
return
|
|
|
|
# For simplicity in this demonstration, we'll create a simplified arbitrage record
|
|
# In a real system, you would have logic to:
|
|
# 1. Decode the transaction instructions to identify token swaps
|
|
# 2. Track token transfers to identify the full arbitrage path
|
|
# 3. Calculate exact profit amounts using token price and amount data
|
|
|
|
# Create a simplified arbitrage record
|
|
# In a real implementation, this would be based on actual analysis of instructions
|
|
|
|
# Let's assume Jupiter transactions with multiple DEXes are arbitrages
|
|
if is_jupiter and multiple_dexes:
|
|
self._create_simplified_arbitrage(tx, dex_names)
|
|
|
|
def _create_simplified_arbitrage(self, tx: Transaction, dex_names: List[str]) -> None:
|
|
"""
|
|
Create a simplified arbitrage record for demonstration purposes.
|
|
|
|
In a real implementation, this would be replaced with actual analysis of
|
|
the transaction instructions to determine:
|
|
- The exact tokens involved
|
|
- The exact amount in/out
|
|
- The specific pools and swap paths used
|
|
"""
|
|
# For demonstration, we'll create a simulated arbitrage
|
|
# with random values
|
|
|
|
# Default values for demonstration
|
|
initiator = tx.fee_payer
|
|
success = True # We only detect successful arbitrages in this demo
|
|
start_token = "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v" # USDC token address
|
|
start_token_symbol = "USDC"
|
|
|
|
# Simulate profit values
|
|
start_amount = 1000.0 # $1000 USDC
|
|
profit_percentage = 0.5 # 0.5% profit
|
|
end_amount = start_amount * (1 + profit_percentage / 100)
|
|
profit_amount = end_amount - start_amount
|
|
gas_cost = 0.01 # Solana gas cost in SOL
|
|
net_profit = profit_amount - gas_cost
|
|
|
|
# Create a description of the arbitrage path
|
|
route_description = f"USDC → {'→'.join(dex_names)} → USDC"
|
|
|
|
# Create the arbitrage
|
|
arbitrage_in = ArbitrageCreate(
|
|
transaction_id=tx.id,
|
|
initiator_address=initiator,
|
|
start_token_address=start_token,
|
|
start_token_symbol=start_token_symbol,
|
|
start_amount=start_amount,
|
|
end_amount=end_amount,
|
|
profit_amount=profit_amount,
|
|
profit_percentage=profit_percentage,
|
|
success=success,
|
|
gas_cost=gas_cost,
|
|
net_profit=net_profit,
|
|
legs_count=len(dex_names),
|
|
route_description=route_description,
|
|
included_dexes=dex_names
|
|
)
|
|
|
|
# In a real implementation, we would create actual legs with real pool data
|
|
# For now, we'll skip leg creation since we don't have real data
|
|
legs = []
|
|
|
|
# Create the arbitrage record
|
|
arbitrage.create_with_legs(self.db, obj_in=arbitrage_in, legs=legs) |