Automated Action 10172c55ca Implement comprehensive Task Manager API with FastAPI
Added complete task management functionality including:
- CRUD operations for tasks (create, read, update, delete)
- Task model with status (pending/in_progress/completed) and priority (low/medium/high)
- SQLite database with SQLAlchemy ORM
- Alembic migrations for database schema
- Pydantic schemas for request/response validation
- FastAPI routers with proper error handling
- Filtering and pagination support
- Health check endpoint
- CORS configuration
- Comprehensive API documentation
- Proper project structure following FastAPI best practices
2025-06-20 19:34:58 +00:00

64 lines
1.7 KiB
Python

from typing import List, Optional
from sqlalchemy.orm import Session
from app.models.task import Task
from app.schemas.task import TaskCreate, TaskUpdate
def get_task(db: Session, task_id: int) -> Optional[Task]:
return db.query(Task).filter(Task.id == task_id).first()
def get_tasks(db: Session, skip: int = 0, limit: int = 100) -> List[Task]:
return db.query(Task).offset(skip).limit(limit).all()
def create_task(db: Session, task: TaskCreate) -> Task:
db_task = Task(
title=task.title,
description=task.description,
status=task.status,
priority=task.priority,
)
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
def update_task(db: Session, task_id: int, task_update: TaskUpdate) -> Optional[Task]:
db_task = db.query(Task).filter(Task.id == task_id).first()
if db_task is None:
return None
update_data = task_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_task, field, value)
db.commit()
db.refresh(db_task)
return db_task
def delete_task(db: Session, task_id: int) -> bool:
db_task = db.query(Task).filter(Task.id == task_id).first()
if db_task is None:
return False
db.delete(db_task)
db.commit()
return True
def get_tasks_by_status(
db: Session, status: str, skip: int = 0, limit: int = 100
) -> List[Task]:
return db.query(Task).filter(Task.status == status).offset(skip).limit(limit).all()
def get_tasks_by_priority(
db: Session, priority: str, skip: int = 0, limit: int = 100
) -> List[Task]:
return (
db.query(Task).filter(Task.priority == priority).offset(skip).limit(limit).all()
)