from typing import List, Optional from sqlalchemy.orm import Session from app.models.task import Task from app.schemas.task import TaskCreate, TaskUpdate def get_task(db: Session, task_id: int) -> Optional[Task]: return db.query(Task).filter(Task.id == task_id).first() def get_tasks(db: Session, skip: int = 0, limit: int = 100) -> List[Task]: return db.query(Task).offset(skip).limit(limit).all() def create_task(db: Session, task: TaskCreate) -> Task: db_task = Task( title=task.title, description=task.description, status=task.status, priority=task.priority, ) db.add(db_task) db.commit() db.refresh(db_task) return db_task def update_task(db: Session, task_id: int, task_update: TaskUpdate) -> Optional[Task]: db_task = db.query(Task).filter(Task.id == task_id).first() if db_task is None: return None update_data = task_update.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(db_task, field, value) db.commit() db.refresh(db_task) return db_task def delete_task(db: Session, task_id: int) -> bool: db_task = db.query(Task).filter(Task.id == task_id).first() if db_task is None: return False db.delete(db_task) db.commit() return True def get_tasks_by_status( db: Session, status: str, skip: int = 0, limit: int = 100 ) -> List[Task]: return db.query(Task).filter(Task.status == status).offset(skip).limit(limit).all() def get_tasks_by_priority( db: Session, priority: str, skip: int = 0, limit: int = 100 ) -> List[Task]: return ( db.query(Task).filter(Task.priority == priority).offset(skip).limit(limit).all() )