Automated Action f1c2b73ade Implement online bookstore backend API
- Set up FastAPI project structure with SQLite and SQLAlchemy
- Create models for users, books, authors, categories, and orders
- Implement JWT authentication and authorization
- Add CRUD endpoints for all resources
- Set up Alembic for database migrations
- Add health check endpoint
- Add proper error handling and validation
- Create comprehensive documentation
2025-05-20 12:04:27 +00:00

245 lines
7.5 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import Optional
from app.db.database import get_db
from app.models.order import Order, OrderItem, OrderStatus
from app.models.book import Book
from app.models.user import User
from app.api.schemas.order import (
Order as OrderSchema,
OrderCreate,
OrderUpdate,
OrderList,
)
from app.auth.auth import get_current_active_user, get_current_admin_user
router = APIRouter()
@router.post("", response_model=OrderSchema, status_code=status.HTTP_201_CREATED)
async def create_order(
order: OrderCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user),
):
"""
Create a new order
"""
# Validate that all books exist and have enough stock
total_amount = 0
order_items = []
for item in order.items:
book = db.query(Book).filter(Book.id == item.book_id).first()
if not book:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Book with ID {item.book_id} not found",
)
if book.stock_quantity < item.quantity:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Not enough stock for book '{book.title}'. Available: {book.stock_quantity}",
)
# Calculate item total
item_total = item.unit_price * item.quantity
total_amount += item_total
# Create order item
order_items.append(
OrderItem(
book_id=item.book_id,
quantity=item.quantity,
unit_price=item.unit_price,
)
)
# Update book stock
book.stock_quantity -= item.quantity
# Create the order
db_order = Order(
user_id=current_user.id,
total_amount=total_amount,
status=OrderStatus.PENDING,
shipping_address=order.shipping_address,
items=order_items,
)
db.add(db_order)
db.commit()
db.refresh(db_order)
return db_order
@router.get("", response_model=OrderList)
async def get_orders(
skip: int = 0,
limit: int = 100,
status: Optional[OrderStatus] = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user),
):
"""
Get a list of user's orders with optional status filter
"""
query = db.query(Order).filter(Order.user_id == current_user.id)
# Apply status filter if provided
if status:
query = query.filter(Order.status == status)
total = query.count()
orders = query.order_by(Order.created_at.desc()).offset(skip).limit(limit).all()
return {"total": total, "items": orders}
@router.get("/admin", response_model=OrderList)
async def get_all_orders(
skip: int = 0,
limit: int = 100,
status: Optional[OrderStatus] = None,
user_id: Optional[int] = None,
db: Session = Depends(get_db),
_: User = Depends(get_current_admin_user),
):
"""
Get a list of all orders with optional filters (admin only)
"""
query = db.query(Order)
# Apply filters if provided
if status:
query = query.filter(Order.status == status)
if user_id:
query = query.filter(Order.user_id == user_id)
total = query.count()
orders = query.order_by(Order.created_at.desc()).offset(skip).limit(limit).all()
return {"total": total, "items": orders}
@router.get("/{order_id}", response_model=OrderSchema)
async def get_order(
order_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user),
):
"""
Get an order by ID
"""
order = db.query(Order).filter(Order.id == order_id).first()
if order is None:
raise HTTPException(status_code=404, detail="Order not found")
# Check if the order belongs to the current user or if the user is an admin
if order.user_id != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to access this order",
)
return order
@router.put("/{order_id}", response_model=OrderSchema)
async def update_order(
order_id: int,
order_update: OrderUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user),
):
"""
Update an order. Users can only update their own orders' shipping address
and only if the order is still in 'pending' state.
Admins can update status and shipping address of any order.
"""
db_order = db.query(Order).filter(Order.id == order_id).first()
if db_order is None:
raise HTTPException(status_code=404, detail="Order not found")
# Check permissions
if current_user.is_admin:
# Admin can update any order
update_data = order_update.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(db_order, key, value)
else:
# Regular users can only update their own orders if they're still pending
if db_order.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to update this order",
)
if db_order.status != OrderStatus.PENDING:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot update order that is not in 'pending' state",
)
# Regular users can only update shipping address
if order_update.shipping_address:
db_order.shipping_address = order_update.shipping_address
db.commit()
db.refresh(db_order)
return db_order
@router.delete(
"/{order_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None
)
async def cancel_order(
order_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user),
):
"""
Cancel an order. Users can only cancel their own orders
and only if the order is still in 'pending' state.
Admins can cancel any order that hasn't been shipped yet.
"""
db_order = db.query(Order).filter(Order.id == order_id).first()
if db_order is None:
raise HTTPException(status_code=404, detail="Order not found")
# Check permissions
if current_user.is_admin:
# Admin can cancel any order that hasn't been shipped
if db_order.status in [OrderStatus.SHIPPED, OrderStatus.DELIVERED]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot cancel order that has been shipped or delivered",
)
else:
# Regular users can only cancel their own orders if they're still pending
if db_order.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to cancel this order",
)
if db_order.status != OrderStatus.PENDING:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot cancel order that is not in 'pending' state",
)
# Return items to inventory
for item in db_order.items:
book = db.query(Book).filter(Book.id == item.book_id).first()
if book:
book.stock_quantity += item.quantity
# Update order status
db_order.status = OrderStatus.CANCELLED
db.commit()
return None