
Added complete task management functionality including: - CRUD operations for tasks (create, read, update, delete) - Task model with status (pending/in_progress/completed) and priority (low/medium/high) - SQLite database with SQLAlchemy ORM - Alembic migrations for database schema - Pydantic schemas for request/response validation - FastAPI routers with proper error handling - Filtering and pagination support - Health check endpoint - CORS configuration - Comprehensive API documentation - Proper project structure following FastAPI best practices
64 lines
1.7 KiB
Python
64 lines
1.7 KiB
Python
from typing import List, Optional
|
|
from sqlalchemy.orm import Session
|
|
from app.models.task import Task
|
|
from app.schemas.task import TaskCreate, TaskUpdate
|
|
|
|
|
|
def get_task(db: Session, task_id: int) -> Optional[Task]:
|
|
return db.query(Task).filter(Task.id == task_id).first()
|
|
|
|
|
|
def get_tasks(db: Session, skip: int = 0, limit: int = 100) -> List[Task]:
|
|
return db.query(Task).offset(skip).limit(limit).all()
|
|
|
|
|
|
def create_task(db: Session, task: TaskCreate) -> Task:
|
|
db_task = Task(
|
|
title=task.title,
|
|
description=task.description,
|
|
status=task.status,
|
|
priority=task.priority,
|
|
)
|
|
db.add(db_task)
|
|
db.commit()
|
|
db.refresh(db_task)
|
|
return db_task
|
|
|
|
|
|
def update_task(db: Session, task_id: int, task_update: TaskUpdate) -> Optional[Task]:
|
|
db_task = db.query(Task).filter(Task.id == task_id).first()
|
|
if db_task is None:
|
|
return None
|
|
|
|
update_data = task_update.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(db_task, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_task)
|
|
return db_task
|
|
|
|
|
|
def delete_task(db: Session, task_id: int) -> bool:
|
|
db_task = db.query(Task).filter(Task.id == task_id).first()
|
|
if db_task is None:
|
|
return False
|
|
|
|
db.delete(db_task)
|
|
db.commit()
|
|
return True
|
|
|
|
|
|
def get_tasks_by_status(
|
|
db: Session, status: str, skip: int = 0, limit: int = 100
|
|
) -> List[Task]:
|
|
return db.query(Task).filter(Task.status == status).offset(skip).limit(limit).all()
|
|
|
|
|
|
def get_tasks_by_priority(
|
|
db: Session, priority: str, skip: int = 0, limit: int = 100
|
|
) -> List[Task]:
|
|
return (
|
|
db.query(Task).filter(Task.priority == priority).offset(skip).limit(limit).all()
|
|
)
|