from typing import Any, List, Optional from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy.orm import Session from app import crud from app.api.deps import get_db from app.models.task import TaskStatus from app.schemas.task import Task, TaskCreate, TaskUpdate router = APIRouter() @router.get("/", response_model=List[Task]) def read_tasks( db: Session = Depends(get_db), skip: int = 0, limit: int = 100, status: Optional[TaskStatus] = None, ) -> Any: """ Retrieve tasks. """ try: import traceback import sqlite3 from sqlalchemy import text from app.db.session import db_file print(f"Getting tasks with status: {status}, skip: {skip}, limit: {limit}") # Try the normal SQLAlchemy approach first try: if status: tasks = crud.task.get_by_status(db, status=status) else: tasks = crud.task.get_multi(db, skip=skip, limit=limit) return tasks except Exception as e: print(f"Error getting tasks with SQLAlchemy: {e}") print(traceback.format_exc()) # Continue to fallback # Fallback to direct SQLite approach try: conn = sqlite3.connect(str(db_file)) conn.row_factory = sqlite3.Row cursor = conn.cursor() if status: cursor.execute("SELECT * FROM task WHERE status = ? LIMIT ? OFFSET ?", (status.value, limit, skip)) else: cursor.execute("SELECT * FROM task LIMIT ? OFFSET ?", (limit, skip)) rows = cursor.fetchall() # Convert to Task objects tasks = [] for row in rows: task_dict = dict(row) # Convert completed to boolean if 'completed' in task_dict: task_dict['completed'] = bool(task_dict['completed']) # Convert to object with attributes class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) tasks.append(TaskResult(**task_dict)) conn.close() return tasks except Exception as e: print(f"Error getting tasks with direct SQLite: {e}") print(traceback.format_exc()) raise except Exception as e: print(f"Global error in read_tasks: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error retrieving tasks: {str(e)}" ) @router.post("/", response_model=Task, status_code=status.HTTP_201_CREATED) def create_task( *, db: Session = Depends(get_db), task_in: TaskCreate, ) -> Any: """ Create new task. """ import sqlite3 import json import traceback from datetime import datetime from app.db.session import db_file # Print detailed request info for debugging print(f"[{datetime.now().isoformat()}] Creating task: {task_in}") try: # Handle datetime conversion for due_date if task_in.due_date: print(f"Due date before processing: {task_in.due_date}") if isinstance(task_in.due_date, str): try: task_in.due_date = datetime.fromisoformat(task_in.due_date.replace('Z', '+00:00')) except Exception as e: print(f"Error parsing due_date: {e}") # Continue with string, SQLAlchemy will handle it # First try the normal SQLAlchemy approach try: print("Attempting to create task via SQLAlchemy...") task = crud.task.create(db, obj_in=task_in) print(f"Task created successfully with ID: {task.id}") return task except Exception as e: print(f"SQLAlchemy task creation failed: {e}") print(traceback.format_exc()) # Continue to try direct SQLite approach # Fallback: Try direct SQLite approach print("Falling back to direct SQLite task creation...") try: # Convert Pydantic model to dict task_data = task_in.model_dump() if hasattr(task_in, 'model_dump') else task_in.dict() print(f"Task data: {task_data}") # Format datetime objects if task_data.get('due_date'): if isinstance(task_data['due_date'], datetime): task_data['due_date'] = task_data['due_date'].isoformat() # Connect directly to SQLite and insert the task conn = sqlite3.connect(str(db_file)) cursor = conn.cursor() now = datetime.utcnow().isoformat() # Check if task table exists cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='task'") if not cursor.fetchone(): # Create the task table if it doesn't exist cursor.execute(""" CREATE TABLE IF NOT EXISTS task ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, description TEXT, priority TEXT, status TEXT, due_date TEXT, completed INTEGER, created_at TEXT, updated_at TEXT ) """) print("Created task table with direct SQLite") # Insert the task cursor.execute( """ INSERT INTO task ( title, description, priority, status, due_date, completed, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( task_data.get('title', 'Untitled Task'), task_data.get('description', ''), task_data.get('priority', 'medium'), task_data.get('status', 'todo'), task_data.get('due_date'), 1 if task_data.get('completed') else 0, now, now ) ) conn.commit() task_id = cursor.lastrowid # Return the created task cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,)) row = cursor.fetchone() conn.close() if row: column_names = ['id', 'title', 'description', 'priority', 'status', 'due_date', 'completed', 'created_at', 'updated_at'] task_dict = {column_names[i]: row[i] for i in range(len(column_names))} # Convert completed to boolean task_dict['completed'] = bool(task_dict['completed']) # Fake Task object with attributes class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) print(f"Created task with direct SQLite, ID: {task_id}") return TaskResult(**task_dict) else: raise Exception("Task was created but could not be retrieved") except Exception as e: print(f"Direct SQLite task creation failed: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"All task creation methods failed. Last error: {str(e)}", ) except HTTPException: raise except Exception as e: print(f"Unhandled error in create_task: {str(e)}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error creating task: {str(e)}", ) @router.get("/{task_id}", response_model=Task) def read_task( *, db: Session = Depends(get_db), task_id: int, ) -> Any: """ Get task by ID. """ try: import traceback import sqlite3 from app.db.session import db_file print(f"Getting task with ID: {task_id}") # Try the normal SQLAlchemy approach first try: task = crud.task.get(db, id=task_id) if task: return task # Fall through to direct SQLite if task not found except Exception as e: print(f"Error getting task with SQLAlchemy: {e}") print(traceback.format_exc()) # Continue to fallback # Fallback to direct SQLite approach try: conn = sqlite3.connect(str(db_file)) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,)) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) task_dict = dict(row) # Convert completed to boolean if 'completed' in task_dict: task_dict['completed'] = bool(task_dict['completed']) # Convert to object with attributes class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) conn.close() return TaskResult(**task_dict) except HTTPException: raise # Re-raise the 404 exception except Exception as e: print(f"Error getting task with direct SQLite: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error retrieving task: {str(e)}" ) except HTTPException: raise # Re-raise any HTTP exceptions except Exception as e: print(f"Global error in read_task: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error retrieving task: {str(e)}" ) @router.put("/{task_id}", response_model=Task) def update_task( *, db: Session = Depends(get_db), task_id: int, task_in: TaskUpdate, ) -> Any: """ Update a task. """ try: import traceback import sqlite3 import json from datetime import datetime from app.db.session import db_file print(f"Updating task with ID: {task_id}, data: {task_in}") # Handle datetime conversion for due_date if present if hasattr(task_in, "due_date") and task_in.due_date is not None: if isinstance(task_in.due_date, str): try: task_in.due_date = datetime.fromisoformat(task_in.due_date.replace('Z', '+00:00')) except Exception as e: print(f"Error parsing due_date: {e}") # Try the normal SQLAlchemy approach first try: task = crud.task.get(db, id=task_id) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) updated_task = crud.task.update(db, db_obj=task, obj_in=task_in) return updated_task except HTTPException: raise # Re-raise the 404 exception except Exception as e: print(f"Error updating task with SQLAlchemy: {e}") print(traceback.format_exc()) # Continue to fallback # Fallback to direct SQLite approach try: conn = sqlite3.connect(str(db_file)) conn.row_factory = sqlite3.Row cursor = conn.cursor() # First check if task exists cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,)) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) # Convert Pydantic model to dict, excluding unset values updates = {} model_data = task_in.model_dump(exclude_unset=True) if hasattr(task_in, "model_dump") else task_in.dict(exclude_unset=True) # Only include fields that were provided in the update for key, value in model_data.items(): if value is not None: # Skip None values updates[key] = value if not updates: # No updates provided task_dict = dict(row) # Convert completed to boolean if 'completed' in task_dict: task_dict['completed'] = bool(task_dict['completed']) # Return the unchanged task class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) conn.close() return TaskResult(**task_dict) # Format datetime objects if 'due_date' in updates and isinstance(updates['due_date'], datetime): updates['due_date'] = updates['due_date'].isoformat() # Add updated_at timestamp updates['updated_at'] = datetime.utcnow().isoformat() # Build the SQL update statement set_clause = ", ".join([f"{key} = ?" for key in updates.keys()]) params = list(updates.values()) params.append(task_id) # For the WHERE clause cursor.execute(f"UPDATE task SET {set_clause} WHERE id = ?", params) conn.commit() # Return the updated task cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,)) updated_row = cursor.fetchone() conn.close() if updated_row: task_dict = dict(updated_row) # Convert completed to boolean if 'completed' in task_dict: task_dict['completed'] = bool(task_dict['completed']) # Convert to object with attributes class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) return TaskResult(**task_dict) else: raise Exception("Task was updated but could not be retrieved") except HTTPException: raise # Re-raise the 404 exception except Exception as e: print(f"Error updating task with direct SQLite: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error updating task: {str(e)}" ) except HTTPException: raise # Re-raise any HTTP exceptions except Exception as e: print(f"Global error in update_task: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error updating task: {str(e)}" ) @router.delete("/{task_id}", response_model=Task) def delete_task( *, db: Session = Depends(get_db), task_id: int, ) -> Any: """ Delete a task. """ try: import traceback import sqlite3 from app.db.session import db_file print(f"Deleting task with ID: {task_id}") # First, get the task to return it later task_to_return = None # Try the normal SQLAlchemy approach first try: task = crud.task.get(db, id=task_id) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) task_to_return = task task = crud.task.remove(db, id=task_id) return task except HTTPException: raise # Re-raise the 404 exception except Exception as e: print(f"Error deleting task with SQLAlchemy: {e}") print(traceback.format_exc()) # Continue to fallback # Fallback to direct SQLite approach try: conn = sqlite3.connect(str(db_file)) conn.row_factory = sqlite3.Row cursor = conn.cursor() # First save the task data for the return value cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,)) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) task_dict = dict(row) # Convert completed to boolean if 'completed' in task_dict: task_dict['completed'] = bool(task_dict['completed']) # Delete the task cursor.execute("DELETE FROM task WHERE id = ?", (task_id,)) conn.commit() conn.close() # Convert to object with attributes class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) return TaskResult(**task_dict) except HTTPException: raise # Re-raise the 404 exception except Exception as e: print(f"Error deleting task with direct SQLite: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error deleting task: {str(e)}" ) except HTTPException: raise # Re-raise any HTTP exceptions except Exception as e: print(f"Global error in delete_task: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error deleting task: {str(e)}" ) @router.post("/{task_id}/complete", response_model=Task) def complete_task( *, db: Session = Depends(get_db), task_id: int, ) -> Any: """ Mark a task as completed. """ try: import traceback import sqlite3 from datetime import datetime from app.db.session import db_file print(f"Marking task {task_id} as completed") # Try the normal SQLAlchemy approach first try: task = crud.task.mark_completed(db, task_id=task_id) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) return task except Exception as e: print(f"Error completing task with SQLAlchemy: {e}") print(traceback.format_exc()) # Continue to fallback # Fallback to direct SQLite approach try: conn = sqlite3.connect(str(db_file)) conn.row_factory = sqlite3.Row cursor = conn.cursor() # First check if task exists cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,)) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) # Update task to completed status now = datetime.utcnow().isoformat() cursor.execute( "UPDATE task SET completed = ?, status = ?, updated_at = ? WHERE id = ?", (1, "done", now, task_id) ) conn.commit() # Get the updated task cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,)) updated_row = cursor.fetchone() conn.close() if updated_row: task_dict = dict(updated_row) # Convert completed to boolean if 'completed' in task_dict: task_dict['completed'] = bool(task_dict['completed']) # Convert to object with attributes class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) return TaskResult(**task_dict) else: raise Exception("Task was completed but could not be retrieved") except HTTPException: raise # Re-raise the 404 exception except Exception as e: print(f"Error completing task with direct SQLite: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error completing task: {str(e)}" ) except HTTPException: raise # Re-raise any HTTP exceptions except Exception as e: print(f"Global error in complete_task: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error completing task: {str(e)}" )