from logging.config import fileConfig import os import sys import logging from pathlib import Path import sqlite3 from sqlalchemy import engine_from_config, event from sqlalchemy import pool from alembic import context # Add the parent directory to the Python path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) # this is the Alembic Config object, which provides # access to the values within the .ini file in use. config = context.config # Interpret the config file for Python logging. # This line sets up loggers basically. fileConfig(config.config_file_name) # Setup logger logger = logging.getLogger("alembic.env") # Ensure database directory exists db_url = config.get_main_option("sqlalchemy.url") if db_url.startswith("sqlite:///"): # Extract the path part after sqlite:/// if db_url.startswith("sqlite:////"): # Absolute path (4 slashes) db_path = db_url[len("sqlite:///"):] else: # Relative path (3 slashes) db_path = db_url[len("sqlite:///"):] # Get the directory path db_dir = os.path.dirname(db_path) logger.info(f"Database URL: {db_url}") logger.info(f"Database path: {db_path}") logger.info(f"Database directory: {db_dir}") # Create directory if it doesn't exist try: os.makedirs(db_dir, exist_ok=True) logger.info(f"Ensured database directory exists: {db_dir}") # Test if we can create the database file try: # Try to touch the database file Path(db_path).touch(exist_ok=True) logger.info(f"Database file is accessible: {db_path}") # Test direct SQLite connection try: conn = sqlite3.connect(db_path) conn.execute("SELECT 1") conn.close() logger.info(f"Successfully connected to SQLite database") except Exception as e: logger.error(f"Direct SQLite connection failed: {e}") except Exception as e: logger.error(f"Could not access database file: {e}") except Exception as e: logger.error(f"Could not create database directory: {e}") # add your model's MetaData object here # for 'autogenerate' support from app.db.base import Base # noqa target_metadata = Base.metadata # other values from the config, defined by the needs of env.py, # can be acquired: # my_important_option = config.get_main_option("my_important_option") # ... etc. def run_migrations_offline(): """Run migrations in 'offline' mode. This configures the context with just a URL and not an Engine, though an Engine is acceptable here as well. By skipping the Engine creation we don't even need a DBAPI to be available. Calls to context.execute() here emit the given string to the script output. """ try: url = config.get_main_option("sqlalchemy.url") logger.info(f"Running offline migrations using URL: {url}") context.configure( url=url, target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}, ) with context.begin_transaction(): logger.info("Running offline migrations...") context.run_migrations() logger.info("Offline migrations completed successfully") except Exception as e: logger.error(f"Offline migration error: {e}") # Re-raise the error raise def run_migrations_online(): """Run migrations in 'online' mode. In this scenario we need to create an Engine and associate a connection with the context. """ try: # Get config and URL cfg = config.get_section(config.config_ini_section) url = cfg.get("sqlalchemy.url") logger.info(f"Running online migrations using URL: {url}") # Create engine with retry logic max_retries = 3 last_error = None for retry in range(max_retries): try: logger.info(f"Connection attempt {retry + 1}/{max_retries}") connectable = engine_from_config( cfg, prefix="sqlalchemy.", poolclass=pool.NullPool, ) # Configure SQLite for better reliability @event.listens_for(connectable, "connect") def setup_sqlite_connection(dbapi_connection, connection_record): dbapi_connection.execute("PRAGMA journal_mode=WAL") dbapi_connection.execute("PRAGMA synchronous=NORMAL") # Connect and run migrations with connectable.connect() as connection: logger.info("Connection successful") context.configure( connection=connection, target_metadata=target_metadata ) with context.begin_transaction(): logger.info("Running migrations...") context.run_migrations() logger.info("Migrations completed successfully") return # Success, exit the function except Exception as e: last_error = e logger.error(f"Migration attempt {retry + 1} failed: {e}") if retry < max_retries - 1: import time wait_time = (retry + 1) * 2 # Exponential backoff logger.info(f"Retrying in {wait_time} seconds...") time.sleep(wait_time) # If we get here, all retries failed raise Exception(f"Failed to run migrations after {max_retries} attempts: {last_error}") except Exception as e: logger.error(f"Migration error: {e}") # Print diagnostic information from sqlalchemy import __version__ as sa_version logger.error(f"SQLAlchemy version: {sa_version}") # Get directory info if url and url.startswith("sqlite:///"): if url.startswith("sqlite:////"): # Absolute path db_path = url[len("sqlite:///"):] else: # Relative path db_path = url[len("sqlite:///"):] db_dir = os.path.dirname(db_path) # Directory permissions if os.path.exists(db_dir): stats = os.stat(db_dir) logger.error(f"DB directory permissions: {oct(stats.st_mode)}") logger.error(f"DB directory exists: Yes") logger.error(f"DB directory is writable: {os.access(db_dir, os.W_OK)}") else: logger.error(f"DB directory exists: No") # File permissions if file exists if os.path.exists(db_path): stats = os.stat(db_path) logger.error(f"DB file permissions: {oct(stats.st_mode)}") logger.error(f"DB file exists: Yes") logger.error(f"DB file is writable: {os.access(db_path, os.W_OK)}") else: logger.error(f"DB file exists: No") # Re-raise the error raise if context.is_offline_mode(): run_migrations_offline() else: run_migrations_online()