Fix database connection issues and improve error handling

This commit is contained in:
Automated Action 2025-06-02 17:45:50 +00:00
parent 9c5b74200b
commit 4abac2b250
10 changed files with 106 additions and 42 deletions

View File

@ -54,7 +54,8 @@ version_path_separator = os # Use os.pathsep. Default configuration used to be
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
# DB path is set programmatically in env.py
sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]

View File

@ -1 +1 @@
# App package initialization
# App package initialization

View File

@ -1 +1 @@
# Database package initialization
# Database package initialization

View File

@ -2,9 +2,13 @@ from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from pathlib import Path
import os
# Create the database directory if it doesn't exist
DB_DIR = Path("/app/storage/db")
# Get project root directory and create a storage directory
PROJECT_ROOT = Path(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
)
DB_DIR = PROJECT_ROOT / "storage" / "db"
DB_DIR.mkdir(parents=True, exist_ok=True)
# SQLite database URL
@ -12,8 +16,7 @@ SQLALCHEMY_DATABASE_URL = f"sqlite:///{DB_DIR}/db.sqlite"
# Create the SQLAlchemy engine
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
# Create a SessionLocal class
@ -22,10 +25,28 @@ SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Create a Base class for declarative models
Base = declarative_base()
# Create tables (important for first run)
def create_tables():
Base.metadata.create_all(bind=engine)
# Dependency to get a database session
def get_db():
db = SessionLocal()
try:
# Test the connection
db.execute("SELECT 1")
yield db
except Exception as e:
# Log the error (in a real-world application)
print(f"Database connection error: {e}")
# Provide a user-friendly error
from fastapi import HTTPException, status
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Database connection error. Please try again later.",
)
finally:
db.close()
db.close()

View File

@ -2,6 +2,7 @@ from sqlalchemy import Boolean, Column, Integer, String, DateTime
from sqlalchemy.sql import func
from .config import Base
class Todo(Base):
__tablename__ = "todos"
@ -10,4 +11,4 @@ class Todo(Base):
description = Column(String, nullable=True)
completed = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

64
main.py
View File

@ -7,12 +7,13 @@ from pydantic import BaseModel
from datetime import datetime
# Import database models and config
from app.database.config import get_db
from app.database.config import get_db, create_tables
from app.database.models import Todo as TodoModel
# Create tables if they don't exist
# In production, you should use Alembic migrations instead
# models.Base.metadata.create_all(bind=engine)
create_tables()
# Pydantic models for request and response
class TodoBase(BaseModel):
@ -20,16 +21,19 @@ class TodoBase(BaseModel):
description: Optional[str] = None
completed: bool = False
class TodoCreate(TodoBase):
pass
class TodoResponse(TodoBase):
id: int
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
orm_mode = True
from_attributes = True # Updated for Pydantic v2 (replaces orm_mode)
# Create the FastAPI app
app = FastAPI(
@ -47,13 +51,29 @@ app.add_middleware(
allow_headers=["*"],
)
# Health endpoint
@app.get("/health", tags=["Health"])
async def health_check():
async def health_check(db: Session = Depends(get_db)):
"""
Health check endpoint to verify the API is running.
Health check endpoint to verify the API is running and database connection is working.
"""
return {"status": "healthy"}
try:
# Test database connection
db.execute("SELECT 1").first()
return {
"status": "healthy",
"database": "connected",
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
return {
"status": "unhealthy",
"database": "disconnected",
"error": str(e),
"timestamp": datetime.now().isoformat(),
}
# Root endpoint
@app.get("/", tags=["Root"])
@ -63,18 +83,25 @@ async def root():
"""
return {"message": "Welcome to Todo List API. Visit /docs for documentation."}
# Todo API endpoints
@app.post("/todos", response_model=TodoResponse, status_code=status.HTTP_201_CREATED, tags=["Todos"])
@app.post(
"/todos",
response_model=TodoResponse,
status_code=status.HTTP_201_CREATED,
tags=["Todos"],
)
def create_todo(todo: TodoCreate, db: Session = Depends(get_db)):
"""
Create a new todo item.
"""
db_todo = TodoModel(**todo.dict())
db_todo = TodoModel(**todo.model_dump())
db.add(db_todo)
db.commit()
db.refresh(db_todo)
return db_todo
@app.get("/todos", response_model=List[TodoResponse], tags=["Todos"])
def read_todos(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
"""
@ -83,6 +110,7 @@ def read_todos(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
todos = db.query(TodoModel).offset(skip).limit(limit).all()
return todos
@app.get("/todos/{todo_id}", response_model=TodoResponse, tags=["Todos"])
def read_todo(todo_id: int, db: Session = Depends(get_db)):
"""
@ -93,6 +121,7 @@ def read_todo(todo_id: int, db: Session = Depends(get_db)):
raise HTTPException(status_code=404, detail="Todo not found")
return db_todo
@app.put("/todos/{todo_id}", response_model=TodoResponse, tags=["Todos"])
def update_todo(todo_id: int, todo: TodoCreate, db: Session = Depends(get_db)):
"""
@ -101,16 +130,22 @@ def update_todo(todo_id: int, todo: TodoCreate, db: Session = Depends(get_db)):
db_todo = db.query(TodoModel).filter(TodoModel.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
# Update todo item fields
for key, value in todo.dict().items():
for key, value in todo.model_dump().items():
setattr(db_todo, key, value)
db.commit()
db.refresh(db_todo)
return db_todo
@app.delete("/todos/{todo_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None, tags=["Todos"])
@app.delete(
"/todos/{todo_id}",
status_code=status.HTTP_204_NO_CONTENT,
response_model=None,
tags=["Todos"],
)
def delete_todo(todo_id: int, db: Session = Depends(get_db)):
"""
Delete a todo item.
@ -118,11 +153,12 @@ def delete_todo(todo_id: int, db: Session = Depends(get_db)):
db_todo = db.query(TodoModel).filter(TodoModel.id == todo_id).first()
if db_todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
db.delete(db_todo)
db.commit()
return None
# For local development
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

View File

@ -1 +1 @@
# Migrations package initialization
# Migrations package initialization

View File

@ -71,7 +71,7 @@ def run_migrations_online() -> None:
with connectable.connect() as connection:
# Check if we're dealing with SQLite
is_sqlite = connection.dialect.name == "sqlite"
context.configure(
connection=connection,
target_metadata=target_metadata,
@ -85,4 +85,4 @@ def run_migrations_online() -> None:
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
run_migrations_online()

View File

@ -1 +1 @@
# Migrations versions package initialization
# Migrations versions package initialization

View File

@ -1,16 +1,17 @@
"""Initial migration
Revision ID: 001
Revises:
Revises:
Create Date: 2023-10-24
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '001'
revision = "001"
down_revision = None
branch_labels = None
depends_on = None
@ -19,21 +20,25 @@ depends_on = None
def upgrade() -> None:
# Create todos table
op.create_table(
'todos',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('title', sa.String(), nullable=False),
sa.Column('description', sa.String(), nullable=True),
sa.Column('completed', sa.Boolean(), nullable=False, default=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)')),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id')
"todos",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("title", sa.String(), nullable=False),
sa.Column("description", sa.String(), nullable=True),
sa.Column("completed", sa.Boolean(), nullable=False, default=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("(CURRENT_TIMESTAMP)"),
),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f('ix_todos_id'), 'todos', ['id'], unique=False)
op.create_index(op.f('ix_todos_title'), 'todos', ['title'], unique=False)
op.create_index(op.f("ix_todos_id"), "todos", ["id"], unique=False)
op.create_index(op.f("ix_todos_title"), "todos", ["title"], unique=False)
def downgrade() -> None:
# Drop todos table
op.drop_index(op.f('ix_todos_title'), table_name='todos')
op.drop_index(op.f('ix_todos_id'), table_name='todos')
op.drop_table('todos')
op.drop_index(op.f("ix_todos_title"), table_name="todos")
op.drop_index(op.f("ix_todos_id"), table_name="todos")
op.drop_table("todos")