from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union from fastapi.encoders import jsonable_encoder from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from app.db.base import Base ModelType = TypeVar("ModelType", bound=Base) CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel) UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel) class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]): def __init__(self, model: Type[ModelType]): """ CRUD object with default methods to Create, Read, Update, Delete (CRUD). **Parameters** * `model`: A SQLAlchemy model class * `schema`: A Pydantic model (schema) class """ self.model = model async def get(self, db: AsyncSession, id_: Any) -> Optional[ModelType]: result = await db.execute(select(self.model).filter(self.model.id == id_)) return result.scalars().first() async def get_multi( self, db: AsyncSession, *, skip: int = 0, limit: int = 100 ) -> List[ModelType]: result = await db.execute(select(self.model).offset(skip).limit(limit)) return result.scalars().all() async def create(self, db: AsyncSession, *, obj_in: CreateSchemaType) -> ModelType: obj_in_data = jsonable_encoder(obj_in) db_obj = self.model(**obj_in_data) db.add(db_obj) await db.commit() await db.refresh(db_obj) return db_obj async def update( self, db: AsyncSession, *, db_obj: ModelType, obj_in: Union[UpdateSchemaType, Dict[str, Any]] ) -> ModelType: obj_data = jsonable_encoder(db_obj) if isinstance(obj_in, dict): update_data = obj_in else: update_data = obj_in.dict(exclude_unset=True) for field in obj_data: if field in update_data: setattr(db_obj, field, update_data[field]) db.add(db_obj) await db.commit() await db.refresh(db_obj) return db_obj async def remove(self, db: AsyncSession, *, id_: str) -> ModelType: obj = await db.execute(select(self.model).filter(self.model.id == id_)) obj = obj.scalars().first() await db.delete(obj) await db.commit() return obj