
- Created FastAPI application with SQLite database - Implemented monitor management endpoints (CRUD operations) - Added uptime checking functionality with response time tracking - Included statistics endpoints for uptime percentage and metrics - Set up database models and Alembic migrations - Added comprehensive API documentation - Configured CORS and health check endpoints
78 lines
2.4 KiB
Python
78 lines
2.4 KiB
Python
import time
|
|
import requests
|
|
from datetime import datetime
|
|
from typing import Dict, Any
|
|
from sqlalchemy.orm import Session
|
|
from app.models.monitor import Monitor, UptimeCheck
|
|
|
|
|
|
class UptimeChecker:
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
|
|
def check_monitor(self, monitor: Monitor) -> Dict[str, Any]:
|
|
start_time = time.time()
|
|
is_up = False
|
|
status_code = None
|
|
error_message = None
|
|
response_time = None
|
|
|
|
try:
|
|
response = requests.request(
|
|
method=monitor.method,
|
|
url=str(monitor.url),
|
|
timeout=monitor.timeout,
|
|
allow_redirects=True,
|
|
)
|
|
|
|
end_time = time.time()
|
|
response_time = (end_time - start_time) * 1000 # Convert to milliseconds
|
|
status_code = response.status_code
|
|
|
|
# Consider 2xx and 3xx status codes as "up"
|
|
is_up = 200 <= status_code < 400
|
|
|
|
if not is_up:
|
|
error_message = f"HTTP {status_code}: {response.reason}"
|
|
|
|
except requests.exceptions.Timeout:
|
|
error_message = f"Request timed out after {monitor.timeout} seconds"
|
|
except requests.exceptions.ConnectionError:
|
|
error_message = "Connection error - unable to reach the endpoint"
|
|
except requests.exceptions.RequestException as e:
|
|
error_message = f"Request error: {str(e)}"
|
|
except Exception as e:
|
|
error_message = f"Unexpected error: {str(e)}"
|
|
|
|
# Save the check result to database
|
|
uptime_check = UptimeCheck(
|
|
monitor_id=monitor.id,
|
|
status_code=status_code,
|
|
response_time=response_time,
|
|
is_up=is_up,
|
|
error_message=error_message,
|
|
checked_at=datetime.utcnow(),
|
|
)
|
|
|
|
self.db.add(uptime_check)
|
|
self.db.commit()
|
|
|
|
return {
|
|
"monitor_id": monitor.id,
|
|
"is_up": is_up,
|
|
"status_code": status_code,
|
|
"response_time": response_time,
|
|
"error_message": error_message,
|
|
"checked_at": uptime_check.checked_at,
|
|
}
|
|
|
|
def check_all_active_monitors(self) -> list:
|
|
active_monitors = self.db.query(Monitor).filter(Monitor.is_active).all()
|
|
results = []
|
|
|
|
for monitor in active_monitors:
|
|
result = self.check_monitor(monitor)
|
|
results.append(result)
|
|
|
|
return results
|