This commit is contained in:
Automated Action 2025-05-28 22:44:22 +00:00
parent 410c2fad21
commit cae8527a4b
12 changed files with 1237 additions and 54 deletions

View File

@ -1,7 +1,7 @@
from datetime import datetime
from sqlalchemy import (Boolean, Column, DateTime, Float, ForeignKey, Integer,
String, Text)
String, Text, JSON)
from sqlalchemy.orm import relationship
from app.db.session import Base
@ -32,8 +32,25 @@ class Transaction(Base):
raw_data = Column(Text)
block = relationship("Block", back_populates="transactions")
token_transfers = relationship("TokenTransfer", back_populates="transaction")
arbitrage_events = relationship("ArbitrageEvent", back_populates="transaction")
instructions = relationship("Instruction", back_populates="transaction", cascade="all, delete-orphan")
token_transfers = relationship("TokenTransfer", back_populates="transaction", cascade="all, delete-orphan")
arbitrage_events = relationship("ArbitrageEvent", back_populates="transaction", cascade="all, delete-orphan")
class Instruction(Base):
__tablename__ = "instructions"
id = Column(Integer, primary_key=True, index=True)
transaction_id = Column(Integer, ForeignKey("transactions.id"))
program_id = Column(String, index=True)
instruction_type = Column(String, index=True)
instruction_index = Column(Integer, nullable=True)
accounts = Column(Text, nullable=True) # JSON string of account pubkeys
data = Column(Text, nullable=True) # Raw instruction data
parsed_data = Column(JSON, nullable=True) # Parsed instruction data
transaction = relationship("Transaction", back_populates="instructions")
swaps = relationship("Swap", back_populates="instruction", cascade="all, delete-orphan")
class TokenTransfer(Base):
@ -41,6 +58,7 @@ class TokenTransfer(Base):
id = Column(Integer, primary_key=True, index=True)
transaction_id = Column(Integer, ForeignKey("transactions.id"))
instruction_id = Column(Integer, ForeignKey("instructions.id"), nullable=True)
token_address = Column(String, index=True)
from_address = Column(String, index=True)
to_address = Column(String, index=True)
@ -49,6 +67,25 @@ class TokenTransfer(Base):
timestamp = Column(DateTime, default=datetime.utcnow)
transaction = relationship("Transaction", back_populates="token_transfers")
instruction = relationship("Instruction", backref="token_transfers")
class Swap(Base):
__tablename__ = "swaps"
id = Column(Integer, primary_key=True, index=True)
transaction_id = Column(Integer, ForeignKey("transactions.id"))
instruction_id = Column(Integer, ForeignKey("instructions.id"))
program_id = Column(String, index=True)
token_in_address = Column(String, index=True)
token_out_address = Column(String, index=True)
amount_in = Column(Float)
amount_out = Column(Float)
user_account = Column(String, index=True)
timestamp = Column(DateTime, default=datetime.utcnow)
transaction = relationship("Transaction", backref="swaps")
instruction = relationship("Instruction", back_populates="swaps")
class ArbitrageEvent(Base):
@ -60,6 +97,7 @@ class ArbitrageEvent(Base):
profit_amount = Column(Float)
profit_usd = Column(Float, nullable=True)
path = Column(Text) # JSON string of token paths
swap_sequence = Column(Text, nullable=True) # JSON string of swap IDs
confidence_score = Column(Float) # 0.0 to 1.0
detected_at = Column(DateTime, default=datetime.utcnow)

1
app/parsers/__init__.py Normal file
View File

@ -0,0 +1 @@
# Instruction parsers package

110
app/parsers/base.py Normal file
View File

@ -0,0 +1,110 @@
"""
Base interfaces for instruction parsers
"""
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple, Any
class InstructionParser(ABC):
"""
Base class for all instruction parsers
"""
@property
@abstractmethod
def program_id(self) -> str:
"""
Return the program ID that this parser handles
"""
pass
@abstractmethod
def parse_instruction(self, instruction: Dict, accounts: List[str], instruction_data: bytes) -> Dict:
"""
Parse a single instruction from transaction data
Args:
instruction: The instruction object from transaction data
accounts: List of account addresses in the transaction
instruction_data: Raw instruction data bytes
Returns:
Dict with parsed instruction information
"""
pass
def can_handle(self, program_id: str) -> bool:
"""
Check if this parser can handle instructions from a given program
Args:
program_id: Program ID to check
Returns:
True if this parser can handle the program, False otherwise
"""
return program_id == self.program_id
class TransferParser(InstructionParser):
"""
Base class for transfer instruction parsers
"""
@abstractmethod
def extract_transfer_info(self, parsed_instruction: Dict) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[float]]:
"""
Extract transfer information from a parsed instruction
Args:
parsed_instruction: Instruction data parsed by parse_instruction
Returns:
Tuple of (token_address, from_address, to_address, amount)
Any of these values can be None if not applicable
"""
pass
class SwapParser(InstructionParser):
"""
Base class for swap instruction parsers
"""
@abstractmethod
def extract_swap_info(self, parsed_instruction: Dict) -> Tuple[Optional[str], Optional[str], Optional[float], Optional[float]]:
"""
Extract swap information from a parsed instruction
Args:
parsed_instruction: Instruction data parsed by parse_instruction
Returns:
Tuple of (token_in_address, token_out_address, amount_in, amount_out)
Any of these values can be None if not applicable
"""
pass
class LiquidityParser(InstructionParser):
"""
Base class for liquidity instruction parsers (add/remove)
"""
@abstractmethod
def extract_liquidity_info(self, parsed_instruction: Dict) -> Dict[str, Any]:
"""
Extract liquidity information from a parsed instruction
Args:
parsed_instruction: Instruction data parsed by parse_instruction
Returns:
Dict with keys:
- action: "add" or "remove"
- pool_address: Address of the liquidity pool
- tokens: List of token addresses involved
- amounts: List of token amounts
- user: User address
"""
pass

93
app/parsers/jupiter.py Normal file
View File

@ -0,0 +1,93 @@
"""
Parser for the Jupiter Aggregator swap instructions
"""
import base64
from typing import Dict, List, Optional, Tuple
from app.parsers.base import SwapParser
# Jupiter Program IDs
JUPITER_PROGRAM_ID = "JUP4Fb2cqiRUcaTHdrPC8h2gNsA2ETXiPDD33WcGuJB" # Jupiter V4
JUPITER_V3_PROGRAM_ID = "JUP3c2Uh3WA4Ng34tw6kPd2G4C5BB21Xo36Je1s32Ph" # Jupiter V3
JUPITER_V6_PROGRAM_ID = "JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4" # Jupiter V6
class JupiterParser(SwapParser):
"""
Parser for the Jupiter Aggregator swap instructions
"""
@property
def program_id(self) -> str:
return JUPITER_PROGRAM_ID
def can_handle(self, program_id: str) -> bool:
"""
Check if this parser can handle instructions from a given program
"""
return program_id in [JUPITER_PROGRAM_ID, JUPITER_V3_PROGRAM_ID, JUPITER_V6_PROGRAM_ID]
def parse_instruction(self, instruction: Dict, accounts: List[str], instruction_data: bytes) -> Dict:
"""
Parse a Jupiter instruction
"""
if not instruction_data:
return {"type": "unknown", "error": "No instruction data"}
# Get accounts referenced by the instruction
instruction_accounts = instruction.get("accounts", [])
referenced_accounts = [accounts[idx] for idx in instruction_accounts if idx < len(accounts)]
# Basic metadata about the instruction
result = {
"type": "jupiter_swap",
"program": "jupiter_aggregator",
"program_id": instruction.get("programId", JUPITER_PROGRAM_ID),
"accounts": referenced_accounts,
"data": base64.b64encode(instruction_data).decode("utf-8"),
}
# Jupiter instructions are complex and version-dependent
# Here we'll implement a simplified version for demonstration
# In Jupiter V4+, the accounts typically follow this pattern:
# 0: Token program
# 1: User's token account (source)
# 2: User's token account (destination)
# 3: User's wallet
# (additional accounts depend on the route)
if len(referenced_accounts) >= 4:
result.update({
"source_account": referenced_accounts[1],
"destination_account": referenced_accounts[2],
"user_wallet": referenced_accounts[3],
})
# Note: Parsing the exact amount and path would require deeper
# instruction-specific knowledge and likely additional context
return result
def extract_swap_info(self, parsed_instruction: Dict) -> Tuple[Optional[str], Optional[str], Optional[float], Optional[float]]:
"""
Extract swap information from a parsed instruction
Returns:
Tuple of (token_in_address, token_out_address, amount_in, amount_out)
"""
if parsed_instruction.get("type") != "jupiter_swap":
return None, None, None, None
# For a full implementation, we would need to:
# 1. Look up token mint addresses from the token accounts
# 2. Parse amounts from instruction data
# 3. Handle different Jupiter versions
# This is a simplified implementation
source_account = parsed_instruction.get("source_account")
destination_account = parsed_instruction.get("destination_account")
# We don't have enough information to determine amounts from just the instruction
return None, source_account, destination_account, None

View File

@ -0,0 +1,139 @@
"""
Registry of instruction parsers
"""
import base64
import logging
from typing import Dict, List, Optional
from app.parsers.base import InstructionParser
from app.parsers.token_program import TokenProgramParser
from app.parsers.serum_dex import SerumDexParser
from app.parsers.jupiter import JupiterParser
from app.parsers.raydium import RaydiumSwapParser, RaydiumLiquidityParser
logger = logging.getLogger(__name__)
class InstructionParserRegistry:
"""
Registry of instruction parsers for different Solana programs
"""
def __init__(self):
"""
Initialize the registry with available parsers
"""
self.parsers: Dict[str, InstructionParser] = {}
self.register_default_parsers()
def register_parser(self, parser: InstructionParser) -> None:
"""
Register a parser for a specific program
Args:
parser: Parser instance to register
"""
self.parsers[parser.program_id] = parser
logger.debug(f"Registered parser for program: {parser.program_id}")
def register_default_parsers(self) -> None:
"""
Register the default set of parsers
"""
# Register SPL Token Program parser
self.register_parser(TokenProgramParser())
# Register DEX parsers
self.register_parser(SerumDexParser())
self.register_parser(JupiterParser())
self.register_parser(RaydiumSwapParser())
self.register_parser(RaydiumLiquidityParser())
def get_parser_for_program(self, program_id: str) -> Optional[InstructionParser]:
"""
Get a parser for a specific program ID
Args:
program_id: The program ID to get a parser for
Returns:
Parser instance if one exists for the program, None otherwise
"""
# First try exact match
if program_id in self.parsers:
return self.parsers[program_id]
# Then try parsers that can handle multiple program IDs
for parser in self.parsers.values():
if parser.can_handle(program_id):
return parser
return None
def parse_transaction_instructions(self, transaction_data: Dict) -> List[Dict]:
"""
Parse all instructions in a transaction
Args:
transaction_data: Transaction data from RPC response
Returns:
List of parsed instructions
"""
parsed_instructions = []
if not transaction_data or "transaction" not in transaction_data:
return parsed_instructions
tx = transaction_data["transaction"]
if "message" not in tx:
return parsed_instructions
message = tx["message"]
# Extract account addresses
accounts = message.get("accountKeys", [])
# Parse each instruction
instructions = message.get("instructions", [])
for instruction in instructions:
program_id = instruction.get("programId")
if not program_id:
continue
# Get parser for this program
parser = self.get_parser_for_program(program_id)
# Parse instruction data
instruction_data_b64 = instruction.get("data", "")
try:
instruction_data = base64.b64decode(instruction_data_b64)
except Exception:
# If not base64, try other formats or skip
instruction_data = b""
# Parse the instruction
if parser:
try:
parsed = parser.parse_instruction(instruction, accounts, instruction_data)
parsed_instructions.append(parsed)
except Exception as e:
logger.error(f"Error parsing instruction: {str(e)}")
parsed_instructions.append({
"type": "error",
"program_id": program_id,
"error": str(e),
})
else:
# No parser available for this program
parsed_instructions.append({
"type": "unknown",
"program_id": program_id,
"data": instruction_data_b64,
})
return parsed_instructions
# Singleton instance
parser_registry = InstructionParserRegistry()

174
app/parsers/raydium.py Normal file
View File

@ -0,0 +1,174 @@
"""
Parser for Raydium AMM instructions
"""
import base64
from typing import Dict, List, Optional, Tuple, Any
from app.parsers.base import SwapParser, LiquidityParser
# Raydium Program IDs
RAYDIUM_LIQUIDITY_PROGRAM_ID = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"
RAYDIUM_SWAP_PROGRAM_ID = "9qvG1zUp8xF1Bi4m6UdRNby1BAAuaDrUxSpv4CmRRMjL"
class RaydiumSwapParser(SwapParser):
"""
Parser for Raydium swap instructions
"""
@property
def program_id(self) -> str:
return RAYDIUM_SWAP_PROGRAM_ID
def parse_instruction(self, instruction: Dict, accounts: List[str], instruction_data: bytes) -> Dict:
"""
Parse a Raydium swap instruction
"""
if not instruction_data:
return {"type": "unknown", "error": "No instruction data"}
# Get accounts referenced by the instruction
instruction_accounts = instruction.get("accounts", [])
referenced_accounts = [accounts[idx] for idx in instruction_accounts if idx < len(accounts)]
# For Raydium swap, the account list typically follows this pattern:
# 0: Token program
# 1: AMM ID
# 2: AMM authority
# 3: AMM open orders
# 4: AMM target orders
# 5: Pool coin token account
# 6: Pool pc token account
# 7: Serum program
# 8: Serum market
# 9: Serum bids
# 10: Serum asks
# 11: Serum event queue
# 12: Serum coin vault
# 13: Serum pc vault
# 14: Serum vault signer
# 15: User source token account
# 16: User destination token account
# 17: User owner
# Basic metadata about the instruction
result = {
"type": "raydium_swap",
"program": "raydium_swap",
"program_id": RAYDIUM_SWAP_PROGRAM_ID,
"accounts": referenced_accounts,
"data": base64.b64encode(instruction_data).decode("utf-8"),
}
# Add specific account references if available
if len(referenced_accounts) >= 18:
result.update({
"amm_id": referenced_accounts[1],
"pool_coin_account": referenced_accounts[5],
"pool_pc_account": referenced_accounts[6],
"serum_market": referenced_accounts[8],
"user_source_account": referenced_accounts[15],
"user_destination_account": referenced_accounts[16],
"user_owner": referenced_accounts[17],
})
# Parse instruction data to get amount_in
# Note: This is a simplified version. Actual layout may vary.
try:
if len(instruction_data) >= 9:
# First byte is instruction type, next 8 bytes are the amount
amount_in = int.from_bytes(instruction_data[1:9], byteorder="little")
result["amount_in"] = amount_in
# For demonstration - in a real implementation, we would parse
# minimum amount out as well
except Exception as e:
result["parse_error"] = str(e)
return result
def extract_swap_info(self, parsed_instruction: Dict) -> Tuple[Optional[str], Optional[str], Optional[float], Optional[float]]:
"""
Extract swap information from a parsed instruction
Returns:
Tuple of (token_in_address, token_out_address, amount_in, amount_out)
"""
if parsed_instruction.get("type") != "raydium_swap":
return None, None, None, None
# For Raydium, we don't directly get token mint addresses from the instruction
# We would need to look up the token accounts' mint addresses
source_account = parsed_instruction.get("user_source_account")
destination_account = parsed_instruction.get("user_destination_account")
amount_in = parsed_instruction.get("amount_in")
# We don't know the amount_out from just the instruction
return None, source_account, destination_account, amount_in
class RaydiumLiquidityParser(LiquidityParser):
"""
Parser for Raydium liquidity instructions
"""
@property
def program_id(self) -> str:
return RAYDIUM_LIQUIDITY_PROGRAM_ID
def parse_instruction(self, instruction: Dict, accounts: List[str], instruction_data: bytes) -> Dict:
"""
Parse a Raydium liquidity instruction
"""
if not instruction_data:
return {"type": "unknown", "error": "No instruction data"}
# First byte is the instruction type
instruction_type = instruction_data[0]
# Get accounts referenced by the instruction
instruction_accounts = instruction.get("accounts", [])
referenced_accounts = [accounts[idx] for idx in instruction_accounts if idx < len(accounts)]
# Basic metadata about the instruction
result = {
"program": "raydium_liquidity",
"program_id": RAYDIUM_LIQUIDITY_PROGRAM_ID,
"accounts": referenced_accounts,
"data": base64.b64encode(instruction_data).decode("utf-8"),
}
# Parse specific instruction types
if instruction_type == 1: # Deposit liquidity
result["type"] = "raydium_deposit_liquidity"
elif instruction_type == 2: # Withdraw liquidity
result["type"] = "raydium_withdraw_liquidity"
else:
result["type"] = f"raydium_instruction_{instruction_type}"
return result
def extract_liquidity_info(self, parsed_instruction: Dict) -> Dict[str, Any]:
"""
Extract liquidity information from a parsed instruction
"""
result = {
"action": None,
"pool_address": None,
"tokens": [],
"amounts": [],
"user": None,
}
if parsed_instruction.get("type") == "raydium_deposit_liquidity":
result["action"] = "add"
elif parsed_instruction.get("type") == "raydium_withdraw_liquidity":
result["action"] = "remove"
else:
return result
# In a full implementation, we would extract pool address, tokens, amounts
# and user from the instruction accounts and data
return result

164
app/parsers/serum_dex.py Normal file
View File

@ -0,0 +1,164 @@
"""
Parser for the Serum DEX v3 instructions
"""
import base64
from typing import Dict, List, Optional, Tuple
from app.parsers.base import SwapParser
# Serum DEX v3 Program ID
SERUM_DEX_V3_PROGRAM_ID = "9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin"
# Serum DEX Instructions
SERUM_DEX_MATCH_ORDERS = 2
SERUM_DEX_NEW_ORDER = 0
SERUM_DEX_CANCEL_ORDER = 1
class SerumDexParser(SwapParser):
"""
Parser for the Serum DEX v3 instructions
"""
@property
def program_id(self) -> str:
return SERUM_DEX_V3_PROGRAM_ID
def parse_instruction(self, instruction: Dict, accounts: List[str], instruction_data: bytes) -> Dict:
"""
Parse a Serum DEX instruction
"""
if not instruction_data:
return {"type": "unknown", "error": "No instruction data"}
# First byte is the instruction type
instruction_type = instruction_data[0]
# Get accounts referenced by the instruction
instruction_accounts = instruction.get("accounts", [])
referenced_accounts = [accounts[idx] for idx in instruction_accounts if idx < len(accounts)]
result = {
"program": "serum_dex",
"program_id": SERUM_DEX_V3_PROGRAM_ID,
"accounts": referenced_accounts,
"data": base64.b64encode(instruction_data).decode("utf-8"),
}
# Parse specific instruction types
if instruction_type == SERUM_DEX_NEW_ORDER:
# New order instruction
result.update(self._parse_new_order(instruction_data, referenced_accounts))
elif instruction_type == SERUM_DEX_MATCH_ORDERS:
# Match orders instruction
result.update(self._parse_match_orders(instruction_data, referenced_accounts))
elif instruction_type == SERUM_DEX_CANCEL_ORDER:
# Cancel order instruction
result.update(self._parse_cancel_order(instruction_data, referenced_accounts))
else:
# Unknown instruction type
result.update({
"type": f"serum_instruction_{instruction_type}",
"error": "Unknown instruction type",
})
return result
def _parse_new_order(self, instruction_data: bytes, accounts: List[str]) -> Dict:
"""
Parse a Serum DEX new order instruction
"""
# New order layout is complex, simplified for this example
try:
side = "buy" if instruction_data[1] == 0 else "sell"
# Note: In a real implementation, we would parse:
# - limit price
# - max base qty
# - max quote qty
# - self trade behavior
# - order type
# - client order ID
# For demo purposes we'll create a simplified result
return {
"type": "serum_new_order",
"side": side,
"market": accounts[0] if len(accounts) > 0 else None,
"open_orders": accounts[1] if len(accounts) > 1 else None,
"request_queue": accounts[2] if len(accounts) > 2 else None,
"payer": accounts[3] if len(accounts) > 3 else None,
"owner": accounts[4] if len(accounts) > 4 else None,
"base_vault": accounts[5] if len(accounts) > 5 else None,
"quote_vault": accounts[6] if len(accounts) > 6 else None,
# Additional fields would be parsed from instruction_data
}
except Exception as e:
return {
"type": "serum_new_order",
"error": f"Failed to parse new order: {str(e)}",
}
def _parse_match_orders(self, instruction_data: bytes, accounts: List[str]) -> Dict:
"""
Parse a Serum DEX match orders instruction
"""
# Simplified implementation for demo purposes
try:
# Real implementation would extract more fields
return {
"type": "serum_match_orders",
"market": accounts[0] if len(accounts) > 0 else None,
"request_queue": accounts[1] if len(accounts) > 1 else None,
"event_queue": accounts[2] if len(accounts) > 2 else None,
"bids": accounts[3] if len(accounts) > 3 else None,
"asks": accounts[4] if len(accounts) > 4 else None,
# Additional accounts would be included
}
except Exception as e:
return {
"type": "serum_match_orders",
"error": f"Failed to parse match orders: {str(e)}",
}
def _parse_cancel_order(self, instruction_data: bytes, accounts: List[str]) -> Dict:
"""
Parse a Serum DEX cancel order instruction
"""
# Simplified implementation for demo purposes
try:
side = "buy" if instruction_data[1] == 0 else "sell"
return {
"type": "serum_cancel_order",
"side": side,
"market": accounts[0] if len(accounts) > 0 else None,
"open_orders": accounts[1] if len(accounts) > 1 else None,
"owner": accounts[2] if len(accounts) > 2 else None,
# Additional fields would be parsed from instruction_data
}
except Exception as e:
return {
"type": "serum_cancel_order",
"error": f"Failed to parse cancel order: {str(e)}",
}
def extract_swap_info(self, parsed_instruction: Dict) -> Tuple[Optional[str], Optional[str], Optional[float], Optional[float]]:
"""
Extract swap information from a parsed instruction
Returns:
Tuple of (token_in_address, token_out_address, amount_in, amount_out)
"""
# For Serum, we need to look at both the new order and match orders instructions
# to get a complete swap picture, which is more complex than can be done
# with a single instruction.
# This is a simplified implementation that would need to be enhanced
if parsed_instruction.get("type") == "serum_match_orders":
# We would need more context to determine the tokens and amounts
# For now, return None values
return None, None, None, None
return None, None, None, None

View File

@ -0,0 +1,111 @@
"""
Parser for the Solana Token Program (SPL)
"""
import base64
from typing import Dict, List, Optional, Tuple
from app.parsers.base import TransferParser
# Token Program Instructions
TOKEN_TRANSFER_INSTRUCTION = 3
TOKEN_PROGRAM_ID = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
class TokenProgramParser(TransferParser):
"""
Parser for the Solana Token Program instructions
"""
@property
def program_id(self) -> str:
return TOKEN_PROGRAM_ID
def parse_instruction(self, instruction: Dict, accounts: List[str], instruction_data: bytes) -> Dict:
"""
Parse a token program instruction
"""
if not instruction_data:
return {"type": "unknown", "error": "No instruction data"}
# First byte is the instruction type
instruction_type = instruction_data[0]
if instruction_type == TOKEN_TRANSFER_INSTRUCTION:
# Parse transfer instruction
return self._parse_transfer(instruction, accounts, instruction_data)
# For other instruction types
return {
"type": f"token_instruction_{instruction_type}",
"program": "token_program",
"program_id": TOKEN_PROGRAM_ID,
"accounts": [accounts[idx] for idx in instruction.get("accounts", [])],
"data": base64.b64encode(instruction_data).decode("utf-8"),
}
def _parse_transfer(self, instruction: Dict, accounts: List[str], instruction_data: bytes) -> Dict:
"""
Parse a token transfer instruction
"""
# For token transfers, the accounts are:
# 0: Source account
# 1: Destination account
# 2: Owner of source account
# (3: Signers if multisig, optional)
instruction_accounts = instruction.get("accounts", [])
if len(instruction_accounts) < 3:
return {
"type": "token_transfer",
"error": "Not enough accounts for transfer",
"program": "token_program",
"program_id": TOKEN_PROGRAM_ID,
}
# Parse amount from instruction data
# The format is: [instruction_type(1 byte), amount(8 bytes)]
try:
amount_data = instruction_data[1:9]
amount = int.from_bytes(amount_data, byteorder="little")
except Exception as e:
return {
"type": "token_transfer",
"error": f"Failed to parse amount: {str(e)}",
"program": "token_program",
"program_id": TOKEN_PROGRAM_ID,
}
source_idx = instruction_accounts[0]
dest_idx = instruction_accounts[1]
owner_idx = instruction_accounts[2]
return {
"type": "token_transfer",
"program": "token_program",
"program_id": TOKEN_PROGRAM_ID,
"source": accounts[source_idx] if source_idx < len(accounts) else None,
"destination": accounts[dest_idx] if dest_idx < len(accounts) else None,
"owner": accounts[owner_idx] if owner_idx < len(accounts) else None,
"amount": amount,
"decimals": None, # We don't know the token decimals from instruction alone
}
def extract_transfer_info(self, parsed_instruction: Dict) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[float]]:
"""
Extract transfer information from a parsed instruction
Returns:
Tuple of (token_address, from_address, to_address, amount)
"""
if parsed_instruction.get("type") != "token_transfer":
return None, None, None, None
source = parsed_instruction.get("source")
destination = parsed_instruction.get("destination")
amount = parsed_instruction.get("amount")
# Note: For SPL tokens, the actual token mint (token_address) is not directly
# included in the instruction, so we would need additional context to determine it
return None, source, destination, amount

View File

@ -1,9 +1,54 @@
from datetime import datetime
from typing import List, Optional
from typing import Dict, List, Optional, Any
from pydantic import BaseModel, Field
class InstructionBase(BaseModel):
program_id: str
instruction_type: str
instruction_index: Optional[int] = None
accounts: Optional[str] = None
data: Optional[str] = None
parsed_data: Optional[Dict[str, Any]] = None
class InstructionCreate(InstructionBase):
transaction_id: int
class Instruction(InstructionBase):
id: int
transaction_id: int
class Config:
from_attributes = True
class SwapBase(BaseModel):
program_id: str
token_in_address: str
token_out_address: str
amount_in: float
amount_out: float
user_account: Optional[str] = None
class SwapCreate(SwapBase):
transaction_id: int
instruction_id: int
class Swap(SwapBase):
id: int
transaction_id: int
instruction_id: int
timestamp: datetime
class Config:
from_attributes = True
class TokenTransferBase(BaseModel):
token_address: str
from_address: str
@ -14,11 +59,13 @@ class TokenTransferBase(BaseModel):
class TokenTransferCreate(TokenTransferBase):
transaction_id: int
instruction_id: Optional[int] = None
class TokenTransfer(TokenTransferBase):
id: int
transaction_id: int
instruction_id: Optional[int] = None
timestamp: datetime
class Config:
@ -30,6 +77,7 @@ class ArbitrageEventBase(BaseModel):
profit_amount: float
profit_usd: Optional[float] = None
path: str
swap_sequence: Optional[str] = None
confidence_score: float = Field(..., ge=0.0, le=1.0)
@ -61,6 +109,8 @@ class Transaction(TransactionBase):
id: int
block_id: int
timestamp: datetime
instructions: List[Instruction] = []
swaps: List[Swap] = []
token_transfers: List[TokenTransfer] = []
arbitrage_events: List[ArbitrageEvent] = []

View File

@ -6,7 +6,8 @@ from typing import Dict, List, Optional, Tuple
from sqlalchemy.orm import Session
from app.core.config import settings
from app.models.models import ArbitrageEvent, Block, TokenTransfer, Transaction
from app.models.models import (ArbitrageEvent, Block, Instruction, Swap,
TokenTransfer, Transaction)
from app.schemas.schemas import (ArbitrageEventCreate, BlockCreate,
TokenTransferCreate, TransactionCreate)
from app.services.solana_client import SolanaClient
@ -84,22 +85,52 @@ class BlockScanner:
def save_token_transfers(self, tx_data: Dict, transaction_id: int) -> List[TokenTransfer]:
"""
Extract and save token transfers from transaction data
Extract and save token transfers from transaction data using instruction-level parsing
"""
# This would require parsing the transaction logs and instruction data
# For simplicity, we'll just create a placeholder function
# In a real implementation, we would extract token transfer info
# from the transaction instructions and logs
from app.parsers.parser_registry import parser_registry
transfers = []
# Placeholder for demonstration purposes
if "meta" in tx_data and "postTokenBalances" in tx_data["meta"] and "preTokenBalances" in tx_data["meta"]:
# Use the parser registry to parse all instructions in the transaction
parsed_instructions = parser_registry.parse_transaction_instructions(tx_data)
# Extract token transfers from parsed instructions using each parser
for instruction in parsed_instructions:
program_id = instruction.get("program_id", "")
parser = parser_registry.get_parser_for_program(program_id)
if not parser:
continue
# Extract transfer info if the parser is a TransferParser
if hasattr(parser, "extract_transfer_info"):
try:
token_address, from_address, to_address, amount = parser.extract_transfer_info(instruction)
if token_address and from_address and to_address and amount is not None and amount > 0:
# Create token transfer record
transfer_create = TokenTransferCreate(
transaction_id=transaction_id,
token_address=token_address,
from_address=from_address,
to_address=to_address,
amount=amount,
program_id=program_id,
)
transfer = TokenTransfer(**transfer_create.model_dump())
self.db.add(transfer)
transfers.append(transfer)
except Exception as e:
logger.error(f"Error extracting transfer info: {str(e)}")
# Fallback to pre/post balance comparison if no transfers found with instruction parsing
if not transfers and "meta" in tx_data and "postTokenBalances" in tx_data["meta"] and "preTokenBalances" in tx_data["meta"]:
logger.info("No transfers found with instruction parsing, falling back to balance comparison")
post_balances = tx_data["meta"]["postTokenBalances"]
pre_balances = tx_data["meta"]["preTokenBalances"]
# Compare pre and post balances to identify transfers
# This is a simplified approach
for post in post_balances:
mint = post.get("mint", "")
owner = post.get("owner", "")
@ -121,7 +152,7 @@ class BlockScanner:
from_address="" if amount_change > 0 else owner,
to_address=owner if amount_change > 0 else "",
amount=abs(amount_change),
program_id=post.get("programId", ""),
program_id="", # We don't know the program from balance comparison
)
transfer = TokenTransfer(**transfer_create.model_dump())
@ -155,6 +186,129 @@ class BlockScanner:
return event
def save_instructions(self, tx_data: Dict, transaction_id: int) -> List[Instruction]:
"""
Extract and save instruction data from a transaction
"""
from app.parsers.parser_registry import parser_registry
instructions = []
# Extract message and accounts from transaction
if not tx_data or "transaction" not in tx_data or "message" not in tx_data["transaction"]:
return instructions
tx_message = tx_data["transaction"]["message"]
accounts = tx_message.get("accountKeys", [])
# Process each instruction
for idx, instruction in enumerate(tx_message.get("instructions", [])):
program_id = instruction.get("programId", "")
parser = parser_registry.get_parser_for_program(program_id)
# Parse instruction data
instruction_data = instruction.get("data", "")
accounts_list = [accounts[idx] for idx in instruction.get("accounts", []) if idx < len(accounts)]
# Create instruction record
instruction_type = "unknown"
parsed_data = None
if parser:
try:
# Decode instruction data from base64
try:
import base64
raw_data = base64.b64decode(instruction_data)
except Exception:
raw_data = b""
# Parse instruction with appropriate parser
parsed_instruction = parser.parse_instruction(instruction, accounts, raw_data)
instruction_type = parsed_instruction.get("type", "unknown")
parsed_data = parsed_instruction
except Exception as e:
logger.error(f"Error parsing instruction: {str(e)}")
# Create instruction record
from app.schemas.schemas import InstructionCreate
instruction_create = InstructionCreate(
transaction_id=transaction_id,
program_id=program_id,
instruction_type=instruction_type,
instruction_index=idx,
accounts=json.dumps(accounts_list),
data=instruction_data,
parsed_data=parsed_data,
)
instruction_obj = Instruction(**instruction_create.model_dump())
self.db.add(instruction_obj)
instructions.append(instruction_obj)
if instructions:
self.db.commit()
for instruction in instructions:
self.db.refresh(instruction)
return instructions
def save_swaps(self, tx_data: Dict, transaction_id: int, instructions: List[Instruction]) -> List[Swap]:
"""
Extract and save swap data from a transaction
"""
from app.parsers.parser_registry import parser_registry
swaps = []
# Get parsed instructions
parsed_instructions = parser_registry.parse_transaction_instructions(tx_data)
# Map instruction indices to IDs
instruction_id_map = {inst.instruction_index: inst.id for inst in instructions if inst.instruction_index is not None}
# Extract swap information from parsed instructions
for idx, parsed_instruction in enumerate(parsed_instructions):
program_id = parsed_instruction.get("program_id", "")
parser = parser_registry.get_parser_for_program(program_id)
if not parser or not hasattr(parser, "extract_swap_info"):
continue
try:
token_in, token_out, amount_in, amount_out = parser.extract_swap_info(parsed_instruction)
if token_in and token_out and amount_in is not None and amount_out is not None:
instruction_id = instruction_id_map.get(idx)
if not instruction_id:
continue
# Create swap record
from app.schemas.schemas import SwapCreate
swap_create = SwapCreate(
transaction_id=transaction_id,
instruction_id=instruction_id,
program_id=program_id,
token_in_address=token_in,
token_out_address=token_out,
amount_in=amount_in,
amount_out=amount_out,
user_account=parsed_instruction.get("user_wallet", ""),
)
swap = Swap(**swap_create.model_dump())
self.db.add(swap)
swaps.append(swap)
except Exception as e:
logger.error(f"Error extracting swap info: {str(e)}")
if swaps:
self.db.commit()
for swap in swaps:
self.db.refresh(swap)
return swaps
def process_transaction(self, tx_data: Dict, block_id: int) -> Tuple[Transaction, Optional[ArbitrageEvent]]:
"""
Process a single transaction, looking for arbitrage
@ -162,6 +316,12 @@ class BlockScanner:
# Save transaction record
transaction = self.save_transaction(tx_data, block_id)
# Extract and save instructions
instructions = self.save_instructions(tx_data, transaction.id)
# Extract and save swaps
swaps = self.save_swaps(tx_data, transaction.id, instructions)
# Extract and save token transfers
self.save_token_transfers(tx_data, transaction.id)
@ -170,6 +330,11 @@ class BlockScanner:
arbitrage_event = None
if is_arbitrage and arbitrage_data:
# Add swap sequence if available
if swaps:
swap_ids = [swap.id for swap in swaps]
arbitrage_data["swap_sequence"] = json.dumps(swap_ids)
arbitrage_event = self.save_arbitrage_event(arbitrage_data, transaction.id)
logger.info(f"Detected arbitrage in transaction {transaction.signature}")

View File

@ -114,56 +114,105 @@ class SolanaClient:
def analyze_transaction_for_arbitrage(self, tx_data: Dict) -> Tuple[bool, Optional[Dict]]:
"""
Analyze transaction data for potential arbitrage
Analyze transaction data for potential arbitrage using instruction-level parsing
Returns a tuple of (is_arbitrage, arbitrage_data)
"""
# This is a simple placeholder implementation
# A real implementation would require advanced pattern matching
# and knowledge of Solana DEX structures
from app.parsers.parser_registry import parser_registry
# Check if transaction has instructions
# Check if transaction exists and has data
if not tx_data or "transaction" not in tx_data:
return False, None
tx = tx_data["transaction"]
if "message" not in tx or "instructions" not in tx["message"]:
# Parse all instructions in the transaction
parsed_instructions = parser_registry.parse_transaction_instructions(tx_data)
# Extract swap information from parsed instructions
swaps = []
# Track token flow for a single user wallet
token_flow = {} # token_address -> net_amount
user_wallet = None
for instruction in parsed_instructions:
program_id = instruction.get("program_id", "")
parser = parser_registry.get_parser_for_program(program_id)
if not parser:
continue
# Extract swap info if the parser is a SwapParser
if hasattr(parser, "extract_swap_info"):
try:
token_in, token_out, amount_in, amount_out = parser.extract_swap_info(instruction)
if token_in and token_out and amount_in is not None and amount_out is not None:
swaps.append({
"token_in": token_in,
"token_out": token_out,
"amount_in": amount_in,
"amount_out": amount_out,
"program_id": program_id,
})
# Track net token flow
token_flow[token_in] = token_flow.get(token_in, 0) - amount_in
token_flow[token_out] = token_flow.get(token_out, 0) + amount_out
# Try to identify user wallet
if "user_wallet" in instruction and not user_wallet:
user_wallet = instruction.get("user_wallet")
except Exception as e:
logger.error(f"Error extracting swap info: {str(e)}")
# No need to continue if we don't have enough swaps
if len(swaps) < 3:
return False, None
instructions = tx["message"]["instructions"]
# Check for circular swaps (token A -> token B -> token C -> ... -> token A)
# This is a key arbitrage pattern
token_path = []
for swap in swaps:
token_path.append(swap["token_in"])
token_path.append(swap["token_out"])
# Check if transaction has multiple token swaps
# This is a simplistic approach - real implementation would be more sophisticated
swap_count = 0
token_programs = [
"9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin", # Serum DEX v3
"SwaPpA9LAaLfeLi3a68M4DjnLqgtticKg6CnyNwgAC8", # Swap Program
"27haf8L6oxUeXrHrgEgsexjSY5hbVUWEmvv9Nyxg8vQv", # Jupiter Aggregator
]
# Check if the first and last tokens in the path are the same
is_circular = len(token_path) >= 2 and token_path[0] == token_path[-1]
token_addresses = []
for inst in instructions:
if inst.get("programId") in token_programs:
swap_count += 1
# In a real implementation, we'd parse the instruction data
# to identify the tokens being swapped
# For now, we'll just use a placeholder
token_addresses.append(f"token_{swap_count}")
# Check if there's a profit in any token
profit_token = None
profit_amount = 0
# Check for circular pattern (token A -> token B -> token C -> token A)
if swap_count >= 3:
# Check if first and last token are the same (circular)
is_circular = len(token_addresses) >= 3 and token_addresses[0] == token_addresses[-1]
if is_circular:
# In a real implementation, we'd calculate the actual profit
# For now, return a placeholder
return True, {
"profit_token_address": token_addresses[0],
"profit_amount": 0.01, # Placeholder
"path": json.dumps(token_addresses),
"confidence_score": 0.7, # Placeholder
}
for token, amount in token_flow.items():
if amount > 0:
# If we have a net positive amount of any token, it's a profit
if profit_amount < amount:
profit_token = token
profit_amount = amount
# Determine arbitrage confidence score
confidence_score = 0.0
if is_circular and profit_token:
# High confidence if we have a circular path and profit
confidence_score = 0.9
elif is_circular:
# Medium confidence if we have a circular path but no clear profit
confidence_score = 0.7
elif profit_token:
# Lower confidence if we have profit but no circular path
confidence_score = 0.5
else:
# Not likely an arbitrage
return False, None
# Consider it arbitrage if we have high enough confidence
if confidence_score >= 0.5:
return True, {
"profit_token_address": profit_token or swaps[0]["token_in"],
"profit_amount": profit_amount or 0.01, # Fallback to placeholder if we can't determine
"path": json.dumps(token_path),
"confidence_score": confidence_score,
}
return False, None

View File

@ -0,0 +1,89 @@
"""add instruction tables
Revision ID: f9c3a857d9e6
Revises: b61d5f5a1ded
Create Date: 2023-10-10 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f9c3a857d9e6'
down_revision = 'b61d5f5a1ded'
branch_labels = None
depends_on = None
def upgrade():
# Create instructions table
op.create_table(
'instructions',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('transaction_id', sa.Integer(), nullable=True),
sa.Column('program_id', sa.String(), nullable=True),
sa.Column('instruction_type', sa.String(), nullable=True),
sa.Column('instruction_index', sa.Integer(), nullable=True),
sa.Column('accounts', sa.Text(), nullable=True),
sa.Column('data', sa.Text(), nullable=True),
sa.Column('parsed_data', sa.JSON(), nullable=True),
sa.ForeignKeyConstraint(['transaction_id'], ['transactions.id'], ),
sa.PrimaryKeyConstraint('id'),
)
op.create_index(op.f('ix_instructions_id'), 'instructions', ['id'], unique=False)
op.create_index(op.f('ix_instructions_program_id'), 'instructions', ['program_id'], unique=False)
op.create_index(op.f('ix_instructions_instruction_type'), 'instructions', ['instruction_type'], unique=False)
# Create swaps table
op.create_table(
'swaps',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('transaction_id', sa.Integer(), nullable=True),
sa.Column('instruction_id', sa.Integer(), nullable=True),
sa.Column('program_id', sa.String(), nullable=True),
sa.Column('token_in_address', sa.String(), nullable=True),
sa.Column('token_out_address', sa.String(), nullable=True),
sa.Column('amount_in', sa.Float(), nullable=True),
sa.Column('amount_out', sa.Float(), nullable=True),
sa.Column('user_account', sa.String(), nullable=True),
sa.Column('timestamp', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['instruction_id'], ['instructions.id'], ),
sa.ForeignKeyConstraint(['transaction_id'], ['transactions.id'], ),
sa.PrimaryKeyConstraint('id'),
)
op.create_index(op.f('ix_swaps_id'), 'swaps', ['id'], unique=False)
op.create_index(op.f('ix_swaps_program_id'), 'swaps', ['program_id'], unique=False)
op.create_index(op.f('ix_swaps_token_in_address'), 'swaps', ['token_in_address'], unique=False)
op.create_index(op.f('ix_swaps_token_out_address'), 'swaps', ['token_out_address'], unique=False)
op.create_index(op.f('ix_swaps_user_account'), 'swaps', ['user_account'], unique=False)
# Add instruction_id to token_transfers
op.add_column('token_transfers', sa.Column('instruction_id', sa.Integer(), nullable=True))
op.create_foreign_key(None, 'token_transfers', 'instructions', ['instruction_id'], ['id'])
# Add swap_sequence to arbitrage_events
op.add_column('arbitrage_events', sa.Column('swap_sequence', sa.Text(), nullable=True))
def downgrade():
# Remove swap_sequence from arbitrage_events
op.drop_column('arbitrage_events', 'swap_sequence')
# Remove instruction_id from token_transfers
op.drop_constraint(None, 'token_transfers', type_='foreignkey')
op.drop_column('token_transfers', 'instruction_id')
# Drop swaps table
op.drop_index(op.f('ix_swaps_user_account'), table_name='swaps')
op.drop_index(op.f('ix_swaps_token_out_address'), table_name='swaps')
op.drop_index(op.f('ix_swaps_token_in_address'), table_name='swaps')
op.drop_index(op.f('ix_swaps_program_id'), table_name='swaps')
op.drop_index(op.f('ix_swaps_id'), table_name='swaps')
op.drop_table('swaps')
# Drop instructions table
op.drop_index(op.f('ix_instructions_instruction_type'), table_name='instructions')
op.drop_index(op.f('ix_instructions_program_id'), table_name='instructions')
op.drop_index(op.f('ix_instructions_id'), table_name='instructions')
op.drop_table('instructions')