324 lines
10 KiB
Python
324 lines
10 KiB
Python
import sys
|
|
import os
|
|
import traceback
|
|
import datetime
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import JSONResponse
|
|
from sqlalchemy import text
|
|
|
|
# Add project root to Python path for imports in alembic migrations
|
|
project_root = Path(__file__).parent.absolute()
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
# Import app modules after setting up project path
|
|
from app.api.routers import api_router # noqa: E402
|
|
from app.core.config import settings # noqa: E402
|
|
from app.db import init_db # noqa: E402
|
|
|
|
# Initialize the database on startup
|
|
print("Starting database initialization...")
|
|
try:
|
|
# Get absolute path of the database file from your config
|
|
from app.db.session import db_file
|
|
|
|
print(f"Database path: {db_file}")
|
|
|
|
# Check directory permissions for the configured DB_DIR
|
|
from app.core.config import DB_DIR
|
|
|
|
print(f"Database directory: {DB_DIR}")
|
|
print(f"Database directory exists: {DB_DIR.exists()}")
|
|
print(f"Database directory is writable: {os.access(DB_DIR, os.W_OK)}")
|
|
|
|
# Initialize the database and create test task
|
|
init_db.init_db()
|
|
|
|
# Try to create a test task
|
|
try:
|
|
init_db.create_test_task()
|
|
except Exception as e:
|
|
print(f"Error creating test task: {e}")
|
|
# Continue anyway
|
|
except Exception as e:
|
|
print(f"Error initializing database: {e}")
|
|
import traceback
|
|
|
|
print(f"Detailed error: {traceback.format_exc()}")
|
|
# Continue with app startup even if DB init fails, to allow debugging
|
|
|
|
app = FastAPI(title=settings.PROJECT_NAME)
|
|
|
|
# Set all CORS enabled origins - Allow all origins
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
# Add comprehensive exception handlers for better error reporting
|
|
@app.exception_handler(Exception)
|
|
async def global_exception_handler(request: Request, exc: Exception):
|
|
import traceback
|
|
import sys
|
|
|
|
# Log the full error with traceback to stdout/stderr
|
|
error_tb = traceback.format_exc()
|
|
print(f"CRITICAL ERROR: {str(exc)}", file=sys.stderr)
|
|
print(f"Request path: {request.url.path}", file=sys.stderr)
|
|
print(f"Traceback:\n{error_tb}", file=sys.stderr)
|
|
|
|
# Get request info for debugging
|
|
headers = dict(request.headers)
|
|
# Remove sensitive headers
|
|
if "authorization" in headers:
|
|
headers["authorization"] = "[REDACTED]"
|
|
if "cookie" in headers:
|
|
headers["cookie"] = "[REDACTED]"
|
|
|
|
# Include minimal traceback in response for debugging
|
|
tb_lines = error_tb.split("\n")
|
|
simplified_tb = []
|
|
for line in tb_lines:
|
|
if line and not line.startswith(" "):
|
|
simplified_tb.append(line)
|
|
|
|
# Create detailed error response
|
|
error_detail = {
|
|
"status": "error",
|
|
"message": str(exc),
|
|
"type": str(type(exc).__name__),
|
|
"path": request.url.path,
|
|
"method": request.method,
|
|
"traceback_summary": simplified_tb[-10:]
|
|
if len(simplified_tb) > 10
|
|
else simplified_tb,
|
|
}
|
|
|
|
# Add SQLite diagnostic check
|
|
try:
|
|
from app.db.session import db_file
|
|
|
|
# Try basic SQLite operations
|
|
conn = sqlite3.connect(str(db_file))
|
|
cursor = conn.cursor()
|
|
cursor.execute("PRAGMA integrity_check")
|
|
integrity = cursor.fetchone()[0]
|
|
|
|
# Check if task table exists
|
|
cursor.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='task'"
|
|
)
|
|
task_table_exists = cursor.fetchone() is not None
|
|
|
|
# Get file info
|
|
file_exists = os.path.exists(db_file)
|
|
file_size = os.path.getsize(db_file) if file_exists else 0
|
|
|
|
# Add SQLite diagnostics to response
|
|
error_detail["db_diagnostics"] = {
|
|
"file_exists": file_exists,
|
|
"file_size": file_size,
|
|
"integrity": integrity,
|
|
"task_table_exists": task_table_exists,
|
|
}
|
|
|
|
conn.close()
|
|
except Exception as db_error:
|
|
error_detail["db_diagnostics"] = {"error": str(db_error)}
|
|
|
|
# Return the error response
|
|
print(f"Returning error response: {error_detail}")
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content=error_detail,
|
|
)
|
|
|
|
|
|
# Include the API router directly (no version prefix)
|
|
app.include_router(api_router)
|
|
|
|
|
|
@app.get("/", tags=["info"])
|
|
def api_info():
|
|
"""
|
|
API information endpoint with links to documentation and endpoints
|
|
"""
|
|
return {
|
|
"name": settings.PROJECT_NAME,
|
|
"version": "1.0.0",
|
|
"description": "A RESTful API for managing tasks with user authentication",
|
|
"endpoints": {
|
|
"authentication": {
|
|
"register": "/auth/register",
|
|
"login": "/auth/login",
|
|
"me": "/auth/me",
|
|
"test-token": "/auth/test-token",
|
|
},
|
|
"tasks": "/tasks",
|
|
"chat": {
|
|
"chat_to_tasks": "/chat/chat-to-tasks",
|
|
},
|
|
"docs": "/docs",
|
|
"redoc": "/redoc",
|
|
"health": "/health",
|
|
"db_test": "/db-test",
|
|
},
|
|
}
|
|
|
|
|
|
@app.get("/health", tags=["health"])
|
|
def health_check():
|
|
"""
|
|
Health check endpoint to verify the application is running correctly
|
|
"""
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.get("/db-test", tags=["health"])
|
|
def test_db_connection():
|
|
"""
|
|
Test database connection and table creation
|
|
"""
|
|
try:
|
|
from app.db.session import engine, db_file
|
|
from app.core.config import DB_DIR
|
|
|
|
# First check direct file access
|
|
file_info = {
|
|
"db_dir": str(DB_DIR),
|
|
"db_file": str(db_file),
|
|
"exists": os.path.exists(db_file),
|
|
"size": os.path.getsize(db_file) if os.path.exists(db_file) else 0,
|
|
"writable": os.access(db_file, os.W_OK)
|
|
if os.path.exists(db_file)
|
|
else False,
|
|
"dir_writable": os.access(DB_DIR, os.W_OK)
|
|
if os.path.exists(DB_DIR)
|
|
else False,
|
|
}
|
|
|
|
# Try direct SQLite connection
|
|
sqlite_test = {}
|
|
try:
|
|
conn = sqlite3.connect(str(db_file))
|
|
sqlite_test["connection"] = "successful"
|
|
|
|
# Check if task table exists
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
|
tables = [row[0] for row in cursor.fetchall()]
|
|
sqlite_test["tables"] = tables
|
|
|
|
# Check for task table specifically
|
|
if "task" in tables:
|
|
cursor.execute("SELECT COUNT(*) FROM task")
|
|
task_count = cursor.fetchone()[0]
|
|
sqlite_test["task_count"] = task_count
|
|
|
|
# Get a sample task if available
|
|
if task_count > 0:
|
|
cursor.execute("SELECT * FROM task LIMIT 1")
|
|
column_names = [
|
|
description[0] for description in cursor.description
|
|
]
|
|
row = cursor.fetchone()
|
|
sample_task = dict(zip(column_names, row))
|
|
sqlite_test["sample_task"] = sample_task
|
|
|
|
# Check database integrity
|
|
cursor.execute("PRAGMA integrity_check")
|
|
integrity = cursor.fetchone()[0]
|
|
sqlite_test["integrity"] = integrity
|
|
|
|
conn.close()
|
|
except Exception as e:
|
|
sqlite_test["connection"] = "failed"
|
|
sqlite_test["error"] = str(e)
|
|
sqlite_test["traceback"] = traceback.format_exc()
|
|
|
|
# Try SQLAlchemy connection
|
|
sqlalchemy_test = {}
|
|
try:
|
|
with engine.connect() as conn:
|
|
# Basic connectivity test
|
|
result = conn.execute(text("SELECT 1")).scalar()
|
|
sqlalchemy_test["basic_query"] = result
|
|
|
|
# Check tables
|
|
result = conn.execute(
|
|
text("SELECT name FROM sqlite_master WHERE type='table'")
|
|
)
|
|
tables = [row[0] for row in result]
|
|
sqlalchemy_test["tables"] = tables
|
|
|
|
# Check task table
|
|
if "task" in tables:
|
|
result = conn.execute(text("SELECT COUNT(*) FROM task"))
|
|
sqlalchemy_test["task_count"] = result.scalar()
|
|
except Exception as e:
|
|
sqlalchemy_test["connection"] = "failed"
|
|
sqlalchemy_test["error"] = str(e)
|
|
sqlalchemy_test["traceback"] = traceback.format_exc()
|
|
|
|
# Check environment
|
|
env_info = {
|
|
"cwd": os.getcwd(),
|
|
"env_variables": {
|
|
k: v
|
|
for k, v in os.environ.items()
|
|
if k.startswith(("DB_", "SQL", "PATH"))
|
|
},
|
|
}
|
|
|
|
# Try to create a test file
|
|
write_test = {}
|
|
try:
|
|
test_path = DB_DIR / "write_test.txt"
|
|
with open(test_path, "w") as f:
|
|
f.write("Test content")
|
|
write_test["success"] = True
|
|
write_test["path"] = str(test_path)
|
|
os.unlink(test_path) # Clean up
|
|
except Exception as e:
|
|
write_test["success"] = False
|
|
write_test["error"] = str(e)
|
|
|
|
return {
|
|
"status": "ok",
|
|
"timestamp": datetime.datetime.utcnow().isoformat(),
|
|
"file_info": file_info,
|
|
"sqlite_test": sqlite_test,
|
|
"sqlalchemy_test": sqlalchemy_test,
|
|
"environment": env_info,
|
|
"write_test": write_test,
|
|
}
|
|
|
|
except Exception as e:
|
|
# Catch-all error handler
|
|
return {
|
|
"status": "error",
|
|
"message": str(e),
|
|
"traceback": traceback.format_exc(),
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
# Configure uvicorn server with appropriate settings
|
|
uvicorn.run(
|
|
"main:app",
|
|
host="0.0.0.0", # Listen on all available network interfaces
|
|
port=8001, # Default port used by app-8001
|
|
reload=False, # Disable auto-reload for production
|
|
workers=1, # Number of worker processes
|
|
log_level="info",
|
|
access_log=True,
|
|
)
|