Fix database migration to handle existing tables
Make migrations idempotent by adding table existence checks before creation. Properly handle the alembic_version table to ensure migration tracking. Update database initialization utility to better handle errors and migration states. Enhance test script with detailed diagnostics for troubleshooting.
This commit is contained in:
parent
aaa32ca932
commit
b41620c464
@ -1,11 +1,12 @@
|
||||
import logging
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Add the project root to the Python path
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent))
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy import create_engine, inspect, text
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.database import SessionLocal
|
||||
@ -15,14 +16,68 @@ from app.models.user import User, UserRole
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def run_migrations():
|
||||
"""Run Alembic migrations to create or upgrade database schema."""
|
||||
try:
|
||||
logger.info("Running database migrations...")
|
||||
result = subprocess.run(
|
||||
["alembic", "upgrade", "head"],
|
||||
check=True,
|
||||
capture_output=True,
|
||||
text=True
|
||||
)
|
||||
|
||||
if result.stdout:
|
||||
logger.info(f"Migration output: {result.stdout}")
|
||||
|
||||
if result.stderr:
|
||||
if "table users already exists" in result.stderr.lower():
|
||||
logger.info("Tables already exist. This is normal for existing databases.")
|
||||
return True
|
||||
else:
|
||||
logger.warning(f"Migration warnings: {result.stderr}")
|
||||
|
||||
logger.info("Database migration completed successfully")
|
||||
return True
|
||||
except subprocess.CalledProcessError as e:
|
||||
logger.error("Database migration failed")
|
||||
logger.error(f"Exit code: {e.returncode}")
|
||||
logger.error(f"stdout: {e.stdout}")
|
||||
logger.error(f"stderr: {e.stderr}")
|
||||
|
||||
# Don't fail if tables already exist
|
||||
if e.stderr and "table users already exists" in e.stderr.lower():
|
||||
logger.info("Tables already exist. Continuing with initialization...")
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def check_db_tables():
|
||||
"""Check if database tables already exist."""
|
||||
try:
|
||||
engine = create_engine(settings.SQLALCHEMY_DATABASE_URL)
|
||||
inspector = inspect(engine)
|
||||
# Get list of all tables
|
||||
tables = inspector.get_table_names()
|
||||
return len(tables) > 0, tables
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking database tables: {e}")
|
||||
return False, []
|
||||
|
||||
def init_db():
|
||||
"""Initialize the database with required tables and initial admin user."""
|
||||
try:
|
||||
# Create database directory if it doesn't exist
|
||||
settings.DB_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Create database engine
|
||||
engine = create_engine(settings.SQLALCHEMY_DATABASE_URL)
|
||||
|
||||
# Check if tables already exist
|
||||
tables_exist, existing_tables = check_db_tables()
|
||||
logger.info(f"Found {len(existing_tables)} existing tables: {', '.join(existing_tables) if existing_tables else 'none'}")
|
||||
|
||||
# Run migrations regardless, our updated script is idempotent and will handle existing tables
|
||||
if not run_migrations():
|
||||
logger.error("Failed to create database schema.")
|
||||
return False
|
||||
|
||||
# Create an admin user
|
||||
db = SessionLocal()
|
||||
@ -49,6 +104,7 @@ def init_db():
|
||||
|
||||
db.close()
|
||||
logger.info("Database initialization completed successfully")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error initializing database: {e}")
|
||||
|
@ -10,7 +10,9 @@ from collections.abc import Sequence
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
from sqlalchemy import inspect, text
|
||||
from sqlalchemy.dialects import sqlite
|
||||
from sqlalchemy import reflection
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = '001'
|
||||
@ -19,7 +21,47 @@ branch_labels: str | Sequence[str] | None = None
|
||||
depends_on: str | Sequence[str] | None = None
|
||||
|
||||
|
||||
def table_exists(table_name):
|
||||
"""Check if a table already exists in the database."""
|
||||
bind = op.get_bind()
|
||||
inspector = reflection.Inspector.from_engine(bind)
|
||||
tables = inspector.get_table_names()
|
||||
return table_name in tables
|
||||
|
||||
def index_exists(table_name, index_name):
|
||||
"""Check if an index already exists on a table."""
|
||||
bind = op.get_bind()
|
||||
inspector = reflection.Inspector.from_engine(bind)
|
||||
if not table_exists(table_name):
|
||||
return False
|
||||
|
||||
indexes = inspector.get_indexes(table_name)
|
||||
return any(idx['name'] == index_name for idx in indexes)
|
||||
|
||||
def create_index_safely(index_name, table_name, columns, unique=False):
|
||||
"""Create an index if it doesn't already exist."""
|
||||
if not index_exists(table_name, index_name):
|
||||
try:
|
||||
op.create_index(index_name, table_name, columns, unique=unique)
|
||||
except Exception as e:
|
||||
# If the index creation fails but not because it already exists, re-raise
|
||||
if 'already exists' not in str(e):
|
||||
raise
|
||||
|
||||
def upgrade() -> None:
|
||||
# Setup alembic_version table if it doesn't exist
|
||||
bind = op.get_bind()
|
||||
inspector = reflection.Inspector.from_engine(bind)
|
||||
tables = inspector.get_table_names()
|
||||
|
||||
if 'alembic_version' not in tables:
|
||||
op.create_table(
|
||||
'alembic_version',
|
||||
sa.Column('version_num', sa.String(32), nullable=False),
|
||||
sa.PrimaryKeyConstraint('version_num')
|
||||
)
|
||||
bind.execute(text("INSERT INTO alembic_version (version_num) VALUES ('001')"))
|
||||
|
||||
# Create enum types for SQLite
|
||||
user_role_type = sa.Enum('customer', 'seller', 'admin', name='userroletype')
|
||||
product_status_type = sa.Enum('draft', 'published', 'out_of_stock', 'discontinued', name='productstatustype')
|
||||
@ -29,8 +71,9 @@ def upgrade() -> None:
|
||||
payment_method_type = sa.Enum('credit_card', 'paypal', 'bank_transfer', 'cash_on_delivery', 'stripe', 'apple_pay', 'google_pay', name='paymentmethodtype')
|
||||
|
||||
# Users table
|
||||
op.create_table(
|
||||
'users',
|
||||
if not table_exists('users'):
|
||||
op.create_table(
|
||||
'users',
|
||||
sa.Column('id', sa.String(36), primary_key=True),
|
||||
sa.Column('email', sa.String(255), nullable=False, unique=True, index=True),
|
||||
sa.Column('hashed_password', sa.String(255), nullable=False),
|
||||
@ -53,11 +96,12 @@ def upgrade() -> None:
|
||||
sa.Column('reset_password_token', sa.String(255), nullable=True),
|
||||
sa.Column('reset_token_expires_at', sa.DateTime(timezone=True), nullable=True),
|
||||
sa.Column('bio', sa.Text(), nullable=True),
|
||||
)
|
||||
)
|
||||
|
||||
# Categories table
|
||||
op.create_table(
|
||||
'categories',
|
||||
if not table_exists('categories'):
|
||||
op.create_table(
|
||||
'categories',
|
||||
sa.Column('id', sa.String(36), primary_key=True),
|
||||
sa.Column('name', sa.String(100), nullable=False, index=True),
|
||||
sa.Column('slug', sa.String(120), nullable=False, unique=True),
|
||||
@ -68,21 +112,23 @@ def upgrade() -> None:
|
||||
sa.Column('display_order', sa.Integer(), default=0),
|
||||
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
|
||||
sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()),
|
||||
)
|
||||
)
|
||||
|
||||
# Tags table
|
||||
op.create_table(
|
||||
'tags',
|
||||
if not table_exists('tags'):
|
||||
op.create_table(
|
||||
'tags',
|
||||
sa.Column('id', sa.String(36), primary_key=True),
|
||||
sa.Column('name', sa.String(50), nullable=False, unique=True, index=True),
|
||||
sa.Column('slug', sa.String(60), nullable=False, unique=True),
|
||||
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
|
||||
sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()),
|
||||
)
|
||||
)
|
||||
|
||||
# Products table
|
||||
op.create_table(
|
||||
'products',
|
||||
if not table_exists('products'):
|
||||
op.create_table(
|
||||
'products',
|
||||
sa.Column('id', sa.String(36), primary_key=True),
|
||||
sa.Column('name', sa.String(255), nullable=False, index=True),
|
||||
sa.Column('description', sa.Text(), nullable=True),
|
||||
@ -105,11 +151,12 @@ def upgrade() -> None:
|
||||
sa.Column('seller_id', sa.String(36), sa.ForeignKey('users.id'), nullable=True),
|
||||
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
|
||||
sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()),
|
||||
)
|
||||
)
|
||||
|
||||
# Product Images table
|
||||
op.create_table(
|
||||
'product_images',
|
||||
if not table_exists('product_images'):
|
||||
op.create_table(
|
||||
'product_images',
|
||||
sa.Column('id', sa.String(36), primary_key=True),
|
||||
sa.Column('product_id', sa.String(36), sa.ForeignKey('products.id', ondelete='CASCADE'), nullable=False),
|
||||
sa.Column('image_url', sa.String(512), nullable=False),
|
||||
@ -117,19 +164,21 @@ def upgrade() -> None:
|
||||
sa.Column('is_primary', sa.Boolean(), default=False),
|
||||
sa.Column('display_order', sa.Integer(), default=0),
|
||||
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
|
||||
)
|
||||
)
|
||||
|
||||
# Product-Tag association table
|
||||
op.create_table(
|
||||
'product_tags',
|
||||
if not table_exists('product_tags'):
|
||||
op.create_table(
|
||||
'product_tags',
|
||||
sa.Column('product_id', sa.String(36), sa.ForeignKey('products.id', ondelete='CASCADE'), primary_key=True),
|
||||
sa.Column('tag_id', sa.String(36), sa.ForeignKey('tags.id', ondelete='CASCADE'), primary_key=True),
|
||||
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
|
||||
)
|
||||
)
|
||||
|
||||
# Cart Items table
|
||||
op.create_table(
|
||||
'cart_items',
|
||||
if not table_exists('cart_items'):
|
||||
op.create_table(
|
||||
'cart_items',
|
||||
sa.Column('id', sa.String(36), primary_key=True),
|
||||
sa.Column('user_id', sa.String(36), sa.ForeignKey('users.id', ondelete='CASCADE'), nullable=False),
|
||||
sa.Column('product_id', sa.String(36), sa.ForeignKey('products.id', ondelete='CASCADE'), nullable=False),
|
||||
@ -138,11 +187,12 @@ def upgrade() -> None:
|
||||
sa.Column('custom_properties', sa.Text(), nullable=True),
|
||||
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
|
||||
sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()),
|
||||
)
|
||||
)
|
||||
|
||||
# Orders table
|
||||
op.create_table(
|
||||
'orders',
|
||||
if not table_exists('orders'):
|
||||
op.create_table(
|
||||
'orders',
|
||||
sa.Column('id', sa.String(36), primary_key=True),
|
||||
sa.Column('user_id', sa.String(36), sa.ForeignKey('users.id'), nullable=False),
|
||||
sa.Column('order_number', sa.String(50), nullable=False, unique=True, index=True),
|
||||
@ -159,11 +209,12 @@ def upgrade() -> None:
|
||||
sa.Column('billing_address', sqlite.JSON(), nullable=True),
|
||||
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
|
||||
sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()),
|
||||
)
|
||||
)
|
||||
|
||||
# Order Items table
|
||||
op.create_table(
|
||||
'order_items',
|
||||
if not table_exists('order_items'):
|
||||
op.create_table(
|
||||
'order_items',
|
||||
sa.Column('id', sa.String(36), primary_key=True),
|
||||
sa.Column('order_id', sa.String(36), sa.ForeignKey('orders.id', ondelete='CASCADE'), nullable=False),
|
||||
sa.Column('product_id', sa.String(36), sa.ForeignKey('products.id'), nullable=False),
|
||||
@ -176,11 +227,12 @@ def upgrade() -> None:
|
||||
sa.Column('product_sku', sa.String(100), nullable=True),
|
||||
sa.Column('product_options', sqlite.JSON(), nullable=True),
|
||||
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
|
||||
)
|
||||
)
|
||||
|
||||
# Payments table
|
||||
op.create_table(
|
||||
'payments',
|
||||
if not table_exists('payments'):
|
||||
op.create_table(
|
||||
'payments',
|
||||
sa.Column('id', sa.String(36), primary_key=True),
|
||||
sa.Column('order_id', sa.String(36), sa.ForeignKey('orders.id'), nullable=False),
|
||||
sa.Column('amount', sa.Float(), nullable=False),
|
||||
@ -191,11 +243,12 @@ def upgrade() -> None:
|
||||
sa.Column('error_message', sa.String(512), nullable=True),
|
||||
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
|
||||
sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()),
|
||||
)
|
||||
)
|
||||
|
||||
# Reviews table
|
||||
op.create_table(
|
||||
'reviews',
|
||||
if not table_exists('reviews'):
|
||||
op.create_table(
|
||||
'reviews',
|
||||
sa.Column('id', sa.String(36), primary_key=True),
|
||||
sa.Column('product_id', sa.String(36), sa.ForeignKey('products.id', ondelete='CASCADE'), nullable=False),
|
||||
sa.Column('user_id', sa.String(36), sa.ForeignKey('users.id'), nullable=False),
|
||||
@ -208,24 +261,22 @@ def upgrade() -> None:
|
||||
sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()),
|
||||
)
|
||||
|
||||
# Create indexes
|
||||
op.create_index('ix_users_email', 'users', ['email'], unique=True)
|
||||
op.create_index('ix_categories_name', 'categories', ['name'])
|
||||
op.create_index('ix_products_name', 'products', ['name'])
|
||||
op.create_index('ix_tags_name', 'tags', ['name'], unique=True)
|
||||
op.create_index('ix_orders_order_number', 'orders', ['order_number'], unique=True)
|
||||
# Create indexes safely (only if they don't already exist)
|
||||
create_index_safely('ix_users_email', 'users', ['email'], unique=True)
|
||||
create_index_safely('ix_categories_name', 'categories', ['name'])
|
||||
create_index_safely('ix_products_name', 'products', ['name'])
|
||||
create_index_safely('ix_tags_name', 'tags', ['name'], unique=True)
|
||||
create_index_safely('ix_orders_order_number', 'orders', ['order_number'], unique=True)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
# Drop tables in reverse order of creation
|
||||
op.drop_table('reviews')
|
||||
op.drop_table('payments')
|
||||
op.drop_table('order_items')
|
||||
op.drop_table('orders')
|
||||
op.drop_table('cart_items')
|
||||
op.drop_table('product_tags')
|
||||
op.drop_table('product_images')
|
||||
op.drop_table('products')
|
||||
op.drop_table('tags')
|
||||
op.drop_table('categories')
|
||||
op.drop_table('users')
|
||||
# Use try/except to handle cases where tables don't exist
|
||||
for table in ['reviews', 'payments', 'order_items', 'orders', 'cart_items',
|
||||
'product_tags', 'product_images', 'products', 'tags', 'categories', 'users']:
|
||||
try:
|
||||
if table_exists(table):
|
||||
op.drop_table(table)
|
||||
except Exception as e:
|
||||
# Log the error but continue with other tables
|
||||
print(f"Error dropping table {table}: {e}")
|
||||
|
259
scripts/test_migration.py
Executable file
259
scripts/test_migration.py
Executable file
@ -0,0 +1,259 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Script to test if the migration works correctly.
|
||||
|
||||
This script checks if tables exist in the database and if the migration completes successfully.
|
||||
It handles common migration issues and provides detailed diagnostics.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import sqlite3
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Add parent directory to path so we can import app modules
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
from app.core.config import settings
|
||||
from sqlalchemy import create_engine, inspect, text
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def check_database_exists():
|
||||
"""Check if the database file exists."""
|
||||
db_path = settings.DB_DIR / "db.sqlite"
|
||||
exists = db_path.exists()
|
||||
logger.info(f"Database file {'exists' if exists else 'does not exist'} at {db_path}")
|
||||
return exists, db_path
|
||||
|
||||
def check_database_permissions(db_path):
|
||||
"""Check permissions on the database file."""
|
||||
if not db_path.exists():
|
||||
return False
|
||||
|
||||
try:
|
||||
# Check read permission
|
||||
with open(db_path, 'rb') as f:
|
||||
f.read(1)
|
||||
|
||||
# Check write permission by opening in read+write mode
|
||||
with open(db_path, 'r+b') as f:
|
||||
pass
|
||||
|
||||
logger.info("Database file is readable and writable")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Database permission error: {e}")
|
||||
return False
|
||||
|
||||
def check_alembic_version_table():
|
||||
"""Check if alembic_version table exists and what version it contains."""
|
||||
try:
|
||||
engine = create_engine(settings.SQLALCHEMY_DATABASE_URL)
|
||||
inspector = inspect(engine)
|
||||
tables = inspector.get_table_names()
|
||||
|
||||
if 'alembic_version' not in tables:
|
||||
logger.warning("alembic_version table not found in database")
|
||||
return False, None
|
||||
|
||||
with engine.connect() as connection:
|
||||
result = connection.execute(text("SELECT version_num FROM alembic_version"))
|
||||
version = result.scalar()
|
||||
logger.info(f"Current alembic version: {version}")
|
||||
return True, version
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking alembic version: {e}")
|
||||
return False, None
|
||||
|
||||
def check_tables_exist():
|
||||
"""Check which tables exist in the database."""
|
||||
db_path = settings.DB_DIR / "db.sqlite"
|
||||
|
||||
if not db_path.exists():
|
||||
logger.warning("Database file does not exist")
|
||||
return []
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Get a list of all tables
|
||||
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
|
||||
tables = [row[0] for row in cursor.fetchall()]
|
||||
|
||||
# Check for SQLite internal tables
|
||||
sqlite_internal = [t for t in tables if t.startswith('sqlite_')]
|
||||
if sqlite_internal:
|
||||
logger.info(f"SQLite internal tables: {', '.join(sqlite_internal)}")
|
||||
# Remove internal tables from the list
|
||||
tables = [t for t in tables if not t.startswith('sqlite_')]
|
||||
|
||||
conn.close()
|
||||
|
||||
logger.info(f"Existing tables: {', '.join(tables)}")
|
||||
return tables
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking tables: {e}")
|
||||
return []
|
||||
|
||||
def backup_database():
|
||||
"""Create a backup of the database file if it exists."""
|
||||
db_path = settings.DB_DIR / "db.sqlite"
|
||||
if not db_path.exists():
|
||||
logger.info("No database file to backup")
|
||||
return
|
||||
|
||||
backup_path = settings.DB_DIR / "db.sqlite.backup"
|
||||
try:
|
||||
shutil.copy2(db_path, backup_path)
|
||||
logger.info(f"Created database backup at {backup_path}")
|
||||
return backup_path
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create database backup: {e}")
|
||||
return None
|
||||
|
||||
def run_migration():
|
||||
"""Run the Alembic migration and return True if successful."""
|
||||
try:
|
||||
logger.info("Running Alembic migration...")
|
||||
result = subprocess.run(
|
||||
["alembic", "upgrade", "head"],
|
||||
check=True,
|
||||
capture_output=True,
|
||||
text=True
|
||||
)
|
||||
|
||||
logger.info("Migration output:")
|
||||
logger.info(result.stdout)
|
||||
|
||||
if result.stderr:
|
||||
if "table users already exists" in result.stderr.lower():
|
||||
logger.info("Tables already exist - this is expected with our idempotent migration")
|
||||
return True
|
||||
else:
|
||||
logger.warning("Migration stderr:")
|
||||
logger.warning(result.stderr)
|
||||
|
||||
logger.info("Migration completed successfully")
|
||||
return True
|
||||
except subprocess.CalledProcessError as e:
|
||||
logger.error("Migration failed")
|
||||
logger.error(f"Exit code: {e.returncode}")
|
||||
logger.error(f"stdout: {e.stdout}")
|
||||
logger.error(f"stderr: {e.stderr}")
|
||||
|
||||
# Don't fail if tables already exist, as our migration should now handle this
|
||||
if e.stderr and "table users already exists" in e.stderr.lower():
|
||||
logger.info("Tables already exist - treating as success due to idempotent design")
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def check_alembic_ini():
|
||||
"""Check alembic.ini configuration."""
|
||||
alembic_ini_path = Path(__file__).resolve().parent.parent / "alembic.ini"
|
||||
if not alembic_ini_path.exists():
|
||||
logger.error("alembic.ini file not found")
|
||||
return False
|
||||
|
||||
try:
|
||||
with open(alembic_ini_path, 'r') as f:
|
||||
content = f.read()
|
||||
|
||||
# Check SQLite URL
|
||||
import re
|
||||
url_match = re.search(r'sqlalchemy\.url\s*=\s*(.+)', content)
|
||||
if url_match:
|
||||
db_url = url_match.group(1).strip()
|
||||
logger.info(f"Alembic SQLite URL: {db_url}")
|
||||
|
||||
# Check if absolute path is used
|
||||
if 'sqlite:///' in db_url and not '/app/storage/db/db.sqlite' in db_url:
|
||||
logger.warning("Alembic SQLite URL might not be using correct absolute path")
|
||||
return False
|
||||
else:
|
||||
logger.warning("Could not find sqlalchemy.url in alembic.ini")
|
||||
return False
|
||||
|
||||
# Check if render_as_batch is enabled
|
||||
if 'render_as_batch=True' in content:
|
||||
logger.info("render_as_batch is enabled in alembic.ini")
|
||||
else:
|
||||
logger.warning("render_as_batch might not be enabled in alembic.ini")
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking alembic.ini: {e}")
|
||||
return False
|
||||
|
||||
def main():
|
||||
"""Test the database migration process."""
|
||||
logger.info("Testing database migration...")
|
||||
|
||||
# Ensure storage directory exists
|
||||
settings.DB_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Check alembic.ini configuration
|
||||
alembic_ini_ok = check_alembic_ini()
|
||||
logger.info(f"Alembic configuration check: {'PASSED' if alembic_ini_ok else 'FAILED'}")
|
||||
|
||||
# Check if database exists
|
||||
db_exists, db_path = check_database_exists()
|
||||
|
||||
# Check permissions
|
||||
if db_exists:
|
||||
perms_ok = check_database_permissions(db_path)
|
||||
logger.info(f"Database permissions check: {'PASSED' if perms_ok else 'FAILED'}")
|
||||
|
||||
# Check which tables exist
|
||||
existing_tables = check_tables_exist()
|
||||
|
||||
# Check alembic version table
|
||||
has_alembic_table, current_version = check_alembic_version_table()
|
||||
|
||||
# Create a backup of the database
|
||||
if db_exists:
|
||||
backup_path = backup_database()
|
||||
|
||||
# Run migration
|
||||
migration_success = run_migration()
|
||||
|
||||
# Check tables after migration
|
||||
tables_after = check_tables_exist()
|
||||
|
||||
# Check alembic version after migration
|
||||
has_alembic_table_after, version_after = check_alembic_version_table()
|
||||
|
||||
# Report results
|
||||
logger.info("\n=== Migration test results ===")
|
||||
logger.info(f"- Alembic configuration: {'✅ OK' if alembic_ini_ok else '❌ Issue detected'}")
|
||||
logger.info(f"- Database existed before test: {db_exists}")
|
||||
if db_exists:
|
||||
logger.info(f"- Database permissions: {'✅ OK' if perms_ok else '❌ Issue detected'}")
|
||||
logger.info(f"- Tables before migration: {len(existing_tables)}")
|
||||
logger.info(f"- Migration successful: {'✅' if migration_success else '❌'}")
|
||||
logger.info(f"- Tables after migration: {len(tables_after)}")
|
||||
logger.info(f"- Alembic version table before: {'✅ Present' if has_alembic_table else '❌ Missing'}")
|
||||
if has_alembic_table:
|
||||
logger.info(f"- Alembic version before: {current_version}")
|
||||
logger.info(f"- Alembic version table after: {'✅ Present' if has_alembic_table_after else '❌ Missing'}")
|
||||
if has_alembic_table_after:
|
||||
logger.info(f"- Alembic version after: {version_after}")
|
||||
|
||||
new_tables = set(tables_after) - set(existing_tables)
|
||||
if new_tables:
|
||||
logger.info(f"- New tables created: {', '.join(new_tables)}")
|
||||
|
||||
if migration_success:
|
||||
logger.info("\n✅ Migration test PASSED")
|
||||
return 0
|
||||
logger.error("\n❌ Migration test FAILED")
|
||||
return 1
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
Loading…
x
Reference in New Issue
Block a user