""" Initial database setup Revision ID: 001 Revises: Create Date: 2023-07-25 00:00:00.000000 """ from collections.abc import Sequence import sqlalchemy as sa from alembic import op from sqlalchemy import inspect, text from sqlalchemy.dialects import sqlite from sqlalchemy import reflection # revision identifiers, used by Alembic. revision: str = '001' down_revision: str | None = None branch_labels: str | Sequence[str] | None = None depends_on: str | Sequence[str] | None = None def table_exists(table_name): """Check if a table already exists in the database.""" bind = op.get_bind() inspector = reflection.Inspector.from_engine(bind) tables = inspector.get_table_names() return table_name in tables def index_exists(table_name, index_name): """Check if an index already exists on a table.""" bind = op.get_bind() inspector = reflection.Inspector.from_engine(bind) if not table_exists(table_name): return False indexes = inspector.get_indexes(table_name) return any(idx['name'] == index_name for idx in indexes) def create_index_safely(index_name, table_name, columns, unique=False): """Create an index if it doesn't already exist.""" if not index_exists(table_name, index_name): try: op.create_index(index_name, table_name, columns, unique=unique) except Exception as e: # If the index creation fails but not because it already exists, re-raise if 'already exists' not in str(e): raise def upgrade() -> None: # Setup alembic_version table if it doesn't exist bind = op.get_bind() inspector = reflection.Inspector.from_engine(bind) tables = inspector.get_table_names() if 'alembic_version' not in tables: op.create_table( 'alembic_version', sa.Column('version_num', sa.String(32), nullable=False), sa.PrimaryKeyConstraint('version_num') ) bind.execute(text("INSERT INTO alembic_version (version_num) VALUES ('001')")) # Create enum types for SQLite user_role_type = sa.Enum('customer', 'seller', 'admin', name='userroletype') product_status_type = sa.Enum('draft', 'published', 'out_of_stock', 'discontinued', name='productstatustype') order_status_type = sa.Enum('pending', 'processing', 'shipped', 'delivered', 'cancelled', 'refunded', name='orderstatustype') shipping_method_type = sa.Enum('standard', 'express', 'overnight', 'pickup', 'digital', name='shippingmethodtype') payment_status_type = sa.Enum('pending', 'processing', 'completed', 'failed', 'refunded', name='paymentstatustype') payment_method_type = sa.Enum('credit_card', 'paypal', 'bank_transfer', 'cash_on_delivery', 'stripe', 'apple_pay', 'google_pay', name='paymentmethodtype') # Users table if not table_exists('users'): op.create_table( 'users', sa.Column('id', sa.String(36), primary_key=True), sa.Column('email', sa.String(255), nullable=False, unique=True, index=True), sa.Column('hashed_password', sa.String(255), nullable=False), sa.Column('first_name', sa.String(100), nullable=True), sa.Column('last_name', sa.String(100), nullable=True), sa.Column('is_active', sa.Boolean(), default=True), sa.Column('role', user_role_type, default='customer'), sa.Column('phone_number', sa.String(20), nullable=True), sa.Column('profile_image', sa.String(255), nullable=True), sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()), sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()), sa.Column('address_line1', sa.String(255), nullable=True), sa.Column('address_line2', sa.String(255), nullable=True), sa.Column('city', sa.String(100), nullable=True), sa.Column('state', sa.String(100), nullable=True), sa.Column('postal_code', sa.String(20), nullable=True), sa.Column('country', sa.String(100), nullable=True), sa.Column('email_verified', sa.Boolean(), default=False), sa.Column('verification_token', sa.String(255), nullable=True), sa.Column('reset_password_token', sa.String(255), nullable=True), sa.Column('reset_token_expires_at', sa.DateTime(timezone=True), nullable=True), sa.Column('bio', sa.Text(), nullable=True), ) # Categories table if not table_exists('categories'): op.create_table( 'categories', sa.Column('id', sa.String(36), primary_key=True), sa.Column('name', sa.String(100), nullable=False, index=True), sa.Column('slug', sa.String(120), nullable=False, unique=True), sa.Column('description', sa.Text(), nullable=True), sa.Column('image', sa.String(255), nullable=True), sa.Column('parent_id', sa.String(36), sa.ForeignKey('categories.id'), nullable=True), sa.Column('is_active', sa.Boolean(), default=True), sa.Column('display_order', sa.Integer(), default=0), sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()), sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()), ) # Tags table if not table_exists('tags'): op.create_table( 'tags', sa.Column('id', sa.String(36), primary_key=True), sa.Column('name', sa.String(50), nullable=False, unique=True, index=True), sa.Column('slug', sa.String(60), nullable=False, unique=True), sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()), sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()), ) # Products table if not table_exists('products'): op.create_table( 'products', sa.Column('id', sa.String(36), primary_key=True), sa.Column('name', sa.String(255), nullable=False, index=True), sa.Column('description', sa.Text(), nullable=True), sa.Column('price', sa.Float(), nullable=False), sa.Column('sku', sa.String(100), unique=True, nullable=True), sa.Column('barcode', sa.String(100), unique=True, nullable=True), sa.Column('stock_quantity', sa.Integer(), default=0), sa.Column('weight', sa.Float(), nullable=True), sa.Column('dimensions', sa.String(100), nullable=True), sa.Column('status', product_status_type, default='draft'), sa.Column('is_featured', sa.Boolean(), default=False), sa.Column('is_digital', sa.Boolean(), default=False), sa.Column('digital_download_link', sa.String(512), nullable=True), sa.Column('slug', sa.String(255), nullable=False, unique=True), sa.Column('tax_rate', sa.Float(), default=0.0), sa.Column('discount_price', sa.Float(), nullable=True), sa.Column('discount_start_date', sa.DateTime(timezone=True), nullable=True), sa.Column('discount_end_date', sa.DateTime(timezone=True), nullable=True), sa.Column('category_id', sa.String(36), sa.ForeignKey('categories.id'), nullable=True), sa.Column('seller_id', sa.String(36), sa.ForeignKey('users.id'), nullable=True), sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()), sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()), ) # Product Images table if not table_exists('product_images'): op.create_table( 'product_images', sa.Column('id', sa.String(36), primary_key=True), sa.Column('product_id', sa.String(36), sa.ForeignKey('products.id', ondelete='CASCADE'), nullable=False), sa.Column('image_url', sa.String(512), nullable=False), sa.Column('alt_text', sa.String(255), nullable=True), sa.Column('is_primary', sa.Boolean(), default=False), sa.Column('display_order', sa.Integer(), default=0), sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()), ) # Product-Tag association table if not table_exists('product_tags'): op.create_table( 'product_tags', sa.Column('product_id', sa.String(36), sa.ForeignKey('products.id', ondelete='CASCADE'), primary_key=True), sa.Column('tag_id', sa.String(36), sa.ForeignKey('tags.id', ondelete='CASCADE'), primary_key=True), sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()), ) # Cart Items table if not table_exists('cart_items'): op.create_table( 'cart_items', sa.Column('id', sa.String(36), primary_key=True), sa.Column('user_id', sa.String(36), sa.ForeignKey('users.id', ondelete='CASCADE'), nullable=False), sa.Column('product_id', sa.String(36), sa.ForeignKey('products.id', ondelete='CASCADE'), nullable=False), sa.Column('quantity', sa.Integer(), default=1, nullable=False), sa.Column('price_at_addition', sa.Float(), nullable=False), sa.Column('custom_properties', sa.Text(), nullable=True), sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()), sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()), ) # Orders table if not table_exists('orders'): op.create_table( 'orders', sa.Column('id', sa.String(36), primary_key=True), sa.Column('user_id', sa.String(36), sa.ForeignKey('users.id'), nullable=False), sa.Column('order_number', sa.String(50), nullable=False, unique=True, index=True), sa.Column('status', order_status_type, default='pending'), sa.Column('total_amount', sa.Float(), nullable=False), sa.Column('subtotal', sa.Float(), nullable=False), sa.Column('tax_amount', sa.Float(), nullable=False), sa.Column('shipping_amount', sa.Float(), nullable=False), sa.Column('discount_amount', sa.Float(), default=0.0), sa.Column('shipping_method', shipping_method_type, nullable=True), sa.Column('tracking_number', sa.String(100), nullable=True), sa.Column('notes', sa.Text(), nullable=True), sa.Column('shipping_address', sqlite.JSON(), nullable=True), sa.Column('billing_address', sqlite.JSON(), nullable=True), sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()), sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()), ) # Order Items table if not table_exists('order_items'): op.create_table( 'order_items', sa.Column('id', sa.String(36), primary_key=True), sa.Column('order_id', sa.String(36), sa.ForeignKey('orders.id', ondelete='CASCADE'), nullable=False), sa.Column('product_id', sa.String(36), sa.ForeignKey('products.id'), nullable=False), sa.Column('quantity', sa.Integer(), default=1, nullable=False), sa.Column('unit_price', sa.Float(), nullable=False), sa.Column('subtotal', sa.Float(), nullable=False), sa.Column('discount', sa.Float(), default=0.0), sa.Column('tax_amount', sa.Float(), default=0.0), sa.Column('product_name', sa.String(255), nullable=False), sa.Column('product_sku', sa.String(100), nullable=True), sa.Column('product_options', sqlite.JSON(), nullable=True), sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()), ) # Payments table if not table_exists('payments'): op.create_table( 'payments', sa.Column('id', sa.String(36), primary_key=True), sa.Column('order_id', sa.String(36), sa.ForeignKey('orders.id'), nullable=False), sa.Column('amount', sa.Float(), nullable=False), sa.Column('payment_method', payment_method_type, nullable=False), sa.Column('status', payment_status_type, default='pending'), sa.Column('transaction_id', sa.String(255), nullable=True, unique=True), sa.Column('payment_details', sqlite.JSON(), nullable=True), sa.Column('error_message', sa.String(512), nullable=True), sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()), sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()), ) # Reviews table if not table_exists('reviews'): op.create_table( 'reviews', sa.Column('id', sa.String(36), primary_key=True), sa.Column('product_id', sa.String(36), sa.ForeignKey('products.id', ondelete='CASCADE'), nullable=False), sa.Column('user_id', sa.String(36), sa.ForeignKey('users.id'), nullable=False), sa.Column('rating', sa.Integer(), nullable=False), sa.Column('title', sa.String(255), nullable=True), sa.Column('comment', sa.Text(), nullable=True), sa.Column('is_verified_purchase', sa.Boolean(), default=False), sa.Column('is_approved', sa.Boolean(), default=True), sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()), sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()), ) # Create indexes safely (only if they don't already exist) create_index_safely('ix_users_email', 'users', ['email'], unique=True) create_index_safely('ix_categories_name', 'categories', ['name']) create_index_safely('ix_products_name', 'products', ['name']) create_index_safely('ix_tags_name', 'tags', ['name'], unique=True) create_index_safely('ix_orders_order_number', 'orders', ['order_number'], unique=True) def downgrade() -> None: # Drop tables in reverse order of creation # Use try/except to handle cases where tables don't exist for table in ['reviews', 'payments', 'order_items', 'orders', 'cart_items', 'product_tags', 'product_images', 'products', 'tags', 'categories', 'users']: try: if table_exists(table): op.drop_table(table) except Exception as e: # Log the error but continue with other tables print(f"Error dropping table {table}: {e}")