from typing import Any, List, Optional from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy.orm import Session from app import crud from app.api.deps import get_db from app.models.task import TaskStatus from app.schemas.task import Task, TaskCreate, TaskUpdate router = APIRouter() @router.get("/", response_model=List[Task]) def read_tasks( db: Session = Depends(get_db), skip: int = 0, limit: int = 100, status: Optional[TaskStatus] = None, ) -> Any: """ Retrieve tasks. """ try: import traceback import sqlite3 from sqlalchemy import text from app.db.session import db_file print(f"Getting tasks with status: {status}, skip: {skip}, limit: {limit}") # Try the normal SQLAlchemy approach first try: if status: tasks = crud.task.get_by_status(db, status=status) else: tasks = crud.task.get_multi(db, skip=skip, limit=limit) return tasks except Exception as e: print(f"Error getting tasks with SQLAlchemy: {e}") print(traceback.format_exc()) # Continue to fallback # Fallback to direct SQLite approach try: conn = sqlite3.connect(str(db_file)) conn.row_factory = sqlite3.Row cursor = conn.cursor() if status: cursor.execute("SELECT * FROM task WHERE status = ? LIMIT ? OFFSET ?", (status.value, limit, skip)) else: cursor.execute("SELECT * FROM task LIMIT ? OFFSET ?", (limit, skip)) rows = cursor.fetchall() # Convert to Task objects tasks = [] for row in rows: task_dict = dict(row) # Convert completed to boolean if 'completed' in task_dict: task_dict['completed'] = bool(task_dict['completed']) # Convert to object with attributes class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) tasks.append(TaskResult(**task_dict)) conn.close() return tasks except Exception as e: print(f"Error getting tasks with direct SQLite: {e}") print(traceback.format_exc()) raise except Exception as e: print(f"Global error in read_tasks: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error retrieving tasks: {str(e)}" ) @router.post("/", response_model=Task, status_code=status.HTTP_201_CREATED) def create_task( *, db: Session = Depends(get_db), task_in: TaskCreate, ) -> Any: """ Create new task - using direct SQLite approach for reliability. """ import sqlite3 import time import sys import traceback from datetime import datetime from app.db.session import db_file # Log creation attempt print(f"[{datetime.now().isoformat()}] Task creation requested", file=sys.stdout) # Use direct SQLite for maximum reliability try: # Extract task data regardless of Pydantic version try: if hasattr(task_in, 'model_dump'): task_data = task_in.model_dump() elif hasattr(task_in, 'dict'): task_data = task_in.dict() else: # Fallback for any case task_data = { 'title': getattr(task_in, 'title', 'Untitled Task'), 'description': getattr(task_in, 'description', ''), 'priority': getattr(task_in, 'priority', 'medium'), 'status': getattr(task_in, 'status', 'todo'), 'due_date': getattr(task_in, 'due_date', None), 'completed': getattr(task_in, 'completed', False) } print(f"Task data: {task_data}") except Exception as e: print(f"Error extracting task data: {e}") # Fallback to minimal data task_data = { 'title': str(getattr(task_in, 'title', 'Unknown Title')), 'description': str(getattr(task_in, 'description', '')), 'priority': 'medium', 'status': 'todo', 'completed': False } # Format due_date if present if task_data.get('due_date'): try: if isinstance(task_data['due_date'], datetime): task_data['due_date'] = task_data['due_date'].isoformat() elif isinstance(task_data['due_date'], str): # Standardize format by parsing and reformatting parsed_date = datetime.fromisoformat( task_data['due_date'].replace('Z', '+00:00') ) task_data['due_date'] = parsed_date.isoformat() except Exception as e: print(f"Warning: Could not parse due_date: {e}") # Keep as-is or set to None if invalid if not isinstance(task_data['due_date'], str): task_data['due_date'] = None # Get current timestamp for created/updated fields now = datetime.utcnow().isoformat() # Connect to SQLite with retry logic for retry in range(3): conn = None try: # Try to connect to the database with a timeout conn = sqlite3.connect(str(db_file), timeout=30) cursor = conn.cursor() # Create the task table if it doesn't exist - using minimal schema cursor.execute(""" CREATE TABLE IF NOT EXISTS task ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, description TEXT, priority TEXT DEFAULT 'medium', status TEXT DEFAULT 'todo', due_date TEXT, completed INTEGER DEFAULT 0, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) """) # Insert the task - provide defaults for all fields cursor.execute( """ INSERT INTO task ( title, description, priority, status, due_date, completed, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( task_data.get('title', 'Untitled'), task_data.get('description', ''), task_data.get('priority', 'medium'), task_data.get('status', 'todo'), task_data.get('due_date'), 1 if task_data.get('completed') else 0, now, now ) ) # Get the ID of the inserted task task_id = cursor.lastrowid print(f"Task inserted with ID: {task_id}") # Commit the transaction conn.commit() # Retrieve the created task to return it cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,)) row = cursor.fetchone() if row: # Get column names from cursor description column_names = [desc[0] for desc in cursor.description] # Create a dictionary from row values task_dict = dict(zip(column_names, row)) # Convert 'completed' to boolean if 'completed' in task_dict: task_dict['completed'] = bool(task_dict['completed']) # Create an object that mimics the Task model class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) print(f"Task created successfully: ID={task_id}") # Close the connection and return the task conn.close() return TaskResult(**task_dict) else: conn.close() raise Exception(f"Task creation succeeded but retrieval failed for ID: {task_id}") except sqlite3.OperationalError as e: if conn: conn.close() # Check if retry is appropriate if "database is locked" in str(e) and retry < 2: wait_time = (retry + 1) * 1.5 # Exponential backoff print(f"Database locked, retrying in {wait_time}s (attempt {retry+1}/3)") time.sleep(wait_time) else: print(f"SQLite operational error: {e}") raise except Exception as e: if conn: # Try to rollback if connection is still open try: conn.rollback() except: pass conn.close() print(f"Error in SQLite task creation: {e}") print(traceback.format_exc()) # Only retry on specific transient errors if retry < 2 and ("locked" in str(e).lower() or "busy" in str(e).lower()): time.sleep(1) continue raise # If we reach here, the retry loop failed raise Exception("Failed to create task after multiple attempts") except Exception as sqlite_error: # Final fallback: try SQLAlchemy approach try: print(f"Direct SQLite approach failed: {sqlite_error}") print("Trying SQLAlchemy as fallback...") task = crud.task.create(db, obj_in=task_in) print(f"Task created with SQLAlchemy fallback: ID={task.id}") return task except Exception as alch_error: print(f"SQLAlchemy fallback also failed: {alch_error}") print(traceback.format_exc()) # Provide detailed error information error_detail = f"Task creation failed. Primary error: {str(sqlite_error)}. Fallback error: {str(alch_error)}" print(f"Final error: {error_detail}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=error_detail ) @router.get("/{task_id}", response_model=Task) def read_task( *, db: Session = Depends(get_db), task_id: int, ) -> Any: """ Get task by ID. """ try: import traceback import sqlite3 from app.db.session import db_file print(f"Getting task with ID: {task_id}") # Try the normal SQLAlchemy approach first try: task = crud.task.get(db, id=task_id) if task: return task # Fall through to direct SQLite if task not found except Exception as e: print(f"Error getting task with SQLAlchemy: {e}") print(traceback.format_exc()) # Continue to fallback # Fallback to direct SQLite approach try: conn = sqlite3.connect(str(db_file)) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,)) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) task_dict = dict(row) # Convert completed to boolean if 'completed' in task_dict: task_dict['completed'] = bool(task_dict['completed']) # Convert to object with attributes class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) conn.close() return TaskResult(**task_dict) except HTTPException: raise # Re-raise the 404 exception except Exception as e: print(f"Error getting task with direct SQLite: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error retrieving task: {str(e)}" ) except HTTPException: raise # Re-raise any HTTP exceptions except Exception as e: print(f"Global error in read_task: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error retrieving task: {str(e)}" ) @router.put("/{task_id}", response_model=Task) def update_task( *, db: Session = Depends(get_db), task_id: int, task_in: TaskUpdate, ) -> Any: """ Update a task. """ try: import traceback import sqlite3 import json from datetime import datetime from app.db.session import db_file print(f"Updating task with ID: {task_id}, data: {task_in}") # Handle datetime conversion for due_date if present if hasattr(task_in, "due_date") and task_in.due_date is not None: if isinstance(task_in.due_date, str): try: task_in.due_date = datetime.fromisoformat(task_in.due_date.replace('Z', '+00:00')) except Exception as e: print(f"Error parsing due_date: {e}") # Try the normal SQLAlchemy approach first try: task = crud.task.get(db, id=task_id) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) updated_task = crud.task.update(db, db_obj=task, obj_in=task_in) return updated_task except HTTPException: raise # Re-raise the 404 exception except Exception as e: print(f"Error updating task with SQLAlchemy: {e}") print(traceback.format_exc()) # Continue to fallback # Fallback to direct SQLite approach try: conn = sqlite3.connect(str(db_file)) conn.row_factory = sqlite3.Row cursor = conn.cursor() # First check if task exists cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,)) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) # Convert Pydantic model to dict, excluding unset values updates = {} model_data = task_in.model_dump(exclude_unset=True) if hasattr(task_in, "model_dump") else task_in.dict(exclude_unset=True) # Only include fields that were provided in the update for key, value in model_data.items(): if value is not None: # Skip None values updates[key] = value if not updates: # No updates provided task_dict = dict(row) # Convert completed to boolean if 'completed' in task_dict: task_dict['completed'] = bool(task_dict['completed']) # Return the unchanged task class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) conn.close() return TaskResult(**task_dict) # Format datetime objects if 'due_date' in updates and isinstance(updates['due_date'], datetime): updates['due_date'] = updates['due_date'].isoformat() # Add updated_at timestamp updates['updated_at'] = datetime.utcnow().isoformat() # Build the SQL update statement set_clause = ", ".join([f"{key} = ?" for key in updates.keys()]) params = list(updates.values()) params.append(task_id) # For the WHERE clause cursor.execute(f"UPDATE task SET {set_clause} WHERE id = ?", params) conn.commit() # Return the updated task cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,)) updated_row = cursor.fetchone() conn.close() if updated_row: task_dict = dict(updated_row) # Convert completed to boolean if 'completed' in task_dict: task_dict['completed'] = bool(task_dict['completed']) # Convert to object with attributes class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) return TaskResult(**task_dict) else: raise Exception("Task was updated but could not be retrieved") except HTTPException: raise # Re-raise the 404 exception except Exception as e: print(f"Error updating task with direct SQLite: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error updating task: {str(e)}" ) except HTTPException: raise # Re-raise any HTTP exceptions except Exception as e: print(f"Global error in update_task: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error updating task: {str(e)}" ) @router.delete("/{task_id}", response_model=Task) def delete_task( *, db: Session = Depends(get_db), task_id: int, ) -> Any: """ Delete a task. """ try: import traceback import sqlite3 from app.db.session import db_file print(f"Deleting task with ID: {task_id}") # First, get the task to return it later task_to_return = None # Try the normal SQLAlchemy approach first try: task = crud.task.get(db, id=task_id) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) task_to_return = task task = crud.task.remove(db, id=task_id) return task except HTTPException: raise # Re-raise the 404 exception except Exception as e: print(f"Error deleting task with SQLAlchemy: {e}") print(traceback.format_exc()) # Continue to fallback # Fallback to direct SQLite approach try: conn = sqlite3.connect(str(db_file)) conn.row_factory = sqlite3.Row cursor = conn.cursor() # First save the task data for the return value cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,)) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) task_dict = dict(row) # Convert completed to boolean if 'completed' in task_dict: task_dict['completed'] = bool(task_dict['completed']) # Delete the task cursor.execute("DELETE FROM task WHERE id = ?", (task_id,)) conn.commit() conn.close() # Convert to object with attributes class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) return TaskResult(**task_dict) except HTTPException: raise # Re-raise the 404 exception except Exception as e: print(f"Error deleting task with direct SQLite: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error deleting task: {str(e)}" ) except HTTPException: raise # Re-raise any HTTP exceptions except Exception as e: print(f"Global error in delete_task: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error deleting task: {str(e)}" ) @router.post("/{task_id}/complete", response_model=Task) def complete_task( *, db: Session = Depends(get_db), task_id: int, ) -> Any: """ Mark a task as completed. """ try: import traceback import sqlite3 from datetime import datetime from app.db.session import db_file print(f"Marking task {task_id} as completed") # Try the normal SQLAlchemy approach first try: task = crud.task.mark_completed(db, task_id=task_id) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) return task except Exception as e: print(f"Error completing task with SQLAlchemy: {e}") print(traceback.format_exc()) # Continue to fallback # Fallback to direct SQLite approach try: conn = sqlite3.connect(str(db_file)) conn.row_factory = sqlite3.Row cursor = conn.cursor() # First check if task exists cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,)) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) # Update task to completed status now = datetime.utcnow().isoformat() cursor.execute( "UPDATE task SET completed = ?, status = ?, updated_at = ? WHERE id = ?", (1, "done", now, task_id) ) conn.commit() # Get the updated task cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,)) updated_row = cursor.fetchone() conn.close() if updated_row: task_dict = dict(updated_row) # Convert completed to boolean if 'completed' in task_dict: task_dict['completed'] = bool(task_dict['completed']) # Convert to object with attributes class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) return TaskResult(**task_dict) else: raise Exception("Task was completed but could not be retrieved") except HTTPException: raise # Re-raise the 404 exception except Exception as e: print(f"Error completing task with direct SQLite: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error completing task: {str(e)}" ) except HTTPException: raise # Re-raise any HTTP exceptions except Exception as e: print(f"Global error in complete_task: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error completing task: {str(e)}" )