Automated Action 85b1f2a581 Enhance Solana RPC client with better error handling and debugging
- Add detailed logging throughout the Solana client and scanner
- Improve error handling in RPC client methods
- Add debug endpoints to validate Solana connection
- Add message field to scan status responses
- Enhance health endpoint with RPC connectivity status
- Handle invalid block ranges and API rate limits
2025-05-28 23:08:35 +00:00

302 lines
12 KiB
Python

import json
import logging
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from sqlalchemy.orm import Session
from app.core.config import settings
from app.models.models import ArbitrageEvent, Block, TokenTransfer, Transaction
from app.schemas.schemas import (ArbitrageEventCreate, BlockCreate,
TokenTransferCreate, TransactionCreate)
from app.services.solana_client import SolanaClient
logger = logging.getLogger(__name__)
class BlockScanner:
"""
Scanner service for processing Solana blocks and detecting arbitrage
"""
def __init__(self, db: Session, solana_client: Optional[SolanaClient] = None):
"""
Initialize scanner with database session and Solana client
"""
self.db = db
self.solana_client = solana_client or SolanaClient()
self._scan_in_progress = False
@property
def scan_in_progress(self) -> bool:
"""
Check if a scan is currently in progress
"""
return self._scan_in_progress
def get_last_scanned_block(self) -> Optional[int]:
"""
Get the latest block that was scanned
"""
latest_block = self.db.query(Block).order_by(Block.slot.desc()).first()
if latest_block:
return latest_block.slot
return None
def save_block(self, block_data: Dict) -> Block:
"""
Save block data to database
"""
block_create = BlockCreate(
slot=block_data["parentSlot"] + 1 if "parentSlot" in block_data else 0,
blockhash=block_data.get("blockhash", ""),
parent_blockhash=block_data.get("previousBlockhash", ""),
processed=False,
)
block = Block(**block_create.model_dump())
self.db.add(block)
self.db.commit()
self.db.refresh(block)
return block
def save_transaction(self, tx_data: Dict, block_id: int) -> Transaction:
"""
Save transaction data to database
"""
signature = tx_data.get("transaction", {}).get("signatures", [""])[0]
tx_create = TransactionCreate(
signature=signature,
block_id=block_id,
fee=tx_data.get("meta", {}).get("fee", 0),
status="success" if tx_data.get("meta", {}).get("err") is None else "error",
raw_data=json.dumps(tx_data),
)
transaction = Transaction(**tx_create.model_dump())
self.db.add(transaction)
self.db.commit()
self.db.refresh(transaction)
return transaction
def save_token_transfers(self, tx_data: Dict, transaction_id: int) -> List[TokenTransfer]:
"""
Extract and save token transfers from transaction data
"""
# This would require parsing the transaction logs and instruction data
# For simplicity, we'll just create a placeholder function
# In a real implementation, we would extract token transfer info
# from the transaction instructions and logs
transfers = []
# Placeholder for demonstration purposes
if "meta" in tx_data and "postTokenBalances" in tx_data["meta"] and "preTokenBalances" in tx_data["meta"]:
post_balances = tx_data["meta"]["postTokenBalances"]
pre_balances = tx_data["meta"]["preTokenBalances"]
# Compare pre and post balances to identify transfers
# This is a simplified approach
for post in post_balances:
mint = post.get("mint", "")
owner = post.get("owner", "")
# Find matching pre-balance
pre = next((p for p in pre_balances if p.get("mint") == mint and p.get("owner") == owner), None)
if pre:
# Calculate amount change
pre_amount = float(pre.get("uiTokenAmount", {}).get("uiAmount", 0) or 0)
post_amount = float(post.get("uiTokenAmount", {}).get("uiAmount", 0) or 0)
amount_change = post_amount - pre_amount
if amount_change != 0:
# Create token transfer record
transfer_create = TokenTransferCreate(
transaction_id=transaction_id,
token_address=mint,
from_address="" if amount_change > 0 else owner,
to_address=owner if amount_change > 0 else "",
amount=abs(amount_change),
program_id=post.get("programId", ""),
)
transfer = TokenTransfer(**transfer_create.model_dump())
self.db.add(transfer)
transfers.append(transfer)
if transfers:
self.db.commit()
for transfer in transfers:
self.db.refresh(transfer)
return transfers
def save_arbitrage_event(self, arbitrage_data: Dict, transaction_id: int) -> ArbitrageEvent:
"""
Save detected arbitrage event to database
"""
event_create = ArbitrageEventCreate(
transaction_id=transaction_id,
profit_token_address=arbitrage_data["profit_token_address"],
profit_amount=arbitrage_data["profit_amount"],
profit_usd=arbitrage_data.get("profit_usd"),
path=arbitrage_data["path"],
confidence_score=arbitrage_data["confidence_score"],
)
event = ArbitrageEvent(**event_create.model_dump())
self.db.add(event)
self.db.commit()
self.db.refresh(event)
return event
def process_transaction(self, tx_data: Dict, block_id: int) -> Tuple[Transaction, Optional[ArbitrageEvent]]:
"""
Process a single transaction, looking for arbitrage
"""
# Save transaction record
transaction = self.save_transaction(tx_data, block_id)
# Extract and save token transfers
self.save_token_transfers(tx_data, transaction.id)
# Analyze for arbitrage
is_arbitrage, arbitrage_data = self.solana_client.analyze_transaction_for_arbitrage(tx_data)
arbitrage_event = None
if is_arbitrage and arbitrage_data:
arbitrage_event = self.save_arbitrage_event(arbitrage_data, transaction.id)
logger.info(f"Detected arbitrage in transaction {transaction.signature}")
return transaction, arbitrage_event
def process_block(self, block_data: Dict) -> Tuple[Block, List[ArbitrageEvent]]:
"""
Process a single block, looking for arbitrage transactions
"""
# Save block record
block = self.save_block(block_data)
arbitrage_events = []
# Process each transaction in the block
if "transactions" in block_data:
for tx_data in block_data["transactions"]:
_, arbitrage_event = self.process_transaction(tx_data, block.slot)
if arbitrage_event:
arbitrage_events.append(arbitrage_event)
# Mark block as processed
block.processed = True
self.db.commit()
self.db.refresh(block)
return block, arbitrage_events
def scan_blocks(self, num_blocks: int = None, start_slot: int = None) -> List[ArbitrageEvent]:
"""
Scan the specified number of latest blocks for arbitrage
"""
if self._scan_in_progress:
logger.warning("Scan already in progress, ignoring request")
return []
self._scan_in_progress = True
all_events = []
try:
logger.info(f"Starting block scan. Num blocks: {num_blocks}, Start slot: {start_slot}")
num_blocks = num_blocks or settings.SOLANA_BLOCKS_TO_SCAN
logger.info(f"Using RPC URL: {self.solana_client.rpc_url}")
# Get latest block if start_slot not provided
if start_slot is None:
logger.info("Start slot not provided, fetching latest block")
try:
latest_block = self.solana_client.get_latest_block()
logger.info(f"Latest block data: {latest_block}")
# Get block at that slot
last_valid_height = latest_block.get("lastValidBlockHeight")
if not last_valid_height:
logger.error(f"No lastValidBlockHeight in response: {latest_block}")
raise ValueError("Invalid response format: missing lastValidBlockHeight")
logger.info(f"Fetching block at height: {last_valid_height}")
block_data = self.solana_client.get_block(last_valid_height)
if not block_data or "parentSlot" not in block_data:
logger.error(f"Invalid block data response: {block_data}")
raise ValueError("Invalid block data: missing parentSlot")
start_slot = block_data["parentSlot"]
logger.info(f"Using start_slot: {start_slot}")
except Exception as e:
logger.error(f"Error getting latest block: {str(e)}")
raise
# Get list of blocks to scan
end_slot = start_slot - num_blocks
if end_slot < 0:
end_slot = 0
logger.info(f"Getting blocks from {end_slot} to {start_slot}")
try:
blocks_to_scan = self.solana_client.get_blocks(end_slot, start_slot)
logger.info(f"Found {len(blocks_to_scan)} blocks to scan")
except Exception as e:
logger.error(f"Error getting blocks to scan: {str(e)}")
raise
# Scan each block
for slot in blocks_to_scan:
try:
logger.info(f"Processing block at slot {slot}")
block_data = self.solana_client.get_block(slot)
if not block_data:
logger.warning(f"Empty block data for slot {slot}")
continue
logger.info(f"Block {slot} has {len(block_data.get('transactions', []))} transactions")
_, events = self.process_block(block_data)
logger.info(f"Found {len(events)} arbitrage events in block {slot}")
all_events.extend(events)
except Exception as e:
logger.error(f"Error processing block {slot}: {str(e)}")
continue
except Exception as e:
logger.error(f"Error scanning blocks: {str(e)}")
finally:
self._scan_in_progress = False
return all_events
def get_scan_status(self) -> Dict:
"""
Get the current status of the block scanners
"""
last_block = self.db.query(Block).order_by(Block.slot.desc()).first()
last_scan_time = None
last_scanned_block = 0
if last_block:
last_scan_time = last_block.timestamp
last_scanned_block = last_block.slot
arbitrage_count = self.db.query(ArbitrageEvent).count()
return {
"last_scanned_block": last_scanned_block,
"last_scan_time": last_scan_time or datetime.utcnow(),
"arbitrage_events_count": arbitrage_count,
"scan_in_progress": self._scan_in_progress,
}