import time from datetime import datetime from typing import Dict, List, Optional, Any, Tuple, Set from loguru import logger from sqlalchemy.orm import Session from app.core.config import settings from app.services.solana_client import SolanaRpcClient from app.models.block import Block from app.models.transaction import Transaction from app.schemas.block import BlockCreate from app.schemas.transaction import TransactionCreate class BlockchainFetcher: """Service for fetching and processing blockchain data.""" def __init__(self, db: Session): """Initialize blockchain fetcher service.""" self.db = db self.solana_client = SolanaRpcClient() def fetch_latest_blocks(self, num_blocks: Optional[int] = None) -> List[Block]: """Fetch the latest blocks from the Solana blockchain.""" num_blocks = num_blocks or settings.BLOCKS_TO_FETCH try: latest_height = self.solana_client.get_latest_block_height() logger.info(f"Latest block height: {latest_height}") # Get the last processed block from the database last_processed_block = self.db.query(Block).order_by(Block.block_height.desc()).first() if last_processed_block: start_slot = last_processed_block.slot + 1 else: # If no blocks in DB, start from a recent block (latest - num_blocks) start_slot = max(0, latest_height - num_blocks) end_slot = start_slot + num_blocks logger.info(f"Fetching blocks from slot {start_slot} to {end_slot}") # Get confirmed slots in the range slots = self.solana_client.get_blocks_in_range(start_slot, end_slot) logger.info(f"Found {len(slots)} confirmed blocks in range") # Fetch and process each block fetched_blocks = [] for slot in slots: block_data = self.solana_client.get_block(slot) if block_data: block = self._process_block(block_data) if block: fetched_blocks.append(block) logger.info(f"Successfully processed {len(fetched_blocks)} blocks") return fetched_blocks except Exception as e: logger.error(f"Error fetching latest blocks: {str(e)}") return [] def _process_block(self, block_data: Dict[str, Any]) -> Optional[Block]: """Process raw block data and store in database.""" try: # Extract block information block_height = block_data.get("blockHeight") block_hash = block_data.get("blockhash") parent_hash = block_data.get("previousBlockhash") slot = block_data.get("parentSlot", 0) + 1 # parentSlot is the previous slot # Convert block timestamp to datetime if available block_time = None if "blockTime" in block_data: block_time = datetime.fromtimestamp(block_data["blockTime"]) # Get transactions transactions = block_data.get("transactions", []) successful_txs = sum(1 for tx in transactions if tx.get("meta", {}).get("err") is None) # Create block record block_in = BlockCreate( block_height=block_height, block_hash=block_hash, parent_block_hash=parent_hash, slot=slot, block_time=block_time, transactions_count=len(transactions), successful_transactions_count=successful_txs ) # Check if block already exists existing_block = self.db.query(Block).filter(Block.block_hash == block_hash).first() if existing_block: logger.debug(f"Block {block_hash} already exists in database") return existing_block # Create new block db_block = Block(**block_in.model_dump()) self.db.add(db_block) self.db.commit() self.db.refresh(db_block) # Process transactions self._process_transactions(db_block.id, transactions) return db_block except Exception as e: logger.error(f"Error processing block: {str(e)}") self.db.rollback() return None def _process_transactions(self, block_id: int, transactions: List[Dict[str, Any]]) -> None: """Process and store transactions from a block.""" for tx_data in transactions: try: # Extract transaction data tx_meta = tx_data.get("meta", {}) tx = tx_data.get("transaction", {}) if not tx: continue # Get transaction hash tx_hash = tx.get("signatures", [""])[0] # Check if transaction already exists existing_tx = self.db.query(Transaction).filter(Transaction.transaction_hash == tx_hash).first() if existing_tx: logger.debug(f"Transaction {tx_hash} already exists in database") continue # Extract accounts involved accounts = [] if "message" in tx and "accountKeys" in tx["message"]: accounts = tx["message"]["accountKeys"] # Extract program IDs program_ids = set() if "message" in tx and "instructions" in tx["message"]: for ix in tx["message"]["instructions"]: if "programIdIndex" in ix: program_idx = ix["programIdIndex"] if program_idx < len(accounts): program_ids.add(accounts[program_idx]) # Create transaction record tx_in = TransactionCreate( block_id=block_id, transaction_hash=tx_hash, signature=tx_hash, slot=tx_data.get("slot", 0), success=tx_meta.get("err") is None, fee=tx_meta.get("fee", 0), fee_payer=accounts[0] if accounts else None, program_ids=list(program_ids) if program_ids else None, accounts=accounts, raw_data=None # We don't store full raw data to save space ) db_tx = Transaction(**tx_in.model_dump()) self.db.add(db_tx) except Exception as e: logger.error(f"Error processing transaction: {str(e)}") continue self.db.commit() def run_continuous_fetcher(self, polling_interval: Optional[int] = None): """Run a continuous fetching process in a blocking loop.""" polling_interval = polling_interval or settings.POLLING_INTERVAL logger.info(f"Starting continuous blockchain fetcher with {polling_interval}s interval") try: while True: start_time = time.time() # Fetch latest blocks blocks = self.fetch_latest_blocks() logger.info(f"Fetched {len(blocks)} new blocks") # Calculate time to sleep elapsed = time.time() - start_time sleep_time = max(0, polling_interval - elapsed) if sleep_time > 0: logger.debug(f"Sleeping for {sleep_time:.2f} seconds") time.sleep(sleep_time) except KeyboardInterrupt: logger.info("Blockchain fetcher stopped by user") except Exception as e: logger.error(f"Error in continuous blockchain fetcher: {str(e)}") raise finally: if hasattr(self, 'solana_client'): self.solana_client.close() logger.info("Solana client connection closed")