import json import logging from typing import Dict, List, Optional, Tuple from solana.rpc.api import Client from solana.rpc.types import MemcmpOpts from app.core.config import settings logger = logging.getLogger(__name__) class SolanaClient: """ Client for interacting with Solana blockchain """ def __init__(self, rpc_url: str = None): """ Initialize Solana client with RPC URL """ self.rpc_url = rpc_url or settings.SOLANA_RPC_URL self.client = Client(self.rpc_url) def get_latest_block(self) -> Dict: """ Get latest finalized block """ try: logger.info(f"Requesting latest blockhash from {self.rpc_url}") response = self.client.get_latest_blockhash() logger.info(f"Got response: {response}") if "error" in response: error_msg = response.get("error", {}) logger.error(f"RPC error getting latest block: {error_msg}") raise Exception(f"RPC error getting latest block: {error_msg}") if "result" not in response: logger.error(f"Invalid response format, missing 'result' key: {response}") raise ValueError("Invalid response format, missing 'result' key") return response["result"] except Exception as e: logger.error(f"Exception in get_latest_block: {str(e)}") raise def get_block(self, slot: int) -> Dict: """ Get block data by slot number """ try: logger.info(f"Requesting block data for slot {slot} from {self.rpc_url}") response = self.client.get_block( slot, encoding="json", max_supported_transaction_version=0, transaction_details="full", ) logger.debug(f"Raw block response: {response}") if "error" in response: error_msg = response.get("error", {}) error_code = error_msg.get("code", "unknown") error_message = error_msg.get("message", "unknown") logger.error(f"RPC error getting block {slot}: code={error_code}, message={error_message}") # Handle specific error cases if error_code == -32009: # slot not found or not available logger.warning(f"Block at slot {slot} not found or not available") return None raise Exception(f"RPC error getting block {slot}: {error_msg}") if "result" not in response: logger.error(f"Invalid response format for block {slot}, missing 'result' key: {response}") raise ValueError(f"Invalid response format for block {slot}, missing 'result' key") return response["result"] except Exception as e: logger.error(f"Exception in get_block for slot {slot}: {str(e)}") raise def get_blocks(self, start_slot: int, end_slot: int = None) -> List[int]: """ Get a list of confirmed blocks """ try: logger.info(f"Getting blocks from {start_slot} to {end_slot or 'latest'}") if end_slot is None: # Get latest slot if end_slot not provided logger.info("No end_slot provided, fetching latest slot") response = self.client.get_slot() logger.debug(f"get_slot response: {response}") if "error" in response: error_msg = response.get("error", {}) logger.error(f"RPC error getting latest slot: {error_msg}") raise Exception(f"RPC error getting latest slot: {error_msg}") if "result" not in response: logger.error(f"Invalid response format, missing 'result' key: {response}") raise ValueError("Invalid response format, missing 'result' key") end_slot = response["result"] logger.info(f"Using end_slot: {end_slot}") logger.info(f"Requesting blocks from {start_slot} to {end_slot}") # Check if the range is valid if start_slot > end_slot: logger.warning(f"Invalid slot range: start={start_slot}, end={end_slot}. Swapping values.") start_slot, end_slot = end_slot, start_slot # Limit the range to prevent potential issues with large requests if end_slot - start_slot > 500: logger.warning(f"Large slot range requested: {start_slot} to {end_slot}, limiting to 500 blocks") start_slot = end_slot - 500 response = self.client.get_blocks(start_slot, end_slot) logger.debug(f"get_blocks response: {response}") if "error" in response: error_msg = response.get("error", {}) logger.error(f"RPC error getting blocks from {start_slot} to {end_slot}: {error_msg}") # Handle specific error cases if isinstance(error_msg, dict) and error_msg.get("code") == -32602: logger.warning("Invalid slot range, trying alternative approach") # Use a fallback approach if needed # For now, return an empty list return [] raise Exception(f"RPC error getting blocks from {start_slot} to {end_slot}: {error_msg}") if "result" not in response: logger.error(f"Invalid response format, missing 'result' key: {response}") raise ValueError("Invalid response format, missing 'result' key") blocks = response["result"] logger.info(f"Retrieved {len(blocks)} blocks from {start_slot} to {end_slot}") return blocks except Exception as e: logger.error(f"Exception in get_blocks: {str(e)}") raise def get_transaction(self, signature: str) -> Dict: """ Get transaction details by signature """ response = self.client.get_transaction( signature, encoding="json", max_supported_transaction_version=0, ) if "error" in response: logger.error(f"Error getting transaction {signature}: {response['error']}") raise Exception(f"Error getting transaction {signature}: {response['error']}") return response["result"] def get_token_accounts_by_owner(self, owner: str, program_id: str = None) -> List[Dict]: """ Get token accounts by owner """ filters = [] if program_id: filters.append( MemcmpOpts( offset=0, bytes=program_id, ) ) response = self.client.get_token_accounts_by_owner( owner, filters, encoding="jsonParsed", ) if "error" in response: logger.error(f"Error getting token accounts for {owner}: {response['error']}") raise Exception(f"Error getting token accounts for {owner}: {response['error']}") return response["result"]["value"] def analyze_transaction_for_arbitrage(self, tx_data: Dict) -> Tuple[bool, Optional[Dict]]: """ Analyze transaction data for potential arbitrage using instruction-level parsing Returns a tuple of (is_arbitrage, arbitrage_data) """ from app.parsers.parser_registry import parser_registry # Check if transaction exists and has data if not tx_data or "transaction" not in tx_data: return False, None # Parse all instructions in the transaction parsed_instructions = parser_registry.parse_transaction_instructions(tx_data) # Extract swap information from parsed instructions swaps = [] # Track token flow for a single user wallet token_flow = {} # token_address -> net_amount user_wallet = None for instruction in parsed_instructions: program_id = instruction.get("program_id", "") parser = parser_registry.get_parser_for_program(program_id) if not parser: continue # Extract swap info if the parser is a SwapParser if hasattr(parser, "extract_swap_info"): try: token_in, token_out, amount_in, amount_out = parser.extract_swap_info(instruction) if token_in and token_out and amount_in is not None and amount_out is not None: swaps.append({ "token_in": token_in, "token_out": token_out, "amount_in": amount_in, "amount_out": amount_out, "program_id": program_id, }) # Track net token flow token_flow[token_in] = token_flow.get(token_in, 0) - amount_in token_flow[token_out] = token_flow.get(token_out, 0) + amount_out # Try to identify user wallet if "user_wallet" in instruction and not user_wallet: user_wallet = instruction.get("user_wallet") except Exception as e: logger.error(f"Error extracting swap info: {str(e)}") # No need to continue if we don't have enough swaps if len(swaps) < 3: return False, None # Check for circular swaps (token A -> token B -> token C -> ... -> token A) # This is a key arbitrage pattern token_path = [] for swap in swaps: token_path.append(swap["token_in"]) token_path.append(swap["token_out"]) # Check if the first and last tokens in the path are the same is_circular = len(token_path) >= 2 and token_path[0] == token_path[-1] # Check if there's a profit in any token profit_token = None profit_amount = 0 for token, amount in token_flow.items(): if amount > 0: # If we have a net positive amount of any token, it's a profit if profit_amount < amount: profit_token = token profit_amount = amount # Determine arbitrage confidence score confidence_score = 0.0 if is_circular and profit_token: # High confidence if we have a circular path and profit confidence_score = 0.9 elif is_circular: # Medium confidence if we have a circular path but no clear profit confidence_score = 0.7 elif profit_token: # Lower confidence if we have profit but no circular path confidence_score = 0.5 else: # Not likely an arbitrage return False, None # Consider it arbitrage if we have high enough confidence if confidence_score >= 0.5: return True, { "profit_token_address": profit_token or swaps[0]["token_in"], "profit_amount": profit_amount or 0.01, # Fallback to placeholder if we can't determine "path": json.dumps(token_path), "confidence_score": confidence_score, } return False, None