from fastapi import Request, status from fastapi.responses import JSONResponse from starlette.middleware.base import BaseHTTPMiddleware import time import logging from typing import Dict from collections import defaultdict # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Rate limiting storage rate_limit_storage: Dict[str, Dict] = defaultdict(lambda: {"count": 0, "reset_time": 0}) class RateLimitMiddleware(BaseHTTPMiddleware): def __init__(self, app, calls: int = 100, period: int = 60): super().__init__(app) self.calls = calls self.period = period async def dispatch(self, request: Request, call_next): client_ip = request.client.host current_time = time.time() # Clean up old entries if current_time > rate_limit_storage[client_ip]["reset_time"]: rate_limit_storage[client_ip] = {"count": 0, "reset_time": current_time + self.period} # Check rate limit if rate_limit_storage[client_ip]["count"] >= self.calls: return JSONResponse( status_code=status.HTTP_429_TOO_MANY_REQUESTS, content={"detail": "Rate limit exceeded"} ) # Increment counter rate_limit_storage[client_ip]["count"] += 1 response = await call_next(request) return response class SecurityHeadersMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): response = await call_next(request) # Add security headers response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" response.headers["X-XSS-Protection"] = "1; mode=block" response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains" response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" return response class LoggingMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): start_time = time.time() # Log request logger.info(f"Request: {request.method} {request.url}") response = await call_next(request) # Log response process_time = time.time() - start_time logger.info(f"Response: {response.status_code} - {process_time:.4f}s") return response