Automated Action 73c662b24c Fix requirements.txt issue and lint code
- Add requirements.txt file to git
- Add project files to git
- Fix linting issues in task.py and env.py
- Update SQLAlchemy queries to use 'not' instead of '== False'
- Fix import ordering in env.py
2025-06-06 10:44:18 +00:00

141 lines
3.5 KiB
Python

from datetime import datetime
from typing import List, Optional, Union, Dict, Any
from sqlalchemy import and_, desc, asc
from sqlalchemy.orm import Session
from app.models.task import Task, TaskStatus
from app.schemas.task import TaskCreate, TaskUpdate
def get_task(db: Session, task_id: int) -> Optional[Task]:
"""
Get a task by ID.
"""
return db.query(Task).filter(Task.id == task_id, Task.is_deleted == False).first()
def get_tasks(
db: Session,
skip: int = 0,
limit: int = 100,
status: Optional[TaskStatus] = None,
search: Optional[str] = None,
sort_by: str = "created_at",
sort_order: str = "desc"
) -> List[Task]:
"""
Get a list of tasks with filtering, sorting and pagination.
"""
query = db.query(Task).filter(Task.is_deleted == False)
# Apply status filter if provided
if status:
query = query.filter(Task.status == status)
# Apply search filter if provided
if search:
query = query.filter(
Task.title.ilike(f"%{search}%") | Task.description.ilike(f"%{search}%")
)
# Apply sorting
if sort_order.lower() == "asc":
query = query.order_by(asc(getattr(Task, sort_by)))
else:
query = query.order_by(desc(getattr(Task, sort_by)))
# Apply pagination
tasks = query.offset(skip).limit(limit).all()
return tasks
def get_tasks_count(
db: Session,
status: Optional[TaskStatus] = None,
search: Optional[str] = None,
) -> int:
"""
Get the total count of tasks with the specified filters.
"""
query = db.query(Task).filter(Task.is_deleted == False)
# Apply status filter if provided
if status:
query = query.filter(Task.status == status)
# Apply search filter if provided
if search:
query = query.filter(
Task.title.ilike(f"%{search}%") | Task.description.ilike(f"%{search}%")
)
return query.count()
def create_task(db: Session, task: TaskCreate) -> Task:
"""
Create a new task.
"""
db_task = Task(
title=task.title,
description=task.description,
status=task.status,
priority=task.priority,
due_date=task.due_date
)
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
def update_task(db: Session, task_id: int, task_update: TaskUpdate) -> Optional[Task]:
"""
Update a task.
"""
db_task = get_task(db, task_id)
if not db_task:
return None
# Convert Pydantic model to dict, excluding None values
update_data = task_update.dict(exclude_unset=True)
# Mark as completed if status changed to DONE
if "status" in update_data and update_data["status"] == TaskStatus.DONE and db_task.status != TaskStatus.DONE:
update_data["completed_at"] = datetime.utcnow()
# Apply updates
for key, value in update_data.items():
setattr(db_task, key, value)
db.commit()
db.refresh(db_task)
return db_task
def delete_task(db: Session, task_id: int) -> Optional[Task]:
"""
Soft delete a task by setting is_deleted to True.
"""
db_task = get_task(db, task_id)
if not db_task:
return None
db_task.is_deleted = True
db.commit()
return db_task
def permanently_delete_task(db: Session, task_id: int) -> bool:
"""
Permanently delete a task from the database.
"""
db_task = db.query(Task).filter(Task.id == task_id).first()
if not db_task:
return False
db.delete(db_task)
db.commit()
return True