Add helper functions for HealthCheck
This commit is contained in:
parent
624a567bb2
commit
9da1b7967f
@ -1,48 +1,48 @@
|
||||
from fastapi import APIRouter, status
|
||||
from datetime import datetime
|
||||
from typing import Dict
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@router.get("/api/health", status_code=status.HTTP_200_OK)
|
||||
def health_check():
|
||||
def health_check() -> Dict[str, str]:
|
||||
"""
|
||||
Health check endpoint to verify service status.
|
||||
Health check function to check if the service is running.
|
||||
|
||||
Returns:
|
||||
dict: JSON response indicating service is running.
|
||||
A dictionary with a 'status' key indicating the service status.
|
||||
"""
|
||||
response = {
|
||||
"status": "ok",
|
||||
"message": "Service is running",
|
||||
"timestamp": datetime.utcnow().isoformat()
|
||||
}
|
||||
return response
|
||||
return {"status": "OK"}
|
||||
|
||||
def is_service_running() -> bool:
|
||||
def is_service_healthy(response: Dict[str, str]) -> bool:
|
||||
"""
|
||||
Check if the service is running.
|
||||
Check if the service is healthy based on the health check response.
|
||||
|
||||
Args:
|
||||
response: The response dictionary from the health check.
|
||||
|
||||
Returns:
|
||||
bool: True if the service is running, False otherwise.
|
||||
True if the service is healthy, False otherwise.
|
||||
"""
|
||||
# Add any necessary checks or logic here
|
||||
return True
|
||||
return response.get("status") == "OK"
|
||||
|
||||
def get_service_uptime() -> float:
|
||||
def get_service_status() -> str:
|
||||
"""
|
||||
Get the service uptime in seconds.
|
||||
Get the service status as a string.
|
||||
|
||||
Returns:
|
||||
float: Service uptime in seconds.
|
||||
A string representing the service status.
|
||||
"""
|
||||
# Add logic to calculate service uptime
|
||||
return 3600.0 # Example: 1 hour uptime
|
||||
response = health_check()
|
||||
if is_service_healthy(response):
|
||||
return "Service is running"
|
||||
else:
|
||||
return "Service is not running"
|
||||
|
||||
def get_service_version() -> str:
|
||||
def format_health_check_response(response: Dict[str, str]) -> str:
|
||||
"""
|
||||
Get the current service version.
|
||||
Format the health check response as a string.
|
||||
|
||||
Args:
|
||||
response: The response dictionary from the health check.
|
||||
|
||||
Returns:
|
||||
str: Service version string.
|
||||
A formatted string representing the health check response.
|
||||
"""
|
||||
return "1.0.0" # Example version
|
||||
status = response.get("status", "Unknown")
|
||||
return f"Service status: {status}"
|
Loading…
x
Reference in New Issue
Block a user