Automated Action 5bb78bd9be Implement Solana arbitrage analytics backend
- Create project structure with FastAPI
- Add database models for blocks, transactions, arbitrages, pools, and DEXes
- Implement Solana RPC client for fetching blockchain data
- Create arbitrage detection algorithm
- Implement comprehensive API endpoints for analytics
- Set up database migrations with Alembic
- Add detailed project documentation

generated with BackendIM... (backend.im)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-05-12 14:13:06 +00:00

204 lines
8.3 KiB
Python

import time
from datetime import datetime
from typing import Dict, List, Optional, Any, Tuple, Set
from loguru import logger
from sqlalchemy.orm import Session
from app.core.config import settings
from app.services.solana_client import SolanaRpcClient
from app.models.block import Block
from app.models.transaction import Transaction
from app.schemas.block import BlockCreate
from app.schemas.transaction import TransactionCreate
class BlockchainFetcher:
"""Service for fetching and processing blockchain data."""
def __init__(self, db: Session):
"""Initialize blockchain fetcher service."""
self.db = db
self.solana_client = SolanaRpcClient()
def fetch_latest_blocks(self, num_blocks: Optional[int] = None) -> List[Block]:
"""Fetch the latest blocks from the Solana blockchain."""
num_blocks = num_blocks or settings.BLOCKS_TO_FETCH
try:
latest_height = self.solana_client.get_latest_block_height()
logger.info(f"Latest block height: {latest_height}")
# Get the last processed block from the database
last_processed_block = self.db.query(Block).order_by(Block.block_height.desc()).first()
if last_processed_block:
start_slot = last_processed_block.slot + 1
else:
# If no blocks in DB, start from a recent block (latest - num_blocks)
start_slot = max(0, latest_height - num_blocks)
end_slot = start_slot + num_blocks
logger.info(f"Fetching blocks from slot {start_slot} to {end_slot}")
# Get confirmed slots in the range
slots = self.solana_client.get_blocks_in_range(start_slot, end_slot)
logger.info(f"Found {len(slots)} confirmed blocks in range")
# Fetch and process each block
fetched_blocks = []
for slot in slots:
block_data = self.solana_client.get_block(slot)
if block_data:
block = self._process_block(block_data)
if block:
fetched_blocks.append(block)
logger.info(f"Successfully processed {len(fetched_blocks)} blocks")
return fetched_blocks
except Exception as e:
logger.error(f"Error fetching latest blocks: {str(e)}")
return []
def _process_block(self, block_data: Dict[str, Any]) -> Optional[Block]:
"""Process raw block data and store in database."""
try:
# Extract block information
block_height = block_data.get("blockHeight")
block_hash = block_data.get("blockhash")
parent_hash = block_data.get("previousBlockhash")
slot = block_data.get("parentSlot", 0) + 1 # parentSlot is the previous slot
# Convert block timestamp to datetime if available
block_time = None
if "blockTime" in block_data:
block_time = datetime.fromtimestamp(block_data["blockTime"])
# Get transactions
transactions = block_data.get("transactions", [])
successful_txs = sum(1 for tx in transactions if tx.get("meta", {}).get("err") is None)
# Create block record
block_in = BlockCreate(
block_height=block_height,
block_hash=block_hash,
parent_block_hash=parent_hash,
slot=slot,
block_time=block_time,
transactions_count=len(transactions),
successful_transactions_count=successful_txs
)
# Check if block already exists
existing_block = self.db.query(Block).filter(Block.block_hash == block_hash).first()
if existing_block:
logger.debug(f"Block {block_hash} already exists in database")
return existing_block
# Create new block
db_block = Block(**block_in.model_dump())
self.db.add(db_block)
self.db.commit()
self.db.refresh(db_block)
# Process transactions
self._process_transactions(db_block.id, transactions)
return db_block
except Exception as e:
logger.error(f"Error processing block: {str(e)}")
self.db.rollback()
return None
def _process_transactions(self, block_id: int, transactions: List[Dict[str, Any]]) -> None:
"""Process and store transactions from a block."""
for tx_data in transactions:
try:
# Extract transaction data
tx_meta = tx_data.get("meta", {})
tx = tx_data.get("transaction", {})
if not tx:
continue
# Get transaction hash
tx_hash = tx.get("signatures", [""])[0]
# Check if transaction already exists
existing_tx = self.db.query(Transaction).filter(Transaction.transaction_hash == tx_hash).first()
if existing_tx:
logger.debug(f"Transaction {tx_hash} already exists in database")
continue
# Extract accounts involved
accounts = []
if "message" in tx and "accountKeys" in tx["message"]:
accounts = tx["message"]["accountKeys"]
# Extract program IDs
program_ids = set()
if "message" in tx and "instructions" in tx["message"]:
for ix in tx["message"]["instructions"]:
if "programIdIndex" in ix:
program_idx = ix["programIdIndex"]
if program_idx < len(accounts):
program_ids.add(accounts[program_idx])
# Create transaction record
tx_in = TransactionCreate(
block_id=block_id,
transaction_hash=tx_hash,
signature=tx_hash,
slot=tx_data.get("slot", 0),
success=tx_meta.get("err") is None,
fee=tx_meta.get("fee", 0),
fee_payer=accounts[0] if accounts else None,
program_ids=list(program_ids) if program_ids else None,
accounts=accounts,
raw_data=None # We don't store full raw data to save space
)
db_tx = Transaction(**tx_in.model_dump())
self.db.add(db_tx)
except Exception as e:
logger.error(f"Error processing transaction: {str(e)}")
continue
self.db.commit()
def run_continuous_fetcher(self, polling_interval: Optional[int] = None):
"""Run a continuous fetching process in a blocking loop."""
polling_interval = polling_interval or settings.POLLING_INTERVAL
logger.info(f"Starting continuous blockchain fetcher with {polling_interval}s interval")
try:
while True:
start_time = time.time()
# Fetch latest blocks
blocks = self.fetch_latest_blocks()
logger.info(f"Fetched {len(blocks)} new blocks")
# Calculate time to sleep
elapsed = time.time() - start_time
sleep_time = max(0, polling_interval - elapsed)
if sleep_time > 0:
logger.debug(f"Sleeping for {sleep_time:.2f} seconds")
time.sleep(sleep_time)
except KeyboardInterrupt:
logger.info("Blockchain fetcher stopped by user")
except Exception as e:
logger.error(f"Error in continuous blockchain fetcher: {str(e)}")
raise
finally:
if hasattr(self, 'solana_client'):
self.solana_client.close()
logger.info("Solana client connection closed")