import json from typing import List, Dict, Any, Optional, Set, Tuple from decimal import Decimal from loguru import logger from sqlalchemy.orm import Session from app.core.config import settings from app.crud import block, transaction, dex, pool, arbitrage from app.models.block import Block from app.models.transaction import Transaction from app.schemas.arbitrage import ArbitrageCreate, ArbitrageLegCreate # Common DEX program IDs on Solana DEX_PROGRAM_IDS = { "jup-ag": "JUP4Fb2cqiRUcaTHdrPC8h2gNsA8fvKXYbXUJJqRUrZP", # Jupiter Aggregator "orca": "whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc", # Orca Whirlpools "raydium": "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8", # Raydium "serum": "9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin", # Serum v3 "aldrin": "CURVGoZn8zycx6FXwwevgBTB2gVvdbGTEpvMJDbgs2t4", # Aldrin "saber": "SSwpkEEcbUqx4vtoEByFjSkhKdCT862DNVb52nZg1UZ", # Saber "mercurial": "MERLuDFBMmsHnsBPZw2sDQZHvXFMwp8EdjudcU2HKky", # Mercurial "lifinity": "EewxydAPCCVuNEyrVN68PuSYdQ7wKn27V9Gjeoi8dy3S", # Lifinity "crema": "6MLxLqiXaaSUpkgMnLbse4ConZQZnxLi3VPYSNznE9Ky", # Crema Finance "meteora": "M2mx93ekt1fmXSVkTrUL9xVFHkmME8HTUi5Cyc5aF7K", # Meteora "cykura": "cysPXAjehMpVKUapzbMCCnpFxUFFryEWEaLgnb9NrR8", # Cykura } class ArbitrageDetector: """Service for detecting arbitrage opportunities in Solana transactions.""" def __init__(self, db: Session): """Initialize arbitrage detector.""" self.db = db def detect_arbitrages(self, num_blocks: Optional[int] = None) -> int: """ Analyze unprocessed blocks to detect arbitrages. Returns the number of blocks processed. """ # Get unprocessed blocks unprocessed_blocks = block.get_unprocessed_blocks( self.db, limit=num_blocks or 100 ) if not unprocessed_blocks: logger.info("No unprocessed blocks found") return 0 logger.info(f"Analyzing {len(unprocessed_blocks)} blocks for arbitrages") processed_count = 0 for blk in unprocessed_blocks: try: # Get the transactions for this block txs = transaction.get_by_block(self.db, block_id=blk.id) # Process each transaction for tx in txs: self._analyze_transaction(tx) # Mark block as processed block.mark_as_processed(self.db, block_id=blk.id) processed_count += 1 except Exception as e: logger.error(f"Error processing block {blk.block_height}: {str(e)}") continue logger.info(f"Processed {processed_count} blocks, detected arbitrages") return processed_count def _analyze_transaction(self, tx: Transaction) -> None: """ Analyze a transaction for potential arbitrage patterns. This method looks for common arbitrage patterns: 1. Same token in/out (circular trading) 2. Jupiter/DEX aggregator interactions 3. Multiple DEX interactions in the same transaction """ # Skip failed transactions if not tx.success: return # Skip transactions with no program IDs if not tx.program_ids: return # Check if there are any DEX program IDs in this transaction has_dex_interaction = False dex_names = [] for program_id in tx.program_ids: for dex_name, dex_program in DEX_PROGRAM_IDS.items(): if program_id == dex_program: has_dex_interaction = True dex_names.append(dex_name) break # Jupiter aggregator is a strong signal for arbitrage is_jupiter = any(p == DEX_PROGRAM_IDS["jup-ag"] for p in tx.program_ids) # Multiple different DEXes is another strong signal multiple_dexes = len(set(dex_names)) > 1 if not (has_dex_interaction and (is_jupiter or multiple_dexes)): return # For simplicity in this demonstration, we'll create a simplified arbitrage record # In a real system, you would have logic to: # 1. Decode the transaction instructions to identify token swaps # 2. Track token transfers to identify the full arbitrage path # 3. Calculate exact profit amounts using token price and amount data # Create a simplified arbitrage record # In a real implementation, this would be based on actual analysis of instructions # Let's assume Jupiter transactions with multiple DEXes are arbitrages if is_jupiter and multiple_dexes: self._create_simplified_arbitrage(tx, dex_names) def _create_simplified_arbitrage(self, tx: Transaction, dex_names: List[str]) -> None: """ Create a simplified arbitrage record for demonstration purposes. In a real implementation, this would be replaced with actual analysis of the transaction instructions to determine: - The exact tokens involved - The exact amount in/out - The specific pools and swap paths used """ # For demonstration, we'll create a simulated arbitrage # with random values # Default values for demonstration initiator = tx.fee_payer success = True # We only detect successful arbitrages in this demo start_token = "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v" # USDC token address start_token_symbol = "USDC" # Simulate profit values start_amount = 1000.0 # $1000 USDC profit_percentage = 0.5 # 0.5% profit end_amount = start_amount * (1 + profit_percentage / 100) profit_amount = end_amount - start_amount gas_cost = 0.01 # Solana gas cost in SOL net_profit = profit_amount - gas_cost # Create a description of the arbitrage path route_description = f"USDC → {'→'.join(dex_names)} → USDC" # Create the arbitrage arbitrage_in = ArbitrageCreate( transaction_id=tx.id, initiator_address=initiator, start_token_address=start_token, start_token_symbol=start_token_symbol, start_amount=start_amount, end_amount=end_amount, profit_amount=profit_amount, profit_percentage=profit_percentage, success=success, gas_cost=gas_cost, net_profit=net_profit, legs_count=len(dex_names), route_description=route_description, included_dexes=dex_names ) # In a real implementation, we would create actual legs with real pool data # For now, we'll skip leg creation since we don't have real data legs = [] # Create the arbitrage record arbitrage.create_with_legs(self.db, obj_in=arbitrage_in, legs=legs)