from sqlalchemy.orm import Session from sqlalchemy import or_ from typing import List, Optional from app.models.task import Task, TaskStatus, TaskPriority from app.schemas.task import TaskCreate, TaskUpdate def get_task(db: Session, task_id: int) -> Optional[Task]: return db.query(Task).filter(Task.id == task_id).first() def get_tasks( db: Session, skip: int = 0, limit: int = 100, status: Optional[TaskStatus] = None, priority: Optional[TaskPriority] = None, search: Optional[str] = None ) -> List[Task]: query = db.query(Task) if status: query = query.filter(Task.status == status) if priority: query = query.filter(Task.priority == priority) if search: query = query.filter( or_( Task.title.contains(search), Task.description.contains(search) ) ) return query.offset(skip).limit(limit).all() def create_task(db: Session, task: TaskCreate) -> Task: db_task = Task(**task.dict()) db.add(db_task) db.commit() db.refresh(db_task) return db_task def update_task(db: Session, task_id: int, task_update: TaskUpdate) -> Optional[Task]: db_task = db.query(Task).filter(Task.id == task_id).first() if not db_task: return None update_data = task_update.dict(exclude_unset=True) for field, value in update_data.items(): setattr(db_task, field, value) db.commit() db.refresh(db_task) return db_task def delete_task(db: Session, task_id: int) -> bool: db_task = db.query(Task).filter(Task.id == task_id).first() if not db_task: return False db.delete(db_task) db.commit() return True def get_tasks_count(db: Session) -> int: return db.query(Task).count() def get_tasks_by_status_count(db: Session, status: TaskStatus) -> int: return db.query(Task).filter(Task.status == status).count()