
- Set up FastAPI project structure with best practices - Implemented SQLAlchemy models with SQLite database - Added Alembic for database migrations - Created CRUD API endpoints for items - Added health check endpoint - Updated documentation generated with BackendIM... (backend.im)
79 lines
2.4 KiB
Python
79 lines
2.4 KiB
Python
from typing import List, Optional
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Query
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.database import get_db
|
|
from app.models.item import Item
|
|
from app.schemas.item import Item as ItemSchema, ItemCreate, ItemUpdate
|
|
|
|
router = APIRouter()
|
|
|
|
@router.post("/items/", response_model=ItemSchema, status_code=status.HTTP_201_CREATED, tags=["Items"])
|
|
def create_item(item: ItemCreate, db: Session = Depends(get_db)):
|
|
"""
|
|
Create a new item.
|
|
"""
|
|
db_item = Item(name=item.name, description=item.description)
|
|
db.add(db_item)
|
|
db.commit()
|
|
db.refresh(db_item)
|
|
return db_item
|
|
|
|
@router.get("/items/", response_model=List[ItemSchema], tags=["Items"])
|
|
def read_items(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
name: Optional[str] = Query(None, description="Filter by name"),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Retrieve items with optional filtering.
|
|
"""
|
|
query = db.query(Item)
|
|
|
|
if name:
|
|
query = query.filter(Item.name.contains(name))
|
|
|
|
items = query.offset(skip).limit(limit).all()
|
|
return items
|
|
|
|
@router.get("/items/{item_id}", response_model=ItemSchema, tags=["Items"])
|
|
def read_item(item_id: int, db: Session = Depends(get_db)):
|
|
"""
|
|
Get a specific item by ID.
|
|
"""
|
|
item = db.query(Item).filter(Item.id == item_id).first()
|
|
if item is None:
|
|
raise HTTPException(status_code=404, detail="Item not found")
|
|
return item
|
|
|
|
@router.put("/items/{item_id}", response_model=ItemSchema, tags=["Items"])
|
|
def update_item(item_id: int, item: ItemUpdate, db: Session = Depends(get_db)):
|
|
"""
|
|
Update an item.
|
|
"""
|
|
db_item = db.query(Item).filter(Item.id == item_id).first()
|
|
if db_item is None:
|
|
raise HTTPException(status_code=404, detail="Item not found")
|
|
|
|
update_data = item.dict(exclude_unset=True)
|
|
for key, value in update_data.items():
|
|
setattr(db_item, key, value)
|
|
|
|
db.add(db_item)
|
|
db.commit()
|
|
db.refresh(db_item)
|
|
return db_item
|
|
|
|
@router.delete("/items/{item_id}", status_code=status.HTTP_204_NO_CONTENT, tags=["Items"])
|
|
def delete_item(item_id: int, db: Session = Depends(get_db)):
|
|
"""
|
|
Delete an item.
|
|
"""
|
|
db_item = db.query(Item).filter(Item.id == item_id).first()
|
|
if db_item is None:
|
|
raise HTTPException(status_code=404, detail="Item not found")
|
|
|
|
db.delete(db_item)
|
|
db.commit()
|
|
return None |