Automated Action cae8527a4b s
2025-05-28 22:44:22 +00:00

431 lines
17 KiB
Python

import json
import logging
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from sqlalchemy.orm import Session
from app.core.config import settings
from app.models.models import (ArbitrageEvent, Block, Instruction, Swap,
TokenTransfer, Transaction)
from app.schemas.schemas import (ArbitrageEventCreate, BlockCreate,
TokenTransferCreate, TransactionCreate)
from app.services.solana_client import SolanaClient
logger = logging.getLogger(__name__)
class BlockScanner:
"""
Scanner service for processing Solana blocks and detecting arbitrage
"""
def __init__(self, db: Session, solana_client: Optional[SolanaClient] = None):
"""
Initialize scanner with database session and Solana client
"""
self.db = db
self.solana_client = solana_client or SolanaClient()
self._scan_in_progress = False
@property
def scan_in_progress(self) -> bool:
"""
Check if a scan is currently in progress
"""
return self._scan_in_progress
def get_last_scanned_block(self) -> Optional[int]:
"""
Get the latest block that was scanned
"""
latest_block = self.db.query(Block).order_by(Block.slot.desc()).first()
if latest_block:
return latest_block.slot
return None
def save_block(self, block_data: Dict) -> Block:
"""
Save block data to database
"""
block_create = BlockCreate(
slot=block_data["parentSlot"] + 1 if "parentSlot" in block_data else 0,
blockhash=block_data.get("blockhash", ""),
parent_blockhash=block_data.get("previousBlockhash", ""),
processed=False,
)
block = Block(**block_create.model_dump())
self.db.add(block)
self.db.commit()
self.db.refresh(block)
return block
def save_transaction(self, tx_data: Dict, block_id: int) -> Transaction:
"""
Save transaction data to database
"""
signature = tx_data.get("transaction", {}).get("signatures", [""])[0]
tx_create = TransactionCreate(
signature=signature,
block_id=block_id,
fee=tx_data.get("meta", {}).get("fee", 0),
status="success" if tx_data.get("meta", {}).get("err") is None else "error",
raw_data=json.dumps(tx_data),
)
transaction = Transaction(**tx_create.model_dump())
self.db.add(transaction)
self.db.commit()
self.db.refresh(transaction)
return transaction
def save_token_transfers(self, tx_data: Dict, transaction_id: int) -> List[TokenTransfer]:
"""
Extract and save token transfers from transaction data using instruction-level parsing
"""
from app.parsers.parser_registry import parser_registry
transfers = []
# Use the parser registry to parse all instructions in the transaction
parsed_instructions = parser_registry.parse_transaction_instructions(tx_data)
# Extract token transfers from parsed instructions using each parser
for instruction in parsed_instructions:
program_id = instruction.get("program_id", "")
parser = parser_registry.get_parser_for_program(program_id)
if not parser:
continue
# Extract transfer info if the parser is a TransferParser
if hasattr(parser, "extract_transfer_info"):
try:
token_address, from_address, to_address, amount = parser.extract_transfer_info(instruction)
if token_address and from_address and to_address and amount is not None and amount > 0:
# Create token transfer record
transfer_create = TokenTransferCreate(
transaction_id=transaction_id,
token_address=token_address,
from_address=from_address,
to_address=to_address,
amount=amount,
program_id=program_id,
)
transfer = TokenTransfer(**transfer_create.model_dump())
self.db.add(transfer)
transfers.append(transfer)
except Exception as e:
logger.error(f"Error extracting transfer info: {str(e)}")
# Fallback to pre/post balance comparison if no transfers found with instruction parsing
if not transfers and "meta" in tx_data and "postTokenBalances" in tx_data["meta"] and "preTokenBalances" in tx_data["meta"]:
logger.info("No transfers found with instruction parsing, falling back to balance comparison")
post_balances = tx_data["meta"]["postTokenBalances"]
pre_balances = tx_data["meta"]["preTokenBalances"]
# Compare pre and post balances to identify transfers
for post in post_balances:
mint = post.get("mint", "")
owner = post.get("owner", "")
# Find matching pre-balance
pre = next((p for p in pre_balances if p.get("mint") == mint and p.get("owner") == owner), None)
if pre:
# Calculate amount change
pre_amount = float(pre.get("uiTokenAmount", {}).get("uiAmount", 0) or 0)
post_amount = float(post.get("uiTokenAmount", {}).get("uiAmount", 0) or 0)
amount_change = post_amount - pre_amount
if amount_change != 0:
# Create token transfer record
transfer_create = TokenTransferCreate(
transaction_id=transaction_id,
token_address=mint,
from_address="" if amount_change > 0 else owner,
to_address=owner if amount_change > 0 else "",
amount=abs(amount_change),
program_id="", # We don't know the program from balance comparison
)
transfer = TokenTransfer(**transfer_create.model_dump())
self.db.add(transfer)
transfers.append(transfer)
if transfers:
self.db.commit()
for transfer in transfers:
self.db.refresh(transfer)
return transfers
def save_arbitrage_event(self, arbitrage_data: Dict, transaction_id: int) -> ArbitrageEvent:
"""
Save detected arbitrage event to database
"""
event_create = ArbitrageEventCreate(
transaction_id=transaction_id,
profit_token_address=arbitrage_data["profit_token_address"],
profit_amount=arbitrage_data["profit_amount"],
profit_usd=arbitrage_data.get("profit_usd"),
path=arbitrage_data["path"],
confidence_score=arbitrage_data["confidence_score"],
)
event = ArbitrageEvent(**event_create.model_dump())
self.db.add(event)
self.db.commit()
self.db.refresh(event)
return event
def save_instructions(self, tx_data: Dict, transaction_id: int) -> List[Instruction]:
"""
Extract and save instruction data from a transaction
"""
from app.parsers.parser_registry import parser_registry
instructions = []
# Extract message and accounts from transaction
if not tx_data or "transaction" not in tx_data or "message" not in tx_data["transaction"]:
return instructions
tx_message = tx_data["transaction"]["message"]
accounts = tx_message.get("accountKeys", [])
# Process each instruction
for idx, instruction in enumerate(tx_message.get("instructions", [])):
program_id = instruction.get("programId", "")
parser = parser_registry.get_parser_for_program(program_id)
# Parse instruction data
instruction_data = instruction.get("data", "")
accounts_list = [accounts[idx] for idx in instruction.get("accounts", []) if idx < len(accounts)]
# Create instruction record
instruction_type = "unknown"
parsed_data = None
if parser:
try:
# Decode instruction data from base64
try:
import base64
raw_data = base64.b64decode(instruction_data)
except Exception:
raw_data = b""
# Parse instruction with appropriate parser
parsed_instruction = parser.parse_instruction(instruction, accounts, raw_data)
instruction_type = parsed_instruction.get("type", "unknown")
parsed_data = parsed_instruction
except Exception as e:
logger.error(f"Error parsing instruction: {str(e)}")
# Create instruction record
from app.schemas.schemas import InstructionCreate
instruction_create = InstructionCreate(
transaction_id=transaction_id,
program_id=program_id,
instruction_type=instruction_type,
instruction_index=idx,
accounts=json.dumps(accounts_list),
data=instruction_data,
parsed_data=parsed_data,
)
instruction_obj = Instruction(**instruction_create.model_dump())
self.db.add(instruction_obj)
instructions.append(instruction_obj)
if instructions:
self.db.commit()
for instruction in instructions:
self.db.refresh(instruction)
return instructions
def save_swaps(self, tx_data: Dict, transaction_id: int, instructions: List[Instruction]) -> List[Swap]:
"""
Extract and save swap data from a transaction
"""
from app.parsers.parser_registry import parser_registry
swaps = []
# Get parsed instructions
parsed_instructions = parser_registry.parse_transaction_instructions(tx_data)
# Map instruction indices to IDs
instruction_id_map = {inst.instruction_index: inst.id for inst in instructions if inst.instruction_index is not None}
# Extract swap information from parsed instructions
for idx, parsed_instruction in enumerate(parsed_instructions):
program_id = parsed_instruction.get("program_id", "")
parser = parser_registry.get_parser_for_program(program_id)
if not parser or not hasattr(parser, "extract_swap_info"):
continue
try:
token_in, token_out, amount_in, amount_out = parser.extract_swap_info(parsed_instruction)
if token_in and token_out and amount_in is not None and amount_out is not None:
instruction_id = instruction_id_map.get(idx)
if not instruction_id:
continue
# Create swap record
from app.schemas.schemas import SwapCreate
swap_create = SwapCreate(
transaction_id=transaction_id,
instruction_id=instruction_id,
program_id=program_id,
token_in_address=token_in,
token_out_address=token_out,
amount_in=amount_in,
amount_out=amount_out,
user_account=parsed_instruction.get("user_wallet", ""),
)
swap = Swap(**swap_create.model_dump())
self.db.add(swap)
swaps.append(swap)
except Exception as e:
logger.error(f"Error extracting swap info: {str(e)}")
if swaps:
self.db.commit()
for swap in swaps:
self.db.refresh(swap)
return swaps
def process_transaction(self, tx_data: Dict, block_id: int) -> Tuple[Transaction, Optional[ArbitrageEvent]]:
"""
Process a single transaction, looking for arbitrage
"""
# Save transaction record
transaction = self.save_transaction(tx_data, block_id)
# Extract and save instructions
instructions = self.save_instructions(tx_data, transaction.id)
# Extract and save swaps
swaps = self.save_swaps(tx_data, transaction.id, instructions)
# Extract and save token transfers
self.save_token_transfers(tx_data, transaction.id)
# Analyze for arbitrage
is_arbitrage, arbitrage_data = self.solana_client.analyze_transaction_for_arbitrage(tx_data)
arbitrage_event = None
if is_arbitrage and arbitrage_data:
# Add swap sequence if available
if swaps:
swap_ids = [swap.id for swap in swaps]
arbitrage_data["swap_sequence"] = json.dumps(swap_ids)
arbitrage_event = self.save_arbitrage_event(arbitrage_data, transaction.id)
logger.info(f"Detected arbitrage in transaction {transaction.signature}")
return transaction, arbitrage_event
def process_block(self, block_data: Dict) -> Tuple[Block, List[ArbitrageEvent]]:
"""
Process a single block, looking for arbitrage transactions
"""
# Save block record
block = self.save_block(block_data)
arbitrage_events = []
# Process each transaction in the block
if "transactions" in block_data:
for tx_data in block_data["transactions"]:
_, arbitrage_event = self.process_transaction(tx_data, block.slot)
if arbitrage_event:
arbitrage_events.append(arbitrage_event)
# Mark block as processed
block.processed = True
self.db.commit()
self.db.refresh(block)
return block, arbitrage_events
def scan_blocks(self, num_blocks: int = None, start_slot: int = None) -> List[ArbitrageEvent]:
"""
Scan the specified number of latest blocks for arbitrage
"""
if self._scan_in_progress:
logger.warning("Scan already in progress, ignoring request")
return []
self._scan_in_progress = True
all_events = []
try:
num_blocks = num_blocks or settings.SOLANA_BLOCKS_TO_SCAN
# Get latest block if start_slot not provided
if start_slot is None:
latest_block = self.solana_client.get_latest_block()
# Get block at that slot
block_data = self.solana_client.get_block(latest_block["lastValidBlockHeight"])
start_slot = block_data["parentSlot"]
# Get list of blocks to scan
end_slot = start_slot - num_blocks
if end_slot < 0:
end_slot = 0
blocks_to_scan = self.solana_client.get_blocks(end_slot, start_slot)
# Scan each block
for slot in blocks_to_scan:
try:
block_data = self.solana_client.get_block(slot)
_, events = self.process_block(block_data)
all_events.extend(events)
except Exception as e:
logger.error(f"Error processing block {slot}: {str(e)}")
continue
except Exception as e:
logger.error(f"Error scanning blocks: {str(e)}")
finally:
self._scan_in_progress = False
return all_events
def get_scan_status(self) -> Dict:
"""
Get the current status of the block scanners
"""
last_block = self.db.query(Block).order_by(Block.slot.desc()).first()
last_scan_time = None
last_scanned_block = 0
if last_block:
last_scan_time = last_block.timestamp
last_scanned_block = last_block.slot
arbitrage_count = self.db.query(ArbitrageEvent).count()
return {
"last_scanned_block": last_scanned_block,
"last_scan_time": last_scan_time or datetime.utcnow(),
"arbitrage_events_count": arbitrage_count,
"scan_in_progress": self._scan_in_progress,
}