import sys import os from pathlib import Path from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse # Add project root to Python path for imports in alembic migrations project_root = Path(__file__).parent.absolute() sys.path.insert(0, str(project_root)) from app.api.routers import api_router from app.core.config import settings from app.db import init_db # Initialize the database on startup print("Starting database initialization...") try: # Get absolute path of the database file from your config from app.db.session import db_file print(f"Database path: {db_file}") # Check directory permissions for the configured DB_DIR from app.core.config import DB_DIR print(f"Database directory: {DB_DIR}") print(f"Database directory exists: {DB_DIR.exists()}") print(f"Database directory is writable: {os.access(DB_DIR, os.W_OK)}") # Initialize the database and create test task init_db.init_db() # Try to create a test task try: init_db.create_test_task() except Exception as e: print(f"Error creating test task: {e}") # Continue anyway except Exception as e: print(f"Error initializing database: {e}") import traceback print(f"Detailed error: {traceback.format_exc()}") # Continue with app startup even if DB init fails, to allow debugging app = FastAPI(title=settings.PROJECT_NAME) # Set all CORS enabled origins - Allow all origins app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Add comprehensive exception handlers for better error reporting @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): import traceback import sys # Log the full error with traceback to stdout/stderr error_tb = traceback.format_exc() print(f"CRITICAL ERROR: {str(exc)}", file=sys.stderr) print(f"Request path: {request.url.path}", file=sys.stderr) print(f"Traceback:\n{error_tb}", file=sys.stderr) # Get request info for debugging headers = dict(request.headers) # Remove sensitive headers if "authorization" in headers: headers["authorization"] = "[REDACTED]" if "cookie" in headers: headers["cookie"] = "[REDACTED]" # Include minimal traceback in response for debugging tb_lines = error_tb.split("\n") simplified_tb = [] for line in tb_lines: if line and not line.startswith(" "): simplified_tb.append(line) # Create detailed error response error_detail = { "status": "error", "message": str(exc), "type": str(type(exc).__name__), "path": request.url.path, "method": request.method, "traceback_summary": simplified_tb[-10:] if len(simplified_tb) > 10 else simplified_tb, } # Add SQLite diagnostic check try: import sqlite3 from app.db.session import db_file # Try basic SQLite operations conn = sqlite3.connect(str(db_file)) cursor = conn.cursor() cursor.execute("PRAGMA integrity_check") integrity = cursor.fetchone()[0] # Check if task table exists cursor.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='task'" ) task_table_exists = cursor.fetchone() is not None # Get file info import os file_exists = os.path.exists(db_file) file_size = os.path.getsize(db_file) if file_exists else 0 # Add SQLite diagnostics to response error_detail["db_diagnostics"] = { "file_exists": file_exists, "file_size": file_size, "integrity": integrity, "task_table_exists": task_table_exists, } conn.close() except Exception as db_error: error_detail["db_diagnostics"] = {"error": str(db_error)} # Return the error response print(f"Returning error response: {error_detail}") return JSONResponse( status_code=500, content=error_detail, ) # Include the API router directly (no version prefix) app.include_router(api_router) @app.get("/", tags=["info"]) def api_info(): """ API information endpoint with links to documentation and endpoints """ return { "name": settings.PROJECT_NAME, "version": "1.0.0", "description": "A RESTful API for managing tasks with user authentication", "endpoints": { "authentication": { "register": "/auth/register", "login": "/auth/login", "me": "/auth/me", "test-token": "/auth/test-token", }, "tasks": "/tasks", "docs": "/docs", "redoc": "/redoc", "health": "/health", "db_test": "/db-test", }, } @app.get("/health", tags=["health"]) def health_check(): """ Health check endpoint to verify the application is running correctly """ return {"status": "ok"} @app.get("/db-test", tags=["health"]) def test_db_connection(): """ Test database connection and table creation """ try: import os import sqlite3 import traceback import datetime from sqlalchemy import text from app.db.session import engine, db_file from app.core.config import DB_DIR # First check direct file access file_info = { "db_dir": str(DB_DIR), "db_file": str(db_file), "exists": os.path.exists(db_file), "size": os.path.getsize(db_file) if os.path.exists(db_file) else 0, "writable": os.access(db_file, os.W_OK) if os.path.exists(db_file) else False, "dir_writable": os.access(DB_DIR, os.W_OK) if os.path.exists(DB_DIR) else False, } # Try direct SQLite connection sqlite_test = {} try: conn = sqlite3.connect(str(db_file)) sqlite_test["connection"] = "successful" # Check if task table exists cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = [row[0] for row in cursor.fetchall()] sqlite_test["tables"] = tables # Check for task table specifically if "task" in tables: cursor.execute("SELECT COUNT(*) FROM task") task_count = cursor.fetchone()[0] sqlite_test["task_count"] = task_count # Get a sample task if available if task_count > 0: cursor.execute("SELECT * FROM task LIMIT 1") column_names = [ description[0] for description in cursor.description ] row = cursor.fetchone() sample_task = dict(zip(column_names, row)) sqlite_test["sample_task"] = sample_task # Check database integrity cursor.execute("PRAGMA integrity_check") integrity = cursor.fetchone()[0] sqlite_test["integrity"] = integrity conn.close() except Exception as e: sqlite_test["connection"] = "failed" sqlite_test["error"] = str(e) sqlite_test["traceback"] = traceback.format_exc() # Try SQLAlchemy connection sqlalchemy_test = {} try: with engine.connect() as conn: # Basic connectivity test result = conn.execute(text("SELECT 1")).scalar() sqlalchemy_test["basic_query"] = result # Check tables result = conn.execute( text("SELECT name FROM sqlite_master WHERE type='table'") ) tables = [row[0] for row in result] sqlalchemy_test["tables"] = tables # Check task table if "task" in tables: result = conn.execute(text("SELECT COUNT(*) FROM task")) sqlalchemy_test["task_count"] = result.scalar() except Exception as e: sqlalchemy_test["connection"] = "failed" sqlalchemy_test["error"] = str(e) sqlalchemy_test["traceback"] = traceback.format_exc() # Check environment env_info = { "cwd": os.getcwd(), "env_variables": { k: v for k, v in os.environ.items() if k.startswith(("DB_", "SQL", "PATH")) }, } # Try to create a test file write_test = {} try: test_path = DB_DIR / "write_test.txt" with open(test_path, "w") as f: f.write("Test content") write_test["success"] = True write_test["path"] = str(test_path) os.unlink(test_path) # Clean up except Exception as e: write_test["success"] = False write_test["error"] = str(e) return { "status": "ok", "timestamp": datetime.datetime.utcnow().isoformat(), "file_info": file_info, "sqlite_test": sqlite_test, "sqlalchemy_test": sqlalchemy_test, "environment": env_info, "write_test": write_test, } except Exception as e: # Catch-all error handler return { "status": "error", "message": str(e), "traceback": traceback.format_exc(), }