
Make migrations idempotent by adding table existence checks before creation. Properly handle the alembic_version table to ensure migration tracking. Update database initialization utility to better handle errors and migration states. Enhance test script with detailed diagnostics for troubleshooting.
260 lines
9.0 KiB
Python
Executable File
260 lines
9.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Script to test if the migration works correctly.
|
|
|
|
This script checks if tables exist in the database and if the migration completes successfully.
|
|
It handles common migration issues and provides detailed diagnostics.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import sqlite3
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Add parent directory to path so we can import app modules
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
|
|
|
from app.core.config import settings
|
|
from sqlalchemy import create_engine, inspect, text
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def check_database_exists():
|
|
"""Check if the database file exists."""
|
|
db_path = settings.DB_DIR / "db.sqlite"
|
|
exists = db_path.exists()
|
|
logger.info(f"Database file {'exists' if exists else 'does not exist'} at {db_path}")
|
|
return exists, db_path
|
|
|
|
def check_database_permissions(db_path):
|
|
"""Check permissions on the database file."""
|
|
if not db_path.exists():
|
|
return False
|
|
|
|
try:
|
|
# Check read permission
|
|
with open(db_path, 'rb') as f:
|
|
f.read(1)
|
|
|
|
# Check write permission by opening in read+write mode
|
|
with open(db_path, 'r+b') as f:
|
|
pass
|
|
|
|
logger.info("Database file is readable and writable")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Database permission error: {e}")
|
|
return False
|
|
|
|
def check_alembic_version_table():
|
|
"""Check if alembic_version table exists and what version it contains."""
|
|
try:
|
|
engine = create_engine(settings.SQLALCHEMY_DATABASE_URL)
|
|
inspector = inspect(engine)
|
|
tables = inspector.get_table_names()
|
|
|
|
if 'alembic_version' not in tables:
|
|
logger.warning("alembic_version table not found in database")
|
|
return False, None
|
|
|
|
with engine.connect() as connection:
|
|
result = connection.execute(text("SELECT version_num FROM alembic_version"))
|
|
version = result.scalar()
|
|
logger.info(f"Current alembic version: {version}")
|
|
return True, version
|
|
except Exception as e:
|
|
logger.error(f"Error checking alembic version: {e}")
|
|
return False, None
|
|
|
|
def check_tables_exist():
|
|
"""Check which tables exist in the database."""
|
|
db_path = settings.DB_DIR / "db.sqlite"
|
|
|
|
if not db_path.exists():
|
|
logger.warning("Database file does not exist")
|
|
return []
|
|
|
|
try:
|
|
conn = sqlite3.connect(str(db_path))
|
|
cursor = conn.cursor()
|
|
|
|
# Get a list of all tables
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
|
|
tables = [row[0] for row in cursor.fetchall()]
|
|
|
|
# Check for SQLite internal tables
|
|
sqlite_internal = [t for t in tables if t.startswith('sqlite_')]
|
|
if sqlite_internal:
|
|
logger.info(f"SQLite internal tables: {', '.join(sqlite_internal)}")
|
|
# Remove internal tables from the list
|
|
tables = [t for t in tables if not t.startswith('sqlite_')]
|
|
|
|
conn.close()
|
|
|
|
logger.info(f"Existing tables: {', '.join(tables)}")
|
|
return tables
|
|
except Exception as e:
|
|
logger.error(f"Error checking tables: {e}")
|
|
return []
|
|
|
|
def backup_database():
|
|
"""Create a backup of the database file if it exists."""
|
|
db_path = settings.DB_DIR / "db.sqlite"
|
|
if not db_path.exists():
|
|
logger.info("No database file to backup")
|
|
return
|
|
|
|
backup_path = settings.DB_DIR / "db.sqlite.backup"
|
|
try:
|
|
shutil.copy2(db_path, backup_path)
|
|
logger.info(f"Created database backup at {backup_path}")
|
|
return backup_path
|
|
except Exception as e:
|
|
logger.error(f"Failed to create database backup: {e}")
|
|
return None
|
|
|
|
def run_migration():
|
|
"""Run the Alembic migration and return True if successful."""
|
|
try:
|
|
logger.info("Running Alembic migration...")
|
|
result = subprocess.run(
|
|
["alembic", "upgrade", "head"],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
logger.info("Migration output:")
|
|
logger.info(result.stdout)
|
|
|
|
if result.stderr:
|
|
if "table users already exists" in result.stderr.lower():
|
|
logger.info("Tables already exist - this is expected with our idempotent migration")
|
|
return True
|
|
else:
|
|
logger.warning("Migration stderr:")
|
|
logger.warning(result.stderr)
|
|
|
|
logger.info("Migration completed successfully")
|
|
return True
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error("Migration failed")
|
|
logger.error(f"Exit code: {e.returncode}")
|
|
logger.error(f"stdout: {e.stdout}")
|
|
logger.error(f"stderr: {e.stderr}")
|
|
|
|
# Don't fail if tables already exist, as our migration should now handle this
|
|
if e.stderr and "table users already exists" in e.stderr.lower():
|
|
logger.info("Tables already exist - treating as success due to idempotent design")
|
|
return True
|
|
|
|
return False
|
|
|
|
def check_alembic_ini():
|
|
"""Check alembic.ini configuration."""
|
|
alembic_ini_path = Path(__file__).resolve().parent.parent / "alembic.ini"
|
|
if not alembic_ini_path.exists():
|
|
logger.error("alembic.ini file not found")
|
|
return False
|
|
|
|
try:
|
|
with open(alembic_ini_path, 'r') as f:
|
|
content = f.read()
|
|
|
|
# Check SQLite URL
|
|
import re
|
|
url_match = re.search(r'sqlalchemy\.url\s*=\s*(.+)', content)
|
|
if url_match:
|
|
db_url = url_match.group(1).strip()
|
|
logger.info(f"Alembic SQLite URL: {db_url}")
|
|
|
|
# Check if absolute path is used
|
|
if 'sqlite:///' in db_url and not '/app/storage/db/db.sqlite' in db_url:
|
|
logger.warning("Alembic SQLite URL might not be using correct absolute path")
|
|
return False
|
|
else:
|
|
logger.warning("Could not find sqlalchemy.url in alembic.ini")
|
|
return False
|
|
|
|
# Check if render_as_batch is enabled
|
|
if 'render_as_batch=True' in content:
|
|
logger.info("render_as_batch is enabled in alembic.ini")
|
|
else:
|
|
logger.warning("render_as_batch might not be enabled in alembic.ini")
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error checking alembic.ini: {e}")
|
|
return False
|
|
|
|
def main():
|
|
"""Test the database migration process."""
|
|
logger.info("Testing database migration...")
|
|
|
|
# Ensure storage directory exists
|
|
settings.DB_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Check alembic.ini configuration
|
|
alembic_ini_ok = check_alembic_ini()
|
|
logger.info(f"Alembic configuration check: {'PASSED' if alembic_ini_ok else 'FAILED'}")
|
|
|
|
# Check if database exists
|
|
db_exists, db_path = check_database_exists()
|
|
|
|
# Check permissions
|
|
if db_exists:
|
|
perms_ok = check_database_permissions(db_path)
|
|
logger.info(f"Database permissions check: {'PASSED' if perms_ok else 'FAILED'}")
|
|
|
|
# Check which tables exist
|
|
existing_tables = check_tables_exist()
|
|
|
|
# Check alembic version table
|
|
has_alembic_table, current_version = check_alembic_version_table()
|
|
|
|
# Create a backup of the database
|
|
if db_exists:
|
|
backup_path = backup_database()
|
|
|
|
# Run migration
|
|
migration_success = run_migration()
|
|
|
|
# Check tables after migration
|
|
tables_after = check_tables_exist()
|
|
|
|
# Check alembic version after migration
|
|
has_alembic_table_after, version_after = check_alembic_version_table()
|
|
|
|
# Report results
|
|
logger.info("\n=== Migration test results ===")
|
|
logger.info(f"- Alembic configuration: {'✅ OK' if alembic_ini_ok else '❌ Issue detected'}")
|
|
logger.info(f"- Database existed before test: {db_exists}")
|
|
if db_exists:
|
|
logger.info(f"- Database permissions: {'✅ OK' if perms_ok else '❌ Issue detected'}")
|
|
logger.info(f"- Tables before migration: {len(existing_tables)}")
|
|
logger.info(f"- Migration successful: {'✅' if migration_success else '❌'}")
|
|
logger.info(f"- Tables after migration: {len(tables_after)}")
|
|
logger.info(f"- Alembic version table before: {'✅ Present' if has_alembic_table else '❌ Missing'}")
|
|
if has_alembic_table:
|
|
logger.info(f"- Alembic version before: {current_version}")
|
|
logger.info(f"- Alembic version table after: {'✅ Present' if has_alembic_table_after else '❌ Missing'}")
|
|
if has_alembic_table_after:
|
|
logger.info(f"- Alembic version after: {version_after}")
|
|
|
|
new_tables = set(tables_after) - set(existing_tables)
|
|
if new_tables:
|
|
logger.info(f"- New tables created: {', '.join(new_tables)}")
|
|
|
|
if migration_success:
|
|
logger.info("\n✅ Migration test PASSED")
|
|
return 0
|
|
logger.error("\n❌ Migration test FAILED")
|
|
return 1
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|