290 lines
8.3 KiB
Python
290 lines
8.3 KiB
Python
import logging
|
|
from typing import Dict, Any, List, Optional, Union, Callable
|
|
import uuid
|
|
import datetime
|
|
import traceback
|
|
import time
|
|
import hashlib
|
|
from fastapi import HTTPException
|
|
|
|
# Since we don't have specific entity information and no model/schema code,
|
|
# we'll create generic utility helper functions that don't rely on database access
|
|
|
|
# In-memory data store as fallback
|
|
_generic_store: List[Dict[str, Any]] = []
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def generate_unique_id() -> str:
|
|
"""
|
|
Generates a unique identifier.
|
|
|
|
Returns:
|
|
str: A unique UUID string
|
|
"""
|
|
return str(uuid.uuid4())
|
|
|
|
def get_timestamp() -> str:
|
|
"""
|
|
Gets the current timestamp in ISO format.
|
|
|
|
Returns:
|
|
str: Current timestamp in ISO format
|
|
"""
|
|
return datetime.datetime.now().isoformat()
|
|
|
|
def safe_json_serialize(obj: Any) -> Dict[str, Any]:
|
|
"""
|
|
Safely serializes an object to a JSON-compatible dictionary.
|
|
|
|
Args:
|
|
obj (Any): The object to serialize
|
|
|
|
Returns:
|
|
Dict[str, Any]: JSON-compatible dictionary
|
|
"""
|
|
if isinstance(obj, dict):
|
|
return {k: safe_json_serialize(v) for k, v in obj.items()}
|
|
elif isinstance(obj, list):
|
|
return [safe_json_serialize(item) for item in obj]
|
|
elif isinstance(obj, (datetime.datetime, datetime.date)):
|
|
return obj.isoformat()
|
|
elif isinstance(obj, uuid.UUID):
|
|
return str(obj)
|
|
elif hasattr(obj, "__dict__"):
|
|
return safe_json_serialize(obj.__dict__)
|
|
else:
|
|
return obj
|
|
|
|
def log_error(error_message: str, exception: Optional[Exception] = None) -> None:
|
|
"""
|
|
Logs an error with optional exception details.
|
|
|
|
Args:
|
|
error_message (str): The error message to log
|
|
exception (Optional[Exception]): The exception that occurred, if any
|
|
"""
|
|
if exception:
|
|
logger.error(f"{error_message}: {str(exception)}")
|
|
logger.debug(traceback.format_exc())
|
|
else:
|
|
logger.error(error_message)
|
|
|
|
def validate_data(data: Dict[str, Any], required_fields: List[str]) -> bool:
|
|
"""
|
|
Validates that a dictionary contains all required fields.
|
|
|
|
Args:
|
|
data (Dict[str, Any]): The data to validate
|
|
required_fields (List[str]): List of required field names
|
|
|
|
Returns:
|
|
bool: True if valid, False otherwise
|
|
"""
|
|
if not isinstance(data, dict):
|
|
return False
|
|
|
|
for field in required_fields:
|
|
if field not in data or data[field] is None:
|
|
return False
|
|
return True
|
|
|
|
def create_generic_item(item_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Creates a new generic item in the in-memory store.
|
|
|
|
Args:
|
|
item_data (Dict[str, Any]): The item data
|
|
|
|
Returns:
|
|
Dict[str, Any]: The created item with ID and timestamps
|
|
"""
|
|
if not item_data:
|
|
raise ValueError("Item data cannot be empty")
|
|
|
|
new_item = item_data.copy()
|
|
new_item["id"] = generate_unique_id()
|
|
new_item["created_at"] = get_timestamp()
|
|
new_item["updated_at"] = new_item["created_at"]
|
|
|
|
_generic_store.append(new_item)
|
|
return new_item
|
|
|
|
def get_generic_item_by_id(item_id: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Retrieves a generic item by its ID from the in-memory store.
|
|
|
|
Args:
|
|
item_id (str): The ID of the item to retrieve
|
|
|
|
Returns:
|
|
Optional[Dict[str, Any]]: The item if found, otherwise None
|
|
"""
|
|
for item in _generic_store:
|
|
if item.get("id") == item_id:
|
|
return item
|
|
return None
|
|
|
|
def update_generic_item(item_id: str, update_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Updates a generic item in the in-memory store.
|
|
|
|
Args:
|
|
item_id (str): The ID of the item to update
|
|
update_data (Dict[str, Any]): The data to update
|
|
|
|
Returns:
|
|
Optional[Dict[str, Any]]: The updated item if found, otherwise None
|
|
"""
|
|
for i, item in enumerate(_generic_store):
|
|
if item.get("id") == item_id:
|
|
updated_item = {**item, **update_data, "updated_at": get_timestamp()}
|
|
_generic_store[i] = updated_item
|
|
return updated_item
|
|
return None
|
|
|
|
def delete_generic_item(item_id: str) -> bool:
|
|
"""
|
|
Deletes a generic item from the in-memory store.
|
|
|
|
Args:
|
|
item_id (str): The ID of the item to delete
|
|
|
|
Returns:
|
|
bool: True if the item was deleted, False otherwise
|
|
"""
|
|
for i, item in enumerate(_generic_store):
|
|
if item.get("id") == item_id:
|
|
_generic_store.pop(i)
|
|
return True
|
|
return False
|
|
|
|
def filter_generic_items(filter_func: Callable[[Dict[str, Any]], bool]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Filters generic items based on a filter function.
|
|
|
|
Args:
|
|
filter_func (Callable): A function that takes an item and returns True if it should be included
|
|
|
|
Returns:
|
|
List[Dict[str, Any]]: List of filtered items
|
|
"""
|
|
return [item for item in _generic_store if filter_func(item)]
|
|
|
|
def batch_process(items: List[Any], process_func: Callable[[Any], Any]) -> List[Any]:
|
|
"""
|
|
Processes a batch of items using the provided function.
|
|
|
|
Args:
|
|
items (List[Any]): List of items to process
|
|
process_func (Callable): Function to apply to each item
|
|
|
|
Returns:
|
|
List[Any]: List of processed items
|
|
"""
|
|
results = []
|
|
errors = []
|
|
|
|
for item in items:
|
|
try:
|
|
result = process_func(item)
|
|
results.append(result)
|
|
except Exception as e:
|
|
errors.append((item, str(e)))
|
|
log_error(f"Error processing item {item}", e)
|
|
|
|
if errors:
|
|
logger.warning(f"Batch processing completed with {len(errors)} errors")
|
|
|
|
return results
|
|
|
|
def hash_data(data: Union[str, bytes]) -> str:
|
|
"""
|
|
Creates a SHA-256 hash of the provided data.
|
|
|
|
Args:
|
|
data (Union[str, bytes]): Data to hash
|
|
|
|
Returns:
|
|
str: Hexadecimal hash string
|
|
"""
|
|
if isinstance(data, str):
|
|
data = data.encode('utf-8')
|
|
return hashlib.sha256(data).hexdigest()
|
|
|
|
def retry_operation(operation: Callable, max_attempts: int = 3, delay: float = 1.0) -> Any:
|
|
"""
|
|
Retries an operation multiple times before giving up.
|
|
|
|
Args:
|
|
operation (Callable): The operation to retry
|
|
max_attempts (int): Maximum number of attempts
|
|
delay (float): Delay between attempts in seconds
|
|
|
|
Returns:
|
|
Any: Result of the operation if successful
|
|
|
|
Raises:
|
|
Exception: The last exception encountered if all attempts fail
|
|
"""
|
|
last_exception = None
|
|
|
|
for attempt in range(max_attempts):
|
|
try:
|
|
return operation()
|
|
except Exception as e:
|
|
last_exception = e
|
|
log_error(f"Operation failed (attempt {attempt+1}/{max_attempts})", e)
|
|
if attempt < max_attempts - 1:
|
|
time.sleep(delay * (attempt + 1)) # Exponential backoff
|
|
|
|
if last_exception:
|
|
raise last_exception
|
|
raise RuntimeError("Operation failed for unknown reasons")
|
|
|
|
def paginate_results(items: List[Any], page: int = 1, page_size: int = 10) -> Dict[str, Any]:
|
|
"""
|
|
Paginates a list of items.
|
|
|
|
Args:
|
|
items (List[Any]): The items to paginate
|
|
page (int): The page number (1-indexed)
|
|
page_size (int): The number of items per page
|
|
|
|
Returns:
|
|
Dict[str, Any]: Pagination result with items and metadata
|
|
"""
|
|
if page < 1:
|
|
page = 1
|
|
if page_size < 1:
|
|
page_size = 10
|
|
|
|
total_items = len(items)
|
|
total_pages = (total_items + page_size - 1) // page_size
|
|
|
|
start_idx = (page - 1) * page_size
|
|
end_idx = min(start_idx + page_size, total_items)
|
|
|
|
return {
|
|
"items": items[start_idx:end_idx],
|
|
"pagination": {
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"total_items": total_items,
|
|
"total_pages": total_pages
|
|
}
|
|
}
|
|
|
|
def handle_http_error(status_code: int, detail: str) -> None:
|
|
"""
|
|
Raises an HTTPException with the specified status code and detail.
|
|
|
|
Args:
|
|
status_code (int): HTTP status code
|
|
detail (str): Error detail message
|
|
|
|
Raises:
|
|
HTTPException: With the specified status code and detail
|
|
"""
|
|
raise HTTPException(status_code=status_code, detail=detail) |