153 lines
4.8 KiB
Python
153 lines
4.8 KiB
Python
import sys
|
|
import os
|
|
from pathlib import Path
|
|
|
|
# Add project root to Python path for imports in alembic migrations
|
|
project_root = Path(__file__).parent.absolute()
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import JSONResponse
|
|
|
|
from app.api.routers import api_router
|
|
from app.core.config import settings
|
|
from app.db import init_db
|
|
|
|
# Initialize the database on startup
|
|
try:
|
|
init_db.init_db()
|
|
except Exception as e:
|
|
print(f"Error initializing database: {e}")
|
|
# Continue with app startup even if DB init fails, to allow debugging
|
|
|
|
app = FastAPI(title=settings.PROJECT_NAME)
|
|
|
|
# Set all CORS enabled origins - Allow all origins
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Add exception handlers for better error reporting
|
|
@app.exception_handler(Exception)
|
|
async def validation_exception_handler(request: Request, exc: Exception):
|
|
import traceback
|
|
error_detail = {
|
|
"detail": f"Internal Server Error: {str(exc)}",
|
|
"type": str(type(exc).__name__),
|
|
"traceback": traceback.format_exc().split("\n")
|
|
}
|
|
print(f"Error processing request: {error_detail}")
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content=error_detail,
|
|
)
|
|
|
|
# Include the API router with the version prefix
|
|
app.include_router(api_router, prefix=settings.API_V1_STR)
|
|
|
|
# Add support for compatibility with non-versioned endpoints
|
|
from fastapi import Request
|
|
from fastapi.responses import RedirectResponse
|
|
|
|
# Create a catch-all route for /tasks paths to redirect to versioned API
|
|
@app.get("/tasks", include_in_schema=False)
|
|
@app.post("/tasks", include_in_schema=False)
|
|
@app.get("/tasks/{task_id:path}", include_in_schema=False)
|
|
@app.put("/tasks/{task_id:path}", include_in_schema=False)
|
|
@app.delete("/tasks/{task_id:path}", include_in_schema=False)
|
|
@app.post("/tasks/{task_id:path}/complete", include_in_schema=False)
|
|
async def redirect_to_versioned_api(request: Request, task_id: str = None):
|
|
"""
|
|
Redirect unversioned API requests to the versioned API path
|
|
"""
|
|
target_url = str(request.url)
|
|
# Replace the /tasks part with /api/v1/tasks
|
|
versioned_url = target_url.replace("/tasks", f"{settings.API_V1_STR}/tasks", 1)
|
|
|
|
# Add debugging info
|
|
print(f"Redirecting from {target_url} to {versioned_url}")
|
|
|
|
# Use 307 to preserve the method and body
|
|
return RedirectResponse(url=versioned_url, status_code=307)
|
|
|
|
|
|
@app.get("/", tags=["info"])
|
|
def api_info():
|
|
"""
|
|
API information endpoint with links to documentation and versioned endpoints
|
|
"""
|
|
return {
|
|
"name": settings.PROJECT_NAME,
|
|
"version": "1.0.0",
|
|
"description": "A RESTful API for managing tasks",
|
|
"endpoints": {
|
|
"api": f"{settings.API_V1_STR}",
|
|
"tasks": f"{settings.API_V1_STR}/tasks",
|
|
"docs": "/docs",
|
|
"redoc": "/redoc",
|
|
"health": "/health",
|
|
"db_test": "/db-test"
|
|
}
|
|
}
|
|
|
|
@app.get("/health", tags=["health"])
|
|
def health_check():
|
|
"""
|
|
Health check endpoint to verify the application is running correctly
|
|
"""
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.get("/db-test", tags=["health"])
|
|
def test_db_connection():
|
|
"""
|
|
Test database connection and table creation
|
|
"""
|
|
from sqlalchemy import text, inspect
|
|
from app.db.session import engine
|
|
import traceback
|
|
|
|
try:
|
|
inspector = inspect(engine)
|
|
tables = inspector.get_table_names()
|
|
|
|
table_info = {}
|
|
for table in tables:
|
|
columns = inspector.get_columns(table)
|
|
table_info[table] = [col['name'] for col in columns]
|
|
|
|
with engine.connect() as conn:
|
|
# Try to select from the task table to verify it exists
|
|
if 'task' in tables:
|
|
result = conn.execute(text("SELECT COUNT(*) FROM task")).scalar()
|
|
task_count = result
|
|
else:
|
|
task_count = "Table 'task' not found"
|
|
|
|
# Check DB directory
|
|
import os
|
|
from app.core.config import DB_DIR
|
|
db_path = f"{DB_DIR}/db.sqlite"
|
|
db_exists = os.path.exists(db_path)
|
|
|
|
return {
|
|
"status": "ok",
|
|
"connection": "successful",
|
|
"tables": tables,
|
|
"table_details": table_info,
|
|
"task_count": task_count,
|
|
"db_path": db_path,
|
|
"db_exists": db_exists
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"status": "error",
|
|
"connection": "failed",
|
|
"error": str(e),
|
|
"traceback": traceback.format_exc()
|
|
} |