import hashlib import json import pickle from typing import Any, Optional, Union from functools import wraps from cachetools import TTLCache import asyncio import time class InMemoryCache: """High-performance in-memory cache for FastAPI""" def __init__(self, maxsize: int = 1000, ttl: int = 300): self.cache = TTLCache(maxsize=maxsize, ttl=ttl) self._stats = {"hits": 0, "misses": 0, "sets": 0} def _make_key(self, key: Union[str, dict, list]) -> str: """Create a consistent cache key from various input types""" if isinstance(key, str): return key elif isinstance(key, (dict, list)): # Create deterministic hash for complex objects key_str = json.dumps(key, sort_keys=True, default=str) return hashlib.md5(key_str.encode()).hexdigest() else: return str(key) def get(self, key: Union[str, dict, list]) -> Optional[Any]: """Get value from cache""" cache_key = self._make_key(key) try: value = self.cache[cache_key] self._stats["hits"] += 1 return value except KeyError: self._stats["misses"] += 1 return None def set(self, key: Union[str, dict, list], value: Any, ttl: Optional[int] = None) -> None: """Set value in cache""" cache_key = self._make_key(key) if ttl: # For custom TTL, we'd need a different approach # For now, use default TTL pass self.cache[cache_key] = value self._stats["sets"] += 1 def delete(self, key: Union[str, dict, list]) -> bool: """Delete value from cache""" cache_key = self._make_key(key) try: del self.cache[cache_key] return True except KeyError: return False def clear(self) -> None: """Clear all cache""" self.cache.clear() def get_stats(self) -> dict: """Get cache statistics""" total_requests = self._stats["hits"] + self._stats["misses"] hit_rate = (self._stats["hits"] / total_requests * 100) if total_requests > 0 else 0 return { "hits": self._stats["hits"], "misses": self._stats["misses"], "sets": self._stats["sets"], "hit_rate": round(hit_rate, 2), "cache_size": len(self.cache), "max_size": self.cache.maxsize } # Global cache instances user_cache = InMemoryCache(maxsize=500, ttl=300) # 5 minutes job_cache = InMemoryCache(maxsize=1000, ttl=600) # 10 minutes resume_cache = InMemoryCache(maxsize=500, ttl=300) # 5 minutes match_cache = InMemoryCache(maxsize=2000, ttl=1800) # 30 minutes ai_cache = InMemoryCache(maxsize=500, ttl=3600) # 1 hour for AI results def cache_response(cache_instance: InMemoryCache, ttl: int = 300): """Decorator to cache function responses""" def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): # Create cache key from function name and arguments cache_key = { "func": func.__name__, "args": args, "kwargs": kwargs } # Try to get from cache cached_result = cache_instance.get(cache_key) if cached_result is not None: return cached_result # Execute function and cache result if asyncio.iscoroutinefunction(func): result = await func(*args, **kwargs) else: result = func(*args, **kwargs) cache_instance.set(cache_key, result, ttl) return result @wraps(func) def sync_wrapper(*args, **kwargs): # Create cache key from function name and arguments cache_key = { "func": func.__name__, "args": args, "kwargs": kwargs } # Try to get from cache cached_result = cache_instance.get(cache_key) if cached_result is not None: return cached_result # Execute function and cache result result = func(*args, **kwargs) cache_instance.set(cache_key, result, ttl) return result if asyncio.iscoroutinefunction(func): return async_wrapper else: return sync_wrapper return decorator def invalidate_user_cache(user_id: int): """Invalidate all cache entries for a specific user""" # This is a simple implementation - in production you might want # more sophisticated cache invalidation pass def get_all_cache_stats() -> dict: """Get statistics for all cache instances""" return { "user_cache": user_cache.get_stats(), "job_cache": job_cache.get_stats(), "resume_cache": resume_cache.get_stats(), "match_cache": match_cache.get_stats(), "ai_cache": ai_cache.get_stats() }