2025-06-12 19:05:38 +00:00

69 lines
2.0 KiB
Python

import uuid
from typing import List, Optional
from sqlalchemy import and_, asc, desc
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.task import Task
from app.schemas.task import TaskCreate, TaskUpdate
class CRUDTask(CRUDBase[Task, TaskCreate, TaskUpdate]):
def create_with_owner(
self, db: Session, *, obj_in: TaskCreate, owner_id: str
) -> Task:
db_obj = Task(
id=str(uuid.uuid4()),
title=obj_in.title,
description=obj_in.description,
priority=obj_in.priority,
due_date=obj_in.due_date,
is_completed=obj_in.is_completed,
owner_id=owner_id,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def get_multi_by_owner(
self,
db: Session,
*,
owner_id: str,
skip: int = 0,
limit: int = 100,
is_completed: Optional[bool] = None,
priority: Optional[int] = None,
sort_by: str = "created_at",
sort_order: str = "desc"
) -> List[Task]:
# Start with base query
query = db.query(Task).filter(Task.owner_id == owner_id)
# Apply filters if provided
if is_completed is not None:
query = query.filter(Task.is_completed == is_completed)
if priority is not None:
query = query.filter(Task.priority == priority)
# Apply sorting
if sort_order.lower() == "asc":
query = query.order_by(asc(getattr(Task, sort_by)))
else:
query = query.order_by(desc(getattr(Task, sort_by)))
# Apply pagination
return query.offset(skip).limit(limit).all()
def get_task_by_id_and_owner(
self, db: Session, *, task_id: str, owner_id: str
) -> Optional[Task]:
return db.query(Task).filter(
and_(Task.id == task_id, Task.owner_id == owner_id)
).first()
task = CRUDTask(Task)