Fix database path and improve error handling for robust API functionality

This commit is contained in:
Automated Action 2025-05-16 06:50:37 +00:00
parent 0645953951
commit ca45718f42
4 changed files with 97 additions and 36 deletions

View File

@ -1,11 +1,4 @@
from typing import Generator from typing import Generator
from app.db.session import SessionLocal # Use the improved get_db function from session.py
from app.db.session import get_db
def get_db() -> Generator:
db = SessionLocal()
try:
yield db
finally:
db.close()

View File

@ -53,26 +53,69 @@ class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
db_obj: ModelType, db_obj: ModelType,
obj_in: Union[UpdateSchemaType, Dict[str, Any]] obj_in: Union[UpdateSchemaType, Dict[str, Any]]
) -> ModelType: ) -> ModelType:
obj_data = jsonable_encoder(db_obj) try:
if isinstance(obj_in, dict): # Log update operation
update_data = obj_in print(f"Updating {self.model.__name__} with id: {db_obj.id}")
else:
# Handle both Pydantic v1 and v2 # Get the existing data
if hasattr(obj_in, "model_dump"): obj_data = jsonable_encoder(db_obj)
update_data = obj_in.model_dump(exclude_unset=True)
# Process the update data
if isinstance(obj_in, dict):
update_data = obj_in
else: else:
update_data = obj_in.dict(exclude_unset=True) # Handle both Pydantic v1 and v2
for field in obj_data: if hasattr(obj_in, "model_dump"):
if field in update_data: update_data = obj_in.model_dump(exclude_unset=True)
setattr(db_obj, field, update_data[field]) else:
db.add(db_obj) update_data = obj_in.dict(exclude_unset=True)
db.commit()
db.refresh(db_obj) # Log the changes being made
return db_obj changes = {k: v for k, v in update_data.items() if k in obj_data}
print(f"Fields to update: {changes}")
# Apply the updates
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
# Save changes
db.add(db_obj)
db.commit()
db.refresh(db_obj)
print(f"Successfully updated {self.model.__name__} with id: {db_obj.id}")
return db_obj
except Exception as e:
db.rollback()
error_msg = f"Error updating {self.model.__name__}: {str(e)}"
print(error_msg)
import traceback
print(traceback.format_exc())
raise Exception(error_msg) from e
def remove(self, db: Session, *, id: int) -> ModelType: def remove(self, db: Session, *, id: int) -> ModelType:
obj = db.query(self.model).filter(self.model.id == id).first() try:
if obj: # Get the object first
obj = db.query(self.model).filter(self.model.id == id).first()
if not obj:
print(f"{self.model.__name__} with id {id} not found for deletion")
return None
print(f"Deleting {self.model.__name__} with id: {id}")
# Delete the object
db.delete(obj) db.delete(obj)
db.commit() db.commit()
return obj
print(f"Successfully deleted {self.model.__name__} with id: {id}")
return obj
except Exception as e:
db.rollback()
error_msg = f"Error deleting {self.model.__name__}: {str(e)}"
print(error_msg)
import traceback
print(traceback.format_exc())
raise Exception(error_msg) from e

View File

@ -13,6 +13,12 @@ class TaskBase(BaseModel):
status: TaskStatus = TaskStatus.TODO status: TaskStatus = TaskStatus.TODO
due_date: Optional[datetime] = None due_date: Optional[datetime] = None
completed: bool = False completed: bool = False
model_config = {
"json_encoders": {
datetime: lambda dt: dt.isoformat(),
}
}
class TaskCreate(TaskBase): class TaskCreate(TaskBase):
@ -26,6 +32,24 @@ class TaskUpdate(BaseModel):
status: Optional[TaskStatus] = None status: Optional[TaskStatus] = None
due_date: Optional[datetime] = None due_date: Optional[datetime] = None
completed: Optional[bool] = None completed: Optional[bool] = None
model_config = {
"json_encoders": {
datetime: lambda dt: dt.isoformat(),
},
"populate_by_name": True,
"json_schema_extra": {
"examples": [
{
"title": "Updated Task Title",
"description": "Updated task description",
"priority": "high",
"status": "in_progress",
"completed": False
}
]
}
}
class TaskInDBBase(TaskBase): class TaskInDBBase(TaskBase):

19
main.py
View File

@ -17,15 +17,15 @@ from app.db import init_db
# Initialize the database on startup # Initialize the database on startup
print("Starting database initialization...") print("Starting database initialization...")
try: try:
# Get absolute path of the database file # Get absolute path of the database file from your config
from pathlib import Path from app.db.session import db_file
db_path = Path("/app/storage/db/db.sqlite").absolute() print(f"Database path: {db_file}")
print(f"Database path: {db_path}")
# Check directory permissions # Check directory permissions for the configured DB_DIR
db_dir = Path("/app/storage/db") from app.core.config import DB_DIR
print(f"Database directory exists: {db_dir.exists()}") print(f"Database directory: {DB_DIR}")
print(f"Database directory is writable: {os.access(db_dir, os.W_OK)}") print(f"Database directory exists: {DB_DIR.exists()}")
print(f"Database directory is writable: {os.access(DB_DIR, os.W_OK)}")
# Initialize the database and create test task # Initialize the database and create test task
init_db.init_db() init_db.init_db()
@ -168,6 +168,7 @@ def test_db_connection():
import os import os
import sqlite3 import sqlite3
import traceback import traceback
import datetime
from sqlalchemy import text from sqlalchemy import text
from app.db.session import engine, db_file from app.db.session import engine, db_file
from app.core.config import DB_DIR from app.core.config import DB_DIR
@ -262,7 +263,7 @@ def test_db_connection():
return { return {
"status": "ok", "status": "ok",
"timestamp": datetime.utcnow().isoformat(), "timestamp": datetime.datetime.utcnow().isoformat(),
"file_info": file_info, "file_info": file_info,
"sqlite_test": sqlite_test, "sqlite_test": sqlite_test,
"sqlalchemy_test": sqlalchemy_test, "sqlalchemy_test": sqlalchemy_test,