
- Add user authentication with JWT tokens - Implement task CRUD operations with status and priority - Set up SQLite database with SQLAlchemy ORM - Create Alembic migrations for database schema - Add comprehensive API documentation - Include health check endpoint and CORS configuration - Structure codebase with proper separation of concerns
65 lines
1.9 KiB
Python
65 lines
1.9 KiB
Python
from sqlalchemy.orm import Session
|
|
from app.models.task import Task, TaskStatus
|
|
from app.schemas.task import TaskCreate, TaskUpdate
|
|
from typing import List, Optional
|
|
|
|
|
|
def get_tasks_by_user(
|
|
db: Session, user_id: int, skip: int = 0, limit: int = 100
|
|
) -> List[Task]:
|
|
return (
|
|
db.query(Task).filter(Task.owner_id == user_id).offset(skip).limit(limit).all()
|
|
)
|
|
|
|
|
|
def get_task_by_id(db: Session, task_id: int, user_id: int) -> Optional[Task]:
|
|
return db.query(Task).filter(Task.id == task_id, Task.owner_id == user_id).first()
|
|
|
|
|
|
def create_task(db: Session, task: TaskCreate, user_id: int) -> Task:
|
|
db_task = Task(**task.dict(), owner_id=user_id)
|
|
db.add(db_task)
|
|
db.commit()
|
|
db.refresh(db_task)
|
|
return db_task
|
|
|
|
|
|
def update_task(
|
|
db: Session, task_id: int, task_update: TaskUpdate, user_id: int
|
|
) -> Optional[Task]:
|
|
db_task = get_task_by_id(db, task_id, user_id)
|
|
if not db_task:
|
|
return None
|
|
|
|
update_data = task_update.dict(exclude_unset=True)
|
|
|
|
if "status" in update_data:
|
|
if update_data["status"] == TaskStatus.COMPLETED:
|
|
update_data["is_completed"] = True
|
|
elif update_data["status"] != TaskStatus.COMPLETED and db_task.is_completed:
|
|
update_data["is_completed"] = False
|
|
|
|
if "is_completed" in update_data and update_data["is_completed"]:
|
|
update_data["status"] = TaskStatus.COMPLETED
|
|
|
|
for field, value in update_data.items():
|
|
setattr(db_task, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_task)
|
|
return db_task
|
|
|
|
|
|
def delete_task(db: Session, task_id: int, user_id: int) -> bool:
|
|
db_task = get_task_by_id(db, task_id, user_id)
|
|
if not db_task:
|
|
return False
|
|
|
|
db.delete(db_task)
|
|
db.commit()
|
|
return True
|
|
|
|
|
|
def get_tasks_by_status(db: Session, user_id: int, status: TaskStatus) -> List[Task]:
|
|
return db.query(Task).filter(Task.owner_id == user_id, Task.status == status).all()
|