129 lines
5.0 KiB
Python

from pathlib import Path
from sqlalchemy import create_engine, event, text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError
from tenacity import retry, stop_after_attempt, wait_fixed
from app.core.config import settings
from app.core.logging import get_logger
logger = get_logger("db.session")
# Wrap in function for easier error handling and retries
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2), reraise=True)
def create_db_engine():
"""Create SQLAlchemy engine with retry logic and fallback options."""
# Use in-memory database if configured or as fallback
if settings.USE_IN_MEMORY_DB:
logger.info("Using in-memory SQLite database")
db_url = "sqlite://"
connect_args = {"check_same_thread": False}
else:
# Ensure the directory exists
db_path = Path(settings.SQLALCHEMY_DATABASE_URL.replace("sqlite:///", ""))
db_dir = db_path.parent
try:
db_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Ensured database directory exists at: {db_dir}")
# Verify write permissions by touching a test file
test_file = db_dir / ".test_write_access"
test_file.touch()
test_file.unlink()
logger.info(f"Verified write access to database directory: {db_dir}")
except Exception as e:
logger.error(f"Database directory access error: {e}")
# Force in-memory database
logger.info("Forcing in-memory SQLite database due to directory access error")
return create_engine(
"sqlite://",
connect_args={"check_same_thread": False},
pool_pre_ping=True,
)
logger.info(f"Using file-based SQLite database at: {settings.SQLALCHEMY_DATABASE_URL}")
db_url = settings.SQLALCHEMY_DATABASE_URL
connect_args = {"check_same_thread": False}
# Create engine with better error handling
try:
logger.info(f"Creating SQLAlchemy engine with URL: {db_url}")
engine = create_engine(
db_url,
connect_args=connect_args,
pool_pre_ping=True, # Ensure connections are still alive
)
# Add engine event listeners for debugging
@event.listens_for(engine, "connect")
def on_connect(dbapi_connection, connection_record):
logger.info("Database connection established")
# Test connection
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
logger.info("Database connection test successful")
return engine
except SQLAlchemyError as e:
logger.error(f"Database connection error: {e}", exc_info=True)
# Fall back to in-memory SQLite if file access failed
if not settings.USE_IN_MEMORY_DB:
logger.info("Falling back to in-memory SQLite database")
return create_engine(
"sqlite://",
connect_args={"check_same_thread": False},
pool_pre_ping=True,
)
# Re-raise if in-memory DB was already the target
raise
# Initialize engine with safer exception handling
engine = None
SessionLocal = None
try:
engine = create_db_engine()
if engine:
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
logger.info("Database session factory created successfully")
else:
logger.error("Failed to create database engine")
except Exception as e:
logger.error(f"Failed to initialize database engine: {e}", exc_info=True)
# Set up in-memory database as last resort
try:
logger.info("Setting up emergency in-memory SQLite database")
engine = create_engine("sqlite://", connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
except Exception as fallback_error:
logger.critical(f"Critical database failure: {fallback_error}", exc_info=True)
# Allow application to start but in degraded mode
# Dependency to get DB session
def get_db():
"""FastAPI dependency for database sessions with error handling."""
global engine, SessionLocal
if not SessionLocal:
logger.error("Database session not initialized, attempting to initialize")
# Last chance to initialize
try:
engine = create_engine("sqlite://", connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
except Exception as e:
logger.critical(f"Could not initialize database even as fallback: {e}")
raise SQLAlchemyError("Database connection critically failed")
db = SessionLocal()
try:
logger.debug("DB session created")
yield db
except SQLAlchemyError as e:
logger.error(f"Database error during session: {e}")
raise
finally:
logger.debug("DB session closed")
db.close()