from logging.config import fileConfig import os import sys import logging import sqlite3 from sqlalchemy import engine_from_config, event from sqlalchemy import pool from alembic import context # Add the parent directory to the Python path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) # this is the Alembic Config object, which provides # access to the values within the .ini file in use. config = context.config # Interpret the config file for Python logging. # This line sets up loggers basically. fileConfig(config.config_file_name) # Setup logger logger = logging.getLogger("alembic.env") # Ensure database directory exists and is writable db_url = config.get_main_option("sqlalchemy.url") if db_url.startswith("sqlite:///"): # Extract the path part after sqlite:/// if db_url.startswith("sqlite:////"): # Absolute path (4 slashes) db_path = db_url[len("sqlite:///") :] else: # Relative path (3 slashes) db_path = db_url[len("sqlite:///") :] # Get the directory path db_dir = os.path.dirname(db_path) logger.info(f"Database URL: {db_url}") logger.info(f"Database path: {db_path}") logger.info(f"Database directory: {db_dir}") # Define alternate paths to try if the primary path isn't writable alternate_paths = [ "/app/storage/db/db.sqlite", # Primary alternate os.path.join(os.path.expanduser("~"), "db.sqlite"), # Home directory "/tmp/taskmanager/db.sqlite", # Fallback to tmp ] # Try to create the main directory first db_created = False for attempt_path in [db_path] + alternate_paths: if db_created: break try: # Get directory part current_dir = os.path.dirname(attempt_path) # Create directory if it doesn't exist os.makedirs(current_dir, exist_ok=True) logger.info(f"Ensured directory exists: {current_dir}") # Set directory permissions if possible (755 = rwxr-xr-x) try: import stat dir_mode = ( stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH ) os.chmod(current_dir, dir_mode) logger.info(f"Set directory permissions on {current_dir}") except Exception as chmod_err: logger.warning(f"Could not set directory permissions: {chmod_err}") # Check if the directory is writable if not os.access(current_dir, os.W_OK): logger.warning( f"Directory {current_dir} exists but is not writable, trying next path" ) continue # Try to create/access the database file try: # Create an empty file if it doesn't exist if not os.path.exists(attempt_path): with open(attempt_path, "wb") as f: f.write(b"") # Write empty content logger.info(f"Created database file: {attempt_path}") # Try to set file permissions (666 = rw-rw-rw-) try: file_mode = ( stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH | stat.S_IWOTH ) os.chmod(attempt_path, file_mode) logger.info(f"Set file permissions on {attempt_path}") except Exception as chmod_err: logger.warning(f"Could not set file permissions: {chmod_err}") # Verify file is writable if not os.access(attempt_path, os.W_OK): logger.warning( f"File {attempt_path} exists but is not writable, trying next path" ) continue # Test direct SQLite connection try: conn = sqlite3.connect(attempt_path) conn.execute("SELECT 1") conn.close() logger.info( f"Successfully connected to SQLite database at {attempt_path}" ) # If this isn't the original path, update the config if attempt_path != db_path: new_url = f"sqlite:///{attempt_path}" logger.info(f"Updating database URL to: {new_url}") config.set_main_option("sqlalchemy.url", new_url) db_path = attempt_path # Update the db_path for diagnostics db_created = True break except Exception as e: logger.error(f"Direct SQLite connection failed: {e}") raise except Exception as e: logger.error(f"Could not access database file: {e}") raise except Exception as e: logger.error(f"Could not create/access database at {attempt_path}: {e}") # Continue to next path if not db_created: logger.error("Failed to create database in any of the attempted locations") # We'll continue anyway to show more diagnostic info in errors # add your model's MetaData object here # for 'autogenerate' support from app.db.base import Base # noqa target_metadata = Base.metadata # other values from the config, defined by the needs of env.py, # can be acquired: # my_important_option = config.get_main_option("my_important_option") # ... etc. def run_migrations_offline(): """Run migrations in 'offline' mode. This configures the context with just a URL and not an Engine, though an Engine is acceptable here as well. By skipping the Engine creation we don't even need a DBAPI to be available. Calls to context.execute() here emit the given string to the script output. """ try: url = config.get_main_option("sqlalchemy.url") logger.info(f"Running offline migrations using URL: {url}") # Check if this is SQLite URL is_sqlite = url.startswith("sqlite") logger.info(f"Is SQLite URL: {is_sqlite}") context.configure( url=url, target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}, render_as_batch=is_sqlite, # Enable batch mode for SQLite ) with context.begin_transaction(): logger.info("Running offline migrations...") context.run_migrations() logger.info("Offline migrations completed successfully") except Exception as e: logger.error(f"Offline migration error: {e}") # Re-raise the error raise def run_migrations_online(): """Run migrations in 'online' mode. In this scenario we need to create an Engine and associate a connection with the context. """ try: # Get config and URL cfg = config.get_section(config.config_ini_section) url = cfg.get("sqlalchemy.url") logger.info(f"Running online migrations using URL: {url}") # Create engine with retry logic max_retries = 3 last_error = None for retry in range(max_retries): try: logger.info(f"Connection attempt {retry + 1}/{max_retries}") connectable = engine_from_config( cfg, prefix="sqlalchemy.", poolclass=pool.NullPool, ) # Configure SQLite for better reliability @event.listens_for(connectable, "connect") def setup_sqlite_connection(dbapi_connection, connection_record): dbapi_connection.execute("PRAGMA journal_mode=WAL") dbapi_connection.execute("PRAGMA synchronous=NORMAL") # Connect and run migrations with connectable.connect() as connection: logger.info("Connection successful") # Check if this is SQLite and enable batch mode if needed is_sqlite = connection.dialect.name == "sqlite" logger.info(f"Detected dialect: {connection.dialect.name}") if is_sqlite: logger.info( "SQLite detected, enabling batch mode for migrations" ) context.configure( connection=connection, target_metadata=target_metadata, render_as_batch=is_sqlite, # This is essential for SQLite migrations ) with context.begin_transaction(): logger.info("Running migrations...") context.run_migrations() logger.info("Migrations completed successfully") return # Success, exit the function except Exception as e: last_error = e logger.error(f"Migration attempt {retry + 1} failed: {e}") if retry < max_retries - 1: import time wait_time = (retry + 1) * 2 # Exponential backoff logger.info(f"Retrying in {wait_time} seconds...") time.sleep(wait_time) # If we get here, all retries failed raise Exception( f"Failed to run migrations after {max_retries} attempts: {last_error}" ) except Exception as e: logger.error(f"Migration error: {e}") # Print diagnostic information from sqlalchemy import __version__ as sa_version logger.error(f"SQLAlchemy version: {sa_version}") # Get directory info if url and url.startswith("sqlite:///"): if url.startswith("sqlite:////"): # Absolute path db_path = url[len("sqlite:///") :] else: # Relative path db_path = url[len("sqlite:///") :] db_dir = os.path.dirname(db_path) # Directory permissions if os.path.exists(db_dir): stats = os.stat(db_dir) logger.error(f"DB directory permissions: {oct(stats.st_mode)}") logger.error("DB directory exists: Yes") logger.error(f"DB directory is writable: {os.access(db_dir, os.W_OK)}") else: logger.error("DB directory exists: No") # File permissions if file exists if os.path.exists(db_path): stats = os.stat(db_path) logger.error(f"DB file permissions: {oct(stats.st_mode)}") logger.error("DB file exists: Yes") logger.error(f"DB file is writable: {os.access(db_path, os.W_OK)}") else: logger.error("DB file exists: No") # Re-raise the error raise if context.is_offline_mode(): run_migrations_offline() else: run_migrations_online()