import json import logging from datetime import datetime from typing import Dict, List, Optional, Tuple from sqlalchemy.orm import Session from app.core.config import settings from app.models.models import ArbitrageEvent, Block, TokenTransfer, Transaction from app.schemas.schemas import (ArbitrageEventCreate, BlockCreate, TokenTransferCreate, TransactionCreate) from app.services.solana_client import SolanaClient logger = logging.getLogger(__name__) class BlockScanner: """ Scanner service for processing Solana blocks and detecting arbitrage """ def __init__(self, db: Session, solana_client: Optional[SolanaClient] = None): """ Initialize scanner with database session and Solana client """ self.db = db self.solana_client = solana_client or SolanaClient() self._scan_in_progress = False @property def scan_in_progress(self) -> bool: """ Check if a scan is currently in progress """ return self._scan_in_progress def get_last_scanned_block(self) -> Optional[int]: """ Get the latest block that was scanned """ latest_block = self.db.query(Block).order_by(Block.slot.desc()).first() if latest_block: return latest_block.slot return None def save_block(self, block_data: Dict) -> Block: """ Save block data to database """ block_create = BlockCreate( slot=block_data["parentSlot"] + 1 if "parentSlot" in block_data else 0, blockhash=block_data.get("blockhash", ""), parent_blockhash=block_data.get("previousBlockhash", ""), processed=False, ) block = Block(**block_create.model_dump()) self.db.add(block) self.db.commit() self.db.refresh(block) return block def save_transaction(self, tx_data: Dict, block_id: int) -> Transaction: """ Save transaction data to database """ signature = tx_data.get("transaction", {}).get("signatures", [""])[0] tx_create = TransactionCreate( signature=signature, block_id=block_id, fee=tx_data.get("meta", {}).get("fee", 0), status="success" if tx_data.get("meta", {}).get("err") is None else "error", raw_data=json.dumps(tx_data), ) transaction = Transaction(**tx_create.model_dump()) self.db.add(transaction) self.db.commit() self.db.refresh(transaction) return transaction def save_token_transfers(self, tx_data: Dict, transaction_id: int) -> List[TokenTransfer]: """ Extract and save token transfers from transaction data """ # This would require parsing the transaction logs and instruction data # For simplicity, we'll just create a placeholder function # In a real implementation, we would extract token transfer info # from the transaction instructions and logs transfers = [] # Placeholder for demonstration purposes if "meta" in tx_data and "postTokenBalances" in tx_data["meta"] and "preTokenBalances" in tx_data["meta"]: post_balances = tx_data["meta"]["postTokenBalances"] pre_balances = tx_data["meta"]["preTokenBalances"] # Compare pre and post balances to identify transfers # This is a simplified approach for post in post_balances: mint = post.get("mint", "") owner = post.get("owner", "") # Find matching pre-balance pre = next((p for p in pre_balances if p.get("mint") == mint and p.get("owner") == owner), None) if pre: # Calculate amount change pre_amount = float(pre.get("uiTokenAmount", {}).get("uiAmount", 0) or 0) post_amount = float(post.get("uiTokenAmount", {}).get("uiAmount", 0) or 0) amount_change = post_amount - pre_amount if amount_change != 0: # Create token transfer record transfer_create = TokenTransferCreate( transaction_id=transaction_id, token_address=mint, from_address="" if amount_change > 0 else owner, to_address=owner if amount_change > 0 else "", amount=abs(amount_change), program_id=post.get("programId", ""), ) transfer = TokenTransfer(**transfer_create.model_dump()) self.db.add(transfer) transfers.append(transfer) if transfers: self.db.commit() for transfer in transfers: self.db.refresh(transfer) return transfers def save_arbitrage_event(self, arbitrage_data: Dict, transaction_id: int) -> ArbitrageEvent: """ Save detected arbitrage event to database """ event_create = ArbitrageEventCreate( transaction_id=transaction_id, profit_token_address=arbitrage_data["profit_token_address"], profit_amount=arbitrage_data["profit_amount"], profit_usd=arbitrage_data.get("profit_usd"), path=arbitrage_data["path"], confidence_score=arbitrage_data["confidence_score"], ) event = ArbitrageEvent(**event_create.model_dump()) self.db.add(event) self.db.commit() self.db.refresh(event) return event def process_transaction(self, tx_data: Dict, block_id: int) -> Tuple[Transaction, Optional[ArbitrageEvent]]: """ Process a single transaction, looking for arbitrage """ # Save transaction record transaction = self.save_transaction(tx_data, block_id) # Extract and save token transfers self.save_token_transfers(tx_data, transaction.id) # Analyze for arbitrage is_arbitrage, arbitrage_data = self.solana_client.analyze_transaction_for_arbitrage(tx_data) arbitrage_event = None if is_arbitrage and arbitrage_data: arbitrage_event = self.save_arbitrage_event(arbitrage_data, transaction.id) logger.info(f"Detected arbitrage in transaction {transaction.signature}") return transaction, arbitrage_event def process_block(self, block_data: Dict) -> Tuple[Block, List[ArbitrageEvent]]: """ Process a single block, looking for arbitrage transactions """ # Save block record block = self.save_block(block_data) arbitrage_events = [] # Process each transaction in the block if "transactions" in block_data: for tx_data in block_data["transactions"]: _, arbitrage_event = self.process_transaction(tx_data, block.slot) if arbitrage_event: arbitrage_events.append(arbitrage_event) # Mark block as processed block.processed = True self.db.commit() self.db.refresh(block) return block, arbitrage_events def scan_blocks(self, num_blocks: int = None, start_slot: int = None) -> List[ArbitrageEvent]: """ Scan the specified number of latest blocks for arbitrage """ if self._scan_in_progress: logger.warning("Scan already in progress, ignoring request") return [] self._scan_in_progress = True all_events = [] try: num_blocks = num_blocks or settings.SOLANA_BLOCKS_TO_SCAN # Get latest block if start_slot not provided if start_slot is None: latest_block = self.solana_client.get_latest_block() # Get block at that slot block_data = self.solana_client.get_block(latest_block["lastValidBlockHeight"]) start_slot = block_data["parentSlot"] # Get list of blocks to scan end_slot = start_slot - num_blocks if end_slot < 0: end_slot = 0 blocks_to_scan = self.solana_client.get_blocks(end_slot, start_slot) # Scan each block for slot in blocks_to_scan: try: block_data = self.solana_client.get_block(slot) _, events = self.process_block(block_data) all_events.extend(events) except Exception as e: logger.error(f"Error processing block {slot}: {str(e)}") continue except Exception as e: logger.error(f"Error scanning blocks: {str(e)}") finally: self._scan_in_progress = False return all_events def get_scan_status(self) -> Dict: """ Get the current status of the block scanners """ last_block = self.db.query(Block).order_by(Block.slot.desc()).first() last_scan_time = None last_scanned_block = 0 if last_block: last_scan_time = last_block.timestamp last_scanned_block = last_block.slot arbitrage_count = self.db.query(ArbitrageEvent).count() return { "last_scanned_block": last_scanned_block, "last_scan_time": last_scan_time or datetime.utcnow(), "arbitrage_events_count": arbitrage_count, "scan_in_progress": self._scan_in_progress, }