2025-05-26 19:04:54 +00:00

104 lines
2.5 KiB
Python

from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.schemas.item import ItemCreate, Item as ItemSchema, ItemUpdate
from app.schemas.response import DataResponse
from app.services.item import (
create_item,
get_item,
get_items,
update_item,
delete_item,
)
router = APIRouter()
@router.post(
"/", response_model=DataResponse[ItemSchema], status_code=status.HTTP_201_CREATED
)
def create_new_item(
*,
item_in: ItemCreate,
owner_id: int,
db: Session = Depends(get_db),
) -> Any:
"""
Create new item.
"""
item = create_item(db, obj_in=item_in, owner_id=owner_id)
return DataResponse(data=item, message="Item created successfully")
@router.get("/", response_model=DataResponse[List[ItemSchema]])
def read_items(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
) -> Any:
"""
Retrieve items.
"""
items = get_items(db, skip=skip, limit=limit)
return DataResponse(data=items, message="Items retrieved successfully")
@router.get("/{item_id}", response_model=DataResponse[ItemSchema])
def read_item(
item_id: int,
db: Session = Depends(get_db),
) -> Any:
"""
Get item by ID.
"""
item = get_item(db, id=item_id)
if not item:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Item not found",
)
return DataResponse(data=item, message="Item retrieved successfully")
@router.put("/{item_id}", response_model=DataResponse[ItemSchema])
def update_item_info(
*,
item_id: int,
item_in: ItemUpdate,
db: Session = Depends(get_db),
) -> Any:
"""
Update item.
"""
item = get_item(db, id=item_id)
if not item:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Item not found",
)
item = update_item(db, db_obj=item, obj_in=item_in)
return DataResponse(data=item, message="Item updated successfully")
@router.delete(
"/{item_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None
)
def delete_item_by_id(
item_id: int,
db: Session = Depends(get_db),
) -> None:
"""
Delete item.
"""
item = get_item(db, id=item_id)
if not item:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Item not found",
)
delete_item(db, id=item_id)
return None