2025-06-06 10:44:35 +00:00

141 lines
3.5 KiB
Python

from datetime import datetime
from typing import List, Optional
from sqlalchemy import desc, asc
from sqlalchemy.orm import Session
from app.models.task import Task, TaskStatus
from app.schemas.task import TaskCreate, TaskUpdate
def get_task(db: Session, task_id: int) -> Optional[Task]:
"""
Get a task by ID.
"""
return db.query(Task).filter(Task.id == task_id, not Task.is_deleted).first()
def get_tasks(
db: Session,
skip: int = 0,
limit: int = 100,
status: Optional[TaskStatus] = None,
search: Optional[str] = None,
sort_by: str = "created_at",
sort_order: str = "desc"
) -> List[Task]:
"""
Get a list of tasks with filtering, sorting and pagination.
"""
query = db.query(Task).filter(not Task.is_deleted)
# Apply status filter if provided
if status:
query = query.filter(Task.status == status)
# Apply search filter if provided
if search:
query = query.filter(
Task.title.ilike(f"%{search}%") | Task.description.ilike(f"%{search}%")
)
# Apply sorting
if sort_order.lower() == "asc":
query = query.order_by(asc(getattr(Task, sort_by)))
else:
query = query.order_by(desc(getattr(Task, sort_by)))
# Apply pagination
tasks = query.offset(skip).limit(limit).all()
return tasks
def get_tasks_count(
db: Session,
status: Optional[TaskStatus] = None,
search: Optional[str] = None,
) -> int:
"""
Get the total count of tasks with the specified filters.
"""
query = db.query(Task).filter(not Task.is_deleted)
# Apply status filter if provided
if status:
query = query.filter(Task.status == status)
# Apply search filter if provided
if search:
query = query.filter(
Task.title.ilike(f"%{search}%") | Task.description.ilike(f"%{search}%")
)
return query.count()
def create_task(db: Session, task: TaskCreate) -> Task:
"""
Create a new task.
"""
db_task = Task(
title=task.title,
description=task.description,
status=task.status,
priority=task.priority,
due_date=task.due_date
)
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
def update_task(db: Session, task_id: int, task_update: TaskUpdate) -> Optional[Task]:
"""
Update a task.
"""
db_task = get_task(db, task_id)
if not db_task:
return None
# Convert Pydantic model to dict, excluding None values
update_data = task_update.dict(exclude_unset=True)
# Mark as completed if status changed to DONE
if "status" in update_data and update_data["status"] == TaskStatus.DONE and db_task.status != TaskStatus.DONE:
update_data["completed_at"] = datetime.utcnow()
# Apply updates
for key, value in update_data.items():
setattr(db_task, key, value)
db.commit()
db.refresh(db_task)
return db_task
def delete_task(db: Session, task_id: int) -> Optional[Task]:
"""
Soft delete a task by setting is_deleted to True.
"""
db_task = get_task(db, task_id)
if not db_task:
return None
db_task.is_deleted = True
db.commit()
return db_task
def permanently_delete_task(db: Session, task_id: int) -> bool:
"""
Permanently delete a task from the database.
"""
db_task = db.query(Task).filter(Task.id == task_id).first()
if not db_task:
return False
db.delete(db_task)
db.commit()
return True