119 lines
4.2 KiB
Python
119 lines
4.2 KiB
Python
import os
|
|
import time
|
|
from pathlib import Path
|
|
|
|
from sqlalchemy import inspect, text
|
|
from sqlalchemy.exc import OperationalError
|
|
|
|
from app.db.base import Base # Import all models
|
|
from app.db.session import engine
|
|
from app.core.config import settings
|
|
|
|
|
|
def init_db() -> None:
|
|
"""Initialize database with required tables and data directly."""
|
|
|
|
# Ensure database directory exists
|
|
Path(settings.DB_DIR).mkdir(parents=True, exist_ok=True)
|
|
|
|
db_file = f"{settings.DB_DIR}/db.sqlite"
|
|
db_exists = os.path.exists(db_file)
|
|
|
|
# If database doesn't exist or is empty, create tables
|
|
try:
|
|
# Try to connect to check if the database is accessible
|
|
max_retries = 3
|
|
retry_count = 0
|
|
connected = False
|
|
|
|
while retry_count < max_retries and not connected:
|
|
try:
|
|
with engine.connect() as conn:
|
|
conn.execute(text("SELECT 1"))
|
|
print(f"Database connection successful to {db_file}")
|
|
connected = True
|
|
except Exception as e:
|
|
retry_count += 1
|
|
print(f"Database connection error (attempt {retry_count}/{max_retries}): {e}")
|
|
time.sleep(1) # Wait a second before retrying
|
|
|
|
if not connected:
|
|
raise Exception(f"Failed to connect to database after {max_retries} attempts")
|
|
|
|
# Check if tables exist
|
|
inspector = inspect(engine)
|
|
existing_tables = inspector.get_table_names()
|
|
|
|
if not existing_tables:
|
|
print("No tables found in database. Creating tables...")
|
|
Base.metadata.create_all(bind=engine)
|
|
print(f"Created tables: {', '.join(inspector.get_table_names())}")
|
|
else:
|
|
print(f"Found existing tables: {', '.join(existing_tables)}")
|
|
|
|
# Print database file information
|
|
if db_exists:
|
|
file_size = os.path.getsize(db_file)
|
|
print(f"Database file exists at {db_file}, size: {file_size} bytes")
|
|
else:
|
|
print(f"Warning: Database file does not exist at {db_file} after initialization!")
|
|
|
|
print("Database initialization completed successfully")
|
|
|
|
except Exception as e:
|
|
print(f"Database initialization error: {str(e)}")
|
|
# Print detailed error but don't raise to allow app to start
|
|
import traceback
|
|
print(traceback.format_exc())
|
|
|
|
# Try to create a test file to check write permissions
|
|
try:
|
|
test_file = f"{settings.DB_DIR}/test_write.txt"
|
|
with open(test_file, 'w') as f:
|
|
f.write('test')
|
|
print(f"Successfully wrote test file: {test_file}")
|
|
os.remove(test_file)
|
|
except Exception as e:
|
|
print(f"Failed to write test file: {str(e)}")
|
|
|
|
|
|
def create_test_task():
|
|
"""Create a test task in the database to verify everything is working."""
|
|
try:
|
|
from app.crud.task import task as task_crud
|
|
from app.schemas.task import TaskCreate
|
|
from app.db.session import SessionLocal
|
|
from datetime import datetime, timedelta
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Check if there are any tasks
|
|
existing_tasks = db.execute("SELECT COUNT(*) FROM task").scalar()
|
|
if existing_tasks > 0:
|
|
print(f"Test task not needed, found {existing_tasks} existing tasks")
|
|
return
|
|
|
|
# Create a test task
|
|
test_task = TaskCreate(
|
|
title="Test Task",
|
|
description="This is a test task created at database initialization",
|
|
priority="medium",
|
|
status="todo",
|
|
due_date=datetime.utcnow() + timedelta(days=7),
|
|
completed=False
|
|
)
|
|
|
|
created_task = task_crud.create(db, obj_in=test_task)
|
|
print(f"Created test task with ID: {created_task.id}")
|
|
|
|
finally:
|
|
db.close()
|
|
except Exception as e:
|
|
print(f"Error creating test task: {e}")
|
|
import traceback
|
|
print(traceback.format_exc())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
init_db()
|
|
create_test_task() |