import logging import asyncio from typing import List, Dict, Any, Callable, Awaitable, Optional from app.core.config import settings from app.db.session import SessionLocal logger = logging.getLogger(__name__) class BackgroundTaskManager: def __init__(self): self.tasks: List[Dict[str, Any]] = [] self.is_running = False self._main_task: Optional[asyncio.Task] = None def add_task( self, name: str, func: Callable[..., Awaitable[Any]], interval_seconds: int, **kwargs ) -> None: """Add a task to be executed periodically.""" self.tasks.append({ "name": name, "func": func, "interval_seconds": interval_seconds, "kwargs": kwargs, "last_run": None, }) logger.info(f"Added background task: {name}") async def start(self) -> None: """Start all background tasks.""" if self.is_running: return self.is_running = True logger.info("Starting background tasks") # Start tasks in a separate task to avoid blocking startup self._main_task = asyncio.create_task(self._run_all_tasks()) async def _run_all_tasks(self) -> None: """Run all tasks in parallel.""" if not self.tasks: logger.warning("No background tasks to run") return tasks = [] for task_info in self.tasks: tasks.append(self._run_task_periodically(task_info)) try: await asyncio.gather(*tasks) except Exception as e: logger.error(f"Error in background tasks: {str(e)}") async def _run_task_periodically(self, task_info: Dict[str, Any]) -> None: """Run a task periodically at the specified interval.""" name = task_info["name"] func = task_info["func"] interval_seconds = task_info["interval_seconds"] kwargs = task_info["kwargs"] logger.info(f"Starting periodic task: {name}") while self.is_running: try: logger.debug(f"Running task: {name}") await func(**kwargs) logger.debug(f"Task completed: {name}") except Exception as e: logger.error(f"Error in task {name}: {str(e)}") # Sleep until next interval await asyncio.sleep(interval_seconds) def stop(self) -> None: """Stop all background tasks.""" self.is_running = False if self._main_task: self._main_task.cancel() logger.info("Stopping background tasks") # Define our background tasks async def process_bot_purchases() -> None: """Process completed bot purchases.""" # Lazy import to avoid circular imports from app.services.bot_simulation import process_completed_bot_purchases db = SessionLocal() try: count = process_completed_bot_purchases(db) if count > 0: logger.info(f"Processed {count} completed bot purchases") except Exception as e: logger.error(f"Error processing bot purchases: {str(e)}") finally: db.close() # Create the task manager instance task_manager = BackgroundTaskManager() # Add the bot simulation task (this will run when the application starts) if settings.BOT_SIMULATION_INTERVAL > 0: task_manager.add_task( "process_bot_purchases", process_bot_purchases, interval_seconds=settings.BOT_SIMULATION_INTERVAL, )