from typing import Any, List, Optional from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy.orm import Session from app import crud from app.api.deps import get_db from app.models.task import TaskStatus from app.schemas.task import Task, TaskCreate, TaskUpdate router = APIRouter() @router.get("/", response_model=List[Task]) def read_tasks( db: Session = Depends(get_db), skip: int = 0, limit: int = 100, status: Optional[TaskStatus] = None, ) -> Any: """ Retrieve tasks. """ if status: tasks = crud.task.get_by_status(db, status=status) else: tasks = crud.task.get_multi(db, skip=skip, limit=limit) return tasks @router.post("/", response_model=Task, status_code=status.HTTP_201_CREATED) def create_task( *, db: Session = Depends(get_db), task_in: TaskCreate, ) -> Any: """ Create new task. """ import sqlite3 import json import traceback from datetime import datetime from app.db.session import db_file # Print detailed request info for debugging print(f"[{datetime.now().isoformat()}] Creating task: {task_in}") try: # Handle datetime conversion for due_date if task_in.due_date: print(f"Due date before processing: {task_in.due_date}") if isinstance(task_in.due_date, str): try: task_in.due_date = datetime.fromisoformat(task_in.due_date.replace('Z', '+00:00')) except Exception as e: print(f"Error parsing due_date: {e}") # Continue with string, SQLAlchemy will handle it # First try the normal SQLAlchemy approach try: print("Attempting to create task via SQLAlchemy...") task = crud.task.create(db, obj_in=task_in) print(f"Task created successfully with ID: {task.id}") return task except Exception as e: print(f"SQLAlchemy task creation failed: {e}") print(traceback.format_exc()) # Continue to try direct SQLite approach # Fallback: Try direct SQLite approach print("Falling back to direct SQLite task creation...") try: # Convert Pydantic model to dict task_data = task_in.model_dump() if hasattr(task_in, 'model_dump') else task_in.dict() print(f"Task data: {task_data}") # Format datetime objects if task_data.get('due_date'): if isinstance(task_data['due_date'], datetime): task_data['due_date'] = task_data['due_date'].isoformat() # Connect directly to SQLite and insert the task conn = sqlite3.connect(str(db_file)) cursor = conn.cursor() now = datetime.utcnow().isoformat() # Check if task table exists cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='task'") if not cursor.fetchone(): # Create the task table if it doesn't exist cursor.execute(""" CREATE TABLE IF NOT EXISTS task ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, description TEXT, priority TEXT, status TEXT, due_date TEXT, completed INTEGER, created_at TEXT, updated_at TEXT ) """) print("Created task table with direct SQLite") # Insert the task cursor.execute( """ INSERT INTO task ( title, description, priority, status, due_date, completed, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( task_data.get('title', 'Untitled Task'), task_data.get('description', ''), task_data.get('priority', 'medium'), task_data.get('status', 'todo'), task_data.get('due_date'), 1 if task_data.get('completed') else 0, now, now ) ) conn.commit() task_id = cursor.lastrowid # Return the created task cursor.execute("SELECT * FROM task WHERE id = ?", (task_id,)) row = cursor.fetchone() conn.close() if row: column_names = ['id', 'title', 'description', 'priority', 'status', 'due_date', 'completed', 'created_at', 'updated_at'] task_dict = {column_names[i]: row[i] for i in range(len(column_names))} # Convert completed to boolean task_dict['completed'] = bool(task_dict['completed']) # Fake Task object with attributes class TaskResult: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) print(f"Created task with direct SQLite, ID: {task_id}") return TaskResult(**task_dict) else: raise Exception("Task was created but could not be retrieved") except Exception as e: print(f"Direct SQLite task creation failed: {e}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"All task creation methods failed. Last error: {str(e)}", ) except HTTPException: raise except Exception as e: print(f"Unhandled error in create_task: {str(e)}") print(traceback.format_exc()) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error creating task: {str(e)}", ) @router.get("/{task_id}", response_model=Task) def read_task( *, db: Session = Depends(get_db), task_id: int, ) -> Any: """ Get task by ID. """ task = crud.task.get(db, id=task_id) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) return task @router.put("/{task_id}", response_model=Task) def update_task( *, db: Session = Depends(get_db), task_id: int, task_in: TaskUpdate, ) -> Any: """ Update a task. """ task = crud.task.get(db, id=task_id) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) task = crud.task.update(db, db_obj=task, obj_in=task_in) return task @router.delete("/{task_id}", response_model=Task) def delete_task( *, db: Session = Depends(get_db), task_id: int, ) -> Any: """ Delete a task. """ task = crud.task.get(db, id=task_id) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) task = crud.task.remove(db, id=task_id) return task @router.post("/{task_id}/complete", response_model=Task) def complete_task( *, db: Session = Depends(get_db), task_id: int, ) -> Any: """ Mark a task as completed. """ task = crud.task.mark_completed(db, task_id=task_id) if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) return task