
- Created Alembic migrations for SQLite database - Set up database initialization on app startup - Fixed linting issues with Ruff - Updated README with comprehensive documentation - Configured startup tasks and health checks
47 lines
1.3 KiB
Python
47 lines
1.3 KiB
Python
from typing import Dict, Optional
|
|
import logging
|
|
|
|
from app.core.config import settings
|
|
from app.services.dex.base import BaseDexService
|
|
from app.services.dex.jupiter import JupiterDexService
|
|
from app.services.dex.raydium import RaydiumDexService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# DEX service registry
|
|
_dex_services: Dict[str, BaseDexService] = {}
|
|
|
|
|
|
def get_dex_service(dex_name: str) -> Optional[BaseDexService]:
|
|
"""Get a DEX service by name"""
|
|
return _dex_services.get(dex_name.lower())
|
|
|
|
|
|
def get_all_dex_services() -> Dict[str, BaseDexService]:
|
|
"""Get all available DEX services"""
|
|
return _dex_services
|
|
|
|
|
|
def initialize_dex_services():
|
|
"""Initialize all enabled DEX services"""
|
|
global _dex_services
|
|
|
|
# Clear existing services
|
|
_dex_services = {}
|
|
|
|
# Initialize services based on configuration
|
|
enabled_dexes = settings.ENABLED_DEXES
|
|
|
|
if "jupiter" in enabled_dexes:
|
|
_dex_services["jupiter"] = JupiterDexService()
|
|
logger.info("Jupiter DEX service initialized")
|
|
|
|
if "raydium" in enabled_dexes:
|
|
_dex_services["raydium"] = RaydiumDexService()
|
|
logger.info("Raydium DEX service initialized")
|
|
|
|
logger.info(f"Initialized {len(_dex_services)} DEX services: {', '.join(_dex_services.keys())}")
|
|
|
|
|
|
# Initialize services on module import
|
|
initialize_dex_services() |