Automated Action f2b2889ea1 Implement complete Stripe payment integration service
- Created FastAPI application with Stripe payment processing
- Added payment intent creation, retrieval, and webhook handling
- Implemented SQLite database with payments and payment_methods tables
- Set up Alembic for database migrations
- Added comprehensive API endpoints for payment management
- Configured CORS middleware for cross-origin requests
- Added health check and service information endpoints
- Created complete project documentation with setup instructions
- Integrated Ruff for code formatting and linting
2025-06-20 12:23:34 +00:00

157 lines
5.0 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy.orm import Session
from typing import Optional, Dict, Any
from pydantic import BaseModel, EmailStr
import stripe
import json
from app.db.session import get_db
from app.services.stripe_service import StripeService
from app.models.payment import Payment
router = APIRouter(prefix="/payments", tags=["payments"])
class CreatePaymentIntentRequest(BaseModel):
amount: int # Amount in cents
currency: str = "usd"
customer_email: Optional[EmailStr] = None
customer_name: Optional[str] = None
description: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
class PaymentIntentResponse(BaseModel):
client_secret: str
payment_intent_id: str
amount: int
currency: str
status: str
class PaymentResponse(BaseModel):
id: int
stripe_payment_intent_id: str
amount: float
currency: str
status: str
customer_email: Optional[str]
customer_name: Optional[str]
description: Optional[str]
created_at: str
@router.post("/create-payment-intent", response_model=PaymentIntentResponse)
async def create_payment_intent(
request: CreatePaymentIntentRequest, db: Session = Depends(get_db)
):
"""Create a new payment intent"""
try:
stripe_service = StripeService()
# Create payment intent with Stripe
payment_intent = stripe_service.create_payment_intent(
amount=request.amount,
currency=request.currency,
customer_email=request.customer_email,
customer_name=request.customer_name,
description=request.description,
metadata=request.metadata,
)
# Save to database
stripe_service.save_payment_to_db(
db=db,
payment_intent=payment_intent,
customer_email=request.customer_email,
customer_name=request.customer_name,
)
return PaymentIntentResponse(
client_secret=payment_intent.client_secret,
payment_intent_id=payment_intent.id,
amount=payment_intent.amount,
currency=payment_intent.currency,
status=payment_intent.status,
)
except stripe.error.StripeError as e:
raise HTTPException(status_code=400, detail=f"Stripe error: {str(e)}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.get("/payment-intent/{payment_intent_id}", response_model=PaymentIntentResponse)
async def get_payment_intent(payment_intent_id: str):
"""Retrieve a payment intent"""
try:
stripe_service = StripeService()
payment_intent = stripe_service.retrieve_payment_intent(payment_intent_id)
return PaymentIntentResponse(
client_secret=payment_intent.client_secret,
payment_intent_id=payment_intent.id,
amount=payment_intent.amount,
currency=payment_intent.currency,
status=payment_intent.status,
)
except stripe.error.StripeError as e:
raise HTTPException(status_code=400, detail=f"Stripe error: {str(e)}")
@router.get("/payment/{payment_intent_id}", response_model=PaymentResponse)
async def get_payment(payment_intent_id: str, db: Session = Depends(get_db)):
"""Get payment details from database"""
payment = (
db.query(Payment)
.filter(Payment.stripe_payment_intent_id == payment_intent_id)
.first()
)
if not payment:
raise HTTPException(status_code=404, detail="Payment not found")
return PaymentResponse(
id=payment.id,
stripe_payment_intent_id=payment.stripe_payment_intent_id,
amount=payment.amount,
currency=payment.currency,
status=payment.status,
customer_email=payment.customer_email,
customer_name=payment.customer_name,
description=payment.description,
created_at=payment.created_at.isoformat() if payment.created_at else "",
)
@router.post("/webhook")
async def stripe_webhook(request: Request, db: Session = Depends(get_db)):
"""Handle Stripe webhooks"""
payload = await request.body()
sig_header = request.headers.get("stripe-signature")
if not sig_header:
raise HTTPException(status_code=400, detail="Missing Stripe signature")
stripe_service = StripeService()
# Verify webhook signature
if not stripe_service.verify_webhook_signature(payload, sig_header):
raise HTTPException(status_code=400, detail="Invalid webhook signature")
try:
event = json.loads(payload)
# Handle the event
if stripe_service.handle_webhook_event(db, event):
return {"status": "success"}
else:
return {"status": "unhandled_event_type"}
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON payload")
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Webhook processing error: {str(e)}"
)