from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union from fastapi.encoders import jsonable_encoder from pydantic import BaseModel from sqlalchemy.orm import Session from app.db.base_class import Base ModelType = TypeVar("ModelType", bound=Base) CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel) UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel) class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]): def __init__(self, model: Type[ModelType]): """ CRUD object with default methods to Create, Read, Update, Delete (CRUD). """ self.model = model def get(self, db: Session, id: Any) -> Optional[ModelType]: return db.query(self.model).filter(self.model.id == id).first() def get_multi( self, db: Session, *, skip: int = 0, limit: int = 100 ) -> List[ModelType]: return db.query(self.model).offset(skip).limit(limit).all() def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType: try: obj_in_data = jsonable_encoder(obj_in) print(f"Creating {self.model.__name__} with data: {obj_in_data}") db_obj = self.model(**obj_in_data) db.add(db_obj) db.commit() db.refresh(db_obj) print(f"Successfully created {self.model.__name__} with id: {db_obj.id}") return db_obj except Exception as e: db.rollback() error_msg = f"Error creating {self.model.__name__}: {str(e)}" print(error_msg) import traceback print(traceback.format_exc()) raise Exception(error_msg) from e def update( self, db: Session, *, db_obj: ModelType, obj_in: Union[UpdateSchemaType, Dict[str, Any]], ) -> ModelType: try: # Log update operation print(f"Updating {self.model.__name__} with id: {db_obj.id}") # Get the existing data obj_data = jsonable_encoder(db_obj) # Process the update data if isinstance(obj_in, dict): update_data = obj_in else: # Handle both Pydantic v1 and v2 if hasattr(obj_in, "model_dump"): update_data = obj_in.model_dump(exclude_unset=True) else: update_data = obj_in.dict(exclude_unset=True) # Log the changes being made changes = {k: v for k, v in update_data.items() if k in obj_data} print(f"Fields to update: {changes}") # Apply the updates for field in obj_data: if field in update_data: setattr(db_obj, field, update_data[field]) # Save changes db.add(db_obj) db.commit() db.refresh(db_obj) print(f"Successfully updated {self.model.__name__} with id: {db_obj.id}") return db_obj except Exception as e: db.rollback() error_msg = f"Error updating {self.model.__name__}: {str(e)}" print(error_msg) import traceback print(traceback.format_exc()) raise Exception(error_msg) from e def remove(self, db: Session, *, id: int) -> ModelType: try: # Get the object first obj = db.query(self.model).filter(self.model.id == id).first() if not obj: print(f"{self.model.__name__} with id {id} not found for deletion") return None print(f"Deleting {self.model.__name__} with id: {id}") # Delete the object db.delete(obj) db.commit() print(f"Successfully deleted {self.model.__name__} with id: {id}") return obj except Exception as e: db.rollback() error_msg = f"Error deleting {self.model.__name__}: {str(e)}" print(error_msg) import traceback print(traceback.format_exc()) raise Exception(error_msg) from e