2025-06-13 12:22:11 +00:00

114 lines
3.5 KiB
Python

import logging
import asyncio
from typing import List, Dict, Any, Callable, Awaitable, Optional
from app.core.config import settings
from app.db.session import SessionLocal
logger = logging.getLogger(__name__)
class BackgroundTaskManager:
def __init__(self):
self.tasks: List[Dict[str, Any]] = []
self.is_running = False
self._main_task: Optional[asyncio.Task] = None
def add_task(
self,
name: str,
func: Callable[..., Awaitable[Any]],
interval_seconds: int,
**kwargs
) -> None:
"""Add a task to be executed periodically."""
self.tasks.append({
"name": name,
"func": func,
"interval_seconds": interval_seconds,
"kwargs": kwargs,
"last_run": None,
})
logger.info(f"Added background task: {name}")
async def start(self) -> None:
"""Start all background tasks."""
if self.is_running:
return
self.is_running = True
logger.info("Starting background tasks")
# Start tasks in a separate task to avoid blocking startup
self._main_task = asyncio.create_task(self._run_all_tasks())
async def _run_all_tasks(self) -> None:
"""Run all tasks in parallel."""
if not self.tasks:
logger.warning("No background tasks to run")
return
tasks = []
for task_info in self.tasks:
tasks.append(self._run_task_periodically(task_info))
try:
await asyncio.gather(*tasks)
except Exception as e:
logger.error(f"Error in background tasks: {str(e)}")
async def _run_task_periodically(self, task_info: Dict[str, Any]) -> None:
"""Run a task periodically at the specified interval."""
name = task_info["name"]
func = task_info["func"]
interval_seconds = task_info["interval_seconds"]
kwargs = task_info["kwargs"]
logger.info(f"Starting periodic task: {name}")
while self.is_running:
try:
logger.debug(f"Running task: {name}")
await func(**kwargs)
logger.debug(f"Task completed: {name}")
except Exception as e:
logger.error(f"Error in task {name}: {str(e)}")
# Sleep until next interval
await asyncio.sleep(interval_seconds)
def stop(self) -> None:
"""Stop all background tasks."""
self.is_running = False
if self._main_task:
self._main_task.cancel()
logger.info("Stopping background tasks")
# Define our background tasks
async def process_bot_purchases() -> None:
"""Process completed bot purchases."""
# Lazy import to avoid circular imports
from app.services.bot_simulation import process_completed_bot_purchases
db = SessionLocal()
try:
count = process_completed_bot_purchases(db)
if count > 0:
logger.info(f"Processed {count} completed bot purchases")
except Exception as e:
logger.error(f"Error processing bot purchases: {str(e)}")
finally:
db.close()
# Create the task manager instance
task_manager = BackgroundTaskManager()
# Add the bot simulation task (this will run when the application starts)
if settings.BOT_SIMULATION_INTERVAL > 0:
task_manager.add_task(
"process_bot_purchases",
process_bot_purchases,
interval_seconds=settings.BOT_SIMULATION_INTERVAL,
)