
- Create project structure with FastAPI - Add database models for blocks, transactions, arbitrages, pools, and DEXes - Implement Solana RPC client for fetching blockchain data - Create arbitrage detection algorithm - Implement comprehensive API endpoints for analytics - Set up database migrations with Alembic - Add detailed project documentation generated with BackendIM... (backend.im) Co-Authored-By: Claude <noreply@anthropic.com>
204 lines
8.3 KiB
Python
204 lines
8.3 KiB
Python
import time
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Any, Tuple, Set
|
|
|
|
from loguru import logger
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.config import settings
|
|
from app.services.solana_client import SolanaRpcClient
|
|
from app.models.block import Block
|
|
from app.models.transaction import Transaction
|
|
from app.schemas.block import BlockCreate
|
|
from app.schemas.transaction import TransactionCreate
|
|
|
|
|
|
class BlockchainFetcher:
|
|
"""Service for fetching and processing blockchain data."""
|
|
|
|
def __init__(self, db: Session):
|
|
"""Initialize blockchain fetcher service."""
|
|
self.db = db
|
|
self.solana_client = SolanaRpcClient()
|
|
|
|
def fetch_latest_blocks(self, num_blocks: Optional[int] = None) -> List[Block]:
|
|
"""Fetch the latest blocks from the Solana blockchain."""
|
|
num_blocks = num_blocks or settings.BLOCKS_TO_FETCH
|
|
|
|
try:
|
|
latest_height = self.solana_client.get_latest_block_height()
|
|
logger.info(f"Latest block height: {latest_height}")
|
|
|
|
# Get the last processed block from the database
|
|
last_processed_block = self.db.query(Block).order_by(Block.block_height.desc()).first()
|
|
|
|
if last_processed_block:
|
|
start_slot = last_processed_block.slot + 1
|
|
else:
|
|
# If no blocks in DB, start from a recent block (latest - num_blocks)
|
|
start_slot = max(0, latest_height - num_blocks)
|
|
|
|
end_slot = start_slot + num_blocks
|
|
logger.info(f"Fetching blocks from slot {start_slot} to {end_slot}")
|
|
|
|
# Get confirmed slots in the range
|
|
slots = self.solana_client.get_blocks_in_range(start_slot, end_slot)
|
|
logger.info(f"Found {len(slots)} confirmed blocks in range")
|
|
|
|
# Fetch and process each block
|
|
fetched_blocks = []
|
|
for slot in slots:
|
|
block_data = self.solana_client.get_block(slot)
|
|
if block_data:
|
|
block = self._process_block(block_data)
|
|
if block:
|
|
fetched_blocks.append(block)
|
|
|
|
logger.info(f"Successfully processed {len(fetched_blocks)} blocks")
|
|
return fetched_blocks
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching latest blocks: {str(e)}")
|
|
return []
|
|
|
|
def _process_block(self, block_data: Dict[str, Any]) -> Optional[Block]:
|
|
"""Process raw block data and store in database."""
|
|
try:
|
|
# Extract block information
|
|
block_height = block_data.get("blockHeight")
|
|
block_hash = block_data.get("blockhash")
|
|
parent_hash = block_data.get("previousBlockhash")
|
|
slot = block_data.get("parentSlot", 0) + 1 # parentSlot is the previous slot
|
|
|
|
# Convert block timestamp to datetime if available
|
|
block_time = None
|
|
if "blockTime" in block_data:
|
|
block_time = datetime.fromtimestamp(block_data["blockTime"])
|
|
|
|
# Get transactions
|
|
transactions = block_data.get("transactions", [])
|
|
successful_txs = sum(1 for tx in transactions if tx.get("meta", {}).get("err") is None)
|
|
|
|
# Create block record
|
|
block_in = BlockCreate(
|
|
block_height=block_height,
|
|
block_hash=block_hash,
|
|
parent_block_hash=parent_hash,
|
|
slot=slot,
|
|
block_time=block_time,
|
|
transactions_count=len(transactions),
|
|
successful_transactions_count=successful_txs
|
|
)
|
|
|
|
# Check if block already exists
|
|
existing_block = self.db.query(Block).filter(Block.block_hash == block_hash).first()
|
|
if existing_block:
|
|
logger.debug(f"Block {block_hash} already exists in database")
|
|
return existing_block
|
|
|
|
# Create new block
|
|
db_block = Block(**block_in.model_dump())
|
|
self.db.add(db_block)
|
|
self.db.commit()
|
|
self.db.refresh(db_block)
|
|
|
|
# Process transactions
|
|
self._process_transactions(db_block.id, transactions)
|
|
|
|
return db_block
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing block: {str(e)}")
|
|
self.db.rollback()
|
|
return None
|
|
|
|
def _process_transactions(self, block_id: int, transactions: List[Dict[str, Any]]) -> None:
|
|
"""Process and store transactions from a block."""
|
|
for tx_data in transactions:
|
|
try:
|
|
# Extract transaction data
|
|
tx_meta = tx_data.get("meta", {})
|
|
tx = tx_data.get("transaction", {})
|
|
|
|
if not tx:
|
|
continue
|
|
|
|
# Get transaction hash
|
|
tx_hash = tx.get("signatures", [""])[0]
|
|
|
|
# Check if transaction already exists
|
|
existing_tx = self.db.query(Transaction).filter(Transaction.transaction_hash == tx_hash).first()
|
|
if existing_tx:
|
|
logger.debug(f"Transaction {tx_hash} already exists in database")
|
|
continue
|
|
|
|
# Extract accounts involved
|
|
accounts = []
|
|
if "message" in tx and "accountKeys" in tx["message"]:
|
|
accounts = tx["message"]["accountKeys"]
|
|
|
|
# Extract program IDs
|
|
program_ids = set()
|
|
if "message" in tx and "instructions" in tx["message"]:
|
|
for ix in tx["message"]["instructions"]:
|
|
if "programIdIndex" in ix:
|
|
program_idx = ix["programIdIndex"]
|
|
if program_idx < len(accounts):
|
|
program_ids.add(accounts[program_idx])
|
|
|
|
# Create transaction record
|
|
tx_in = TransactionCreate(
|
|
block_id=block_id,
|
|
transaction_hash=tx_hash,
|
|
signature=tx_hash,
|
|
slot=tx_data.get("slot", 0),
|
|
success=tx_meta.get("err") is None,
|
|
fee=tx_meta.get("fee", 0),
|
|
fee_payer=accounts[0] if accounts else None,
|
|
program_ids=list(program_ids) if program_ids else None,
|
|
accounts=accounts,
|
|
raw_data=None # We don't store full raw data to save space
|
|
)
|
|
|
|
db_tx = Transaction(**tx_in.model_dump())
|
|
self.db.add(db_tx)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing transaction: {str(e)}")
|
|
continue
|
|
|
|
self.db.commit()
|
|
|
|
def run_continuous_fetcher(self, polling_interval: Optional[int] = None):
|
|
"""Run a continuous fetching process in a blocking loop."""
|
|
polling_interval = polling_interval or settings.POLLING_INTERVAL
|
|
|
|
logger.info(f"Starting continuous blockchain fetcher with {polling_interval}s interval")
|
|
|
|
try:
|
|
while True:
|
|
start_time = time.time()
|
|
|
|
# Fetch latest blocks
|
|
blocks = self.fetch_latest_blocks()
|
|
logger.info(f"Fetched {len(blocks)} new blocks")
|
|
|
|
# Calculate time to sleep
|
|
elapsed = time.time() - start_time
|
|
sleep_time = max(0, polling_interval - elapsed)
|
|
|
|
if sleep_time > 0:
|
|
logger.debug(f"Sleeping for {sleep_time:.2f} seconds")
|
|
time.sleep(sleep_time)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Blockchain fetcher stopped by user")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in continuous blockchain fetcher: {str(e)}")
|
|
raise
|
|
|
|
finally:
|
|
if hasattr(self, 'solana_client'):
|
|
self.solana_client.close()
|
|
logger.info("Solana client connection closed") |