""" Utilities to handle container orchestration environments and improve resilience in Kubernetes-like environments. """ import logging import os import socket import time from typing import Dict, Optional, Tuple logger = logging.getLogger(__name__) # Maximum attempts to resolve host MAX_HOST_RESOLVE_ATTEMPTS = 5 # Delay between attempts in seconds HOST_RESOLVE_DELAY = 2 def get_host_info() -> Dict[str, str]: """ Get information about the host where the application is running. This helps diagnose container and pod issues in orchestration environments. Returns: Dict[str, str]: Dictionary with host information """ info = { "hostname": "unknown", "ip_address": "unknown", "pod_name": os.environ.get("HOSTNAME", "unknown"), "namespace": os.environ.get("POD_NAMESPACE", "unknown"), } try: info["hostname"] = socket.gethostname() info["ip_address"] = socket.gethostbyname(info["hostname"]) except Exception as e: logger.warning(f"Could not resolve host information: {str(e)}") return info def check_host_connectivity(timeout: int = 5) -> Tuple[bool, Optional[str]]: """ Check if the host has network connectivity. Args: timeout: Timeout in seconds Returns: Tuple[bool, Optional[str]]: (Success, Error message if any) """ try: # Try to resolve a common external domain socket.getaddrinfo("google.com", 80, proto=socket.IPPROTO_TCP) return True, None except socket.gaierror as e: return False, f"DNS resolution error: {str(e)}" except socket.timeout: return False, "Connection timed out" except Exception as e: return False, f"Unknown connection error: {str(e)}" def wait_for_host_assignment(max_attempts: int = MAX_HOST_RESOLVE_ATTEMPTS, delay: int = HOST_RESOLVE_DELAY) -> Tuple[bool, Optional[str]]: """ Wait for the host to be assigned in container orchestration environments. This helps to handle situations where the pod is scheduled but the host assignment is delayed. Args: max_attempts: Maximum number of attempts to resolve the host delay: Delay between attempts in seconds Returns: Tuple[bool, Optional[str]]: (Success, Error message if any) """ attempt = 0 while attempt < max_attempts: try: hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) logger.info(f"Host assigned: {hostname} ({ip_address})") return True, None except socket.gaierror as e: logger.warning(f"Host not resolved yet (attempt {attempt+1}/{max_attempts}): {str(e)}") attempt += 1 time.sleep(delay) return False, "Maximum attempts reached, host still not assigned" def get_orchestration_status() -> Dict[str, any]: """ Get comprehensive status information about the container orchestration environment. Returns: Dict: Orchestration status information """ host_info = get_host_info() connectivity_status, connectivity_error = check_host_connectivity() startup_error = os.environ.get("APP_STARTUP_ERROR", None) return { "host": host_info, "ready": os.environ.get("APP_READY", "false") == "true", "connectivity": { "status": "connected" if connectivity_status else "disconnected", "error": connectivity_error }, "startup_error": startup_error, "environment": "kubernetes" if "KUBERNETES_SERVICE_HOST" in os.environ else "unknown" }