701 lines
25 KiB
Python

from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from app import crud
from app.api.deps import get_db
from app.models.task import TaskStatus
from app.schemas.task import Task, TaskCreate, TaskUpdate
router = APIRouter()
@router.get("/", response_model=List[Task])
def read_tasks(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
status: Optional[TaskStatus] = None,
) -> Any:
"""
Retrieve tasks.
"""
try:
import traceback
import sqlite3
from sqlalchemy import text
from app.db.session import db_file
print(f"Getting tasks with status: {status}, skip: {skip}, limit: {limit}")
# Try the normal SQLAlchemy approach first
try:
if status:
tasks = crud.task.get_by_status(db, status=status)
else:
tasks = crud.task.get_multi(db, skip=skip, limit=limit)
return tasks
except Exception as e:
print(f"Error getting tasks with SQLAlchemy: {e}")
print(traceback.format_exc())
# Continue to fallback
# Fallback to direct SQLite approach
try:
conn = sqlite3.connect(str(db_file))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
if status:
cursor.execute("SELECT * FROM task WHERE status = ? LIMIT ? OFFSET ?",
(status.value, limit, skip))
else:
cursor.execute("SELECT * FROM task LIMIT ? OFFSET ?", (limit, skip))
rows = cursor.fetchall()
# Convert to Task objects
tasks = []
for row in rows:
task_dict = dict(row)
# Convert completed to boolean
if 'completed' in task_dict:
task_dict['completed'] = bool(task_dict['completed'])
# Convert to object with attributes
class TaskResult:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
tasks.append(TaskResult(**task_dict))
conn.close()
return tasks
except Exception as e:
print(f"Error getting tasks with direct SQLite: {e}")
print(traceback.format_exc())
raise
except Exception as e:
print(f"Global error in read_tasks: {e}")
print(traceback.format_exc())
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error retrieving tasks: {str(e)}"
)
@router.post("/", response_model=Task, status_code=status.HTTP_201_CREATED)
def create_task(
*,
db: Session = Depends(get_db),
task_in: TaskCreate,
) -> Any:
"""
Create new task - using direct SQLite approach for reliability.
"""
import sqlite3
import time
import sys
import traceback
from datetime import datetime
from app.db.session import db_file
# Log creation attempt
print(f"[{datetime.now().isoformat()}] Task creation requested", file=sys.stdout)
# Use direct SQLite for maximum reliability
try:
# Extract task data regardless of Pydantic version
try:
if hasattr(task_in, 'model_dump'):
task_data = task_in.model_dump()
elif hasattr(task_in, 'dict'):
task_data = task_in.dict()
else:
# Fallback for any case
task_data = {
'title': getattr(task_in, 'title', 'Untitled Task'),
'description': getattr(task_in, 'description', ''),
'priority': getattr(task_in, 'priority', 'medium'),
'status': getattr(task_in, 'status', 'todo'),
'due_date': getattr(task_in, 'due_date', None),
'completed': getattr(task_in, 'completed', False)
}
print(f"Task data: {task_data}")
except Exception as e:
print(f"Error extracting task data: {e}")
# Fallback to minimal data
task_data = {
'title': str(getattr(task_in, 'title', 'Unknown Title')),
'description': str(getattr(task_in, 'description', '')),
'priority': 'medium',
'status': 'todo',
'completed': False
}
# Format due_date if present
if task_data.get('due_date'):
try:
if isinstance(task_data['due_date'], datetime):
task_data['due_date'] = task_data['due_date'].isoformat()
elif isinstance(task_data['due_date'], str):
# Standardize format by parsing and reformatting
parsed_date = datetime.fromisoformat(
task_data['due_date'].replace('Z', '+00:00')
)
task_data['due_date'] = parsed_date.isoformat()
except Exception as e:
print(f"Warning: Could not parse due_date: {e}")
# Keep as-is or set to None if invalid
if not isinstance(task_data['due_date'], str):
task_data['due_date'] = None
# Get current timestamp for created/updated fields
now = datetime.utcnow().isoformat()
# Connect to SQLite with retry logic
for retry in range(3):
conn = None
try:
# Try to connect to the database with a timeout
conn = sqlite3.connect(str(db_file), timeout=30)
cursor = conn.cursor()
# Create the task table if it doesn't exist - using minimal schema
cursor.execute("""
CREATE TABLE IF NOT EXISTS task (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT,
priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'todo',
due_date TEXT,
completed INTEGER DEFAULT 0,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""")
# Insert the task - provide defaults for all fields
cursor.execute(
"""
INSERT INTO task (
title, description, priority, status,
due_date, completed, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
task_data.get('title', 'Untitled'),
task_data.get('description', ''),
task_data.get('priority', 'medium'),
task_data.get('status', 'todo'),
task_data.get('due_date'),
1 if task_data.get('completed') else 0,
now,
now
)
)
# Get the ID of the inserted task
task_id = cursor.lastrowid
print(f"Task inserted with ID: {task_id}")
# Commit the transaction
conn.commit()
# Retrieve the created task to return it
cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,))
row = cursor.fetchone()
if row:
# Get column names from cursor description
column_names = [desc[0] for desc in cursor.description]
# Create a dictionary from row values
task_dict = dict(zip(column_names, row))
# Convert 'completed' to boolean
if 'completed' in task_dict:
task_dict['completed'] = bool(task_dict['completed'])
# Create an object that mimics the Task model
class TaskResult:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
print(f"Task created successfully: ID={task_id}")
# Close the connection and return the task
conn.close()
return TaskResult(**task_dict)
else:
conn.close()
raise Exception(f"Task creation succeeded but retrieval failed for ID: {task_id}")
except sqlite3.OperationalError as e:
if conn:
conn.close()
# Check if retry is appropriate
if "database is locked" in str(e) and retry < 2:
wait_time = (retry + 1) * 1.5 # Exponential backoff
print(f"Database locked, retrying in {wait_time}s (attempt {retry+1}/3)")
time.sleep(wait_time)
else:
print(f"SQLite operational error: {e}")
raise
except Exception as e:
if conn:
# Try to rollback if connection is still open
try:
conn.rollback()
except:
pass
conn.close()
print(f"Error in SQLite task creation: {e}")
print(traceback.format_exc())
# Only retry on specific transient errors
if retry < 2 and ("locked" in str(e).lower() or "busy" in str(e).lower()):
time.sleep(1)
continue
raise
# If we reach here, the retry loop failed
raise Exception("Failed to create task after multiple attempts")
except Exception as sqlite_error:
# Final fallback: try SQLAlchemy approach
try:
print(f"Direct SQLite approach failed: {sqlite_error}")
print("Trying SQLAlchemy as fallback...")
task = crud.task.create(db, obj_in=task_in)
print(f"Task created with SQLAlchemy fallback: ID={task.id}")
return task
except Exception as alch_error:
print(f"SQLAlchemy fallback also failed: {alch_error}")
print(traceback.format_exc())
# Provide detailed error information
error_detail = f"Task creation failed. Primary error: {str(sqlite_error)}. Fallback error: {str(alch_error)}"
print(f"Final error: {error_detail}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=error_detail
)
@router.get("/{task_id}", response_model=Task)
def read_task(
*,
db: Session = Depends(get_db),
task_id: int,
) -> Any:
"""
Get task by ID.
"""
try:
import traceback
import sqlite3
from app.db.session import db_file
print(f"Getting task with ID: {task_id}")
# Try the normal SQLAlchemy approach first
try:
task = crud.task.get(db, id=task_id)
if task:
return task
# Fall through to direct SQLite if task not found
except Exception as e:
print(f"Error getting task with SQLAlchemy: {e}")
print(traceback.format_exc())
# Continue to fallback
# Fallback to direct SQLite approach
try:
conn = sqlite3.connect(str(db_file))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,))
row = cursor.fetchone()
if not row:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found",
)
task_dict = dict(row)
# Convert completed to boolean
if 'completed' in task_dict:
task_dict['completed'] = bool(task_dict['completed'])
# Convert to object with attributes
class TaskResult:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
conn.close()
return TaskResult(**task_dict)
except HTTPException:
raise # Re-raise the 404 exception
except Exception as e:
print(f"Error getting task with direct SQLite: {e}")
print(traceback.format_exc())
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error retrieving task: {str(e)}"
)
except HTTPException:
raise # Re-raise any HTTP exceptions
except Exception as e:
print(f"Global error in read_task: {e}")
print(traceback.format_exc())
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error retrieving task: {str(e)}"
)
@router.put("/{task_id}", response_model=Task)
def update_task(
*,
db: Session = Depends(get_db),
task_id: int,
task_in: TaskUpdate,
) -> Any:
"""
Update a task.
"""
try:
import traceback
import sqlite3
import json
from datetime import datetime
from app.db.session import db_file
print(f"Updating task with ID: {task_id}, data: {task_in}")
# Handle datetime conversion for due_date if present
if hasattr(task_in, "due_date") and task_in.due_date is not None:
if isinstance(task_in.due_date, str):
try:
task_in.due_date = datetime.fromisoformat(task_in.due_date.replace('Z', '+00:00'))
except Exception as e:
print(f"Error parsing due_date: {e}")
# Try the normal SQLAlchemy approach first
try:
task = crud.task.get(db, id=task_id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found",
)
updated_task = crud.task.update(db, db_obj=task, obj_in=task_in)
return updated_task
except HTTPException:
raise # Re-raise the 404 exception
except Exception as e:
print(f"Error updating task with SQLAlchemy: {e}")
print(traceback.format_exc())
# Continue to fallback
# Fallback to direct SQLite approach
try:
conn = sqlite3.connect(str(db_file))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# First check if task exists
cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,))
row = cursor.fetchone()
if not row:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found",
)
# Convert Pydantic model to dict, excluding unset values
updates = {}
model_data = task_in.model_dump(exclude_unset=True) if hasattr(task_in, "model_dump") else task_in.dict(exclude_unset=True)
# Only include fields that were provided in the update
for key, value in model_data.items():
if value is not None: # Skip None values
updates[key] = value
if not updates:
# No updates provided
task_dict = dict(row)
# Convert completed to boolean
if 'completed' in task_dict:
task_dict['completed'] = bool(task_dict['completed'])
# Return the unchanged task
class TaskResult:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
conn.close()
return TaskResult(**task_dict)
# Format datetime objects
if 'due_date' in updates and isinstance(updates['due_date'], datetime):
updates['due_date'] = updates['due_date'].isoformat()
# Add updated_at timestamp
updates['updated_at'] = datetime.utcnow().isoformat()
# Build the SQL update statement
set_clause = ", ".join([f"{key} = ?" for key in updates.keys()])
params = list(updates.values())
params.append(task_id) # For the WHERE clause
cursor.execute(f"UPDATE task SET {set_clause} WHERE id = ?", params)
conn.commit()
# Return the updated task
cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,))
updated_row = cursor.fetchone()
conn.close()
if updated_row:
task_dict = dict(updated_row)
# Convert completed to boolean
if 'completed' in task_dict:
task_dict['completed'] = bool(task_dict['completed'])
# Convert to object with attributes
class TaskResult:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
return TaskResult(**task_dict)
else:
raise Exception("Task was updated but could not be retrieved")
except HTTPException:
raise # Re-raise the 404 exception
except Exception as e:
print(f"Error updating task with direct SQLite: {e}")
print(traceback.format_exc())
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error updating task: {str(e)}"
)
except HTTPException:
raise # Re-raise any HTTP exceptions
except Exception as e:
print(f"Global error in update_task: {e}")
print(traceback.format_exc())
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error updating task: {str(e)}"
)
@router.delete("/{task_id}", response_model=Task)
def delete_task(
*,
db: Session = Depends(get_db),
task_id: int,
) -> Any:
"""
Delete a task.
"""
try:
import traceback
import sqlite3
from app.db.session import db_file
print(f"Deleting task with ID: {task_id}")
# First, get the task to return it later
task_to_return = None
# Try the normal SQLAlchemy approach first
try:
task = crud.task.get(db, id=task_id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found",
)
task_to_return = task
task = crud.task.remove(db, id=task_id)
return task
except HTTPException:
raise # Re-raise the 404 exception
except Exception as e:
print(f"Error deleting task with SQLAlchemy: {e}")
print(traceback.format_exc())
# Continue to fallback
# Fallback to direct SQLite approach
try:
conn = sqlite3.connect(str(db_file))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# First save the task data for the return value
cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,))
row = cursor.fetchone()
if not row:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found",
)
task_dict = dict(row)
# Convert completed to boolean
if 'completed' in task_dict:
task_dict['completed'] = bool(task_dict['completed'])
# Delete the task
cursor.execute("DELETE FROM task WHERE id = ?", (task_id,))
conn.commit()
conn.close()
# Convert to object with attributes
class TaskResult:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
return TaskResult(**task_dict)
except HTTPException:
raise # Re-raise the 404 exception
except Exception as e:
print(f"Error deleting task with direct SQLite: {e}")
print(traceback.format_exc())
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error deleting task: {str(e)}"
)
except HTTPException:
raise # Re-raise any HTTP exceptions
except Exception as e:
print(f"Global error in delete_task: {e}")
print(traceback.format_exc())
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error deleting task: {str(e)}"
)
@router.post("/{task_id}/complete", response_model=Task)
def complete_task(
*,
db: Session = Depends(get_db),
task_id: int,
) -> Any:
"""
Mark a task as completed.
"""
try:
import traceback
import sqlite3
from datetime import datetime
from app.db.session import db_file
print(f"Marking task {task_id} as completed")
# Try the normal SQLAlchemy approach first
try:
task = crud.task.mark_completed(db, task_id=task_id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found",
)
return task
except Exception as e:
print(f"Error completing task with SQLAlchemy: {e}")
print(traceback.format_exc())
# Continue to fallback
# Fallback to direct SQLite approach
try:
conn = sqlite3.connect(str(db_file))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# First check if task exists
cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,))
row = cursor.fetchone()
if not row:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found",
)
# Update task to completed status
now = datetime.utcnow().isoformat()
cursor.execute(
"UPDATE task SET completed = ?, status = ?, updated_at = ? WHERE id = ?",
(1, "done", now, task_id)
)
conn.commit()
# Get the updated task
cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,))
updated_row = cursor.fetchone()
conn.close()
if updated_row:
task_dict = dict(updated_row)
# Convert completed to boolean
if 'completed' in task_dict:
task_dict['completed'] = bool(task_dict['completed'])
# Convert to object with attributes
class TaskResult:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
return TaskResult(**task_dict)
else:
raise Exception("Task was completed but could not be retrieved")
except HTTPException:
raise # Re-raise the 404 exception
except Exception as e:
print(f"Error completing task with direct SQLite: {e}")
print(traceback.format_exc())
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error completing task: {str(e)}"
)
except HTTPException:
raise # Re-raise any HTTP exceptions
except Exception as e:
print(f"Global error in complete_task: {e}")
print(traceback.format_exc())
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error completing task: {str(e)}"
)