
This API provides endpoints for: - Product and inventory management - Customer management - Order processing - Payment processing with Stripe integration - User authentication generated with BackendIM... (backend.im)
152 lines
5.0 KiB
Python
152 lines
5.0 KiB
Python
import json
|
|
from typing import Any
|
|
|
|
import stripe
|
|
from fastapi import APIRouter, Depends, HTTPException, Path, Request, Response
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app import crud, models, schemas
|
|
from app.api import deps
|
|
from app.core.config import settings
|
|
|
|
router = APIRouter()
|
|
|
|
# Initialize Stripe
|
|
stripe.api_key = settings.STRIPE_API_KEY
|
|
|
|
|
|
@router.post("/create-payment-intent/{order_id}", response_model=schemas.PaymentIntentResponse)
|
|
async def create_payment_intent(
|
|
*,
|
|
db: Session = Depends(deps.get_db),
|
|
order_id: int = Path(..., description="The ID of the order to create payment for"),
|
|
current_user: models.User = Depends(deps.get_current_active_user),
|
|
) -> Any:
|
|
"""
|
|
Create a Stripe payment intent for an order
|
|
"""
|
|
# Get the order
|
|
order = crud.order.get(db, id=order_id)
|
|
if not order:
|
|
raise HTTPException(status_code=404, detail="Order not found")
|
|
|
|
# Check permissions if not superuser
|
|
if not crud.user.is_superuser(current_user):
|
|
customer = crud.customer.get(db, id=order.customer_id)
|
|
if not customer or not customer.user_id or customer.user_id != current_user.id:
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Not enough permissions to create payment for this order"
|
|
)
|
|
|
|
# Check if payment is already made
|
|
if order.payment_status == "paid":
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Payment has already been processed for this order"
|
|
)
|
|
|
|
# Get or create Stripe customer
|
|
customer = crud.customer.get(db, id=order.customer_id)
|
|
if not customer:
|
|
raise HTTPException(status_code=404, detail="Customer not found")
|
|
|
|
stripe_customer_id = customer.stripe_customer_id
|
|
if not stripe_customer_id:
|
|
# Create a new Stripe customer
|
|
stripe_customer = stripe.Customer.create(
|
|
email=customer.email,
|
|
name=f"{customer.first_name} {customer.last_name}",
|
|
metadata={"customer_id": customer.id}
|
|
)
|
|
stripe_customer_id = stripe_customer["id"]
|
|
|
|
# Update the customer record with Stripe ID
|
|
crud.customer.update(db, db_obj=customer, obj_in={"stripe_customer_id": stripe_customer_id})
|
|
|
|
# Create a payment intent
|
|
try:
|
|
intent = stripe.PaymentIntent.create(
|
|
amount=int(order.total_amount * 100), # Convert to cents
|
|
currency="usd",
|
|
customer=stripe_customer_id,
|
|
metadata={
|
|
"order_id": order.id,
|
|
"order_number": order.order_number
|
|
}
|
|
)
|
|
|
|
# Update the order with the payment intent ID
|
|
crud.order.update_payment_status(
|
|
db, order_id=order.id, payment_intent_id=intent["id"], payment_status="unpaid"
|
|
)
|
|
|
|
# Return the client secret
|
|
return {
|
|
"client_secret": intent["client_secret"],
|
|
"payment_intent_id": intent["id"]
|
|
}
|
|
|
|
except stripe.error.StripeError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
|
|
@router.post("/webhook", status_code=200)
|
|
async def stripe_webhook(request: Request, response: Response) -> Any:
|
|
"""
|
|
Handle Stripe webhook events
|
|
"""
|
|
# Get the webhook body
|
|
payload = await request.body()
|
|
sig_header = request.headers.get("stripe-signature")
|
|
|
|
try:
|
|
event = stripe.Webhook.construct_event(
|
|
payload, sig_header, settings.STRIPE_WEBHOOK_SECRET
|
|
)
|
|
except ValueError:
|
|
# Invalid payload
|
|
response.status_code = 400
|
|
return {"error": "Invalid payload"}
|
|
except stripe.error.SignatureVerificationError:
|
|
# Invalid signature
|
|
response.status_code = 400
|
|
return {"error": "Invalid signature"}
|
|
|
|
# Handle the event
|
|
db = next(deps.get_db())
|
|
|
|
if event["type"] == "payment_intent.succeeded":
|
|
payment_intent = event["data"]["object"]
|
|
# Extract order ID from metadata
|
|
order_id = payment_intent["metadata"].get("order_id")
|
|
|
|
if order_id:
|
|
# Update order payment status
|
|
order = crud.order.get(db, id=int(order_id))
|
|
if order:
|
|
crud.order.update_payment_status(
|
|
db,
|
|
order_id=order.id,
|
|
payment_intent_id=payment_intent["id"],
|
|
payment_status="paid"
|
|
)
|
|
|
|
elif event["type"] == "payment_intent.payment_failed":
|
|
payment_intent = event["data"]["object"]
|
|
# Extract order ID from metadata
|
|
order_id = payment_intent["metadata"].get("order_id")
|
|
|
|
if order_id:
|
|
# Update order payment status
|
|
order = crud.order.get(db, id=int(order_id))
|
|
if order:
|
|
crud.order.update(
|
|
db,
|
|
db_obj=order,
|
|
obj_in={"status": "payment_failed"}
|
|
)
|
|
|
|
# You can handle more events here as needed
|
|
|
|
return {"status": "success"} |