diff --git a/app/services/scanner.py b/app/services/scanner.py index 65a8ff9..d99f6c2 100644 --- a/app/services/scanner.py +++ b/app/services/scanner.py @@ -6,8 +6,7 @@ from typing import Dict, List, Optional, Tuple from sqlalchemy.orm import Session from app.core.config import settings -from app.models.models import (ArbitrageEvent, Block, Instruction, Swap, - TokenTransfer, Transaction) +from app.models.models import ArbitrageEvent, Block, TokenTransfer, Transaction from app.schemas.schemas import (ArbitrageEventCreate, BlockCreate, TokenTransferCreate, TransactionCreate) from app.services.solana_client import SolanaClient @@ -85,52 +84,22 @@ class BlockScanner: def save_token_transfers(self, tx_data: Dict, transaction_id: int) -> List[TokenTransfer]: """ - Extract and save token transfers from transaction data using instruction-level parsing + Extract and save token transfers from transaction data """ - from app.parsers.parser_registry import parser_registry + # This would require parsing the transaction logs and instruction data + # For simplicity, we'll just create a placeholder function + + # In a real implementation, we would extract token transfer info + # from the transaction instructions and logs transfers = [] - - # Use the parser registry to parse all instructions in the transaction - parsed_instructions = parser_registry.parse_transaction_instructions(tx_data) - - # Extract token transfers from parsed instructions using each parser - for instruction in parsed_instructions: - program_id = instruction.get("program_id", "") - parser = parser_registry.get_parser_for_program(program_id) - - if not parser: - continue - - # Extract transfer info if the parser is a TransferParser - if hasattr(parser, "extract_transfer_info"): - try: - token_address, from_address, to_address, amount = parser.extract_transfer_info(instruction) - - if token_address and from_address and to_address and amount is not None and amount > 0: - # Create token transfer record - transfer_create = TokenTransferCreate( - transaction_id=transaction_id, - token_address=token_address, - from_address=from_address, - to_address=to_address, - amount=amount, - program_id=program_id, - ) - - transfer = TokenTransfer(**transfer_create.model_dump()) - self.db.add(transfer) - transfers.append(transfer) - except Exception as e: - logger.error(f"Error extracting transfer info: {str(e)}") - - # Fallback to pre/post balance comparison if no transfers found with instruction parsing - if not transfers and "meta" in tx_data and "postTokenBalances" in tx_data["meta"] and "preTokenBalances" in tx_data["meta"]: - logger.info("No transfers found with instruction parsing, falling back to balance comparison") + # Placeholder for demonstration purposes + if "meta" in tx_data and "postTokenBalances" in tx_data["meta"] and "preTokenBalances" in tx_data["meta"]: post_balances = tx_data["meta"]["postTokenBalances"] pre_balances = tx_data["meta"]["preTokenBalances"] # Compare pre and post balances to identify transfers + # This is a simplified approach for post in post_balances: mint = post.get("mint", "") owner = post.get("owner", "") @@ -152,7 +121,7 @@ class BlockScanner: from_address="" if amount_change > 0 else owner, to_address=owner if amount_change > 0 else "", amount=abs(amount_change), - program_id="", # We don't know the program from balance comparison + program_id=post.get("programId", ""), ) transfer = TokenTransfer(**transfer_create.model_dump()) @@ -186,129 +155,6 @@ class BlockScanner: return event - def save_instructions(self, tx_data: Dict, transaction_id: int) -> List[Instruction]: - """ - Extract and save instruction data from a transaction - """ - from app.parsers.parser_registry import parser_registry - - instructions = [] - - # Extract message and accounts from transaction - if not tx_data or "transaction" not in tx_data or "message" not in tx_data["transaction"]: - return instructions - - tx_message = tx_data["transaction"]["message"] - accounts = tx_message.get("accountKeys", []) - - # Process each instruction - for idx, instruction in enumerate(tx_message.get("instructions", [])): - program_id = instruction.get("programId", "") - parser = parser_registry.get_parser_for_program(program_id) - - # Parse instruction data - instruction_data = instruction.get("data", "") - accounts_list = [accounts[idx] for idx in instruction.get("accounts", []) if idx < len(accounts)] - - # Create instruction record - instruction_type = "unknown" - parsed_data = None - - if parser: - try: - # Decode instruction data from base64 - try: - import base64 - raw_data = base64.b64decode(instruction_data) - except Exception: - raw_data = b"" - - # Parse instruction with appropriate parser - parsed_instruction = parser.parse_instruction(instruction, accounts, raw_data) - instruction_type = parsed_instruction.get("type", "unknown") - parsed_data = parsed_instruction - except Exception as e: - logger.error(f"Error parsing instruction: {str(e)}") - - # Create instruction record - from app.schemas.schemas import InstructionCreate - instruction_create = InstructionCreate( - transaction_id=transaction_id, - program_id=program_id, - instruction_type=instruction_type, - instruction_index=idx, - accounts=json.dumps(accounts_list), - data=instruction_data, - parsed_data=parsed_data, - ) - - instruction_obj = Instruction(**instruction_create.model_dump()) - self.db.add(instruction_obj) - instructions.append(instruction_obj) - - if instructions: - self.db.commit() - for instruction in instructions: - self.db.refresh(instruction) - - return instructions - - def save_swaps(self, tx_data: Dict, transaction_id: int, instructions: List[Instruction]) -> List[Swap]: - """ - Extract and save swap data from a transaction - """ - from app.parsers.parser_registry import parser_registry - - swaps = [] - - # Get parsed instructions - parsed_instructions = parser_registry.parse_transaction_instructions(tx_data) - - # Map instruction indices to IDs - instruction_id_map = {inst.instruction_index: inst.id for inst in instructions if inst.instruction_index is not None} - - # Extract swap information from parsed instructions - for idx, parsed_instruction in enumerate(parsed_instructions): - program_id = parsed_instruction.get("program_id", "") - parser = parser_registry.get_parser_for_program(program_id) - - if not parser or not hasattr(parser, "extract_swap_info"): - continue - - try: - token_in, token_out, amount_in, amount_out = parser.extract_swap_info(parsed_instruction) - - if token_in and token_out and amount_in is not None and amount_out is not None: - instruction_id = instruction_id_map.get(idx) - if not instruction_id: - continue - - # Create swap record - from app.schemas.schemas import SwapCreate - swap_create = SwapCreate( - transaction_id=transaction_id, - instruction_id=instruction_id, - program_id=program_id, - token_in_address=token_in, - token_out_address=token_out, - amount_in=amount_in, - amount_out=amount_out, - user_account=parsed_instruction.get("user_wallet", ""), - ) - - swap = Swap(**swap_create.model_dump()) - self.db.add(swap) - swaps.append(swap) - except Exception as e: - logger.error(f"Error extracting swap info: {str(e)}") - - if swaps: - self.db.commit() - for swap in swaps: - self.db.refresh(swap) - - return swaps - def process_transaction(self, tx_data: Dict, block_id: int) -> Tuple[Transaction, Optional[ArbitrageEvent]]: """ Process a single transaction, looking for arbitrage @@ -316,12 +162,6 @@ class BlockScanner: # Save transaction record transaction = self.save_transaction(tx_data, block_id) - # Extract and save instructions - instructions = self.save_instructions(tx_data, transaction.id) - - # Extract and save swaps - swaps = self.save_swaps(tx_data, transaction.id, instructions) - # Extract and save token transfers self.save_token_transfers(tx_data, transaction.id) @@ -330,11 +170,6 @@ class BlockScanner: arbitrage_event = None if is_arbitrage and arbitrage_data: - # Add swap sequence if available - if swaps: - swap_ids = [swap.id for swap in swaps] - arbitrage_data["swap_sequence"] = json.dumps(swap_ids) - arbitrage_event = self.save_arbitrage_event(arbitrage_data, transaction.id) logger.info(f"Detected arbitrage in transaction {transaction.signature}")