
- Update SQLite foreign key constraint implementation to use batch operations - Add helper function to detect existing constraints - Use batch_alter_table context manager for SQLite compatibility - Improve error handling in downgrade operations for SQLite compatibility - Add better error messages for debugging
153 lines
5.6 KiB
Python
153 lines
5.6 KiB
Python
"""User authentication and Todo-User relationship
|
|
|
|
Revision ID: 54ab38f52c12
|
|
Depends on: 1a39b3495612
|
|
Create Date: 2023-10-28
|
|
|
|
"""
|
|
from alembic import op
|
|
import sqlalchemy as sa
|
|
from sqlalchemy.engine.reflection import Inspector
|
|
from sqlalchemy.exc import OperationalError
|
|
|
|
|
|
# revision identifiers, used by Alembic.
|
|
revision = '54ab38f52c12'
|
|
down_revision = '1a39b3495612'
|
|
branch_labels = None
|
|
depends_on = None
|
|
|
|
|
|
def table_exists(table_name):
|
|
"""Check if table exists in the database"""
|
|
conn = op.get_bind()
|
|
inspector = Inspector.from_engine(conn)
|
|
return table_name in inspector.get_table_names()
|
|
|
|
|
|
def column_exists(table_name, column_name):
|
|
"""Check if column exists in the table"""
|
|
conn = op.get_bind()
|
|
inspector = Inspector.from_engine(conn)
|
|
if table_name not in inspector.get_table_names():
|
|
return False
|
|
columns = [col['name'] for col in inspector.get_columns(table_name)]
|
|
return column_name in columns
|
|
|
|
|
|
def index_exists(table_name, index_name):
|
|
"""Check if index exists in the table"""
|
|
conn = op.get_bind()
|
|
inspector = Inspector.from_engine(conn)
|
|
if table_name not in inspector.get_table_names():
|
|
return False
|
|
indexes = [idx['name'] for idx in inspector.get_indexes(table_name)]
|
|
return index_name in indexes
|
|
|
|
|
|
def get_constraint_names(table_name):
|
|
"""Get all constraint names for a table"""
|
|
conn = op.get_bind()
|
|
inspector = Inspector.from_engine(conn)
|
|
if not table_exists(table_name):
|
|
return []
|
|
return [fk.get('name', '') for fk in inspector.get_foreign_keys(table_name)]
|
|
|
|
|
|
def upgrade():
|
|
# Create users table if it doesn't exist
|
|
if not table_exists('users'):
|
|
op.create_table(
|
|
'users',
|
|
sa.Column('id', sa.Integer(), nullable=False),
|
|
sa.Column('email', sa.String(), nullable=False),
|
|
sa.Column('username', sa.String(), nullable=False),
|
|
sa.Column('hashed_password', sa.String(), nullable=False),
|
|
sa.Column('is_active', sa.Boolean(), nullable=True, default=True),
|
|
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)')),
|
|
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
|
|
sa.PrimaryKeyConstraint('id')
|
|
)
|
|
|
|
# Create indexes on users table if they don't exist
|
|
try:
|
|
if not index_exists('users', 'ix_users_email'):
|
|
op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True)
|
|
except (OperationalError, sa.exc.InternalError):
|
|
pass
|
|
|
|
try:
|
|
if not index_exists('users', 'ix_users_id'):
|
|
op.create_index(op.f('ix_users_id'), 'users', ['id'], unique=False)
|
|
except (OperationalError, sa.exc.InternalError):
|
|
pass
|
|
|
|
try:
|
|
if not index_exists('users', 'ix_users_username'):
|
|
op.create_index(op.f('ix_users_username'), 'users', ['username'], unique=True)
|
|
except (OperationalError, sa.exc.InternalError):
|
|
pass
|
|
|
|
# Add owner_id column to todos table if it doesn't exist
|
|
if not column_exists('todos', 'owner_id'):
|
|
op.add_column('todos', sa.Column('owner_id', sa.Integer(), nullable=True))
|
|
|
|
# For SQLite, we need to use batch operations to add a foreign key constraint
|
|
# SQLite doesn't support ALTER TABLE ADD CONSTRAINT, so we need to
|
|
# use SQLAlchemy's batch operations which recreate the table
|
|
existing_constraints = get_constraint_names('todos')
|
|
if 'fk_todos_owner_id_users' not in existing_constraints:
|
|
try:
|
|
# Use batch operations to recreate the table with the foreign key
|
|
with op.batch_alter_table('todos') as batch_op:
|
|
batch_op.create_foreign_key(
|
|
'fk_todos_owner_id_users',
|
|
'users',
|
|
['owner_id'],
|
|
['id']
|
|
)
|
|
except (OperationalError, sa.exc.InternalError) as e:
|
|
# Log error but allow migration to continue
|
|
print(f"Warning: Could not create foreign key constraint: {str(e)}")
|
|
|
|
|
|
def downgrade():
|
|
# Try to drop foreign key constraint using batch operations for SQLite
|
|
existing_constraints = get_constraint_names('todos')
|
|
if 'fk_todos_owner_id_users' in existing_constraints:
|
|
try:
|
|
with op.batch_alter_table('todos') as batch_op:
|
|
batch_op.drop_constraint('fk_todos_owner_id_users', type_='foreignkey')
|
|
except (OperationalError, sa.exc.InternalError) as e:
|
|
print(f"Warning: Could not drop foreign key constraint: {str(e)}")
|
|
|
|
# Try to drop owner_id column from todos table
|
|
if column_exists('todos', 'owner_id'):
|
|
try:
|
|
with op.batch_alter_table('todos') as batch_op:
|
|
batch_op.drop_column('owner_id')
|
|
except (OperationalError, sa.exc.InternalError) as e:
|
|
print(f"Warning: Could not drop column: {str(e)}")
|
|
|
|
# Try to drop indexes on users table
|
|
try:
|
|
if index_exists('users', 'ix_users_username'):
|
|
op.drop_index(op.f('ix_users_username'), table_name='users')
|
|
except (OperationalError, sa.exc.InternalError):
|
|
pass
|
|
|
|
try:
|
|
if index_exists('users', 'ix_users_id'):
|
|
op.drop_index(op.f('ix_users_id'), table_name='users')
|
|
except (OperationalError, sa.exc.InternalError):
|
|
pass
|
|
|
|
try:
|
|
if index_exists('users', 'ix_users_email'):
|
|
op.drop_index(op.f('ix_users_email'), table_name='users')
|
|
except (OperationalError, sa.exc.InternalError):
|
|
pass
|
|
|
|
# Try to drop users table
|
|
if table_exists('users'):
|
|
op.drop_table('users') |