From cae8527a4bd2fff050d078c29ecd034affdeb9a8 Mon Sep 17 00:00:00 2001 From: Automated Action Date: Wed, 28 May 2025 22:44:22 +0000 Subject: [PATCH] s --- app/models/models.py | 44 ++++- app/parsers/__init__.py | 1 + app/parsers/base.py | 110 +++++++++++ app/parsers/jupiter.py | 93 +++++++++ app/parsers/parser_registry.py | 139 +++++++++++++ app/parsers/raydium.py | 174 ++++++++++++++++ app/parsers/serum_dex.py | 164 +++++++++++++++ app/parsers/token_program.py | 111 +++++++++++ app/schemas/schemas.py | 52 ++++- app/services/scanner.py | 187 ++++++++++++++++-- app/services/solana_client.py | 127 ++++++++---- migrations/versions/add_instruction_tables.py | 89 +++++++++ 12 files changed, 1237 insertions(+), 54 deletions(-) create mode 100644 app/parsers/__init__.py create mode 100644 app/parsers/base.py create mode 100644 app/parsers/jupiter.py create mode 100644 app/parsers/parser_registry.py create mode 100644 app/parsers/raydium.py create mode 100644 app/parsers/serum_dex.py create mode 100644 app/parsers/token_program.py create mode 100644 migrations/versions/add_instruction_tables.py diff --git a/app/models/models.py b/app/models/models.py index 062a82b..7eb477b 100644 --- a/app/models/models.py +++ b/app/models/models.py @@ -1,7 +1,7 @@ from datetime import datetime from sqlalchemy import (Boolean, Column, DateTime, Float, ForeignKey, Integer, - String, Text) + String, Text, JSON) from sqlalchemy.orm import relationship from app.db.session import Base @@ -32,8 +32,25 @@ class Transaction(Base): raw_data = Column(Text) block = relationship("Block", back_populates="transactions") - token_transfers = relationship("TokenTransfer", back_populates="transaction") - arbitrage_events = relationship("ArbitrageEvent", back_populates="transaction") + instructions = relationship("Instruction", back_populates="transaction", cascade="all, delete-orphan") + token_transfers = relationship("TokenTransfer", back_populates="transaction", cascade="all, delete-orphan") + arbitrage_events = relationship("ArbitrageEvent", back_populates="transaction", cascade="all, delete-orphan") + + +class Instruction(Base): + __tablename__ = "instructions" + + id = Column(Integer, primary_key=True, index=True) + transaction_id = Column(Integer, ForeignKey("transactions.id")) + program_id = Column(String, index=True) + instruction_type = Column(String, index=True) + instruction_index = Column(Integer, nullable=True) + accounts = Column(Text, nullable=True) # JSON string of account pubkeys + data = Column(Text, nullable=True) # Raw instruction data + parsed_data = Column(JSON, nullable=True) # Parsed instruction data + + transaction = relationship("Transaction", back_populates="instructions") + swaps = relationship("Swap", back_populates="instruction", cascade="all, delete-orphan") class TokenTransfer(Base): @@ -41,6 +58,7 @@ class TokenTransfer(Base): id = Column(Integer, primary_key=True, index=True) transaction_id = Column(Integer, ForeignKey("transactions.id")) + instruction_id = Column(Integer, ForeignKey("instructions.id"), nullable=True) token_address = Column(String, index=True) from_address = Column(String, index=True) to_address = Column(String, index=True) @@ -49,6 +67,25 @@ class TokenTransfer(Base): timestamp = Column(DateTime, default=datetime.utcnow) transaction = relationship("Transaction", back_populates="token_transfers") + instruction = relationship("Instruction", backref="token_transfers") + + +class Swap(Base): + __tablename__ = "swaps" + + id = Column(Integer, primary_key=True, index=True) + transaction_id = Column(Integer, ForeignKey("transactions.id")) + instruction_id = Column(Integer, ForeignKey("instructions.id")) + program_id = Column(String, index=True) + token_in_address = Column(String, index=True) + token_out_address = Column(String, index=True) + amount_in = Column(Float) + amount_out = Column(Float) + user_account = Column(String, index=True) + timestamp = Column(DateTime, default=datetime.utcnow) + + transaction = relationship("Transaction", backref="swaps") + instruction = relationship("Instruction", back_populates="swaps") class ArbitrageEvent(Base): @@ -60,6 +97,7 @@ class ArbitrageEvent(Base): profit_amount = Column(Float) profit_usd = Column(Float, nullable=True) path = Column(Text) # JSON string of token paths + swap_sequence = Column(Text, nullable=True) # JSON string of swap IDs confidence_score = Column(Float) # 0.0 to 1.0 detected_at = Column(DateTime, default=datetime.utcnow) diff --git a/app/parsers/__init__.py b/app/parsers/__init__.py new file mode 100644 index 0000000..be7afd8 --- /dev/null +++ b/app/parsers/__init__.py @@ -0,0 +1 @@ +# Instruction parsers package \ No newline at end of file diff --git a/app/parsers/base.py b/app/parsers/base.py new file mode 100644 index 0000000..9fc91d5 --- /dev/null +++ b/app/parsers/base.py @@ -0,0 +1,110 @@ +""" +Base interfaces for instruction parsers +""" +from abc import ABC, abstractmethod +from typing import Dict, List, Optional, Tuple, Any + + +class InstructionParser(ABC): + """ + Base class for all instruction parsers + """ + + @property + @abstractmethod + def program_id(self) -> str: + """ + Return the program ID that this parser handles + """ + pass + + @abstractmethod + def parse_instruction(self, instruction: Dict, accounts: List[str], instruction_data: bytes) -> Dict: + """ + Parse a single instruction from transaction data + + Args: + instruction: The instruction object from transaction data + accounts: List of account addresses in the transaction + instruction_data: Raw instruction data bytes + + Returns: + Dict with parsed instruction information + """ + pass + + def can_handle(self, program_id: str) -> bool: + """ + Check if this parser can handle instructions from a given program + + Args: + program_id: Program ID to check + + Returns: + True if this parser can handle the program, False otherwise + """ + return program_id == self.program_id + + +class TransferParser(InstructionParser): + """ + Base class for transfer instruction parsers + """ + + @abstractmethod + def extract_transfer_info(self, parsed_instruction: Dict) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[float]]: + """ + Extract transfer information from a parsed instruction + + Args: + parsed_instruction: Instruction data parsed by parse_instruction + + Returns: + Tuple of (token_address, from_address, to_address, amount) + Any of these values can be None if not applicable + """ + pass + + +class SwapParser(InstructionParser): + """ + Base class for swap instruction parsers + """ + + @abstractmethod + def extract_swap_info(self, parsed_instruction: Dict) -> Tuple[Optional[str], Optional[str], Optional[float], Optional[float]]: + """ + Extract swap information from a parsed instruction + + Args: + parsed_instruction: Instruction data parsed by parse_instruction + + Returns: + Tuple of (token_in_address, token_out_address, amount_in, amount_out) + Any of these values can be None if not applicable + """ + pass + + +class LiquidityParser(InstructionParser): + """ + Base class for liquidity instruction parsers (add/remove) + """ + + @abstractmethod + def extract_liquidity_info(self, parsed_instruction: Dict) -> Dict[str, Any]: + """ + Extract liquidity information from a parsed instruction + + Args: + parsed_instruction: Instruction data parsed by parse_instruction + + Returns: + Dict with keys: + - action: "add" or "remove" + - pool_address: Address of the liquidity pool + - tokens: List of token addresses involved + - amounts: List of token amounts + - user: User address + """ + pass \ No newline at end of file diff --git a/app/parsers/jupiter.py b/app/parsers/jupiter.py new file mode 100644 index 0000000..e7ed3e7 --- /dev/null +++ b/app/parsers/jupiter.py @@ -0,0 +1,93 @@ +""" +Parser for the Jupiter Aggregator swap instructions +""" +import base64 +from typing import Dict, List, Optional, Tuple + +from app.parsers.base import SwapParser + + +# Jupiter Program IDs +JUPITER_PROGRAM_ID = "JUP4Fb2cqiRUcaTHdrPC8h2gNsA2ETXiPDD33WcGuJB" # Jupiter V4 +JUPITER_V3_PROGRAM_ID = "JUP3c2Uh3WA4Ng34tw6kPd2G4C5BB21Xo36Je1s32Ph" # Jupiter V3 +JUPITER_V6_PROGRAM_ID = "JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4" # Jupiter V6 + + +class JupiterParser(SwapParser): + """ + Parser for the Jupiter Aggregator swap instructions + """ + + @property + def program_id(self) -> str: + return JUPITER_PROGRAM_ID + + def can_handle(self, program_id: str) -> bool: + """ + Check if this parser can handle instructions from a given program + """ + return program_id in [JUPITER_PROGRAM_ID, JUPITER_V3_PROGRAM_ID, JUPITER_V6_PROGRAM_ID] + + def parse_instruction(self, instruction: Dict, accounts: List[str], instruction_data: bytes) -> Dict: + """ + Parse a Jupiter instruction + """ + if not instruction_data: + return {"type": "unknown", "error": "No instruction data"} + + # Get accounts referenced by the instruction + instruction_accounts = instruction.get("accounts", []) + referenced_accounts = [accounts[idx] for idx in instruction_accounts if idx < len(accounts)] + + # Basic metadata about the instruction + result = { + "type": "jupiter_swap", + "program": "jupiter_aggregator", + "program_id": instruction.get("programId", JUPITER_PROGRAM_ID), + "accounts": referenced_accounts, + "data": base64.b64encode(instruction_data).decode("utf-8"), + } + + # Jupiter instructions are complex and version-dependent + # Here we'll implement a simplified version for demonstration + + # In Jupiter V4+, the accounts typically follow this pattern: + # 0: Token program + # 1: User's token account (source) + # 2: User's token account (destination) + # 3: User's wallet + # (additional accounts depend on the route) + + if len(referenced_accounts) >= 4: + result.update({ + "source_account": referenced_accounts[1], + "destination_account": referenced_accounts[2], + "user_wallet": referenced_accounts[3], + }) + + # Note: Parsing the exact amount and path would require deeper + # instruction-specific knowledge and likely additional context + + return result + + def extract_swap_info(self, parsed_instruction: Dict) -> Tuple[Optional[str], Optional[str], Optional[float], Optional[float]]: + """ + Extract swap information from a parsed instruction + + Returns: + Tuple of (token_in_address, token_out_address, amount_in, amount_out) + """ + if parsed_instruction.get("type") != "jupiter_swap": + return None, None, None, None + + # For a full implementation, we would need to: + # 1. Look up token mint addresses from the token accounts + # 2. Parse amounts from instruction data + # 3. Handle different Jupiter versions + + # This is a simplified implementation + source_account = parsed_instruction.get("source_account") + destination_account = parsed_instruction.get("destination_account") + + # We don't have enough information to determine amounts from just the instruction + return None, source_account, destination_account, None \ No newline at end of file diff --git a/app/parsers/parser_registry.py b/app/parsers/parser_registry.py new file mode 100644 index 0000000..1e2e5e9 --- /dev/null +++ b/app/parsers/parser_registry.py @@ -0,0 +1,139 @@ +""" +Registry of instruction parsers +""" +import base64 +import logging +from typing import Dict, List, Optional + +from app.parsers.base import InstructionParser +from app.parsers.token_program import TokenProgramParser +from app.parsers.serum_dex import SerumDexParser +from app.parsers.jupiter import JupiterParser +from app.parsers.raydium import RaydiumSwapParser, RaydiumLiquidityParser + +logger = logging.getLogger(__name__) + + +class InstructionParserRegistry: + """ + Registry of instruction parsers for different Solana programs + """ + + def __init__(self): + """ + Initialize the registry with available parsers + """ + self.parsers: Dict[str, InstructionParser] = {} + self.register_default_parsers() + + def register_parser(self, parser: InstructionParser) -> None: + """ + Register a parser for a specific program + + Args: + parser: Parser instance to register + """ + self.parsers[parser.program_id] = parser + logger.debug(f"Registered parser for program: {parser.program_id}") + + def register_default_parsers(self) -> None: + """ + Register the default set of parsers + """ + # Register SPL Token Program parser + self.register_parser(TokenProgramParser()) + + # Register DEX parsers + self.register_parser(SerumDexParser()) + self.register_parser(JupiterParser()) + self.register_parser(RaydiumSwapParser()) + self.register_parser(RaydiumLiquidityParser()) + + def get_parser_for_program(self, program_id: str) -> Optional[InstructionParser]: + """ + Get a parser for a specific program ID + + Args: + program_id: The program ID to get a parser for + + Returns: + Parser instance if one exists for the program, None otherwise + """ + # First try exact match + if program_id in self.parsers: + return self.parsers[program_id] + + # Then try parsers that can handle multiple program IDs + for parser in self.parsers.values(): + if parser.can_handle(program_id): + return parser + + return None + + def parse_transaction_instructions(self, transaction_data: Dict) -> List[Dict]: + """ + Parse all instructions in a transaction + + Args: + transaction_data: Transaction data from RPC response + + Returns: + List of parsed instructions + """ + parsed_instructions = [] + + if not transaction_data or "transaction" not in transaction_data: + return parsed_instructions + + tx = transaction_data["transaction"] + if "message" not in tx: + return parsed_instructions + + message = tx["message"] + + # Extract account addresses + accounts = message.get("accountKeys", []) + + # Parse each instruction + instructions = message.get("instructions", []) + for instruction in instructions: + program_id = instruction.get("programId") + if not program_id: + continue + + # Get parser for this program + parser = self.get_parser_for_program(program_id) + + # Parse instruction data + instruction_data_b64 = instruction.get("data", "") + try: + instruction_data = base64.b64decode(instruction_data_b64) + except Exception: + # If not base64, try other formats or skip + instruction_data = b"" + + # Parse the instruction + if parser: + try: + parsed = parser.parse_instruction(instruction, accounts, instruction_data) + parsed_instructions.append(parsed) + except Exception as e: + logger.error(f"Error parsing instruction: {str(e)}") + parsed_instructions.append({ + "type": "error", + "program_id": program_id, + "error": str(e), + }) + else: + # No parser available for this program + parsed_instructions.append({ + "type": "unknown", + "program_id": program_id, + "data": instruction_data_b64, + }) + + return parsed_instructions + + +# Singleton instance +parser_registry = InstructionParserRegistry() \ No newline at end of file diff --git a/app/parsers/raydium.py b/app/parsers/raydium.py new file mode 100644 index 0000000..a137990 --- /dev/null +++ b/app/parsers/raydium.py @@ -0,0 +1,174 @@ +""" +Parser for Raydium AMM instructions +""" +import base64 +from typing import Dict, List, Optional, Tuple, Any + +from app.parsers.base import SwapParser, LiquidityParser + + +# Raydium Program IDs +RAYDIUM_LIQUIDITY_PROGRAM_ID = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8" +RAYDIUM_SWAP_PROGRAM_ID = "9qvG1zUp8xF1Bi4m6UdRNby1BAAuaDrUxSpv4CmRRMjL" + + +class RaydiumSwapParser(SwapParser): + """ + Parser for Raydium swap instructions + """ + + @property + def program_id(self) -> str: + return RAYDIUM_SWAP_PROGRAM_ID + + def parse_instruction(self, instruction: Dict, accounts: List[str], instruction_data: bytes) -> Dict: + """ + Parse a Raydium swap instruction + """ + if not instruction_data: + return {"type": "unknown", "error": "No instruction data"} + + # Get accounts referenced by the instruction + instruction_accounts = instruction.get("accounts", []) + referenced_accounts = [accounts[idx] for idx in instruction_accounts if idx < len(accounts)] + + # For Raydium swap, the account list typically follows this pattern: + # 0: Token program + # 1: AMM ID + # 2: AMM authority + # 3: AMM open orders + # 4: AMM target orders + # 5: Pool coin token account + # 6: Pool pc token account + # 7: Serum program + # 8: Serum market + # 9: Serum bids + # 10: Serum asks + # 11: Serum event queue + # 12: Serum coin vault + # 13: Serum pc vault + # 14: Serum vault signer + # 15: User source token account + # 16: User destination token account + # 17: User owner + + # Basic metadata about the instruction + result = { + "type": "raydium_swap", + "program": "raydium_swap", + "program_id": RAYDIUM_SWAP_PROGRAM_ID, + "accounts": referenced_accounts, + "data": base64.b64encode(instruction_data).decode("utf-8"), + } + + # Add specific account references if available + if len(referenced_accounts) >= 18: + result.update({ + "amm_id": referenced_accounts[1], + "pool_coin_account": referenced_accounts[5], + "pool_pc_account": referenced_accounts[6], + "serum_market": referenced_accounts[8], + "user_source_account": referenced_accounts[15], + "user_destination_account": referenced_accounts[16], + "user_owner": referenced_accounts[17], + }) + + # Parse instruction data to get amount_in + # Note: This is a simplified version. Actual layout may vary. + try: + if len(instruction_data) >= 9: + # First byte is instruction type, next 8 bytes are the amount + amount_in = int.from_bytes(instruction_data[1:9], byteorder="little") + result["amount_in"] = amount_in + + # For demonstration - in a real implementation, we would parse + # minimum amount out as well + except Exception as e: + result["parse_error"] = str(e) + + return result + + def extract_swap_info(self, parsed_instruction: Dict) -> Tuple[Optional[str], Optional[str], Optional[float], Optional[float]]: + """ + Extract swap information from a parsed instruction + + Returns: + Tuple of (token_in_address, token_out_address, amount_in, amount_out) + """ + if parsed_instruction.get("type") != "raydium_swap": + return None, None, None, None + + # For Raydium, we don't directly get token mint addresses from the instruction + # We would need to look up the token accounts' mint addresses + source_account = parsed_instruction.get("user_source_account") + destination_account = parsed_instruction.get("user_destination_account") + amount_in = parsed_instruction.get("amount_in") + + # We don't know the amount_out from just the instruction + return None, source_account, destination_account, amount_in + + +class RaydiumLiquidityParser(LiquidityParser): + """ + Parser for Raydium liquidity instructions + """ + + @property + def program_id(self) -> str: + return RAYDIUM_LIQUIDITY_PROGRAM_ID + + def parse_instruction(self, instruction: Dict, accounts: List[str], instruction_data: bytes) -> Dict: + """ + Parse a Raydium liquidity instruction + """ + if not instruction_data: + return {"type": "unknown", "error": "No instruction data"} + + # First byte is the instruction type + instruction_type = instruction_data[0] + + # Get accounts referenced by the instruction + instruction_accounts = instruction.get("accounts", []) + referenced_accounts = [accounts[idx] for idx in instruction_accounts if idx < len(accounts)] + + # Basic metadata about the instruction + result = { + "program": "raydium_liquidity", + "program_id": RAYDIUM_LIQUIDITY_PROGRAM_ID, + "accounts": referenced_accounts, + "data": base64.b64encode(instruction_data).decode("utf-8"), + } + + # Parse specific instruction types + if instruction_type == 1: # Deposit liquidity + result["type"] = "raydium_deposit_liquidity" + elif instruction_type == 2: # Withdraw liquidity + result["type"] = "raydium_withdraw_liquidity" + else: + result["type"] = f"raydium_instruction_{instruction_type}" + + return result + + def extract_liquidity_info(self, parsed_instruction: Dict) -> Dict[str, Any]: + """ + Extract liquidity information from a parsed instruction + """ + result = { + "action": None, + "pool_address": None, + "tokens": [], + "amounts": [], + "user": None, + } + + if parsed_instruction.get("type") == "raydium_deposit_liquidity": + result["action"] = "add" + elif parsed_instruction.get("type") == "raydium_withdraw_liquidity": + result["action"] = "remove" + else: + return result + + # In a full implementation, we would extract pool address, tokens, amounts + # and user from the instruction accounts and data + + return result \ No newline at end of file diff --git a/app/parsers/serum_dex.py b/app/parsers/serum_dex.py new file mode 100644 index 0000000..3dd07ab --- /dev/null +++ b/app/parsers/serum_dex.py @@ -0,0 +1,164 @@ +""" +Parser for the Serum DEX v3 instructions +""" +import base64 +from typing import Dict, List, Optional, Tuple + +from app.parsers.base import SwapParser + + +# Serum DEX v3 Program ID +SERUM_DEX_V3_PROGRAM_ID = "9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin" + +# Serum DEX Instructions +SERUM_DEX_MATCH_ORDERS = 2 +SERUM_DEX_NEW_ORDER = 0 +SERUM_DEX_CANCEL_ORDER = 1 + + +class SerumDexParser(SwapParser): + """ + Parser for the Serum DEX v3 instructions + """ + + @property + def program_id(self) -> str: + return SERUM_DEX_V3_PROGRAM_ID + + def parse_instruction(self, instruction: Dict, accounts: List[str], instruction_data: bytes) -> Dict: + """ + Parse a Serum DEX instruction + """ + if not instruction_data: + return {"type": "unknown", "error": "No instruction data"} + + # First byte is the instruction type + instruction_type = instruction_data[0] + + # Get accounts referenced by the instruction + instruction_accounts = instruction.get("accounts", []) + referenced_accounts = [accounts[idx] for idx in instruction_accounts if idx < len(accounts)] + + result = { + "program": "serum_dex", + "program_id": SERUM_DEX_V3_PROGRAM_ID, + "accounts": referenced_accounts, + "data": base64.b64encode(instruction_data).decode("utf-8"), + } + + # Parse specific instruction types + if instruction_type == SERUM_DEX_NEW_ORDER: + # New order instruction + result.update(self._parse_new_order(instruction_data, referenced_accounts)) + elif instruction_type == SERUM_DEX_MATCH_ORDERS: + # Match orders instruction + result.update(self._parse_match_orders(instruction_data, referenced_accounts)) + elif instruction_type == SERUM_DEX_CANCEL_ORDER: + # Cancel order instruction + result.update(self._parse_cancel_order(instruction_data, referenced_accounts)) + else: + # Unknown instruction type + result.update({ + "type": f"serum_instruction_{instruction_type}", + "error": "Unknown instruction type", + }) + + return result + + def _parse_new_order(self, instruction_data: bytes, accounts: List[str]) -> Dict: + """ + Parse a Serum DEX new order instruction + """ + # New order layout is complex, simplified for this example + try: + side = "buy" if instruction_data[1] == 0 else "sell" + + # Note: In a real implementation, we would parse: + # - limit price + # - max base qty + # - max quote qty + # - self trade behavior + # - order type + # - client order ID + + # For demo purposes we'll create a simplified result + return { + "type": "serum_new_order", + "side": side, + "market": accounts[0] if len(accounts) > 0 else None, + "open_orders": accounts[1] if len(accounts) > 1 else None, + "request_queue": accounts[2] if len(accounts) > 2 else None, + "payer": accounts[3] if len(accounts) > 3 else None, + "owner": accounts[4] if len(accounts) > 4 else None, + "base_vault": accounts[5] if len(accounts) > 5 else None, + "quote_vault": accounts[6] if len(accounts) > 6 else None, + # Additional fields would be parsed from instruction_data + } + except Exception as e: + return { + "type": "serum_new_order", + "error": f"Failed to parse new order: {str(e)}", + } + + def _parse_match_orders(self, instruction_data: bytes, accounts: List[str]) -> Dict: + """ + Parse a Serum DEX match orders instruction + """ + # Simplified implementation for demo purposes + try: + # Real implementation would extract more fields + return { + "type": "serum_match_orders", + "market": accounts[0] if len(accounts) > 0 else None, + "request_queue": accounts[1] if len(accounts) > 1 else None, + "event_queue": accounts[2] if len(accounts) > 2 else None, + "bids": accounts[3] if len(accounts) > 3 else None, + "asks": accounts[4] if len(accounts) > 4 else None, + # Additional accounts would be included + } + except Exception as e: + return { + "type": "serum_match_orders", + "error": f"Failed to parse match orders: {str(e)}", + } + + def _parse_cancel_order(self, instruction_data: bytes, accounts: List[str]) -> Dict: + """ + Parse a Serum DEX cancel order instruction + """ + # Simplified implementation for demo purposes + try: + side = "buy" if instruction_data[1] == 0 else "sell" + + return { + "type": "serum_cancel_order", + "side": side, + "market": accounts[0] if len(accounts) > 0 else None, + "open_orders": accounts[1] if len(accounts) > 1 else None, + "owner": accounts[2] if len(accounts) > 2 else None, + # Additional fields would be parsed from instruction_data + } + except Exception as e: + return { + "type": "serum_cancel_order", + "error": f"Failed to parse cancel order: {str(e)}", + } + + def extract_swap_info(self, parsed_instruction: Dict) -> Tuple[Optional[str], Optional[str], Optional[float], Optional[float]]: + """ + Extract swap information from a parsed instruction + + Returns: + Tuple of (token_in_address, token_out_address, amount_in, amount_out) + """ + # For Serum, we need to look at both the new order and match orders instructions + # to get a complete swap picture, which is more complex than can be done + # with a single instruction. + + # This is a simplified implementation that would need to be enhanced + if parsed_instruction.get("type") == "serum_match_orders": + # We would need more context to determine the tokens and amounts + # For now, return None values + return None, None, None, None + + return None, None, None, None \ No newline at end of file diff --git a/app/parsers/token_program.py b/app/parsers/token_program.py new file mode 100644 index 0000000..9474adb --- /dev/null +++ b/app/parsers/token_program.py @@ -0,0 +1,111 @@ +""" +Parser for the Solana Token Program (SPL) +""" +import base64 +from typing import Dict, List, Optional, Tuple + + +from app.parsers.base import TransferParser + + +# Token Program Instructions +TOKEN_TRANSFER_INSTRUCTION = 3 +TOKEN_PROGRAM_ID = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" + + +class TokenProgramParser(TransferParser): + """ + Parser for the Solana Token Program instructions + """ + + @property + def program_id(self) -> str: + return TOKEN_PROGRAM_ID + + def parse_instruction(self, instruction: Dict, accounts: List[str], instruction_data: bytes) -> Dict: + """ + Parse a token program instruction + """ + if not instruction_data: + return {"type": "unknown", "error": "No instruction data"} + + # First byte is the instruction type + instruction_type = instruction_data[0] + + if instruction_type == TOKEN_TRANSFER_INSTRUCTION: + # Parse transfer instruction + return self._parse_transfer(instruction, accounts, instruction_data) + + # For other instruction types + return { + "type": f"token_instruction_{instruction_type}", + "program": "token_program", + "program_id": TOKEN_PROGRAM_ID, + "accounts": [accounts[idx] for idx in instruction.get("accounts", [])], + "data": base64.b64encode(instruction_data).decode("utf-8"), + } + + def _parse_transfer(self, instruction: Dict, accounts: List[str], instruction_data: bytes) -> Dict: + """ + Parse a token transfer instruction + """ + # For token transfers, the accounts are: + # 0: Source account + # 1: Destination account + # 2: Owner of source account + # (3: Signers if multisig, optional) + instruction_accounts = instruction.get("accounts", []) + + if len(instruction_accounts) < 3: + return { + "type": "token_transfer", + "error": "Not enough accounts for transfer", + "program": "token_program", + "program_id": TOKEN_PROGRAM_ID, + } + + # Parse amount from instruction data + # The format is: [instruction_type(1 byte), amount(8 bytes)] + try: + amount_data = instruction_data[1:9] + amount = int.from_bytes(amount_data, byteorder="little") + except Exception as e: + return { + "type": "token_transfer", + "error": f"Failed to parse amount: {str(e)}", + "program": "token_program", + "program_id": TOKEN_PROGRAM_ID, + } + + source_idx = instruction_accounts[0] + dest_idx = instruction_accounts[1] + owner_idx = instruction_accounts[2] + + return { + "type": "token_transfer", + "program": "token_program", + "program_id": TOKEN_PROGRAM_ID, + "source": accounts[source_idx] if source_idx < len(accounts) else None, + "destination": accounts[dest_idx] if dest_idx < len(accounts) else None, + "owner": accounts[owner_idx] if owner_idx < len(accounts) else None, + "amount": amount, + "decimals": None, # We don't know the token decimals from instruction alone + } + + def extract_transfer_info(self, parsed_instruction: Dict) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[float]]: + """ + Extract transfer information from a parsed instruction + + Returns: + Tuple of (token_address, from_address, to_address, amount) + """ + if parsed_instruction.get("type") != "token_transfer": + return None, None, None, None + + source = parsed_instruction.get("source") + destination = parsed_instruction.get("destination") + amount = parsed_instruction.get("amount") + + # Note: For SPL tokens, the actual token mint (token_address) is not directly + # included in the instruction, so we would need additional context to determine it + return None, source, destination, amount \ No newline at end of file diff --git a/app/schemas/schemas.py b/app/schemas/schemas.py index 0da7e5e..2d486dd 100644 --- a/app/schemas/schemas.py +++ b/app/schemas/schemas.py @@ -1,9 +1,54 @@ from datetime import datetime -from typing import List, Optional +from typing import Dict, List, Optional, Any from pydantic import BaseModel, Field +class InstructionBase(BaseModel): + program_id: str + instruction_type: str + instruction_index: Optional[int] = None + accounts: Optional[str] = None + data: Optional[str] = None + parsed_data: Optional[Dict[str, Any]] = None + + +class InstructionCreate(InstructionBase): + transaction_id: int + + +class Instruction(InstructionBase): + id: int + transaction_id: int + + class Config: + from_attributes = True + + +class SwapBase(BaseModel): + program_id: str + token_in_address: str + token_out_address: str + amount_in: float + amount_out: float + user_account: Optional[str] = None + + +class SwapCreate(SwapBase): + transaction_id: int + instruction_id: int + + +class Swap(SwapBase): + id: int + transaction_id: int + instruction_id: int + timestamp: datetime + + class Config: + from_attributes = True + + class TokenTransferBase(BaseModel): token_address: str from_address: str @@ -14,11 +59,13 @@ class TokenTransferBase(BaseModel): class TokenTransferCreate(TokenTransferBase): transaction_id: int + instruction_id: Optional[int] = None class TokenTransfer(TokenTransferBase): id: int transaction_id: int + instruction_id: Optional[int] = None timestamp: datetime class Config: @@ -30,6 +77,7 @@ class ArbitrageEventBase(BaseModel): profit_amount: float profit_usd: Optional[float] = None path: str + swap_sequence: Optional[str] = None confidence_score: float = Field(..., ge=0.0, le=1.0) @@ -61,6 +109,8 @@ class Transaction(TransactionBase): id: int block_id: int timestamp: datetime + instructions: List[Instruction] = [] + swaps: List[Swap] = [] token_transfers: List[TokenTransfer] = [] arbitrage_events: List[ArbitrageEvent] = [] diff --git a/app/services/scanner.py b/app/services/scanner.py index d99f6c2..65a8ff9 100644 --- a/app/services/scanner.py +++ b/app/services/scanner.py @@ -6,7 +6,8 @@ from typing import Dict, List, Optional, Tuple from sqlalchemy.orm import Session from app.core.config import settings -from app.models.models import ArbitrageEvent, Block, TokenTransfer, Transaction +from app.models.models import (ArbitrageEvent, Block, Instruction, Swap, + TokenTransfer, Transaction) from app.schemas.schemas import (ArbitrageEventCreate, BlockCreate, TokenTransferCreate, TransactionCreate) from app.services.solana_client import SolanaClient @@ -84,22 +85,52 @@ class BlockScanner: def save_token_transfers(self, tx_data: Dict, transaction_id: int) -> List[TokenTransfer]: """ - Extract and save token transfers from transaction data + Extract and save token transfers from transaction data using instruction-level parsing """ - # This would require parsing the transaction logs and instruction data - # For simplicity, we'll just create a placeholder function - - # In a real implementation, we would extract token transfer info - # from the transaction instructions and logs + from app.parsers.parser_registry import parser_registry transfers = [] - # Placeholder for demonstration purposes - if "meta" in tx_data and "postTokenBalances" in tx_data["meta"] and "preTokenBalances" in tx_data["meta"]: + + # Use the parser registry to parse all instructions in the transaction + parsed_instructions = parser_registry.parse_transaction_instructions(tx_data) + + # Extract token transfers from parsed instructions using each parser + for instruction in parsed_instructions: + program_id = instruction.get("program_id", "") + parser = parser_registry.get_parser_for_program(program_id) + + if not parser: + continue + + # Extract transfer info if the parser is a TransferParser + if hasattr(parser, "extract_transfer_info"): + try: + token_address, from_address, to_address, amount = parser.extract_transfer_info(instruction) + + if token_address and from_address and to_address and amount is not None and amount > 0: + # Create token transfer record + transfer_create = TokenTransferCreate( + transaction_id=transaction_id, + token_address=token_address, + from_address=from_address, + to_address=to_address, + amount=amount, + program_id=program_id, + ) + + transfer = TokenTransfer(**transfer_create.model_dump()) + self.db.add(transfer) + transfers.append(transfer) + except Exception as e: + logger.error(f"Error extracting transfer info: {str(e)}") + + # Fallback to pre/post balance comparison if no transfers found with instruction parsing + if not transfers and "meta" in tx_data and "postTokenBalances" in tx_data["meta"] and "preTokenBalances" in tx_data["meta"]: + logger.info("No transfers found with instruction parsing, falling back to balance comparison") post_balances = tx_data["meta"]["postTokenBalances"] pre_balances = tx_data["meta"]["preTokenBalances"] # Compare pre and post balances to identify transfers - # This is a simplified approach for post in post_balances: mint = post.get("mint", "") owner = post.get("owner", "") @@ -121,7 +152,7 @@ class BlockScanner: from_address="" if amount_change > 0 else owner, to_address=owner if amount_change > 0 else "", amount=abs(amount_change), - program_id=post.get("programId", ""), + program_id="", # We don't know the program from balance comparison ) transfer = TokenTransfer(**transfer_create.model_dump()) @@ -155,6 +186,129 @@ class BlockScanner: return event + def save_instructions(self, tx_data: Dict, transaction_id: int) -> List[Instruction]: + """ + Extract and save instruction data from a transaction + """ + from app.parsers.parser_registry import parser_registry + + instructions = [] + + # Extract message and accounts from transaction + if not tx_data or "transaction" not in tx_data or "message" not in tx_data["transaction"]: + return instructions + + tx_message = tx_data["transaction"]["message"] + accounts = tx_message.get("accountKeys", []) + + # Process each instruction + for idx, instruction in enumerate(tx_message.get("instructions", [])): + program_id = instruction.get("programId", "") + parser = parser_registry.get_parser_for_program(program_id) + + # Parse instruction data + instruction_data = instruction.get("data", "") + accounts_list = [accounts[idx] for idx in instruction.get("accounts", []) if idx < len(accounts)] + + # Create instruction record + instruction_type = "unknown" + parsed_data = None + + if parser: + try: + # Decode instruction data from base64 + try: + import base64 + raw_data = base64.b64decode(instruction_data) + except Exception: + raw_data = b"" + + # Parse instruction with appropriate parser + parsed_instruction = parser.parse_instruction(instruction, accounts, raw_data) + instruction_type = parsed_instruction.get("type", "unknown") + parsed_data = parsed_instruction + except Exception as e: + logger.error(f"Error parsing instruction: {str(e)}") + + # Create instruction record + from app.schemas.schemas import InstructionCreate + instruction_create = InstructionCreate( + transaction_id=transaction_id, + program_id=program_id, + instruction_type=instruction_type, + instruction_index=idx, + accounts=json.dumps(accounts_list), + data=instruction_data, + parsed_data=parsed_data, + ) + + instruction_obj = Instruction(**instruction_create.model_dump()) + self.db.add(instruction_obj) + instructions.append(instruction_obj) + + if instructions: + self.db.commit() + for instruction in instructions: + self.db.refresh(instruction) + + return instructions + + def save_swaps(self, tx_data: Dict, transaction_id: int, instructions: List[Instruction]) -> List[Swap]: + """ + Extract and save swap data from a transaction + """ + from app.parsers.parser_registry import parser_registry + + swaps = [] + + # Get parsed instructions + parsed_instructions = parser_registry.parse_transaction_instructions(tx_data) + + # Map instruction indices to IDs + instruction_id_map = {inst.instruction_index: inst.id for inst in instructions if inst.instruction_index is not None} + + # Extract swap information from parsed instructions + for idx, parsed_instruction in enumerate(parsed_instructions): + program_id = parsed_instruction.get("program_id", "") + parser = parser_registry.get_parser_for_program(program_id) + + if not parser or not hasattr(parser, "extract_swap_info"): + continue + + try: + token_in, token_out, amount_in, amount_out = parser.extract_swap_info(parsed_instruction) + + if token_in and token_out and amount_in is not None and amount_out is not None: + instruction_id = instruction_id_map.get(idx) + if not instruction_id: + continue + + # Create swap record + from app.schemas.schemas import SwapCreate + swap_create = SwapCreate( + transaction_id=transaction_id, + instruction_id=instruction_id, + program_id=program_id, + token_in_address=token_in, + token_out_address=token_out, + amount_in=amount_in, + amount_out=amount_out, + user_account=parsed_instruction.get("user_wallet", ""), + ) + + swap = Swap(**swap_create.model_dump()) + self.db.add(swap) + swaps.append(swap) + except Exception as e: + logger.error(f"Error extracting swap info: {str(e)}") + + if swaps: + self.db.commit() + for swap in swaps: + self.db.refresh(swap) + + return swaps + def process_transaction(self, tx_data: Dict, block_id: int) -> Tuple[Transaction, Optional[ArbitrageEvent]]: """ Process a single transaction, looking for arbitrage @@ -162,6 +316,12 @@ class BlockScanner: # Save transaction record transaction = self.save_transaction(tx_data, block_id) + # Extract and save instructions + instructions = self.save_instructions(tx_data, transaction.id) + + # Extract and save swaps + swaps = self.save_swaps(tx_data, transaction.id, instructions) + # Extract and save token transfers self.save_token_transfers(tx_data, transaction.id) @@ -170,6 +330,11 @@ class BlockScanner: arbitrage_event = None if is_arbitrage and arbitrage_data: + # Add swap sequence if available + if swaps: + swap_ids = [swap.id for swap in swaps] + arbitrage_data["swap_sequence"] = json.dumps(swap_ids) + arbitrage_event = self.save_arbitrage_event(arbitrage_data, transaction.id) logger.info(f"Detected arbitrage in transaction {transaction.signature}") diff --git a/app/services/solana_client.py b/app/services/solana_client.py index 93085df..3679d2d 100644 --- a/app/services/solana_client.py +++ b/app/services/solana_client.py @@ -114,56 +114,105 @@ class SolanaClient: def analyze_transaction_for_arbitrage(self, tx_data: Dict) -> Tuple[bool, Optional[Dict]]: """ - Analyze transaction data for potential arbitrage + Analyze transaction data for potential arbitrage using instruction-level parsing Returns a tuple of (is_arbitrage, arbitrage_data) """ - # This is a simple placeholder implementation - # A real implementation would require advanced pattern matching - # and knowledge of Solana DEX structures + from app.parsers.parser_registry import parser_registry - # Check if transaction has instructions + # Check if transaction exists and has data if not tx_data or "transaction" not in tx_data: return False, None - tx = tx_data["transaction"] - if "message" not in tx or "instructions" not in tx["message"]: + # Parse all instructions in the transaction + parsed_instructions = parser_registry.parse_transaction_instructions(tx_data) + + # Extract swap information from parsed instructions + swaps = [] + + # Track token flow for a single user wallet + token_flow = {} # token_address -> net_amount + user_wallet = None + + for instruction in parsed_instructions: + program_id = instruction.get("program_id", "") + parser = parser_registry.get_parser_for_program(program_id) + + if not parser: + continue + + # Extract swap info if the parser is a SwapParser + if hasattr(parser, "extract_swap_info"): + try: + token_in, token_out, amount_in, amount_out = parser.extract_swap_info(instruction) + + if token_in and token_out and amount_in is not None and amount_out is not None: + swaps.append({ + "token_in": token_in, + "token_out": token_out, + "amount_in": amount_in, + "amount_out": amount_out, + "program_id": program_id, + }) + + # Track net token flow + token_flow[token_in] = token_flow.get(token_in, 0) - amount_in + token_flow[token_out] = token_flow.get(token_out, 0) + amount_out + + # Try to identify user wallet + if "user_wallet" in instruction and not user_wallet: + user_wallet = instruction.get("user_wallet") + except Exception as e: + logger.error(f"Error extracting swap info: {str(e)}") + + # No need to continue if we don't have enough swaps + if len(swaps) < 3: return False, None - instructions = tx["message"]["instructions"] + # Check for circular swaps (token A -> token B -> token C -> ... -> token A) + # This is a key arbitrage pattern + token_path = [] + for swap in swaps: + token_path.append(swap["token_in"]) + token_path.append(swap["token_out"]) - # Check if transaction has multiple token swaps - # This is a simplistic approach - real implementation would be more sophisticated - swap_count = 0 - token_programs = [ - "9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin", # Serum DEX v3 - "SwaPpA9LAaLfeLi3a68M4DjnLqgtticKg6CnyNwgAC8", # Swap Program - "27haf8L6oxUeXrHrgEgsexjSY5hbVUWEmvv9Nyxg8vQv", # Jupiter Aggregator - ] + # Check if the first and last tokens in the path are the same + is_circular = len(token_path) >= 2 and token_path[0] == token_path[-1] - token_addresses = [] - for inst in instructions: - if inst.get("programId") in token_programs: - swap_count += 1 - # In a real implementation, we'd parse the instruction data - # to identify the tokens being swapped - - # For now, we'll just use a placeholder - token_addresses.append(f"token_{swap_count}") + # Check if there's a profit in any token + profit_token = None + profit_amount = 0 - # Check for circular pattern (token A -> token B -> token C -> token A) - if swap_count >= 3: - # Check if first and last token are the same (circular) - is_circular = len(token_addresses) >= 3 and token_addresses[0] == token_addresses[-1] - - if is_circular: - # In a real implementation, we'd calculate the actual profit - # For now, return a placeholder - return True, { - "profit_token_address": token_addresses[0], - "profit_amount": 0.01, # Placeholder - "path": json.dumps(token_addresses), - "confidence_score": 0.7, # Placeholder - } + for token, amount in token_flow.items(): + if amount > 0: + # If we have a net positive amount of any token, it's a profit + if profit_amount < amount: + profit_token = token + profit_amount = amount + + # Determine arbitrage confidence score + confidence_score = 0.0 + + if is_circular and profit_token: + # High confidence if we have a circular path and profit + confidence_score = 0.9 + elif is_circular: + # Medium confidence if we have a circular path but no clear profit + confidence_score = 0.7 + elif profit_token: + # Lower confidence if we have profit but no circular path + confidence_score = 0.5 + else: + # Not likely an arbitrage + return False, None + + # Consider it arbitrage if we have high enough confidence + if confidence_score >= 0.5: + return True, { + "profit_token_address": profit_token or swaps[0]["token_in"], + "profit_amount": profit_amount or 0.01, # Fallback to placeholder if we can't determine + "path": json.dumps(token_path), + "confidence_score": confidence_score, + } return False, None \ No newline at end of file diff --git a/migrations/versions/add_instruction_tables.py b/migrations/versions/add_instruction_tables.py new file mode 100644 index 0000000..be32d32 --- /dev/null +++ b/migrations/versions/add_instruction_tables.py @@ -0,0 +1,89 @@ +"""add instruction tables + +Revision ID: f9c3a857d9e6 +Revises: b61d5f5a1ded +Create Date: 2023-10-10 12:00:00.000000 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'f9c3a857d9e6' +down_revision = 'b61d5f5a1ded' +branch_labels = None +depends_on = None + + +def upgrade(): + # Create instructions table + op.create_table( + 'instructions', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('transaction_id', sa.Integer(), nullable=True), + sa.Column('program_id', sa.String(), nullable=True), + sa.Column('instruction_type', sa.String(), nullable=True), + sa.Column('instruction_index', sa.Integer(), nullable=True), + sa.Column('accounts', sa.Text(), nullable=True), + sa.Column('data', sa.Text(), nullable=True), + sa.Column('parsed_data', sa.JSON(), nullable=True), + sa.ForeignKeyConstraint(['transaction_id'], ['transactions.id'], ), + sa.PrimaryKeyConstraint('id'), + ) + op.create_index(op.f('ix_instructions_id'), 'instructions', ['id'], unique=False) + op.create_index(op.f('ix_instructions_program_id'), 'instructions', ['program_id'], unique=False) + op.create_index(op.f('ix_instructions_instruction_type'), 'instructions', ['instruction_type'], unique=False) + + # Create swaps table + op.create_table( + 'swaps', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('transaction_id', sa.Integer(), nullable=True), + sa.Column('instruction_id', sa.Integer(), nullable=True), + sa.Column('program_id', sa.String(), nullable=True), + sa.Column('token_in_address', sa.String(), nullable=True), + sa.Column('token_out_address', sa.String(), nullable=True), + sa.Column('amount_in', sa.Float(), nullable=True), + sa.Column('amount_out', sa.Float(), nullable=True), + sa.Column('user_account', sa.String(), nullable=True), + sa.Column('timestamp', sa.DateTime(), nullable=True), + sa.ForeignKeyConstraint(['instruction_id'], ['instructions.id'], ), + sa.ForeignKeyConstraint(['transaction_id'], ['transactions.id'], ), + sa.PrimaryKeyConstraint('id'), + ) + op.create_index(op.f('ix_swaps_id'), 'swaps', ['id'], unique=False) + op.create_index(op.f('ix_swaps_program_id'), 'swaps', ['program_id'], unique=False) + op.create_index(op.f('ix_swaps_token_in_address'), 'swaps', ['token_in_address'], unique=False) + op.create_index(op.f('ix_swaps_token_out_address'), 'swaps', ['token_out_address'], unique=False) + op.create_index(op.f('ix_swaps_user_account'), 'swaps', ['user_account'], unique=False) + + # Add instruction_id to token_transfers + op.add_column('token_transfers', sa.Column('instruction_id', sa.Integer(), nullable=True)) + op.create_foreign_key(None, 'token_transfers', 'instructions', ['instruction_id'], ['id']) + + # Add swap_sequence to arbitrage_events + op.add_column('arbitrage_events', sa.Column('swap_sequence', sa.Text(), nullable=True)) + + +def downgrade(): + # Remove swap_sequence from arbitrage_events + op.drop_column('arbitrage_events', 'swap_sequence') + + # Remove instruction_id from token_transfers + op.drop_constraint(None, 'token_transfers', type_='foreignkey') + op.drop_column('token_transfers', 'instruction_id') + + # Drop swaps table + op.drop_index(op.f('ix_swaps_user_account'), table_name='swaps') + op.drop_index(op.f('ix_swaps_token_out_address'), table_name='swaps') + op.drop_index(op.f('ix_swaps_token_in_address'), table_name='swaps') + op.drop_index(op.f('ix_swaps_program_id'), table_name='swaps') + op.drop_index(op.f('ix_swaps_id'), table_name='swaps') + op.drop_table('swaps') + + # Drop instructions table + op.drop_index(op.f('ix_instructions_instruction_type'), table_name='instructions') + op.drop_index(op.f('ix_instructions_program_id'), table_name='instructions') + op.drop_index(op.f('ix_instructions_id'), table_name='instructions') + op.drop_table('instructions') \ No newline at end of file