
- Add complete CRUD operations for task management - Implement task filtering by status, priority, and search - Add task statistics endpoint for summary data - Configure SQLite database with Alembic migrations - Set up FastAPI with CORS support and API documentation - Include health check endpoint and base URL information - Add comprehensive README with API usage examples - Structure project with proper separation of concerns
76 lines
1.9 KiB
Python
76 lines
1.9 KiB
Python
from sqlalchemy.orm import Session
|
|
from sqlalchemy import or_
|
|
from typing import List, Optional
|
|
from app.models.task import Task, TaskStatus, TaskPriority
|
|
from app.schemas.task import TaskCreate, TaskUpdate
|
|
|
|
|
|
def get_task(db: Session, task_id: int) -> Optional[Task]:
|
|
return db.query(Task).filter(Task.id == task_id).first()
|
|
|
|
|
|
def get_tasks(
|
|
db: Session,
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
status: Optional[TaskStatus] = None,
|
|
priority: Optional[TaskPriority] = None,
|
|
search: Optional[str] = None
|
|
) -> List[Task]:
|
|
query = db.query(Task)
|
|
|
|
if status:
|
|
query = query.filter(Task.status == status)
|
|
|
|
if priority:
|
|
query = query.filter(Task.priority == priority)
|
|
|
|
if search:
|
|
query = query.filter(
|
|
or_(
|
|
Task.title.contains(search),
|
|
Task.description.contains(search)
|
|
)
|
|
)
|
|
|
|
return query.offset(skip).limit(limit).all()
|
|
|
|
|
|
def create_task(db: Session, task: TaskCreate) -> Task:
|
|
db_task = Task(**task.dict())
|
|
db.add(db_task)
|
|
db.commit()
|
|
db.refresh(db_task)
|
|
return db_task
|
|
|
|
|
|
def update_task(db: Session, task_id: int, task_update: TaskUpdate) -> Optional[Task]:
|
|
db_task = db.query(Task).filter(Task.id == task_id).first()
|
|
if not db_task:
|
|
return None
|
|
|
|
update_data = task_update.dict(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(db_task, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_task)
|
|
return db_task
|
|
|
|
|
|
def delete_task(db: Session, task_id: int) -> bool:
|
|
db_task = db.query(Task).filter(Task.id == task_id).first()
|
|
if not db_task:
|
|
return False
|
|
|
|
db.delete(db_task)
|
|
db.commit()
|
|
return True
|
|
|
|
|
|
def get_tasks_count(db: Session) -> int:
|
|
return db.query(Task).count()
|
|
|
|
|
|
def get_tasks_by_status_count(db: Session, status: TaskStatus) -> int:
|
|
return db.query(Task).filter(Task.status == status).count() |