
- Add GET endpoint for individual arbitrage opportunities - Add POST endpoint for creating arbitrage opportunities - Add GET endpoint for individual trades - Add POST endpoint for creating trades - Add endpoints for system events (GET list, GET by ID, POST) - Update API router to include the new events endpoints - Fix linting issues
231 lines
8.3 KiB
Python
231 lines
8.3 KiB
Python
import json
|
|
import logging
|
|
from typing import Dict, List, Optional, Any, Tuple
|
|
import base64
|
|
import base58
|
|
from app.core.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Try to import Solana packages, otherwise use mock implementations
|
|
try:
|
|
from solana.rpc.api import Client
|
|
from solana.publickey import PublicKey
|
|
from solana.keypair import Keypair
|
|
from solana.transaction import Transaction
|
|
from solana.rpc.types import TxOpts
|
|
SOLANA_AVAILABLE = True
|
|
except ImportError:
|
|
logger.warning("Solana package not available. Using mock implementations.")
|
|
SOLANA_AVAILABLE = False
|
|
# Mock classes for when solana package is not available
|
|
class Client:
|
|
def __init__(self, endpoint):
|
|
self.endpoint = endpoint
|
|
|
|
def get_balance(self, *args, **kwargs):
|
|
return {"result": {"value": 0}}
|
|
|
|
def get_token_accounts_by_owner(self, *args, **kwargs):
|
|
return {"result": {"value": []}}
|
|
|
|
def get_token_supply(self, *args, **kwargs):
|
|
return {"result": {"value": {"decimals": 9}}}
|
|
|
|
def send_transaction(self, *args, **kwargs):
|
|
return {"error": {"message": "Solana package not available"}}
|
|
|
|
class PublicKey:
|
|
def __init__(self, key):
|
|
self.key = key
|
|
|
|
class Keypair:
|
|
@classmethod
|
|
def from_secret_key(cls, *args, **kwargs):
|
|
return cls()
|
|
|
|
@property
|
|
def public_key(self):
|
|
return PublicKey("mock")
|
|
|
|
class Transaction:
|
|
def sign(self, *args, **kwargs):
|
|
pass
|
|
|
|
class TxOpts:
|
|
pass
|
|
|
|
# Initialize Solana client
|
|
solana_client = Client(settings.SOLANA_RPC_URL)
|
|
|
|
# Token constants
|
|
USDC_TOKEN_ADDRESS = "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v" # USDC on Solana
|
|
SOL_DECIMALS = 9
|
|
USDC_DECIMALS = 6
|
|
|
|
# Cache for token metadata
|
|
token_metadata_cache = {}
|
|
|
|
|
|
def get_solana_client() -> Client:
|
|
"""Get Solana RPC client"""
|
|
return solana_client
|
|
|
|
|
|
def load_wallet_keypair() -> Optional[Keypair]:
|
|
"""Load wallet keypair from file if configured"""
|
|
if not settings.WALLET_KEYPAIR_PATH:
|
|
logger.warning("No wallet keypair path configured")
|
|
return None
|
|
|
|
try:
|
|
with open(settings.WALLET_KEYPAIR_PATH, "r") as f:
|
|
keypair_data = json.load(f)
|
|
if isinstance(keypair_data, list):
|
|
# Array format [private_key_bytes]
|
|
secret_key = bytes(keypair_data)
|
|
return Keypair.from_secret_key(secret_key)
|
|
elif isinstance(keypair_data, dict) and "secretKey" in keypair_data:
|
|
# Phantom wallet export format {"publicKey": "...", "secretKey": "..."}
|
|
secret_key = base58.b58decode(keypair_data["secretKey"])
|
|
return Keypair.from_secret_key(secret_key)
|
|
else:
|
|
# Solflare and other wallets might use different formats
|
|
logger.error("Unsupported wallet keypair format")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Failed to load wallet keypair: {str(e)}")
|
|
return None
|
|
|
|
|
|
def get_wallet_balance() -> Dict[str, float]:
|
|
"""Get SOL and USDC balance for the configured wallet"""
|
|
if not SOLANA_AVAILABLE:
|
|
logger.warning("Solana package not available. Returning mock balances.")
|
|
return {"SOL": 0.0, "USDC": 0.0}
|
|
|
|
keypair = load_wallet_keypair()
|
|
if not keypair:
|
|
return {"SOL": 0.0, "USDC": 0.0}
|
|
|
|
wallet_pubkey = keypair.public_key
|
|
|
|
# Get SOL balance
|
|
sol_balance_response = solana_client.get_balance(wallet_pubkey)
|
|
sol_balance = sol_balance_response["result"]["value"] / 10**SOL_DECIMALS if "result" in sol_balance_response else 0
|
|
|
|
# Get USDC balance
|
|
try:
|
|
token_accounts = solana_client.get_token_accounts_by_owner(
|
|
wallet_pubkey,
|
|
{"mint": USDC_TOKEN_ADDRESS}
|
|
)
|
|
|
|
usdc_balance = 0
|
|
if "result" in token_accounts and "value" in token_accounts["result"]:
|
|
for account in token_accounts["result"]["value"]:
|
|
account_data = account["account"]["data"]
|
|
if isinstance(account_data, list) and len(account_data) > 1:
|
|
decoded_data = base64.b64decode(account_data[0])
|
|
# Parse the token account data - this is a simplified approach
|
|
# In a real implementation, you'd use proper parsing
|
|
if len(decoded_data) >= 64: # Minimum length for token account data
|
|
amount_bytes = decoded_data[64:72]
|
|
amount = int.from_bytes(amount_bytes, byteorder="little")
|
|
usdc_balance += amount / 10**USDC_DECIMALS
|
|
except Exception as e:
|
|
logger.error(f"Error getting USDC balance: {str(e)}")
|
|
usdc_balance = 0
|
|
|
|
return {
|
|
"SOL": sol_balance,
|
|
"USDC": usdc_balance
|
|
}
|
|
|
|
|
|
def get_token_metadata(token_address: str) -> Dict[str, Any]:
|
|
"""Get token metadata including symbol and decimals"""
|
|
if token_address in token_metadata_cache:
|
|
return token_metadata_cache[token_address]
|
|
|
|
try:
|
|
# Simplification: In a real implementation, you'd query the token's metadata
|
|
# properly from the Solana token registry or on-chain data
|
|
|
|
# For now, we just use a placeholder implementation
|
|
if token_address == USDC_TOKEN_ADDRESS:
|
|
metadata = {
|
|
"address": token_address,
|
|
"symbol": "USDC",
|
|
"name": "USD Coin",
|
|
"decimals": 6,
|
|
"logo": "https://raw.githubusercontent.com/solana-labs/token-list/main/assets/mainnet/EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v/logo.png"
|
|
}
|
|
else:
|
|
# For other tokens, make an RPC call to get the decimals
|
|
token_info = solana_client.get_token_supply(token_address)
|
|
if "result" in token_info and "value" in token_info["result"]:
|
|
decimals = token_info["result"]["value"]["decimals"]
|
|
metadata = {
|
|
"address": token_address,
|
|
"symbol": f"TOKEN-{token_address[:4]}", # Placeholder symbol
|
|
"name": f"Unknown Token {token_address[:8]}",
|
|
"decimals": decimals,
|
|
"logo": None
|
|
}
|
|
else:
|
|
# Default fallback
|
|
metadata = {
|
|
"address": token_address,
|
|
"symbol": f"TOKEN-{token_address[:4]}",
|
|
"name": f"Unknown Token {token_address[:8]}",
|
|
"decimals": 9, # Default to 9 decimals
|
|
"logo": None
|
|
}
|
|
|
|
# Cache the result
|
|
token_metadata_cache[token_address] = metadata
|
|
return metadata
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting token metadata for {token_address}: {str(e)}")
|
|
# Return a default metadata object
|
|
default_metadata = {
|
|
"address": token_address,
|
|
"symbol": f"TOKEN-{token_address[:4]}",
|
|
"name": f"Unknown Token {token_address[:8]}",
|
|
"decimals": 9,
|
|
"logo": None
|
|
}
|
|
return default_metadata
|
|
|
|
|
|
def send_transaction(transaction: Transaction, signers: List[Keypair], opts: Optional[TxOpts] = None) -> Tuple[bool, str, Optional[str]]:
|
|
"""
|
|
Send a transaction to the Solana network
|
|
|
|
Returns:
|
|
Tuple of (success, signature, error_message)
|
|
"""
|
|
if not SOLANA_AVAILABLE:
|
|
logger.warning("Solana package not available. Cannot send transaction.")
|
|
return False, "", "Solana package not available"
|
|
|
|
try:
|
|
# Sign the transaction
|
|
transaction.sign(*signers)
|
|
|
|
# Send the transaction
|
|
result = solana_client.send_transaction(transaction, *signers, opts=opts)
|
|
|
|
if "result" in result:
|
|
signature = result["result"]
|
|
return True, signature, None
|
|
else:
|
|
error_msg = result.get("error", {}).get("message", "Unknown error")
|
|
return False, "", error_msg
|
|
|
|
except Exception as e:
|
|
error_message = str(e)
|
|
logger.error(f"Transaction failed: {error_message}")
|
|
return False, "", error_message |