
- Setup project structure and FastAPI application - Create SQLite database with SQLAlchemy - Implement user authentication with JWT - Create task and user models - Add CRUD operations for tasks and users - Configure Alembic for database migrations - Implement API endpoints for task management - Add error handling and validation - Configure CORS middleware - Create health check endpoint - Add comprehensive documentation
70 lines
2.3 KiB
Python
70 lines
2.3 KiB
Python
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
|
|
|
|
from fastapi.encoders import jsonable_encoder
|
|
from pydantic import BaseModel
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.future import select
|
|
|
|
from app.db.base import Base
|
|
|
|
ModelType = TypeVar("ModelType", bound=Base)
|
|
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
|
|
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
|
|
|
|
|
|
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
|
|
def __init__(self, model: Type[ModelType]):
|
|
"""
|
|
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
|
|
|
|
**Parameters**
|
|
|
|
* `model`: A SQLAlchemy model class
|
|
* `schema`: A Pydantic model (schema) class
|
|
"""
|
|
self.model = model
|
|
|
|
async def get(self, db: AsyncSession, id_: Any) -> Optional[ModelType]:
|
|
result = await db.execute(select(self.model).filter(self.model.id == id_))
|
|
return result.scalars().first()
|
|
|
|
async def get_multi(
|
|
self, db: AsyncSession, *, skip: int = 0, limit: int = 100
|
|
) -> List[ModelType]:
|
|
result = await db.execute(select(self.model).offset(skip).limit(limit))
|
|
return result.scalars().all()
|
|
|
|
async def create(self, db: AsyncSession, *, obj_in: CreateSchemaType) -> ModelType:
|
|
obj_in_data = jsonable_encoder(obj_in)
|
|
db_obj = self.model(**obj_in_data)
|
|
db.add(db_obj)
|
|
await db.commit()
|
|
await db.refresh(db_obj)
|
|
return db_obj
|
|
|
|
async def update(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
db_obj: ModelType,
|
|
obj_in: Union[UpdateSchemaType, Dict[str, Any]]
|
|
) -> ModelType:
|
|
obj_data = jsonable_encoder(db_obj)
|
|
if isinstance(obj_in, dict):
|
|
update_data = obj_in
|
|
else:
|
|
update_data = obj_in.dict(exclude_unset=True)
|
|
for field in obj_data:
|
|
if field in update_data:
|
|
setattr(db_obj, field, update_data[field])
|
|
db.add(db_obj)
|
|
await db.commit()
|
|
await db.refresh(db_obj)
|
|
return db_obj
|
|
|
|
async def remove(self, db: AsyncSession, *, id_: str) -> ModelType:
|
|
obj = await db.execute(select(self.model).filter(self.model.id == id_))
|
|
obj = obj.scalars().first()
|
|
await db.delete(obj)
|
|
await db.commit()
|
|
return obj |