from typing import Any, List, Optional from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from app import crud from app.api.deps import get_db, get_current_active_user from app.models.task import TaskStatus from app.models.user import User from app.schemas.task import Task, TaskCreate, TaskUpdate router = APIRouter() @router.get("/", response_model=List[Task]) def read_tasks( db: Session = Depends(get_db), skip: int = 0, limit: int = 100, status: Optional[TaskStatus] = None, current_user: User = Depends(get_current_active_user), ) -> Any: """ Retrieve tasks for the current user. """ try: import traceback import sqlite3 from app.db.session import db_file print(f"Getting tasks with status: {status}, skip: {skip}, limit: {limit}") # Try the normal SQLAlchemy approach first try: if status: tasks = crud.task.get_by_status( db, status=status, user_id=current_user.id ) else: tasks = crud.task.get_multi( db, skip=skip, limit=limit, user_id=current_user.id ) return tasks except Exception as e: print(f"Error getting tasks with SQLAlchemy: {e}") print(traceback.format_exc()) # Continue to fallback # Fallback to direct SQLite approach try: conn = sqlite3.connect(str(db_file)) conn.row_factory = sqlite3.Row cursor = conn.cursor() if status: cursor.execute( "SELECT * FROM task WHERE status = ? AND user_id = ? LIMIT ? OFFSET ?", (status.value, current_user.id, limit, skip), ) else: cursor.execute( "SELECT * FROM task WHERE user_id = ? LIMIT ? OFFSET ?", (current_user.id, limit, skip), ) rows = cursor.fetchall() # Convert to Task objects tasks = [] for row in rows: task_dict = dict(row) # Convert completed to boolean if "completed" in task_dict: task_dict["completed"] = bool(task_dict["completed"]) # Convert to object with attributes class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) tasks.append(TaskResult(**task_dict)) conn.close() return tasks except Exception as e: print(f"Error getting tasks with direct SQLite: {e}") print(traceback.format_exc()) raise except Exception as e: print(f"Global error in read_tasks: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error retrieving tasks: {str(e)}", ) @router.post("/", response_model=Task, status_code=status.HTTP_201_CREATED) def create_task( *, db: Session = Depends(get_db), task_in: TaskCreate, current_user: User = Depends(get_current_active_user), ) -> Any: """ Create new task for the current user - using direct SQLite approach for reliability. """ import sqlite3 import time import sys import traceback from datetime import datetime from app.db.session import db_file # Log creation attempt print(f"[{datetime.now().isoformat()}] Task creation requested", file=sys.stdout) # Use direct SQLite for maximum reliability try: # Extract task data regardless of Pydantic version try: if hasattr(task_in, "model_dump"): task_data = task_in.model_dump() elif hasattr(task_in, "dict"): task_data = task_in.dict() else: # Fallback for any case task_data = { "title": getattr(task_in, "title", "Untitled Task"), "description": getattr(task_in, "description", ""), "priority": getattr(task_in, "priority", "medium"), "status": getattr(task_in, "status", "todo"), "due_date": getattr(task_in, "due_date", None), "completed": getattr(task_in, "completed", False), } print(f"Task data: {task_data}") except Exception as e: print(f"Error extracting task data: {e}") # Fallback to minimal data task_data = { "title": str(getattr(task_in, "title", "Unknown Title")), "description": str(getattr(task_in, "description", "")), "priority": "medium", "status": "todo", "completed": False, } # Format due_date if present if task_data.get("due_date"): try: if isinstance(task_data["due_date"], datetime): task_data["due_date"] = task_data["due_date"].isoformat() elif isinstance(task_data["due_date"], str): # Standardize format by parsing and reformatting parsed_date = datetime.fromisoformat( task_data["due_date"].replace("Z", "+00:00") ) task_data["due_date"] = parsed_date.isoformat() except Exception as e: print(f"Warning: Could not parse due_date: {e}") # Keep as-is or set to None if invalid if not isinstance(task_data["due_date"], str): task_data["due_date"] = None # Get current timestamp for created/updated fields now = datetime.utcnow().isoformat() # Connect to SQLite with retry logic for retry in range(3): conn = None try: # Try to connect to the database with a timeout conn = sqlite3.connect(str(db_file), timeout=30) cursor = conn.cursor() # Create the task table if it doesn't exist - using minimal schema cursor.execute(""" CREATE TABLE IF NOT EXISTS task ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, description TEXT, priority TEXT DEFAULT 'medium', status TEXT DEFAULT 'todo', due_date TEXT, completed INTEGER DEFAULT 0, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, user_id INTEGER ) """) # Insert the task - provide defaults for all fields cursor.execute( """ INSERT INTO task ( title, description, priority, status, due_date, completed, created_at, updated_at, user_id ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( task_data.get("title", "Untitled"), task_data.get("description", ""), task_data.get("priority", "medium"), task_data.get("status", "todo"), task_data.get("due_date"), 1 if task_data.get("completed") else 0, now, now, current_user.id, ), ) # Get the ID of the inserted task task_id = cursor.lastrowid print(f"Task inserted with ID: {task_id}") # Commit the transaction conn.commit() # Retrieve the created task to return it cursor.execute( "SELECT * FROM task WHERE id = ? AND user_id = ?", (task_id, current_user.id), ) row = cursor.fetchone() if row: # Get column names from cursor description column_names = [desc[0] for desc in cursor.description] # Create a dictionary from row values task_dict = dict(zip(column_names, row)) # Convert 'completed' to boolean if "completed" in task_dict: task_dict["completed"] = bool(task_dict["completed"]) # Create an object that mimics the Task model class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) print(f"Task created successfully: ID={task_id}") # Close the connection and return the task conn.close() return TaskResult(**task_dict) else: conn.close() raise Exception( f"Task creation succeeded but retrieval failed for ID: {task_id}" ) except sqlite3.OperationalError as e: if conn: conn.close() # Check if retry is appropriate if "database is locked" in str(e) and retry < 2: wait_time = (retry + 1) * 1.5 # Exponential backoff print( f"Database locked, retrying in {wait_time}s (attempt {retry + 1}/3)" ) time.sleep(wait_time) else: print(f"SQLite operational error: {e}") raise except Exception as e: if conn: # Try to rollback if connection is still open try: conn.rollback() except Exception: pass conn.close() print(f"Error in SQLite task creation: {e}") print(traceback.format_exc()) # Only retry on specific transient errors if retry < 2 and ( "locked" in str(e).lower() or "busy" in str(e).lower() ): time.sleep(1) continue raise # If we reach here, the retry loop failed raise Exception("Failed to create task after multiple attempts") except Exception as sqlite_error: # Final fallback: try SQLAlchemy approach try: print(f"Direct SQLite approach failed: {sqlite_error}") print("Trying SQLAlchemy as fallback...") task = crud.task.create_with_owner( db, obj_in=task_in, user_id=current_user.id ) print(f"Task created with SQLAlchemy fallback: ID={task.id}") return task except Exception as alch_error: print(f"SQLAlchemy fallback also failed: {alch_error}") print(traceback.format_exc()) # Provide detailed error information error_detail = f"Task creation failed. Primary error: {str(sqlite_error)}. Fallback error: {str(alch_error)}" print(f"Final error: {error_detail}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=error_detail ) @router.get("/{task_id}", response_model=Task) def read_task( *, db: Session = Depends(get_db), task_id: int, current_user: User = Depends(get_current_active_user), ) -> Any: """ Get task by ID for the current user. """ try: import traceback import sqlite3 from app.db.session import db_file print(f"Getting task with ID: {task_id}") # Try the normal SQLAlchemy approach first try: task = crud.task.get_by_id_and_user( db, task_id=task_id, user_id=current_user.id ) if task: return task # Task not found or doesn't belong to user - check further with direct SQLite except Exception as e: print(f"Error getting task with SQLAlchemy: {e}") print(traceback.format_exc()) # Continue to fallback # Fallback to direct SQLite approach try: conn = sqlite3.connect(str(db_file)) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute( "SELECT * FROM task WHERE id = ? AND user_id = ?", (task_id, current_user.id), ) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) task_dict = dict(row) # Convert completed to boolean if "completed" in task_dict: task_dict["completed"] = bool(task_dict["completed"]) # Convert to object with attributes class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) conn.close() return TaskResult(**task_dict) except HTTPException: raise # Re-raise the 404 exception except Exception as e: print(f"Error getting task with direct SQLite: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error retrieving task: {str(e)}", ) except HTTPException: raise # Re-raise any HTTP exceptions except Exception as e: print(f"Global error in read_task: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error retrieving task: {str(e)}", ) @router.put("/{task_id}", response_model=Task) def update_task( *, db: Session = Depends(get_db), task_id: int, task_in: TaskUpdate, current_user: User = Depends(get_current_active_user), ) -> Any: """ Update a task for the current user. """ try: import traceback import sqlite3 from datetime import datetime from app.db.session import db_file print(f"Updating task with ID: {task_id}, data: {task_in}") # Handle datetime conversion for due_date if present if hasattr(task_in, "due_date") and task_in.due_date is not None: if isinstance(task_in.due_date, str): try: task_in.due_date = datetime.fromisoformat( task_in.due_date.replace("Z", "+00:00") ) except Exception as e: print(f"Error parsing due_date: {e}") # Try the normal SQLAlchemy approach first try: task = crud.task.get_by_id_and_user( db, task_id=task_id, user_id=current_user.id ) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) updated_task = crud.task.update(db, db_obj=task, obj_in=task_in) return updated_task except HTTPException: raise # Re-raise the 404 exception except Exception as e: print(f"Error updating task with SQLAlchemy: {e}") print(traceback.format_exc()) # Continue to fallback # Fallback to direct SQLite approach try: conn = sqlite3.connect(str(db_file)) conn.row_factory = sqlite3.Row cursor = conn.cursor() # First check if task exists and belongs to current user cursor.execute( "SELECT * FROM task WHERE id = ? AND user_id = ?", (task_id, current_user.id), ) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) # Convert Pydantic model to dict, excluding unset values updates = {} model_data = ( task_in.model_dump(exclude_unset=True) if hasattr(task_in, "model_dump") else task_in.dict(exclude_unset=True) ) # Only include fields that were provided in the update for key, value in model_data.items(): if value is not None: # Skip None values updates[key] = value if not updates: # No updates provided task_dict = dict(row) # Convert completed to boolean if "completed" in task_dict: task_dict["completed"] = bool(task_dict["completed"]) # Return the unchanged task class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) conn.close() return TaskResult(**task_dict) # Format datetime objects if "due_date" in updates and isinstance(updates["due_date"], datetime): updates["due_date"] = updates["due_date"].isoformat() # Add updated_at timestamp updates["updated_at"] = datetime.utcnow().isoformat() # Build the SQL update statement set_clause = ", ".join([f"{key} = ?" for key in updates.keys()]) params = list(updates.values()) params.append(task_id) # For the WHERE clause cursor.execute(f"UPDATE task SET {set_clause} WHERE id = ?", params) conn.commit() # Return the updated task cursor.execute( "SELECT * FROM task WHERE id = ? AND user_id = ?", (task_id, current_user.id), ) updated_row = cursor.fetchone() conn.close() if updated_row: task_dict = dict(updated_row) # Convert completed to boolean if "completed" in task_dict: task_dict["completed"] = bool(task_dict["completed"]) # Convert to object with attributes class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) return TaskResult(**task_dict) else: raise Exception("Task was updated but could not be retrieved") except HTTPException: raise # Re-raise the 404 exception except Exception as e: print(f"Error updating task with direct SQLite: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error updating task: {str(e)}", ) except HTTPException: raise # Re-raise any HTTP exceptions except Exception as e: print(f"Global error in update_task: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error updating task: {str(e)}", ) @router.delete("/{task_id}", response_model=Task) def delete_task( *, db: Session = Depends(get_db), task_id: int, current_user: User = Depends(get_current_active_user), ) -> Any: """ Delete a task for the current user. """ try: import traceback import sqlite3 from app.db.session import db_file print(f"Deleting task with ID: {task_id}") # Try the normal SQLAlchemy approach try: task = crud.task.get_by_id_and_user( db, task_id=task_id, user_id=current_user.id ) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) removed_task = crud.task.remove(db, id=task_id) return removed_task except HTTPException: raise # Re-raise the 404 exception except Exception as e: print(f"Error deleting task with SQLAlchemy: {e}") print(traceback.format_exc()) # Continue to fallback # Fallback to direct SQLite approach try: conn = sqlite3.connect(str(db_file)) conn.row_factory = sqlite3.Row cursor = conn.cursor() # First save the task data for the return value cursor.execute( "SELECT * FROM task WHERE id = ? AND user_id = ?", (task_id, current_user.id), ) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) task_dict = dict(row) # Convert completed to boolean if "completed" in task_dict: task_dict["completed"] = bool(task_dict["completed"]) # Delete the task cursor.execute( "DELETE FROM task WHERE id = ? AND user_id = ?", (task_id, current_user.id), ) conn.commit() conn.close() # Convert to object with attributes class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) return TaskResult(**task_dict) except HTTPException: raise # Re-raise the 404 exception except Exception as e: print(f"Error deleting task with direct SQLite: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error deleting task: {str(e)}", ) except HTTPException: raise # Re-raise any HTTP exceptions except Exception as e: print(f"Global error in delete_task: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error deleting task: {str(e)}", ) @router.post("/{task_id}/complete", response_model=Task) def complete_task( *, db: Session = Depends(get_db), task_id: int, current_user: User = Depends(get_current_active_user), ) -> Any: """ Mark a task as completed for the current user. """ try: import traceback import sqlite3 from datetime import datetime from app.db.session import db_file print(f"Marking task {task_id} as completed") # Try the normal SQLAlchemy approach first try: task = crud.task.mark_completed( db, task_id=task_id, user_id=current_user.id ) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) return task except Exception as e: print(f"Error completing task with SQLAlchemy: {e}") print(traceback.format_exc()) # Continue to fallback # Fallback to direct SQLite approach try: conn = sqlite3.connect(str(db_file)) conn.row_factory = sqlite3.Row cursor = conn.cursor() # First check if task exists and belongs to current user cursor.execute( "SELECT * FROM task WHERE id = ? AND user_id = ?", (task_id, current_user.id), ) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) # Update task to completed status now = datetime.utcnow().isoformat() cursor.execute( "UPDATE task SET completed = ?, status = ?, updated_at = ? WHERE id = ? AND user_id = ?", (1, "done", now, task_id, current_user.id), ) conn.commit() # Get the updated task cursor.execute( "SELECT * FROM task WHERE id = ? AND user_id = ?", (task_id, current_user.id), ) updated_row = cursor.fetchone() conn.close() if updated_row: task_dict = dict(updated_row) # Convert completed to boolean if "completed" in task_dict: task_dict["completed"] = bool(task_dict["completed"]) # Convert to object with attributes class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) return TaskResult(**task_dict) else: raise Exception("Task was completed but could not be retrieved") except HTTPException: raise # Re-raise the 404 exception except Exception as e: print(f"Error completing task with direct SQLite: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error completing task: {str(e)}", ) except HTTPException: raise # Re-raise any HTTP exceptions except Exception as e: print(f"Global error in complete_task: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error completing task: {str(e)}", )