
- Set up FastAPI project structure with SQLite and SQLAlchemy - Create models for users, books, authors, categories, and orders - Implement JWT authentication and authorization - Add CRUD endpoints for all resources - Set up Alembic for database migrations - Add health check endpoint - Add proper error handling and validation - Create comprehensive documentation
245 lines
7.5 KiB
Python
245 lines
7.5 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from typing import Optional
|
|
|
|
from app.db.database import get_db
|
|
from app.models.order import Order, OrderItem, OrderStatus
|
|
from app.models.book import Book
|
|
from app.models.user import User
|
|
from app.api.schemas.order import (
|
|
Order as OrderSchema,
|
|
OrderCreate,
|
|
OrderUpdate,
|
|
OrderList,
|
|
)
|
|
from app.auth.auth import get_current_active_user, get_current_admin_user
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("", response_model=OrderSchema, status_code=status.HTTP_201_CREATED)
|
|
async def create_order(
|
|
order: OrderCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_active_user),
|
|
):
|
|
"""
|
|
Create a new order
|
|
"""
|
|
# Validate that all books exist and have enough stock
|
|
total_amount = 0
|
|
order_items = []
|
|
|
|
for item in order.items:
|
|
book = db.query(Book).filter(Book.id == item.book_id).first()
|
|
if not book:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Book with ID {item.book_id} not found",
|
|
)
|
|
|
|
if book.stock_quantity < item.quantity:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Not enough stock for book '{book.title}'. Available: {book.stock_quantity}",
|
|
)
|
|
|
|
# Calculate item total
|
|
item_total = item.unit_price * item.quantity
|
|
total_amount += item_total
|
|
|
|
# Create order item
|
|
order_items.append(
|
|
OrderItem(
|
|
book_id=item.book_id,
|
|
quantity=item.quantity,
|
|
unit_price=item.unit_price,
|
|
)
|
|
)
|
|
|
|
# Update book stock
|
|
book.stock_quantity -= item.quantity
|
|
|
|
# Create the order
|
|
db_order = Order(
|
|
user_id=current_user.id,
|
|
total_amount=total_amount,
|
|
status=OrderStatus.PENDING,
|
|
shipping_address=order.shipping_address,
|
|
items=order_items,
|
|
)
|
|
|
|
db.add(db_order)
|
|
db.commit()
|
|
db.refresh(db_order)
|
|
return db_order
|
|
|
|
|
|
@router.get("", response_model=OrderList)
|
|
async def get_orders(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
status: Optional[OrderStatus] = None,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_active_user),
|
|
):
|
|
"""
|
|
Get a list of user's orders with optional status filter
|
|
"""
|
|
query = db.query(Order).filter(Order.user_id == current_user.id)
|
|
|
|
# Apply status filter if provided
|
|
if status:
|
|
query = query.filter(Order.status == status)
|
|
|
|
total = query.count()
|
|
orders = query.order_by(Order.created_at.desc()).offset(skip).limit(limit).all()
|
|
|
|
return {"total": total, "items": orders}
|
|
|
|
|
|
@router.get("/admin", response_model=OrderList)
|
|
async def get_all_orders(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
status: Optional[OrderStatus] = None,
|
|
user_id: Optional[int] = None,
|
|
db: Session = Depends(get_db),
|
|
_: User = Depends(get_current_admin_user),
|
|
):
|
|
"""
|
|
Get a list of all orders with optional filters (admin only)
|
|
"""
|
|
query = db.query(Order)
|
|
|
|
# Apply filters if provided
|
|
if status:
|
|
query = query.filter(Order.status == status)
|
|
if user_id:
|
|
query = query.filter(Order.user_id == user_id)
|
|
|
|
total = query.count()
|
|
orders = query.order_by(Order.created_at.desc()).offset(skip).limit(limit).all()
|
|
|
|
return {"total": total, "items": orders}
|
|
|
|
|
|
@router.get("/{order_id}", response_model=OrderSchema)
|
|
async def get_order(
|
|
order_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_active_user),
|
|
):
|
|
"""
|
|
Get an order by ID
|
|
"""
|
|
order = db.query(Order).filter(Order.id == order_id).first()
|
|
if order is None:
|
|
raise HTTPException(status_code=404, detail="Order not found")
|
|
|
|
# Check if the order belongs to the current user or if the user is an admin
|
|
if order.user_id != current_user.id and not current_user.is_admin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not enough permissions to access this order",
|
|
)
|
|
|
|
return order
|
|
|
|
|
|
@router.put("/{order_id}", response_model=OrderSchema)
|
|
async def update_order(
|
|
order_id: int,
|
|
order_update: OrderUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_active_user),
|
|
):
|
|
"""
|
|
Update an order. Users can only update their own orders' shipping address
|
|
and only if the order is still in 'pending' state.
|
|
Admins can update status and shipping address of any order.
|
|
"""
|
|
db_order = db.query(Order).filter(Order.id == order_id).first()
|
|
if db_order is None:
|
|
raise HTTPException(status_code=404, detail="Order not found")
|
|
|
|
# Check permissions
|
|
if current_user.is_admin:
|
|
# Admin can update any order
|
|
update_data = order_update.model_dump(exclude_unset=True)
|
|
for key, value in update_data.items():
|
|
setattr(db_order, key, value)
|
|
else:
|
|
# Regular users can only update their own orders if they're still pending
|
|
if db_order.user_id != current_user.id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not enough permissions to update this order",
|
|
)
|
|
|
|
if db_order.status != OrderStatus.PENDING:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Cannot update order that is not in 'pending' state",
|
|
)
|
|
|
|
# Regular users can only update shipping address
|
|
if order_update.shipping_address:
|
|
db_order.shipping_address = order_update.shipping_address
|
|
|
|
db.commit()
|
|
db.refresh(db_order)
|
|
return db_order
|
|
|
|
|
|
@router.delete(
|
|
"/{order_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None
|
|
)
|
|
async def cancel_order(
|
|
order_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_active_user),
|
|
):
|
|
"""
|
|
Cancel an order. Users can only cancel their own orders
|
|
and only if the order is still in 'pending' state.
|
|
Admins can cancel any order that hasn't been shipped yet.
|
|
"""
|
|
db_order = db.query(Order).filter(Order.id == order_id).first()
|
|
if db_order is None:
|
|
raise HTTPException(status_code=404, detail="Order not found")
|
|
|
|
# Check permissions
|
|
if current_user.is_admin:
|
|
# Admin can cancel any order that hasn't been shipped
|
|
if db_order.status in [OrderStatus.SHIPPED, OrderStatus.DELIVERED]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Cannot cancel order that has been shipped or delivered",
|
|
)
|
|
else:
|
|
# Regular users can only cancel their own orders if they're still pending
|
|
if db_order.user_id != current_user.id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not enough permissions to cancel this order",
|
|
)
|
|
|
|
if db_order.status != OrderStatus.PENDING:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Cannot cancel order that is not in 'pending' state",
|
|
)
|
|
|
|
# Return items to inventory
|
|
for item in db_order.items:
|
|
book = db.query(Book).filter(Book.id == item.book_id).first()
|
|
if book:
|
|
book.stock_quantity += item.quantity
|
|
|
|
# Update order status
|
|
db_order.status = OrderStatus.CANCELLED
|
|
|
|
db.commit()
|
|
return None
|