from sqlalchemy.orm import Session from typing import List, Optional from app.crud.base import CRUDBase from app.models.task import Task, TaskStatus from app.schemas.task import TaskCreate, TaskUpdate class CRUDTask(CRUDBase[Task, TaskCreate, TaskUpdate]): def get_by_status( self, db: Session, *, status: TaskStatus, user_id: Optional[int] = None ) -> List[Task]: query = db.query(self.model).filter(Task.status == status) if user_id is not None: query = query.filter(Task.user_id == user_id) return query.all() def get_completed( self, db: Session, *, user_id: Optional[int] = None ) -> List[Task]: query = db.query(self.model).filter(Task.completed.is_(True)) if user_id is not None: query = query.filter(Task.user_id == user_id) return query.all() def get_multi( self, db: Session, *, skip: int = 0, limit: int = 100, user_id: Optional[int] = None, ) -> List[Task]: query = db.query(self.model) if user_id is not None: query = query.filter(Task.user_id == user_id) return query.offset(skip).limit(limit).all() def create_with_owner( self, db: Session, *, obj_in: TaskCreate, user_id: int ) -> Task: obj_in_data = ( obj_in.model_dump() if hasattr(obj_in, "model_dump") else obj_in.dict() ) db_obj = self.model(**obj_in_data, user_id=user_id) db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj def get_by_id_and_user( self, db: Session, *, task_id: int, user_id: int ) -> Optional[Task]: return ( db.query(self.model) .filter(Task.id == task_id, Task.user_id == user_id) .first() ) def mark_completed( self, db: Session, *, task_id: int, user_id: Optional[int] = None ) -> Optional[Task]: if user_id: task = self.get_by_id_and_user(db, task_id=task_id, user_id=user_id) else: task = self.get(db, id=task_id) if not task: return None task_in = TaskUpdate(status=TaskStatus.DONE, completed=True) return self.update(db, db_obj=task, obj_in=task_in) task = CRUDTask(Task)