114 lines
3.5 KiB
Python
114 lines
3.5 KiB
Python
import logging
|
|
import asyncio
|
|
from typing import List, Dict, Any, Callable, Awaitable, Optional
|
|
|
|
from app.core.config import settings
|
|
from app.db.session import SessionLocal
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BackgroundTaskManager:
|
|
def __init__(self):
|
|
self.tasks: List[Dict[str, Any]] = []
|
|
self.is_running = False
|
|
self._main_task: Optional[asyncio.Task] = None
|
|
|
|
def add_task(
|
|
self,
|
|
name: str,
|
|
func: Callable[..., Awaitable[Any]],
|
|
interval_seconds: int,
|
|
**kwargs
|
|
) -> None:
|
|
"""Add a task to be executed periodically."""
|
|
self.tasks.append({
|
|
"name": name,
|
|
"func": func,
|
|
"interval_seconds": interval_seconds,
|
|
"kwargs": kwargs,
|
|
"last_run": None,
|
|
})
|
|
logger.info(f"Added background task: {name}")
|
|
|
|
async def start(self) -> None:
|
|
"""Start all background tasks."""
|
|
if self.is_running:
|
|
return
|
|
|
|
self.is_running = True
|
|
logger.info("Starting background tasks")
|
|
|
|
# Start tasks in a separate task to avoid blocking startup
|
|
self._main_task = asyncio.create_task(self._run_all_tasks())
|
|
|
|
async def _run_all_tasks(self) -> None:
|
|
"""Run all tasks in parallel."""
|
|
if not self.tasks:
|
|
logger.warning("No background tasks to run")
|
|
return
|
|
|
|
tasks = []
|
|
for task_info in self.tasks:
|
|
tasks.append(self._run_task_periodically(task_info))
|
|
|
|
try:
|
|
await asyncio.gather(*tasks)
|
|
except Exception as e:
|
|
logger.error(f"Error in background tasks: {str(e)}")
|
|
|
|
async def _run_task_periodically(self, task_info: Dict[str, Any]) -> None:
|
|
"""Run a task periodically at the specified interval."""
|
|
name = task_info["name"]
|
|
func = task_info["func"]
|
|
interval_seconds = task_info["interval_seconds"]
|
|
kwargs = task_info["kwargs"]
|
|
|
|
logger.info(f"Starting periodic task: {name}")
|
|
|
|
while self.is_running:
|
|
try:
|
|
logger.debug(f"Running task: {name}")
|
|
await func(**kwargs)
|
|
logger.debug(f"Task completed: {name}")
|
|
except Exception as e:
|
|
logger.error(f"Error in task {name}: {str(e)}")
|
|
|
|
# Sleep until next interval
|
|
await asyncio.sleep(interval_seconds)
|
|
|
|
def stop(self) -> None:
|
|
"""Stop all background tasks."""
|
|
self.is_running = False
|
|
if self._main_task:
|
|
self._main_task.cancel()
|
|
logger.info("Stopping background tasks")
|
|
|
|
|
|
# Define our background tasks
|
|
async def process_bot_purchases() -> None:
|
|
"""Process completed bot purchases."""
|
|
# Lazy import to avoid circular imports
|
|
from app.services.bot_simulation import process_completed_bot_purchases
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
count = process_completed_bot_purchases(db)
|
|
if count > 0:
|
|
logger.info(f"Processed {count} completed bot purchases")
|
|
except Exception as e:
|
|
logger.error(f"Error processing bot purchases: {str(e)}")
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# Create the task manager instance
|
|
task_manager = BackgroundTaskManager()
|
|
|
|
# Add the bot simulation task (this will run when the application starts)
|
|
if settings.BOT_SIMULATION_INTERVAL > 0:
|
|
task_manager.add_task(
|
|
"process_bot_purchases",
|
|
process_bot_purchases,
|
|
interval_seconds=settings.BOT_SIMULATION_INTERVAL,
|
|
) |