
- Set up FastAPI project structure with SQLite and SQLAlchemy - Create models for users, books, authors, categories, and orders - Implement JWT authentication and authorization - Add CRUD endpoints for all resources - Set up Alembic for database migrations - Add health check endpoint - Add proper error handling and validation - Create comprehensive documentation
384 lines
11 KiB
Python
384 lines
11 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from typing import List, Optional
|
|
|
|
from app.db.database import get_db
|
|
from app.models.book import Book, Author, Category
|
|
from app.models.user import User
|
|
from app.api.schemas.book import (
|
|
Book as BookSchema,
|
|
BookCreate,
|
|
BookUpdate,
|
|
BookList,
|
|
Author as AuthorSchema,
|
|
AuthorCreate,
|
|
AuthorUpdate,
|
|
Category as CategorySchema,
|
|
CategoryCreate,
|
|
CategoryUpdate,
|
|
)
|
|
from app.auth.auth import get_current_admin_user
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# Book routes
|
|
@router.get("", response_model=BookList)
|
|
async def get_books(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
title: Optional[str] = None,
|
|
category_id: Optional[int] = None,
|
|
author_id: Optional[int] = None,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
Get a list of books with optional filters
|
|
"""
|
|
query = db.query(Book)
|
|
|
|
# Apply filters if provided
|
|
if title:
|
|
query = query.filter(Book.title.ilike(f"%{title}%"))
|
|
if category_id:
|
|
query = query.filter(Book.categories.any(Category.id == category_id))
|
|
if author_id:
|
|
query = query.filter(Book.authors.any(Author.id == author_id))
|
|
|
|
total = query.count()
|
|
books = query.offset(skip).limit(limit).all()
|
|
|
|
return {"total": total, "items": books}
|
|
|
|
|
|
@router.get("/{book_id}", response_model=BookSchema)
|
|
async def get_book(book_id: int, db: Session = Depends(get_db)):
|
|
"""
|
|
Get a book by ID
|
|
"""
|
|
book = db.query(Book).filter(Book.id == book_id).first()
|
|
if book is None:
|
|
raise HTTPException(status_code=404, detail="Book not found")
|
|
return book
|
|
|
|
|
|
@router.post("", response_model=BookSchema, status_code=status.HTTP_201_CREATED)
|
|
async def create_book(
|
|
book: BookCreate,
|
|
db: Session = Depends(get_db),
|
|
_: User = Depends(get_current_admin_user),
|
|
):
|
|
"""
|
|
Create a new book (admin only)
|
|
"""
|
|
# Check if book with ISBN already exists
|
|
existing_book = db.query(Book).filter(Book.isbn == book.isbn).first()
|
|
if existing_book:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Book with this ISBN already exists",
|
|
)
|
|
|
|
# Get authors
|
|
authors = db.query(Author).filter(Author.id.in_(book.author_ids)).all()
|
|
if len(authors) != len(book.author_ids):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="One or more author IDs not found",
|
|
)
|
|
|
|
# Get categories
|
|
categories = db.query(Category).filter(Category.id.in_(book.category_ids)).all()
|
|
if len(categories) != len(book.category_ids):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="One or more category IDs not found",
|
|
)
|
|
|
|
# Create new book
|
|
db_book = Book(
|
|
title=book.title,
|
|
isbn=book.isbn,
|
|
description=book.description,
|
|
price=book.price,
|
|
cover_image_url=book.cover_image_url,
|
|
publication_date=book.publication_date,
|
|
publisher=book.publisher,
|
|
language=book.language,
|
|
page_count=book.page_count,
|
|
stock_quantity=book.stock_quantity,
|
|
authors=authors,
|
|
categories=categories,
|
|
)
|
|
|
|
db.add(db_book)
|
|
db.commit()
|
|
db.refresh(db_book)
|
|
return db_book
|
|
|
|
|
|
@router.put("/{book_id}", response_model=BookSchema)
|
|
async def update_book(
|
|
book_id: int,
|
|
book: BookUpdate,
|
|
db: Session = Depends(get_db),
|
|
_: User = Depends(get_current_admin_user),
|
|
):
|
|
"""
|
|
Update a book (admin only)
|
|
"""
|
|
db_book = db.query(Book).filter(Book.id == book_id).first()
|
|
if db_book is None:
|
|
raise HTTPException(status_code=404, detail="Book not found")
|
|
|
|
# Update basic fields if provided
|
|
update_data = book.model_dump(exclude_unset=True)
|
|
|
|
# Handle author_ids separately
|
|
if "author_ids" in update_data:
|
|
author_ids = update_data.pop("author_ids")
|
|
if author_ids:
|
|
authors = db.query(Author).filter(Author.id.in_(author_ids)).all()
|
|
if len(authors) != len(author_ids):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="One or more author IDs not found",
|
|
)
|
|
db_book.authors = authors
|
|
|
|
# Handle category_ids separately
|
|
if "category_ids" in update_data:
|
|
category_ids = update_data.pop("category_ids")
|
|
if category_ids:
|
|
categories = db.query(Category).filter(Category.id.in_(category_ids)).all()
|
|
if len(categories) != len(category_ids):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="One or more category IDs not found",
|
|
)
|
|
db_book.categories = categories
|
|
|
|
# Update the book with the remaining fields
|
|
for key, value in update_data.items():
|
|
setattr(db_book, key, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_book)
|
|
return db_book
|
|
|
|
|
|
@router.delete(
|
|
"/{book_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None
|
|
)
|
|
async def delete_book(
|
|
book_id: int,
|
|
db: Session = Depends(get_db),
|
|
_: User = Depends(get_current_admin_user),
|
|
):
|
|
"""
|
|
Delete a book (admin only)
|
|
"""
|
|
db_book = db.query(Book).filter(Book.id == book_id).first()
|
|
if db_book is None:
|
|
raise HTTPException(status_code=404, detail="Book not found")
|
|
|
|
db.delete(db_book)
|
|
db.commit()
|
|
return None
|
|
|
|
|
|
# Author routes
|
|
@router.get("/authors", response_model=List[AuthorSchema])
|
|
async def get_authors(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
Get a list of authors
|
|
"""
|
|
authors = db.query(Author).offset(skip).limit(limit).all()
|
|
return authors
|
|
|
|
|
|
@router.get("/authors/{author_id}", response_model=AuthorSchema)
|
|
async def get_author(author_id: int, db: Session = Depends(get_db)):
|
|
"""
|
|
Get an author by ID
|
|
"""
|
|
author = db.query(Author).filter(Author.id == author_id).first()
|
|
if author is None:
|
|
raise HTTPException(status_code=404, detail="Author not found")
|
|
return author
|
|
|
|
|
|
@router.post(
|
|
"/authors", response_model=AuthorSchema, status_code=status.HTTP_201_CREATED
|
|
)
|
|
async def create_author(
|
|
author: AuthorCreate,
|
|
db: Session = Depends(get_db),
|
|
_: User = Depends(get_current_admin_user),
|
|
):
|
|
"""
|
|
Create a new author (admin only)
|
|
"""
|
|
db_author = Author(**author.model_dump())
|
|
db.add(db_author)
|
|
db.commit()
|
|
db.refresh(db_author)
|
|
return db_author
|
|
|
|
|
|
@router.put("/authors/{author_id}", response_model=AuthorSchema)
|
|
async def update_author(
|
|
author_id: int,
|
|
author: AuthorUpdate,
|
|
db: Session = Depends(get_db),
|
|
_: User = Depends(get_current_admin_user),
|
|
):
|
|
"""
|
|
Update an author (admin only)
|
|
"""
|
|
db_author = db.query(Author).filter(Author.id == author_id).first()
|
|
if db_author is None:
|
|
raise HTTPException(status_code=404, detail="Author not found")
|
|
|
|
update_data = author.model_dump(exclude_unset=True)
|
|
for key, value in update_data.items():
|
|
setattr(db_author, key, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_author)
|
|
return db_author
|
|
|
|
|
|
@router.delete(
|
|
"/authors/{author_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None
|
|
)
|
|
async def delete_author(
|
|
author_id: int,
|
|
db: Session = Depends(get_db),
|
|
_: User = Depends(get_current_admin_user),
|
|
):
|
|
"""
|
|
Delete an author (admin only)
|
|
"""
|
|
db_author = db.query(Author).filter(Author.id == author_id).first()
|
|
if db_author is None:
|
|
raise HTTPException(status_code=404, detail="Author not found")
|
|
|
|
db.delete(db_author)
|
|
db.commit()
|
|
return None
|
|
|
|
|
|
# Category routes
|
|
@router.get("/categories", response_model=List[CategorySchema])
|
|
async def get_categories(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
Get a list of categories
|
|
"""
|
|
categories = db.query(Category).offset(skip).limit(limit).all()
|
|
return categories
|
|
|
|
|
|
@router.get("/categories/{category_id}", response_model=CategorySchema)
|
|
async def get_category(category_id: int, db: Session = Depends(get_db)):
|
|
"""
|
|
Get a category by ID
|
|
"""
|
|
category = db.query(Category).filter(Category.id == category_id).first()
|
|
if category is None:
|
|
raise HTTPException(status_code=404, detail="Category not found")
|
|
return category
|
|
|
|
|
|
@router.post(
|
|
"/categories", response_model=CategorySchema, status_code=status.HTTP_201_CREATED
|
|
)
|
|
async def create_category(
|
|
category: CategoryCreate,
|
|
db: Session = Depends(get_db),
|
|
_: User = Depends(get_current_admin_user),
|
|
):
|
|
"""
|
|
Create a new category (admin only)
|
|
"""
|
|
# Check if category with the same name already exists
|
|
existing_category = (
|
|
db.query(Category).filter(Category.name == category.name).first()
|
|
)
|
|
if existing_category:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Category with this name already exists",
|
|
)
|
|
|
|
db_category = Category(**category.model_dump())
|
|
db.add(db_category)
|
|
db.commit()
|
|
db.refresh(db_category)
|
|
return db_category
|
|
|
|
|
|
@router.put("/categories/{category_id}", response_model=CategorySchema)
|
|
async def update_category(
|
|
category_id: int,
|
|
category: CategoryUpdate,
|
|
db: Session = Depends(get_db),
|
|
_: User = Depends(get_current_admin_user),
|
|
):
|
|
"""
|
|
Update a category (admin only)
|
|
"""
|
|
db_category = db.query(Category).filter(Category.id == category_id).first()
|
|
if db_category is None:
|
|
raise HTTPException(status_code=404, detail="Category not found")
|
|
|
|
update_data = category.model_dump(exclude_unset=True)
|
|
|
|
# If name is being updated, check for uniqueness
|
|
if "name" in update_data and update_data["name"] != db_category.name:
|
|
existing_category = (
|
|
db.query(Category).filter(Category.name == update_data["name"]).first()
|
|
)
|
|
if existing_category:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Category with this name already exists",
|
|
)
|
|
|
|
for key, value in update_data.items():
|
|
setattr(db_category, key, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_category)
|
|
return db_category
|
|
|
|
|
|
@router.delete(
|
|
"/categories/{category_id}",
|
|
status_code=status.HTTP_204_NO_CONTENT,
|
|
response_model=None,
|
|
)
|
|
async def delete_category(
|
|
category_id: int,
|
|
db: Session = Depends(get_db),
|
|
_: User = Depends(get_current_admin_user),
|
|
):
|
|
"""
|
|
Delete a category (admin only)
|
|
"""
|
|
db_category = db.query(Category).filter(Category.id == category_id).first()
|
|
if db_category is None:
|
|
raise HTTPException(status_code=404, detail="Category not found")
|
|
|
|
db.delete(db_category)
|
|
db.commit()
|
|
return None
|