104 lines
3.0 KiB
Python
104 lines
3.0 KiB
Python
from typing import List, Optional
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.crud.base import CRUDBase
|
|
from app.models.task import Task, TaskStatus
|
|
from app.models.user import User
|
|
from app.schemas.task import TaskCreate, TaskUpdate
|
|
|
|
|
|
class CRUDTask(CRUDBase[Task, TaskCreate, TaskUpdate]):
|
|
def create_with_owner(
|
|
self, db: Session, *, obj_in: TaskCreate, owner_id: int
|
|
) -> Task:
|
|
obj_in_data = obj_in.model_dump()
|
|
db_obj = Task(**obj_in_data, owner_id=owner_id)
|
|
db.add(db_obj)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
return db_obj
|
|
|
|
def get_multi_by_owner(
|
|
self, db: Session, *, owner_id: int, skip: int = 0, limit: int = 100
|
|
) -> List[Task]:
|
|
return (
|
|
db.query(self.model)
|
|
.filter(Task.owner_id == owner_id)
|
|
.offset(skip)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
def get_multi_by_status(
|
|
self, db: Session, *, owner_id: int, status: TaskStatus, skip: int = 0, limit: int = 100
|
|
) -> List[Task]:
|
|
return (
|
|
db.query(self.model)
|
|
.filter(Task.owner_id == owner_id, Task.status == status)
|
|
.offset(skip)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
def get_multi_by_completion(
|
|
self, db: Session, *, owner_id: int, is_completed: bool, skip: int = 0, limit: int = 100
|
|
) -> List[Task]:
|
|
return (
|
|
db.query(self.model)
|
|
.filter(Task.owner_id == owner_id, Task.is_completed == is_completed)
|
|
.offset(skip)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
def mark_as_complete(
|
|
self, db: Session, *, task_id: int, user: User
|
|
) -> Optional[Task]:
|
|
task = db.query(self.model).filter(
|
|
Task.id == task_id, Task.owner_id == user.id
|
|
).first()
|
|
if not task:
|
|
return None
|
|
task.is_completed = True
|
|
task.status = TaskStatus.DONE
|
|
db.add(task)
|
|
db.commit()
|
|
db.refresh(task)
|
|
return task
|
|
|
|
def mark_as_incomplete(
|
|
self, db: Session, *, task_id: int, user: User
|
|
) -> Optional[Task]:
|
|
task = db.query(self.model).filter(
|
|
Task.id == task_id, Task.owner_id == user.id
|
|
).first()
|
|
if not task:
|
|
return None
|
|
task.is_completed = False
|
|
if task.status == TaskStatus.DONE:
|
|
task.status = TaskStatus.IN_PROGRESS
|
|
db.add(task)
|
|
db.commit()
|
|
db.refresh(task)
|
|
return task
|
|
|
|
def update_task_status(
|
|
self, db: Session, *, task_id: int, user: User, status: TaskStatus
|
|
) -> Optional[Task]:
|
|
task = db.query(self.model).filter(
|
|
Task.id == task_id, Task.owner_id == user.id
|
|
).first()
|
|
if not task:
|
|
return None
|
|
task.status = status
|
|
if status == TaskStatus.DONE:
|
|
task.is_completed = True
|
|
else:
|
|
task.is_completed = False
|
|
db.add(task)
|
|
db.commit()
|
|
db.refresh(task)
|
|
return task
|
|
|
|
|
|
task = CRUDTask(Task) |