from sqlalchemy.orm import Session from app.models.task import Task, TaskStatus from app.schemas.task import TaskCreate, TaskUpdate from typing import List, Optional def get_tasks_by_user( db: Session, user_id: int, skip: int = 0, limit: int = 100 ) -> List[Task]: return ( db.query(Task).filter(Task.owner_id == user_id).offset(skip).limit(limit).all() ) def get_task_by_id(db: Session, task_id: int, user_id: int) -> Optional[Task]: return db.query(Task).filter(Task.id == task_id, Task.owner_id == user_id).first() def create_task(db: Session, task: TaskCreate, user_id: int) -> Task: db_task = Task(**task.dict(), owner_id=user_id) db.add(db_task) db.commit() db.refresh(db_task) return db_task def update_task( db: Session, task_id: int, task_update: TaskUpdate, user_id: int ) -> Optional[Task]: db_task = get_task_by_id(db, task_id, user_id) if not db_task: return None update_data = task_update.dict(exclude_unset=True) if "status" in update_data: if update_data["status"] == TaskStatus.COMPLETED: update_data["is_completed"] = True elif update_data["status"] != TaskStatus.COMPLETED and db_task.is_completed: update_data["is_completed"] = False if "is_completed" in update_data and update_data["is_completed"]: update_data["status"] = TaskStatus.COMPLETED for field, value in update_data.items(): setattr(db_task, field, value) db.commit() db.refresh(db_task) return db_task def delete_task(db: Session, task_id: int, user_id: int) -> bool: db_task = get_task_by_id(db, task_id, user_id) if not db_task: return False db.delete(db_task) db.commit() return True def get_tasks_by_status(db: Session, user_id: int, status: TaskStatus) -> List[Task]: return db.query(Task).filter(Task.owner_id == user_id, Task.status == status).all()