from datetime import datetime from typing import List, Optional from sqlalchemy import select from sqlalchemy.orm import Session from sqlalchemy.sql import func from app.core.exceptions import TaskNotFoundException from app.models.task import Task, TaskPriority, TaskStatus from app.schemas.task import TaskCreate, TaskUpdate class TaskService: """Service for task operations.""" @staticmethod def get_tasks( db: Session, skip: int = 0, limit: int = 100, status: Optional[TaskStatus] = None, priority: Optional[TaskPriority] = None, completed: Optional[bool] = None, ) -> List[Task]: """Get a list of tasks with optional filtering. Args: db: Database session skip: Number of items to skip limit: Maximum number of items to return status: Filter by task status priority: Filter by task priority completed: Filter by completion status Returns: List of tasks """ query = select(Task) if status is not None: query = query.where(Task.status == status) if priority is not None: query = query.where(Task.priority == priority) if completed is not None: query = query.where(Task.completed == completed) query = query.offset(skip).limit(limit) return db.execute(query).scalars().all() @staticmethod def count_tasks( db: Session, status: Optional[TaskStatus] = None, priority: Optional[TaskPriority] = None, completed: Optional[bool] = None, ) -> int: """Count tasks with optional filtering. Args: db: Database session status: Filter by task status priority: Filter by task priority completed: Filter by completion status Returns: Number of tasks """ query = select(func.count(Task.id)) if status is not None: query = query.where(Task.status == status) if priority is not None: query = query.where(Task.priority == priority) if completed is not None: query = query.where(Task.completed == completed) return db.execute(query).scalar_one() @staticmethod def get_task(db: Session, task_id: int) -> Task: """Get a task by ID. Args: db: Database session task_id: ID of the task Returns: Task Raises: HTTPException: If task is not found """ task = db.get(Task, task_id) if task is None: raise TaskNotFoundException(detail=f"Task with id {task_id} not found") return task @staticmethod def create_task(db: Session, task_data: TaskCreate) -> Task: """Create a new task. Args: db: Database session task_data: Task data Returns: Created task """ task = Task(**task_data.model_dump()) db.add(task) db.commit() db.refresh(task) return task @staticmethod def update_task(db: Session, task_id: int, task_data: TaskUpdate) -> Task: """Update an existing task. Args: db: Database session task_id: ID of the task task_data: Updated task data Returns: Updated task Raises: HTTPException: If task is not found """ task = TaskService.get_task(db, task_id) data = task_data.model_dump(exclude_unset=True) for key, value in data.items(): setattr(task, key, value) # Update the updated_at field task.updated_at = datetime.now() # If status is changed to DONE, mark as completed if task_data.status == TaskStatus.DONE: task.completed = True db.add(task) db.commit() db.refresh(task) return task @staticmethod def delete_task(db: Session, task_id: int) -> None: """Delete a task. Args: db: Database session task_id: ID of the task Raises: HTTPException: If task is not found """ task = TaskService.get_task(db, task_id) db.delete(task) db.commit()