Automated Action efba5e489a Fix database permissions issues by using more accessible paths
- Updated database configuration to use /app/storage/db directory
- Added robust directory and file permission handling
- Implemented fallback paths for database location
- Enhanced Alembic migration script with retry logic and clear error diagnostics
- Updated the README with the new database path information
2025-05-16 12:47:17 +00:00

338 lines
12 KiB
Python

import os
import sqlite3
from datetime import datetime, timedelta
from sqlalchemy import text
from app.db.base import Base # Import all models
from app.db.session import engine, db_file
from app.core.config import settings, DB_DIR
def init_db() -> None:
"""
Initialize database using both direct SQLite and SQLAlchemy approaches
for maximum reliability.
"""
print(f"Initializing database at {db_file}")
print(f"Using SQLAlchemy URL: {settings.SQLALCHEMY_DATABASE_URL}")
print(f"DB_DIR is set to: {DB_DIR} (this should be /app/db in production)")
# First try direct SQLite approach to ensure we have a basic database file
try:
# Ensure database file exists and is writable
with open(db_file, "a"): # Try opening for append (creates if doesn't exist)
os.utime(db_file, None) # Update access/modify time
print(f"Database file exists and is writable: {db_file}")
# Try direct SQLite connection to create task table
conn = sqlite3.connect(str(db_file))
# Enable foreign keys and WAL journal mode
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("PRAGMA journal_mode = WAL")
# Create task table if it doesn't exist
conn.execute("""
CREATE TABLE IF NOT EXISTS task (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT,
priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'todo',
due_date TEXT,
completed INTEGER DEFAULT 0,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
user_id INTEGER
)
""")
# Create user table if it doesn't exist
conn.execute("""
CREATE TABLE IF NOT EXISTS user (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL UNIQUE,
username TEXT NOT NULL UNIQUE,
hashed_password TEXT NOT NULL,
is_active INTEGER DEFAULT 1,
is_superuser INTEGER DEFAULT 0,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""")
# Create indexes
conn.execute("CREATE INDEX IF NOT EXISTS idx_task_id ON task(id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_task_user_id ON task(user_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_user_id ON user(id)")
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_user_email ON user(email)")
conn.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_user_username ON user(username)"
)
# Add a default admin user if the user table is empty
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM user")
user_count = cursor.fetchone()[0]
if user_count == 0:
now = datetime.utcnow().isoformat()
# Import get_password_hash function here to avoid F823 error
from app.core.security import get_password_hash
admin_password_hash = get_password_hash("adminpassword")
conn.execute(
"""
INSERT INTO user (email, username, hashed_password, is_active, is_superuser, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
"admin@example.com",
"admin",
admin_password_hash,
1, # is_active
1, # is_superuser
now,
now,
),
)
print("Created default admin user: admin@example.com / adminpassword")
# Add a sample task if the task table is empty
cursor.execute("SELECT COUNT(*) FROM task")
task_count = cursor.fetchone()[0]
if task_count == 0:
# Get the admin user ID if it exists
cursor.execute("SELECT id FROM user WHERE username = ?", ("admin",))
admin_row = cursor.fetchone()
admin_id = admin_row[0] if admin_row else None
now = datetime.utcnow().isoformat()
conn.execute(
"""
INSERT INTO task (title, description, priority, status, completed, created_at, updated_at, user_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"Example Task",
"This is an example task created during initialization",
"medium",
"todo",
0,
now,
now,
admin_id,
),
)
conn.commit()
cursor.close()
conn.close()
print("Successfully initialized database with direct SQLite")
except Exception as e:
print(f"Error during direct SQLite initialization: {e}")
import traceback
print(traceback.format_exc())
# Now try with SQLAlchemy as a backup approach
try:
print("Attempting SQLAlchemy database initialization...")
# Try to create all tables from models
Base.metadata.create_all(bind=engine)
print("Successfully created tables with SQLAlchemy")
# Verify tables exist
with engine.connect() as conn:
# Get list of tables
result = conn.execute(
text("SELECT name FROM sqlite_master WHERE type='table'")
)
tables = [row[0] for row in result]
print(f"Tables in database: {', '.join(tables)}")
# Verify user table exists
if "user" in tables:
# Check if user table is empty
result = conn.execute(text("SELECT COUNT(*) FROM user"))
user_count = result.scalar()
print(f"User table contains {user_count} records")
# If user table is empty, add default admin user
if user_count == 0:
print("Adding default admin user with SQLAlchemy")
from app.models.user import User
from app.core.security import get_password_hash
admin_user = User(
email="admin@example.com",
username="admin",
hashed_password=get_password_hash("adminpassword"),
is_active=True,
is_superuser=True,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
)
from app.db.session import SessionLocal
db = SessionLocal()
db.add(admin_user)
db.commit()
# Get the admin user ID for the sample task
admin_id = None
admin = db.query(User).filter_by(username="admin").first()
if admin:
admin_id = admin.id
# Verify task table exists
if "task" in tables:
# Check if task table is empty
result = conn.execute(text("SELECT COUNT(*) FROM task"))
task_count = result.scalar()
print(f"Task table contains {task_count} records")
# If table exists but is empty, add a sample task
if task_count == 0 and admin_id:
print("Adding sample task with SQLAlchemy")
from app.models.task import Task, TaskPriority, TaskStatus
sample_task = Task(
title="Sample SQLAlchemy Task",
description="This is a sample task created with SQLAlchemy",
priority=TaskPriority.MEDIUM,
status=TaskStatus.TODO,
completed=False,
user_id=admin_id,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
)
db.add(sample_task)
db.commit()
print("Added sample task with SQLAlchemy")
db.close()
print("Added default admin user with SQLAlchemy")
else:
print("WARNING: 'user' table not found!")
if "task" not in tables:
print("WARNING: 'task' table not found!")
print("SQLAlchemy database initialization completed")
except Exception as e:
print(f"Error during SQLAlchemy initialization: {e}")
import traceback
print(traceback.format_exc())
print("Continuing despite SQLAlchemy initialization error...")
def create_test_task():
"""Create a test task in the database to verify everything is working."""
print("Attempting to create a test task...")
try:
# Try direct SQLite approach first
try:
conn = sqlite3.connect(str(db_file))
cursor = conn.cursor()
# Check if task table exists
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='task'"
)
if not cursor.fetchone():
print("Task table doesn't exist - cannot create test task")
return
# Check if any tasks exist
cursor.execute("SELECT COUNT(*) FROM task")
count = cursor.fetchone()[0]
if count == 0:
# Create a task directly with SQLite
now = datetime.utcnow().isoformat()
cursor.execute(
"""
INSERT INTO task (
title, description, priority, status,
completed, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
"Test Task (Direct SQL)",
"This is a test task created directly with SQLite",
"medium",
"todo",
0, # not completed
now,
now,
),
)
conn.commit()
task_id = cursor.lastrowid
print(f"Created test task with direct SQLite, ID: {task_id}")
else:
print(f"Found {count} existing tasks, no need to create test task")
conn.close()
except Exception as e:
print(f"Error with direct SQLite test task creation: {e}")
# Continue with SQLAlchemy approach
# Now try with SQLAlchemy
try:
from app.crud.task import task as task_crud
from app.schemas.task import TaskCreate
from app.db.session import SessionLocal
db = SessionLocal()
try:
# Check if there are any tasks
try:
existing_tasks = db.execute(
text("SELECT COUNT(*) FROM task")
).scalar()
if existing_tasks > 0:
print(
f"Test task not needed, found {existing_tasks} existing tasks"
)
return
except Exception as e:
print(f"Error checking for existing tasks: {e}")
# Continue anyway to try creating a task
# Create a test task
test_task = TaskCreate(
title="Test Task (SQLAlchemy)",
description="This is a test task created with SQLAlchemy",
priority="medium",
status="todo",
due_date=datetime.utcnow() + timedelta(days=7),
completed=False,
)
created_task = task_crud.create(db, obj_in=test_task)
print(f"Created test task with SQLAlchemy, ID: {created_task.id}")
finally:
db.close()
except Exception as e:
print(f"Error with SQLAlchemy test task creation: {e}")
except Exception as e:
print(f"Global error creating test task: {e}")
import traceback
print(traceback.format_exc())
if __name__ == "__main__":
init_db()
create_test_task()