diff --git a/app/api/v1/endpoints/arbitrage.py b/app/api/v1/endpoints/arbitrage.py index 8df0e7a..7c8b6fe 100644 --- a/app/api/v1/endpoints/arbitrage.py +++ b/app/api/v1/endpoints/arbitrage.py @@ -1,4 +1,4 @@ -from typing import List +from typing import Dict, List from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException from sqlalchemy.orm import Session @@ -8,6 +8,7 @@ from app.models.models import ArbitrageEvent from app.schemas.schemas import (ArbitrageEventResponse, ScanRequest, ScanStatusResponse) from app.services.scanner import BlockScanner +from app.services.solana_client import SolanaClient router = APIRouter() @@ -23,6 +24,37 @@ async def scan_for_arbitrage( """ scanner = BlockScanner(db) + # Check if scan is already in progress + if scanner.scan_in_progress: + return ScanStatusResponse( + last_scanned_block=scanner.get_last_scanned_block() or 0, + last_scan_time=scanner.get_scan_status()["last_scan_time"], + arbitrage_events_count=scanner.get_scan_status()["arbitrage_events_count"], + scan_in_progress=True, + message="Scan already in progress, please wait for it to complete" + ) + + try: + # Verify Solana connection before starting scan + client = SolanaClient() + latest_block = client.get_latest_block() + if not latest_block: + return ScanStatusResponse( + last_scanned_block=scanner.get_last_scanned_block() or 0, + last_scan_time=scanner.get_scan_status()["last_scan_time"], + arbitrage_events_count=scanner.get_scan_status()["arbitrage_events_count"], + scan_in_progress=False, + message="Failed to connect to Solana RPC. Please check your connection and try again." + ) + except Exception as e: + return ScanStatusResponse( + last_scanned_block=scanner.get_last_scanned_block() or 0, + last_scan_time=scanner.get_scan_status()["last_scan_time"], + arbitrage_events_count=scanner.get_scan_status()["arbitrage_events_count"], + scan_in_progress=False, + message=f"Error connecting to Solana RPC: {str(e)}" + ) + # Add scanning task to background tasks background_tasks.add_task( scanner.scan_blocks, @@ -32,6 +64,7 @@ async def scan_for_arbitrage( # Return current scan status status = scanner.get_scan_status() + status["message"] = "Scan started successfully" return ScanStatusResponse(**status) @@ -44,6 +77,15 @@ async def get_scan_status( """ scanner = BlockScanner(db) status = scanner.get_scan_status() + + # Add helpful message based on status + if status["scan_in_progress"]: + status["message"] = "Scan is currently in progress" + elif status["last_scanned_block"] == 0: + status["message"] = "No blocks have been scanned yet. Use the /scan endpoint to start scanning." + else: + status["message"] = f"Last scan completed at {status['last_scan_time']}. Found {status['arbitrage_events_count']} arbitrage events." + return ScanStatusResponse(**status) @@ -121,4 +163,108 @@ async def get_arbitrage_event( confidence_score=event.confidence_score, detected_at=event.detected_at, block_slot=block_slot, - ) \ No newline at end of file + ) + + +@router.get("/debug/solana-connection", response_model=Dict) +async def debug_solana_connection(): + """ + Debug endpoint to test Solana RPC connection + """ + client = SolanaClient() + results = {} + + try: + # Test getting latest blockhash + latest_block = client.get_latest_block() + results["latest_blockhash"] = latest_block + + # Try to get a slot + slot_response = client.client.get_slot() + if "result" in slot_response: + results["current_slot"] = slot_response["result"] + + # Try to get a block + try: + block_data = client.get_block(slot_response["result"]) + results["block_data"] = {"has_data": block_data is not None} + if block_data: + results["block_data"]["parent_slot"] = block_data.get("parentSlot") + results["block_data"]["transactions_count"] = len(block_data.get("transactions", [])) + except Exception as e: + results["block_error"] = str(e) + + except Exception as e: + results["error"] = str(e) + + # Add RPC URL info + results["rpc_url"] = client.rpc_url + + return results + + +@router.get("/validate-connection", response_model=Dict) +async def validate_solana_connection(): + """ + Validate the Solana RPC connection and return detailed status + """ + client = SolanaClient() + result = { + "status": "success", + "rpc_url": client.rpc_url, + "connection_valid": False, + "message": "", + } + + try: + # Test 1: Get latest blockhash + latest_blockhash = client.get_latest_block() + result["blockhash_test"] = { + "success": latest_blockhash is not None, + "data": latest_blockhash if latest_blockhash else "Failed to retrieve" + } + + # Test 2: Get current slot + try: + slot_response = client.client.get_slot() + current_slot = slot_response.get("result") if "result" in slot_response else None + result["slot_test"] = { + "success": current_slot is not None, + "data": current_slot if current_slot else "Failed to retrieve" + } + + # If we have a slot, try to get a block + if current_slot: + try: + block = client.get_block(current_slot) + result["block_test"] = { + "success": block is not None, + "data": { + "has_transactions": block is not None and "transactions" in block, + "transaction_count": len(block.get("transactions", [])) if block else 0 + } + } + except Exception as e: + result["block_test"] = { + "success": False, + "error": str(e) + } + except Exception as e: + result["slot_test"] = { + "success": False, + "error": str(e) + } + + # Determine overall status + if result.get("blockhash_test", {}).get("success", False) and result.get("slot_test", {}).get("success", False): + result["connection_valid"] = True + result["message"] = "Solana RPC connection is valid and functioning properly." + else: + result["status"] = "error" + result["message"] = "Solana RPC connection is not fully functional." + + except Exception as e: + result["status"] = "error" + result["message"] = f"Error validating Solana RPC connection: {str(e)}" + + return result \ No newline at end of file diff --git a/app/schemas/schemas.py b/app/schemas/schemas.py index 2d486dd..2eb3f32 100644 --- a/app/schemas/schemas.py +++ b/app/schemas/schemas.py @@ -155,6 +155,7 @@ class ScanStatusResponse(BaseModel): last_scan_time: datetime arbitrage_events_count: int scan_in_progress: bool + message: Optional[str] = None class ScanRequest(BaseModel): diff --git a/app/services/scanner.py b/app/services/scanner.py index d99f6c2..3ccd87d 100644 --- a/app/services/scanner.py +++ b/app/services/scanner.py @@ -210,27 +210,63 @@ class BlockScanner: all_events = [] try: + logger.info(f"Starting block scan. Num blocks: {num_blocks}, Start slot: {start_slot}") + num_blocks = num_blocks or settings.SOLANA_BLOCKS_TO_SCAN + logger.info(f"Using RPC URL: {self.solana_client.rpc_url}") # Get latest block if start_slot not provided if start_slot is None: - latest_block = self.solana_client.get_latest_block() - # Get block at that slot - block_data = self.solana_client.get_block(latest_block["lastValidBlockHeight"]) - start_slot = block_data["parentSlot"] + logger.info("Start slot not provided, fetching latest block") + try: + latest_block = self.solana_client.get_latest_block() + logger.info(f"Latest block data: {latest_block}") + + # Get block at that slot + last_valid_height = latest_block.get("lastValidBlockHeight") + if not last_valid_height: + logger.error(f"No lastValidBlockHeight in response: {latest_block}") + raise ValueError("Invalid response format: missing lastValidBlockHeight") + + logger.info(f"Fetching block at height: {last_valid_height}") + block_data = self.solana_client.get_block(last_valid_height) + + if not block_data or "parentSlot" not in block_data: + logger.error(f"Invalid block data response: {block_data}") + raise ValueError("Invalid block data: missing parentSlot") + + start_slot = block_data["parentSlot"] + logger.info(f"Using start_slot: {start_slot}") + except Exception as e: + logger.error(f"Error getting latest block: {str(e)}") + raise # Get list of blocks to scan end_slot = start_slot - num_blocks if end_slot < 0: end_slot = 0 - blocks_to_scan = self.solana_client.get_blocks(end_slot, start_slot) + logger.info(f"Getting blocks from {end_slot} to {start_slot}") + try: + blocks_to_scan = self.solana_client.get_blocks(end_slot, start_slot) + logger.info(f"Found {len(blocks_to_scan)} blocks to scan") + except Exception as e: + logger.error(f"Error getting blocks to scan: {str(e)}") + raise # Scan each block for slot in blocks_to_scan: try: + logger.info(f"Processing block at slot {slot}") block_data = self.solana_client.get_block(slot) + + if not block_data: + logger.warning(f"Empty block data for slot {slot}") + continue + + logger.info(f"Block {slot} has {len(block_data.get('transactions', []))} transactions") _, events = self.process_block(block_data) + logger.info(f"Found {len(events)} arbitrage events in block {slot}") all_events.extend(events) except Exception as e: logger.error(f"Error processing block {slot}: {str(e)}") diff --git a/app/services/solana_client.py b/app/services/solana_client.py index 3679d2d..bcd1acb 100644 --- a/app/services/solana_client.py +++ b/app/services/solana_client.py @@ -26,50 +26,131 @@ class SolanaClient: """ Get latest finalized block """ - response = self.client.get_latest_blockhash() - if "error" in response: - logger.error(f"Error getting latest block: {response['error']}") - raise Exception(f"Error getting latest block: {response['error']}") - - return response["result"] + try: + logger.info(f"Requesting latest blockhash from {self.rpc_url}") + response = self.client.get_latest_blockhash() + + logger.info(f"Got response: {response}") + + if "error" in response: + error_msg = response.get("error", {}) + logger.error(f"RPC error getting latest block: {error_msg}") + raise Exception(f"RPC error getting latest block: {error_msg}") + + if "result" not in response: + logger.error(f"Invalid response format, missing 'result' key: {response}") + raise ValueError("Invalid response format, missing 'result' key") + + return response["result"] + except Exception as e: + logger.error(f"Exception in get_latest_block: {str(e)}") + raise def get_block(self, slot: int) -> Dict: """ Get block data by slot number """ - response = self.client.get_block( - slot, - encoding="json", - max_supported_transaction_version=0, - transaction_details="full", - ) - - if "error" in response: - logger.error(f"Error getting block {slot}: {response['error']}") - raise Exception(f"Error getting block {slot}: {response['error']}") - - return response["result"] + try: + logger.info(f"Requesting block data for slot {slot} from {self.rpc_url}") + + response = self.client.get_block( + slot, + encoding="json", + max_supported_transaction_version=0, + transaction_details="full", + ) + + logger.debug(f"Raw block response: {response}") + + if "error" in response: + error_msg = response.get("error", {}) + error_code = error_msg.get("code", "unknown") + error_message = error_msg.get("message", "unknown") + + logger.error(f"RPC error getting block {slot}: code={error_code}, message={error_message}") + + # Handle specific error cases + if error_code == -32009: # slot not found or not available + logger.warning(f"Block at slot {slot} not found or not available") + return None + + raise Exception(f"RPC error getting block {slot}: {error_msg}") + + if "result" not in response: + logger.error(f"Invalid response format for block {slot}, missing 'result' key: {response}") + raise ValueError(f"Invalid response format for block {slot}, missing 'result' key") + + return response["result"] + except Exception as e: + logger.error(f"Exception in get_block for slot {slot}: {str(e)}") + raise def get_blocks(self, start_slot: int, end_slot: int = None) -> List[int]: """ Get a list of confirmed blocks """ - if end_slot is None: - # Get latest slot if end_slot not provided - response = self.client.get_slot() - if "error" in response: - logger.error(f"Error getting latest slot: {response['error']}") - raise Exception(f"Error getting latest slot: {response['error']}") + try: + logger.info(f"Getting blocks from {start_slot} to {end_slot or 'latest'}") - end_slot = response["result"] - - response = self.client.get_blocks(start_slot, end_slot) - - if "error" in response: - logger.error(f"Error getting blocks from {start_slot} to {end_slot}: {response['error']}") - raise Exception(f"Error getting blocks from {start_slot} to {end_slot}: {response['error']}") - - return response["result"] + if end_slot is None: + # Get latest slot if end_slot not provided + logger.info("No end_slot provided, fetching latest slot") + response = self.client.get_slot() + + logger.debug(f"get_slot response: {response}") + + if "error" in response: + error_msg = response.get("error", {}) + logger.error(f"RPC error getting latest slot: {error_msg}") + raise Exception(f"RPC error getting latest slot: {error_msg}") + + if "result" not in response: + logger.error(f"Invalid response format, missing 'result' key: {response}") + raise ValueError("Invalid response format, missing 'result' key") + + end_slot = response["result"] + logger.info(f"Using end_slot: {end_slot}") + + logger.info(f"Requesting blocks from {start_slot} to {end_slot}") + + # Check if the range is valid + if start_slot > end_slot: + logger.warning(f"Invalid slot range: start={start_slot}, end={end_slot}. Swapping values.") + start_slot, end_slot = end_slot, start_slot + + # Limit the range to prevent potential issues with large requests + if end_slot - start_slot > 500: + logger.warning(f"Large slot range requested: {start_slot} to {end_slot}, limiting to 500 blocks") + start_slot = end_slot - 500 + + response = self.client.get_blocks(start_slot, end_slot) + + logger.debug(f"get_blocks response: {response}") + + if "error" in response: + error_msg = response.get("error", {}) + logger.error(f"RPC error getting blocks from {start_slot} to {end_slot}: {error_msg}") + + # Handle specific error cases + if isinstance(error_msg, dict) and error_msg.get("code") == -32602: + logger.warning("Invalid slot range, trying alternative approach") + # Use a fallback approach if needed + # For now, return an empty list + return [] + + raise Exception(f"RPC error getting blocks from {start_slot} to {end_slot}: {error_msg}") + + if "result" not in response: + logger.error(f"Invalid response format, missing 'result' key: {response}") + raise ValueError("Invalid response format, missing 'result' key") + + blocks = response["result"] + logger.info(f"Retrieved {len(blocks)} blocks from {start_slot} to {end_slot}") + return blocks + + except Exception as e: + logger.error(f"Exception in get_blocks: {str(e)}") + raise def get_transaction(self, signature: str) -> Dict: """ diff --git a/main.py b/main.py index d8ecf42..a3cd4ed 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,5 @@ import logging +from datetime import datetime import uvicorn from fastapi import FastAPI @@ -14,6 +15,12 @@ logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) +# Enable debug logging for the scanner and Solana client modules +logging.getLogger("app.services.scanner").setLevel(logging.DEBUG) +logging.getLogger("app.services.solana_client").setLevel(logging.DEBUG) +# Also enable debug for solana.rpc +logging.getLogger("solana.rpc").setLevel(logging.DEBUG) + logger = logging.getLogger(__name__) app = FastAPI( @@ -40,8 +47,36 @@ app.include_router(api_router, prefix=settings.API_V1_STR) # Health check endpoint @app.get("/health", tags=["health"]) async def health_check(): - """Health check endpoint""" - return {"status": "ok"} + """Health check endpoint with Solana RPC connectivity status""" + from app.services.solana_client import SolanaClient + + health_status = { + "status": "ok", + "timestamp": datetime.now().isoformat(), + "solana_rpc": { + "url": settings.SOLANA_RPC_URL, + "connection": "unknown" + } + } + + # Test Solana RPC connection + try: + client = SolanaClient() + response = client.client.get_latest_blockhash() + if "result" in response: + health_status["solana_rpc"]["connection"] = "ok" + health_status["solana_rpc"]["latest_blockhash"] = response["result"].get("blockhash", "unknown") + else: + health_status["solana_rpc"]["connection"] = "error" + health_status["solana_rpc"]["error"] = "No result in response" + except Exception as e: + health_status["solana_rpc"]["connection"] = "error" + health_status["solana_rpc"]["error"] = str(e) + + if health_status["solana_rpc"]["connection"] != "ok": + health_status["status"] = "degraded" + + return health_status @app.on_event("startup")