from typing import List, Optional, Tuple from datetime import datetime from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from app.models.todo import Todo, Priority from app.models.tag import Tag from app.schemas.todo import TodoCreate, TodoUpdate, SubtaskCreate from app.utils.date_utils import ( get_timezone_aware_now, get_date_range_today, get_date_range_this_week, get_date_range_next_week, get_overdue_cutoff, get_due_soon_cutoff, ) def get_todo(db: Session, todo_id: int) -> Optional[Todo]: return db.query(Todo).filter(Todo.id == todo_id).first() def get_todos( db: Session, skip: int = 0, limit: int = 100, completed: Optional[bool] = None, priority: Optional[Priority] = None, search: Optional[str] = None, category_id: Optional[int] = None, project_id: Optional[int] = None, parent_id: Optional[int] = None, include_subtasks: bool = True, due_date_filter: Optional[str] = None, overdue: Optional[bool] = None, sort_by_due_date: bool = False, due_date_start: Optional[datetime] = None, due_date_end: Optional[datetime] = None, ) -> Tuple[List[Todo], int]: query = db.query(Todo) # Filter by parent_id to get only main todos or subtasks if parent_id is not None: query = query.filter(Todo.parent_id == parent_id) elif not include_subtasks: # Only get main todos (no subtasks) if parent_id is None and include_subtasks is False query = query.filter(Todo.parent_id.is_(None)) # Apply filters if completed is not None: query = query.filter(Todo.completed == completed) if priority is not None: query = query.filter(Todo.priority == priority) if category_id is not None: query = query.filter(Todo.category_id == category_id) if project_id is not None: query = query.filter(Todo.project_id == project_id) if search: query = query.filter( Todo.title.contains(search) | Todo.description.contains(search) ) # Due date filtering if due_date_filter: if due_date_filter == "today": start_date, end_date = get_date_range_today() query = query.filter( and_(Todo.due_date >= start_date, Todo.due_date <= end_date) ) elif due_date_filter == "this_week": start_date, end_date = get_date_range_this_week() query = query.filter( and_(Todo.due_date >= start_date, Todo.due_date <= end_date) ) elif due_date_filter == "next_week": start_date, end_date = get_date_range_next_week() query = query.filter( and_(Todo.due_date >= start_date, Todo.due_date <= end_date) ) # Custom date range filtering if due_date_start: query = query.filter(Todo.due_date >= due_date_start) if due_date_end: query = query.filter(Todo.due_date <= due_date_end) # Overdue filtering if overdue is not None: current_time = get_overdue_cutoff() if overdue: query = query.filter( and_(Todo.due_date.isnot(None), Todo.due_date < current_time) ) else: query = query.filter( or_(Todo.due_date.is_(None), Todo.due_date >= current_time) ) # Get total count before pagination total = query.count() # Apply pagination and ordering if sort_by_due_date: # Sort by due_date ascending (earliest first), with None values at the end todos = ( query.order_by(Todo.due_date.asc().nullslast(), Todo.created_at.desc()) .offset(skip) .limit(limit) .all() ) else: todos = query.order_by(Todo.created_at.desc()).offset(skip).limit(limit).all() return todos, total def get_overdue_todos( db: Session, skip: int = 0, limit: int = 100, completed: Optional[bool] = None, priority: Optional[Priority] = None, category_id: Optional[int] = None, project_id: Optional[int] = None, parent_id: Optional[int] = None, include_subtasks: bool = True, ) -> Tuple[List[Todo], int]: """Get all overdue todos.""" current_time = get_overdue_cutoff() query = db.query(Todo).filter( and_(Todo.due_date.isnot(None), Todo.due_date < current_time) ) # Apply same filtering logic as get_todos if parent_id is not None: query = query.filter(Todo.parent_id == parent_id) elif not include_subtasks: query = query.filter(Todo.parent_id.is_(None)) if completed is not None: query = query.filter(Todo.completed == completed) if priority is not None: query = query.filter(Todo.priority == priority) if category_id is not None: query = query.filter(Todo.category_id == category_id) if project_id is not None: query = query.filter(Todo.project_id == project_id) total = query.count() todos = ( query.order_by(Todo.due_date.asc(), Todo.created_at.desc()) .offset(skip) .limit(limit) .all() ) return todos, total def get_todos_due_soon( db: Session, days: int = 7, skip: int = 0, limit: int = 100, completed: Optional[bool] = None, priority: Optional[Priority] = None, category_id: Optional[int] = None, project_id: Optional[int] = None, parent_id: Optional[int] = None, include_subtasks: bool = True, ) -> Tuple[List[Todo], int]: """Get todos due within the next N days (default 7 days).""" current_time = get_timezone_aware_now() future_cutoff = get_due_soon_cutoff(days) query = db.query(Todo).filter( and_( Todo.due_date.isnot(None), Todo.due_date >= current_time, Todo.due_date <= future_cutoff, ) ) # Apply same filtering logic as get_todos if parent_id is not None: query = query.filter(Todo.parent_id == parent_id) elif not include_subtasks: query = query.filter(Todo.parent_id.is_(None)) if completed is not None: query = query.filter(Todo.completed == completed) if priority is not None: query = query.filter(Todo.priority == priority) if category_id is not None: query = query.filter(Todo.category_id == category_id) if project_id is not None: query = query.filter(Todo.project_id == project_id) total = query.count() todos = ( query.order_by(Todo.due_date.asc(), Todo.created_at.desc()) .offset(skip) .limit(limit) .all() ) return todos, total def get_todos_by_date_range( db: Session, start_date: datetime, end_date: datetime, skip: int = 0, limit: int = 100, completed: Optional[bool] = None, priority: Optional[Priority] = None, category_id: Optional[int] = None, project_id: Optional[int] = None, parent_id: Optional[int] = None, include_subtasks: bool = True, ) -> Tuple[List[Todo], int]: """Get todos within a specific date range.""" query = db.query(Todo).filter( and_( Todo.due_date.isnot(None), Todo.due_date >= start_date, Todo.due_date <= end_date, ) ) # Apply same filtering logic as get_todos if parent_id is not None: query = query.filter(Todo.parent_id == parent_id) elif not include_subtasks: query = query.filter(Todo.parent_id.is_(None)) if completed is not None: query = query.filter(Todo.completed == completed) if priority is not None: query = query.filter(Todo.priority == priority) if category_id is not None: query = query.filter(Todo.category_id == category_id) if project_id is not None: query = query.filter(Todo.project_id == project_id) total = query.count() todos = ( query.order_by(Todo.due_date.asc(), Todo.created_at.desc()) .offset(skip) .limit(limit) .all() ) return todos, total def create_todo(db: Session, todo: TodoCreate) -> Todo: todo_data = todo.model_dump(exclude={"tag_ids"}) db_todo = Todo(**todo_data) # Handle tags if provided if todo.tag_ids: tags = db.query(Tag).filter(Tag.id.in_(todo.tag_ids)).all() db_todo.tags = tags db.add(db_todo) db.commit() db.refresh(db_todo) return db_todo def update_todo(db: Session, todo_id: int, todo_update: TodoUpdate) -> Optional[Todo]: db_todo = db.query(Todo).filter(Todo.id == todo_id).first() if db_todo: update_data = todo_update.model_dump(exclude_unset=True, exclude={"tag_ids"}) # Handle tags if provided if todo_update.tag_ids is not None: tags = db.query(Tag).filter(Tag.id.in_(todo_update.tag_ids)).all() db_todo.tags = tags # Update other fields for field, value in update_data.items(): setattr(db_todo, field, value) db.commit() db.refresh(db_todo) return db_todo def delete_todo(db: Session, todo_id: int) -> bool: db_todo = db.query(Todo).filter(Todo.id == todo_id).first() if db_todo: db.delete(db_todo) db.commit() return True return False def get_subtasks(db: Session, parent_id: int) -> List[Todo]: """Get all subtasks for a given parent todo.""" return ( db.query(Todo) .filter(Todo.parent_id == parent_id) .order_by(Todo.created_at.desc()) .all() ) def create_subtask( db: Session, parent_id: int, subtask: SubtaskCreate ) -> Optional[Todo]: """Create a subtask for a given parent todo.""" # Check if parent exists parent_todo = db.query(Todo).filter(Todo.id == parent_id).first() if not parent_todo: return None # Create subtask with parent_id subtask_data = subtask.model_dump() subtask_data["parent_id"] = parent_id db_subtask = Todo(**subtask_data) db.add(db_subtask) db.commit() db.refresh(db_subtask) return db_subtask def move_subtask( db: Session, subtask_id: int, new_parent_id: Optional[int] ) -> Optional[Todo]: """Move a subtask to a different parent or make it a main todo.""" subtask = db.query(Todo).filter(Todo.id == subtask_id).first() if not subtask: return None # If new_parent_id is provided, check if it exists and is not the subtask itself if new_parent_id is not None: if new_parent_id == subtask_id: return None # Cannot make a todo a subtask of itself new_parent = db.query(Todo).filter(Todo.id == new_parent_id).first() if not new_parent: return None # New parent doesn't exist # Prevent creating cycles (subtask cannot become parent of its current parent) if new_parent.parent_id == subtask_id: return None # Update the parent_id subtask.parent_id = new_parent_id db.commit() db.refresh(subtask) return subtask