
Make migrations idempotent by adding table existence checks before creation. Properly handle the alembic_version table to ensure migration tracking. Update database initialization utility to better handle errors and migration states. Enhance test script with detailed diagnostics for troubleshooting.
117 lines
4.1 KiB
Python
117 lines
4.1 KiB
Python
import logging
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Add the project root to the Python path
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent))
|
|
|
|
from sqlalchemy import create_engine, inspect, text
|
|
|
|
from app.core.config import settings
|
|
from app.core.database import SessionLocal
|
|
from app.core.security import get_password_hash
|
|
from app.models.user import User, UserRole
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def run_migrations():
|
|
"""Run Alembic migrations to create or upgrade database schema."""
|
|
try:
|
|
logger.info("Running database migrations...")
|
|
result = subprocess.run(
|
|
["alembic", "upgrade", "head"],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if result.stdout:
|
|
logger.info(f"Migration output: {result.stdout}")
|
|
|
|
if result.stderr:
|
|
if "table users already exists" in result.stderr.lower():
|
|
logger.info("Tables already exist. This is normal for existing databases.")
|
|
return True
|
|
else:
|
|
logger.warning(f"Migration warnings: {result.stderr}")
|
|
|
|
logger.info("Database migration completed successfully")
|
|
return True
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error("Database migration failed")
|
|
logger.error(f"Exit code: {e.returncode}")
|
|
logger.error(f"stdout: {e.stdout}")
|
|
logger.error(f"stderr: {e.stderr}")
|
|
|
|
# Don't fail if tables already exist
|
|
if e.stderr and "table users already exists" in e.stderr.lower():
|
|
logger.info("Tables already exist. Continuing with initialization...")
|
|
return True
|
|
|
|
return False
|
|
|
|
def check_db_tables():
|
|
"""Check if database tables already exist."""
|
|
try:
|
|
engine = create_engine(settings.SQLALCHEMY_DATABASE_URL)
|
|
inspector = inspect(engine)
|
|
# Get list of all tables
|
|
tables = inspector.get_table_names()
|
|
return len(tables) > 0, tables
|
|
except Exception as e:
|
|
logger.error(f"Error checking database tables: {e}")
|
|
return False, []
|
|
|
|
def init_db():
|
|
"""Initialize the database with required tables and initial admin user."""
|
|
try:
|
|
# Create database directory if it doesn't exist
|
|
settings.DB_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Check if tables already exist
|
|
tables_exist, existing_tables = check_db_tables()
|
|
logger.info(f"Found {len(existing_tables)} existing tables: {', '.join(existing_tables) if existing_tables else 'none'}")
|
|
|
|
# Run migrations regardless, our updated script is idempotent and will handle existing tables
|
|
if not run_migrations():
|
|
logger.error("Failed to create database schema.")
|
|
return False
|
|
|
|
# Create an admin user
|
|
db = SessionLocal()
|
|
|
|
# Check if admin user already exists
|
|
existing_admin = db.query(User).filter(User.email == settings.FIRST_SUPERUSER_EMAIL).first()
|
|
|
|
if not existing_admin:
|
|
logger.info("Creating initial admin user...")
|
|
admin_user = User(
|
|
email=settings.FIRST_SUPERUSER_EMAIL,
|
|
hashed_password=get_password_hash(settings.FIRST_SUPERUSER_PASSWORD),
|
|
is_active=True,
|
|
role=UserRole.ADMIN,
|
|
first_name="Admin",
|
|
last_name="User",
|
|
email_verified=True
|
|
)
|
|
db.add(admin_user)
|
|
db.commit()
|
|
logger.info(f"Admin user created with email: {settings.FIRST_SUPERUSER_EMAIL}")
|
|
else:
|
|
logger.info("Admin user already exists")
|
|
|
|
db.close()
|
|
logger.info("Database initialization completed successfully")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error initializing database: {e}")
|
|
raise
|
|
|
|
if __name__ == "__main__":
|
|
logger.info("Creating initial database tables and admin user")
|
|
init_db()
|
|
logger.info("Initial data created")
|