Automated Action 0ceeef31a6 Add user authentication system with JWT tokens
- Add user model with relationship to tasks
- Implement JWT token authentication
- Create user registration and login endpoints
- Update task endpoints to filter by current user
- Add Alembic migration for user table
- Update documentation with authentication details
2025-05-16 12:40:03 +00:00

309 lines
9.6 KiB
Python

import sys
import os
from pathlib import Path
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
# Add project root to Python path for imports in alembic migrations
project_root = Path(__file__).parent.absolute()
sys.path.insert(0, str(project_root))
from app.api.routers import api_router
from app.core.config import settings
from app.db import init_db
# Initialize the database on startup
print("Starting database initialization...")
try:
# Get absolute path of the database file from your config
from app.db.session import db_file
print(f"Database path: {db_file}")
# Check directory permissions for the configured DB_DIR
from app.core.config import DB_DIR
print(f"Database directory: {DB_DIR}")
print(f"Database directory exists: {DB_DIR.exists()}")
print(f"Database directory is writable: {os.access(DB_DIR, os.W_OK)}")
# Initialize the database and create test task
init_db.init_db()
# Try to create a test task
try:
init_db.create_test_task()
except Exception as e:
print(f"Error creating test task: {e}")
# Continue anyway
except Exception as e:
print(f"Error initializing database: {e}")
import traceback
print(f"Detailed error: {traceback.format_exc()}")
# Continue with app startup even if DB init fails, to allow debugging
app = FastAPI(title=settings.PROJECT_NAME)
# Set all CORS enabled origins - Allow all origins
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Add comprehensive exception handlers for better error reporting
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
import traceback
import sys
# Log the full error with traceback to stdout/stderr
error_tb = traceback.format_exc()
print(f"CRITICAL ERROR: {str(exc)}", file=sys.stderr)
print(f"Request path: {request.url.path}", file=sys.stderr)
print(f"Traceback:\n{error_tb}", file=sys.stderr)
# Get request info for debugging
headers = dict(request.headers)
# Remove sensitive headers
if "authorization" in headers:
headers["authorization"] = "[REDACTED]"
if "cookie" in headers:
headers["cookie"] = "[REDACTED]"
# Include minimal traceback in response for debugging
tb_lines = error_tb.split("\n")
simplified_tb = []
for line in tb_lines:
if line and not line.startswith(" "):
simplified_tb.append(line)
# Create detailed error response
error_detail = {
"status": "error",
"message": str(exc),
"type": str(type(exc).__name__),
"path": request.url.path,
"method": request.method,
"traceback_summary": simplified_tb[-10:]
if len(simplified_tb) > 10
else simplified_tb,
}
# Add SQLite diagnostic check
try:
import sqlite3
from app.db.session import db_file
# Try basic SQLite operations
conn = sqlite3.connect(str(db_file))
cursor = conn.cursor()
cursor.execute("PRAGMA integrity_check")
integrity = cursor.fetchone()[0]
# Check if task table exists
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='task'"
)
task_table_exists = cursor.fetchone() is not None
# Get file info
import os
file_exists = os.path.exists(db_file)
file_size = os.path.getsize(db_file) if file_exists else 0
# Add SQLite diagnostics to response
error_detail["db_diagnostics"] = {
"file_exists": file_exists,
"file_size": file_size,
"integrity": integrity,
"task_table_exists": task_table_exists,
}
conn.close()
except Exception as db_error:
error_detail["db_diagnostics"] = {"error": str(db_error)}
# Return the error response
print(f"Returning error response: {error_detail}")
return JSONResponse(
status_code=500,
content=error_detail,
)
# Include the API router directly (no version prefix)
app.include_router(api_router)
@app.get("/", tags=["info"])
def api_info():
"""
API information endpoint with links to documentation and endpoints
"""
return {
"name": settings.PROJECT_NAME,
"version": "1.0.0",
"description": "A RESTful API for managing tasks with user authentication",
"endpoints": {
"authentication": {
"register": "/auth/register",
"login": "/auth/login",
"me": "/auth/me",
"test-token": "/auth/test-token",
},
"tasks": "/tasks",
"docs": "/docs",
"redoc": "/redoc",
"health": "/health",
"db_test": "/db-test",
},
}
@app.get("/health", tags=["health"])
def health_check():
"""
Health check endpoint to verify the application is running correctly
"""
return {"status": "ok"}
@app.get("/db-test", tags=["health"])
def test_db_connection():
"""
Test database connection and table creation
"""
try:
import os
import sqlite3
import traceback
import datetime
from sqlalchemy import text
from app.db.session import engine, db_file
from app.core.config import DB_DIR
# First check direct file access
file_info = {
"db_dir": str(DB_DIR),
"db_file": str(db_file),
"exists": os.path.exists(db_file),
"size": os.path.getsize(db_file) if os.path.exists(db_file) else 0,
"writable": os.access(db_file, os.W_OK)
if os.path.exists(db_file)
else False,
"dir_writable": os.access(DB_DIR, os.W_OK)
if os.path.exists(DB_DIR)
else False,
}
# Try direct SQLite connection
sqlite_test = {}
try:
conn = sqlite3.connect(str(db_file))
sqlite_test["connection"] = "successful"
# Check if task table exists
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
sqlite_test["tables"] = tables
# Check for task table specifically
if "task" in tables:
cursor.execute("SELECT COUNT(*) FROM task")
task_count = cursor.fetchone()[0]
sqlite_test["task_count"] = task_count
# Get a sample task if available
if task_count > 0:
cursor.execute("SELECT * FROM task LIMIT 1")
column_names = [
description[0] for description in cursor.description
]
row = cursor.fetchone()
sample_task = dict(zip(column_names, row))
sqlite_test["sample_task"] = sample_task
# Check database integrity
cursor.execute("PRAGMA integrity_check")
integrity = cursor.fetchone()[0]
sqlite_test["integrity"] = integrity
conn.close()
except Exception as e:
sqlite_test["connection"] = "failed"
sqlite_test["error"] = str(e)
sqlite_test["traceback"] = traceback.format_exc()
# Try SQLAlchemy connection
sqlalchemy_test = {}
try:
with engine.connect() as conn:
# Basic connectivity test
result = conn.execute(text("SELECT 1")).scalar()
sqlalchemy_test["basic_query"] = result
# Check tables
result = conn.execute(
text("SELECT name FROM sqlite_master WHERE type='table'")
)
tables = [row[0] for row in result]
sqlalchemy_test["tables"] = tables
# Check task table
if "task" in tables:
result = conn.execute(text("SELECT COUNT(*) FROM task"))
sqlalchemy_test["task_count"] = result.scalar()
except Exception as e:
sqlalchemy_test["connection"] = "failed"
sqlalchemy_test["error"] = str(e)
sqlalchemy_test["traceback"] = traceback.format_exc()
# Check environment
env_info = {
"cwd": os.getcwd(),
"env_variables": {
k: v
for k, v in os.environ.items()
if k.startswith(("DB_", "SQL", "PATH"))
},
}
# Try to create a test file
write_test = {}
try:
test_path = DB_DIR / "write_test.txt"
with open(test_path, "w") as f:
f.write("Test content")
write_test["success"] = True
write_test["path"] = str(test_path)
os.unlink(test_path) # Clean up
except Exception as e:
write_test["success"] = False
write_test["error"] = str(e)
return {
"status": "ok",
"timestamp": datetime.datetime.utcnow().isoformat(),
"file_info": file_info,
"sqlite_test": sqlite_test,
"sqlalchemy_test": sqlalchemy_test,
"environment": env_info,
"write_test": write_test,
}
except Exception as e:
# Catch-all error handler
return {
"status": "error",
"message": str(e),
"traceback": traceback.format_exc(),
}