From 4d00fd54736fb16c3f2e5d3ea700a4d14a90bf25 Mon Sep 17 00:00:00 2001 From: Automated Action Date: Fri, 16 May 2025 22:46:12 +0000 Subject: [PATCH] Enhance FastAPI app for Kubernetes resilience and pod host assignment issues --- app/api/endpoints/health.py | 63 +++++++++++++++++-- app/core/orchestration.py | 118 ++++++++++++++++++++++++++++++++++++ main.py | 68 +++++++++++++++++++++ 3 files changed, 245 insertions(+), 4 deletions(-) create mode 100644 app/core/orchestration.py diff --git a/app/api/endpoints/health.py b/app/api/endpoints/health.py index 659b831..f1ca45c 100644 --- a/app/api/endpoints/health.py +++ b/app/api/endpoints/health.py @@ -1,15 +1,70 @@ -from fastapi import APIRouter, Depends +from typing import Any, Dict + +from fastapi import APIRouter, Depends, Query +from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import Session +from app.core.orchestration import get_orchestration_status from app.db.session import get_db router = APIRouter() @router.get("") -def health_check(db: Session = Depends(get_db)): +def health_check( + db: Session = Depends(get_db), + detailed: bool = Query(False, description="Include detailed diagnostics information") +) -> Dict[str, Any]: """ Health check endpoint for the API + + This endpoint checks the health of various components of the application: + - Database connectivity + - Host and pod status + - Application readiness + + Args: + detailed: Whether to include detailed diagnostics information + + Returns: + Dict[str, Any]: Health status of the application """ - # Simple health check that ensures the database connection is working - return {"status": "healthy", "database": "connected"} \ No newline at end of file + # Basic health response + response = { + "status": "healthy" + } + + # Check database connectivity + try: + # Simple query to verify database connectivity + db.execute("SELECT 1") + database_status = "connected" + database_error = None + except SQLAlchemyError as e: + database_status = "error" + database_error = str(e) + except Exception as e: + database_status = "error" + database_error = str(e) + + response["database"] = { + "status": database_status + } + + # Add orchestration status if detailed information is requested + if detailed: + response["orchestration"] = get_orchestration_status() + + # Add more detailed database information if there was an error + if database_error: + response["database"]["error"] = database_error + elif database_status != "connected": + # Always include error information in the basic response + response["status"] = "unhealthy" + response["database"]["error"] = database_error + + # Set overall status based on component statuses + if database_status != "connected": + response["status"] = "unhealthy" + + return response \ No newline at end of file diff --git a/app/core/orchestration.py b/app/core/orchestration.py new file mode 100644 index 0000000..31539b6 --- /dev/null +++ b/app/core/orchestration.py @@ -0,0 +1,118 @@ +""" +Utilities to handle container orchestration environments and improve +resilience in Kubernetes-like environments. +""" + +import logging +import os +import socket +import time +from typing import Dict, Optional, Tuple + +logger = logging.getLogger(__name__) + +# Maximum attempts to resolve host +MAX_HOST_RESOLVE_ATTEMPTS = 5 +# Delay between attempts in seconds +HOST_RESOLVE_DELAY = 2 + + +def get_host_info() -> Dict[str, str]: + """ + Get information about the host where the application is running. + + This helps diagnose container and pod issues in orchestration environments. + + Returns: + Dict[str, str]: Dictionary with host information + """ + info = { + "hostname": "unknown", + "ip_address": "unknown", + "pod_name": os.environ.get("HOSTNAME", "unknown"), + "namespace": os.environ.get("POD_NAMESPACE", "unknown"), + } + + try: + info["hostname"] = socket.gethostname() + info["ip_address"] = socket.gethostbyname(info["hostname"]) + except Exception as e: + logger.warning(f"Could not resolve host information: {str(e)}") + + return info + + +def check_host_connectivity(timeout: int = 5) -> Tuple[bool, Optional[str]]: + """ + Check if the host has network connectivity. + + Args: + timeout: Timeout in seconds + + Returns: + Tuple[bool, Optional[str]]: (Success, Error message if any) + """ + try: + # Try to resolve a common external domain + socket.getaddrinfo("google.com", 80, proto=socket.IPPROTO_TCP) + return True, None + except socket.gaierror as e: + return False, f"DNS resolution error: {str(e)}" + except socket.timeout: + return False, "Connection timed out" + except Exception as e: + return False, f"Unknown connection error: {str(e)}" + + +def wait_for_host_assignment(max_attempts: int = MAX_HOST_RESOLVE_ATTEMPTS, + delay: int = HOST_RESOLVE_DELAY) -> Tuple[bool, Optional[str]]: + """ + Wait for the host to be assigned in container orchestration environments. + + This helps to handle situations where the pod is scheduled but the host + assignment is delayed. + + Args: + max_attempts: Maximum number of attempts to resolve the host + delay: Delay between attempts in seconds + + Returns: + Tuple[bool, Optional[str]]: (Success, Error message if any) + """ + attempt = 0 + + while attempt < max_attempts: + try: + hostname = socket.gethostname() + ip_address = socket.gethostbyname(hostname) + logger.info(f"Host assigned: {hostname} ({ip_address})") + return True, None + except socket.gaierror as e: + logger.warning(f"Host not resolved yet (attempt {attempt+1}/{max_attempts}): {str(e)}") + attempt += 1 + time.sleep(delay) + + return False, "Maximum attempts reached, host still not assigned" + + +def get_orchestration_status() -> Dict[str, any]: + """ + Get comprehensive status information about the container orchestration environment. + + Returns: + Dict: Orchestration status information + """ + host_info = get_host_info() + connectivity_status, connectivity_error = check_host_connectivity() + startup_error = os.environ.get("APP_STARTUP_ERROR", None) + + return { + "host": host_info, + "ready": os.environ.get("APP_READY", "false") == "true", + "connectivity": { + "status": "connected" if connectivity_status else "disconnected", + "error": connectivity_error + }, + "startup_error": startup_error, + "environment": "kubernetes" if "KUBERNETES_SERVICE_HOST" in os.environ else "unknown" + } \ No newline at end of file diff --git a/main.py b/main.py index 6277f13..be265cc 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,9 @@ +import logging +import os +import socket +import sys +from contextlib import asynccontextmanager + import uvicorn from fastapi import FastAPI from starlette.middleware.cors import CORSMiddleware @@ -5,13 +11,75 @@ from starlette.middleware.cors import CORSMiddleware from app.api.api import api_router from app.core.config import settings +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[logging.StreamHandler(sys.stdout)] +) +logger = logging.getLogger(__name__) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """ + Lifespan events for the FastAPI application + Handles startup and shutdown gracefully + """ + # Startup: Check for host availability and other system requirements + try: + hostname = socket.gethostname() + ip_address = socket.gethostbyname(hostname) + logger.info(f"Starting application on host: {hostname}, IP: {ip_address}") + + # Check required directories exist + from pathlib import Path + db_dir = Path("/app") / "storage" / "db" + db_dir.mkdir(parents=True, exist_ok=True) + logger.info(f"Database directory confirmed at: {db_dir}") + + # Set environment variable to indicate we're ready + os.environ["APP_READY"] = "true" + logger.info("Application startup completed successfully") + except Exception as e: + logger.error(f"Application startup error: {str(e)}") + # Critical errors during startup should be propagated, but still allow the app to run + os.environ["APP_READY"] = "false" + os.environ["APP_STARTUP_ERROR"] = str(e) + + yield # This is where the app runs + + # Shutdown: Perform cleanup operations + try: + logger.info("Application shutting down gracefully") + # Any cleanup operations would go here + except Exception as e: + logger.error(f"Error during application shutdown: {str(e)}") + + app = FastAPI( title=settings.PROJECT_NAME, openapi_url=f"{settings.API_V1_STR}/openapi.json", description="Generic REST API Service with FastAPI and SQLite", version="0.1.0", + lifespan=lifespan, ) +# Add middleware for exception handling +@app.middleware("http") +async def add_process_time_header(request, call_next): + try: + response = await call_next(request) + return response + except Exception as e: + logger.error(f"Request error: {str(e)}") + # Return error to the client + from fastapi.responses import JSONResponse + return JSONResponse( + status_code=500, + content={"detail": "Internal server error", "type": "server_error"} + ) + # Set all CORS enabled origins if settings.BACKEND_CORS_ORIGINS: app.add_middleware(