import json from datetime import datetime from typing import Dict, List, Optional, Any, Tuple, Union import httpx from loguru import logger from solana.rpc.api import Client as SolanaClient from app.core.config import settings class SolanaRpcClient: """Client for interacting with Solana blockchain via RPC.""" def __init__(self, rpc_url: Optional[str] = None): """Initialize Solana RPC client.""" self.rpc_url = rpc_url or settings.SOLANA_RPC_URL self.client = SolanaClient(self.rpc_url) self.http_client = httpx.Client(timeout=30.0) # For custom RPC calls def get_latest_block_height(self) -> int: """Get the current block height of the Solana blockchain.""" try: resp = self.client.get_block_height() if resp.get("result") is not None: return resp["result"] else: logger.error(f"Failed to get latest block height: {resp}") raise Exception(f"Failed to get latest block height: {resp}") except Exception as e: logger.error(f"Error getting latest block height: {str(e)}") raise def get_block(self, slot_or_block: Union[int, str]) -> Optional[Dict[str, Any]]: """Get block data by slot number or block hash.""" try: resp = self.client.get_block( slot_or_block, encoding="json", max_supported_transaction_version=0, transaction_details="full", rewards=False ) if resp.get("result") is not None: return resp["result"] else: logger.warning(f"Block not found or error for slot/hash {slot_or_block}: {resp}") return None except Exception as e: logger.error(f"Error fetching block {slot_or_block}: {str(e)}") return None def get_blocks_in_range(self, start_slot: int, end_slot: int) -> List[int]: """Get a list of confirmed blocks in the given slot range.""" try: resp = self.client.get_blocks(start_slot, end_slot) if resp.get("result") is not None: return resp["result"] else: logger.error(f"Failed to get blocks in range {start_slot}-{end_slot}: {resp}") return [] except Exception as e: logger.error(f"Error getting blocks in range {start_slot}-{end_slot}: {str(e)}") return [] def get_transaction(self, signature: str) -> Optional[Dict[str, Any]]: """Get transaction details by signature.""" try: resp = self.client.get_transaction( signature, encoding="json", max_supported_transaction_version=0 ) if resp.get("result") is not None: return resp["result"] else: logger.warning(f"Transaction not found or error for signature {signature}: {resp}") return None except Exception as e: logger.error(f"Error fetching transaction {signature}: {str(e)}") return None def get_token_accounts_by_owner(self, owner_address: str) -> List[Dict[str, Any]]: """Get all token accounts owned by the given address.""" try: resp = self.client.get_token_accounts_by_owner( owner_address, {"programId": "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"}, # SPL Token program encoding="jsonParsed" ) if resp.get("result") and resp["result"].get("value"): return resp["result"]["value"] else: logger.warning(f"No token accounts found for owner {owner_address}") return [] except Exception as e: logger.error(f"Error fetching token accounts for {owner_address}: {str(e)}") return [] def get_token_supply(self, token_mint: str) -> Optional[Dict[str, Any]]: """Get the total supply of a token.""" try: resp = self.client.get_token_supply(token_mint) if resp.get("result") is not None: return resp["result"] else: logger.warning(f"Error getting token supply for {token_mint}: {resp}") return None except Exception as e: logger.error(f"Error fetching token supply for {token_mint}: {str(e)}") return None def get_multiple_accounts(self, pubkeys: List[str]) -> List[Optional[Dict[str, Any]]]: """Get data for multiple accounts at once.""" try: resp = self.client.get_multiple_accounts( pubkeys, encoding="jsonParsed" ) if resp.get("result") and resp["result"].get("value"): return resp["result"]["value"] else: logger.warning(f"Error getting multiple accounts: {resp}") return [None] * len(pubkeys) except Exception as e: logger.error(f"Error fetching multiple accounts: {str(e)}") return [None] * len(pubkeys) def get_program_accounts( self, program_id: str, filters: Optional[List[Dict[str, Any]]] = None ) -> List[Dict[str, Any]]: """Get all accounts owned by the given program.""" try: resp = self.client.get_program_accounts( program_id, encoding="jsonParsed", filters=filters or [] ) if resp.get("result") is not None: return resp["result"] else: logger.warning(f"Error getting program accounts for {program_id}: {resp}") return [] except Exception as e: logger.error(f"Error fetching program accounts for {program_id}: {str(e)}") return [] def custom_rpc_call(self, method: str, params: List[Any]) -> Optional[Dict[str, Any]]: """Make a custom RPC call to the Solana node.""" try: payload = { "jsonrpc": "2.0", "id": 1, "method": method, "params": params } response = self.http_client.post( self.rpc_url, json=payload, headers={"Content-Type": "application/json"} ) response.raise_for_status() result = response.json() if "result" in result: return result["result"] else: logger.warning(f"Error in custom RPC call {method}: {result}") return None except Exception as e: logger.error(f"Error in custom RPC call {method}: {str(e)}") return None def close(self): """Close the HTTP client connection.""" if hasattr(self, 'http_client'): self.http_client.close()