
- Create Category and Tag models - Create TodoTag association table - Add category_id to Todo model - Create Alembic migration for new tables - Create schemas for Category and Tag - Update Todo schemas to include Category and Tags - Create CRUD operations for Categories and Tags - Update Todo CRUD operations to handle categories and tags - Create API endpoints for categories and tags - Update Todo API endpoints with category and tag filtering - Update documentation
113 lines
3.1 KiB
Python
113 lines
3.1 KiB
Python
from datetime import datetime
|
|
from typing import List, Optional
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.crud.crud_tag import get_tags_by_ids
|
|
from app.models.todo import PriorityLevel, Todo
|
|
from app.schemas.todo import TodoCreate, TodoUpdate
|
|
|
|
|
|
def get_todo(db: Session, todo_id: int) -> Optional[Todo]:
|
|
return db.query(Todo).filter(Todo.id == todo_id).first()
|
|
|
|
|
|
def get_todos(
|
|
db: Session,
|
|
owner_id: int,
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
title: Optional[str] = None,
|
|
is_completed: Optional[bool] = None,
|
|
priority: Optional[PriorityLevel] = None,
|
|
due_date_before: Optional[datetime] = None,
|
|
due_date_after: Optional[datetime] = None,
|
|
category_id: Optional[int] = None,
|
|
tag_id: Optional[int] = None
|
|
) -> List[Todo]:
|
|
query = db.query(Todo).filter(Todo.owner_id == owner_id)
|
|
|
|
if title:
|
|
query = query.filter(Todo.title.contains(title))
|
|
|
|
if is_completed is not None:
|
|
query = query.filter(Todo.is_completed == is_completed)
|
|
|
|
if priority is not None:
|
|
query = query.filter(Todo.priority == priority)
|
|
|
|
if due_date_before is not None:
|
|
query = query.filter(Todo.due_date <= due_date_before)
|
|
|
|
if due_date_after is not None:
|
|
query = query.filter(Todo.due_date >= due_date_after)
|
|
|
|
if category_id is not None:
|
|
query = query.filter(Todo.category_id == category_id)
|
|
|
|
if tag_id is not None:
|
|
query = query.join(Todo.tags).filter(Todo.tags.any(id=tag_id))
|
|
|
|
# Order by priority (highest first) and due date (earliest first)
|
|
query = query.order_by(Todo.priority.desc(), Todo.due_date.asc())
|
|
|
|
return query.offset(skip).limit(limit).all()
|
|
|
|
|
|
def create_todo(db: Session, todo_in: TodoCreate, owner_id: int) -> Todo:
|
|
db_todo = Todo(
|
|
title=todo_in.title,
|
|
description=todo_in.description,
|
|
is_completed=todo_in.is_completed,
|
|
priority=todo_in.priority,
|
|
due_date=todo_in.due_date,
|
|
category_id=todo_in.category_id,
|
|
owner_id=owner_id
|
|
)
|
|
db.add(db_todo)
|
|
db.commit()
|
|
db.refresh(db_todo)
|
|
|
|
# Add tags if provided
|
|
if todo_in.tags:
|
|
tags = get_tags_by_ids(db, todo_in.tags, owner_id)
|
|
db_todo.tags = tags
|
|
db.add(db_todo)
|
|
db.commit()
|
|
db.refresh(db_todo)
|
|
|
|
return db_todo
|
|
|
|
|
|
def update_todo(
|
|
db: Session, db_obj: Todo, obj_in: TodoUpdate
|
|
) -> Todo:
|
|
update_data = obj_in.model_dump(exclude_unset=True)
|
|
|
|
# Handle tags separately
|
|
tags_data = update_data.pop("tags", None)
|
|
|
|
# Update other fields
|
|
for field in update_data:
|
|
setattr(db_obj, field, update_data[field])
|
|
|
|
db.add(db_obj)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
|
|
# Update tags if provided
|
|
if tags_data is not None:
|
|
tags = get_tags_by_ids(db, tags_data, db_obj.owner_id)
|
|
db_obj.tags = tags
|
|
db.add(db_obj)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
|
|
return db_obj
|
|
|
|
|
|
def delete_todo(db: Session, todo_id: int) -> Todo:
|
|
todo = db.query(Todo).filter(Todo.id == todo_id).first()
|
|
db.delete(todo)
|
|
db.commit()
|
|
return todo |