mark-xl1tc0/helpers/generic_helpers.py

335 lines
9.9 KiB
Python

import logging
from typing import Dict, Any, List, Optional, Union, Callable
import uuid
import datetime
import traceback
import time
import hashlib
from fastapi import HTTPException
import litellm
# Configure logging
logger = logging.getLogger(__name__)
# In-memory data store as fallback
_generic_store: List[Dict[str, Any]] = []
def generate_unique_id() -> str:
"""
Generates a unique identifier.
Returns:
str: A unique UUID string
"""
return str(uuid.uuid4())
def get_timestamp() -> str:
"""
Gets the current timestamp in ISO format.
Returns:
str: Current timestamp in ISO format
"""
return datetime.datetime.now().isoformat()
def safe_json_serialize(obj: Any) -> Dict[str, Any]:
"""
Safely serializes an object to a JSON-compatible dictionary.
Args:
obj (Any): The object to serialize
Returns:
Dict[str, Any]: JSON-compatible dictionary
"""
if isinstance(obj, dict):
return {k: safe_json_serialize(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [safe_json_serialize(item) for item in obj]
elif isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
elif isinstance(obj, uuid.UUID):
return str(obj)
elif hasattr(obj, "__dict__"):
return safe_json_serialize(obj.__dict__)
else:
return obj
def log_error(error_message: str, exception: Optional[Exception] = None) -> None:
"""
Logs an error with optional exception details.
Args:
error_message (str): The error message to log
exception (Optional[Exception]): The exception that occurred, if any
"""
if exception:
logger.error(f"{error_message}: {str(exception)}")
logger.debug(traceback.format_exc())
else:
logger.error(error_message)
def validate_data(data: Dict[str, Any], required_fields: List[str]) -> bool:
"""
Validates that a dictionary contains all required fields.
Args:
data (Dict[str, Any]): The data to validate
required_fields (List[str]): List of required field names
Returns:
bool: True if valid, False otherwise
"""
if not isinstance(data, dict):
return False
for field in required_fields:
if field not in data or data[field] is None:
return False
return True
def create_generic_item(item_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Creates a new generic item in the in-memory store.
Args:
item_data (Dict[str, Any]): The item data
Returns:
Dict[str, Any]: The created item with ID and timestamps
"""
if not item_data:
raise ValueError("Item data cannot be empty")
new_item = item_data.copy()
new_item["id"] = generate_unique_id()
new_item["created_at"] = get_timestamp()
new_item["updated_at"] = new_item["created_at"]
_generic_store.append(new_item)
return new_item
def get_generic_item_by_id(item_id: str) -> Optional[Dict[str, Any]]:
"""
Retrieves a generic item by its ID from the in-memory store.
Args:
item_id (str): The ID of the item to retrieve
Returns:
Optional[Dict[str, Any]]: The item if found, otherwise None
"""
for item in _generic_store:
if item.get("id") == item_id:
return item
return None
def update_generic_item(item_id: str, update_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Updates a generic item in the in-memory store.
Args:
item_id (str): The ID of the item to update
update_data (Dict[str, Any]): The data to update
Returns:
Optional[Dict[str, Any]]: The updated item if found, otherwise None
"""
for i, item in enumerate(_generic_store):
if item.get("id") == item_id:
updated_item = {**item, **update_data, "updated_at": get_timestamp()}
_generic_store[i] = updated_item
return updated_item
return None
def delete_generic_item(item_id: str) -> bool:
"""
Deletes a generic item from the in-memory store.
Args:
item_id (str): The ID of the item to delete
Returns:
bool: True if the item was deleted, False otherwise
"""
for i, item in enumerate(_generic_store):
if item.get("id") == item_id:
_generic_store.pop(i)
return True
return False
def filter_generic_items(filter_func: Callable[[Dict[str, Any]], bool]) -> List[Dict[str, Any]]:
"""
Filters generic items based on a filter function.
Args:
filter_func (Callable): A function that takes an item and returns True if it should be included
Returns:
List[Dict[str, Any]]: List of filtered items
"""
return [item for item in _generic_store if filter_func(item)]
def batch_process(items: List[Any], process_func: Callable[[Any], Any]) -> List[Any]:
"""
Processes a batch of items using the provided function.
Args:
items (List[Any]): List of items to process
process_func (Callable): Function to apply to each item
Returns:
List[Any]: List of processed items
"""
results = []
errors = []
for item in items:
try:
result = process_func(item)
results.append(result)
except Exception as e:
errors.append((item, str(e)))
log_error(f"Error processing item {item}", e)
if errors:
logger.warning(f"Batch processing completed with {len(errors)} errors")
return results
def hash_data(data: Union[str, bytes]) -> str:
"""
Creates a SHA-256 hash of the provided data.
Args:
data (Union[str, bytes]): Data to hash
Returns:
str: Hexadecimal hash string
"""
if isinstance(data, str):
data = data.encode('utf-8')
return hashlib.sha256(data).hexdigest()
def retry_operation(operation: Callable, max_attempts: int = 3, delay: float = 1.0) -> Any:
"""
Retries an operation multiple times before giving up.
Args:
operation (Callable): The operation to retry
max_attempts (int): Maximum number of attempts
delay (float): Delay between attempts in seconds
Returns:
Any: Result of the operation if successful
Raises:
Exception: The last exception encountered if all attempts fail
"""
last_exception = None
for attempt in range(max_attempts):
try:
return operation()
except Exception as e:
last_exception = e
log_error(f"Operation failed (attempt {attempt+1}/{max_attempts})", e)
if attempt < max_attempts - 1:
time.sleep(delay * (attempt + 1)) # Exponential backoff
if last_exception:
raise last_exception
raise RuntimeError("Operation failed for unknown reasons")
def paginate_results(items: List[Any], page: int = 1, page_size: int = 10) -> Dict[str, Any]:
"""
Paginates a list of items.
Args:
items (List[Any]): The items to paginate
page (int): The page number (1-indexed)
page_size (int): The number of items per page
Returns:
Dict[str, Any]: Pagination result with items and metadata
"""
if page < 1:
page = 1
if page_size < 1:
page_size = 10
total_items = len(items)
total_pages = (total_items + page_size - 1) // page_size
start_idx = (page - 1) * page_size
end_idx = min(start_idx + page_size, total_items)
return {
"items": items[start_idx:end_idx],
"pagination": {
"page": page,
"page_size": page_size,
"total_items": total_items,
"total_pages": total_pages
}
}
def handle_http_error(status_code: int, detail: str) -> None:
"""
Raises an HTTPException with the specified status code and detail.
Args:
status_code (int): HTTP status code
detail (str): Error detail message
Raises:
HTTPException: With the specified status code and detail
"""
raise HTTPException(status_code=status_code, detail=detail)
def process_llm_request(model: str, prompt: str, temperature: float = 0.7, max_tokens: int = 1000) -> Dict[str, Any]:
"""
Processes an LLM request using litellm to handle the actual inference.
Args:
model (str): The LLM model to use for inference
prompt (str): The prompt text to send to the LLM
temperature (float): Controls randomness in the output (0-1)
max_tokens (int): Maximum number of tokens to generate
Returns:
Dict[str, Any]: The LLM response with content and metadata
"""
try:
logger.info(f"Sending request to LLM model: {model}")
# Make the actual LLM call using litellm
response = litellm.completion(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=temperature,
max_tokens=max_tokens
)
# Process and return the response
result = {
"id": response.id,
"content": response.choices[0].message.content,
"model": response.model,
"created": response.created,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
}
}
logger.info(f"LLM request completed successfully. Used {result['usage']['total_tokens']} tokens.")
return result
except Exception as e:
log_error("Error processing LLM request", e)
raise HTTPException(
status_code=500,
detail=f"LLM processing error: {str(e)}"
)