Automated Action 2adbcd0535 Complete multi-tenant SaaS platform with external integrations
- Implemented comprehensive multi-tenant data isolation using database-level security
- Built JWT authentication system with role-based access control (Super Admin, Org Admin, User, Viewer)
- Created RESTful API endpoints for user and organization operations
- Added complete audit logging for all data modifications with IP tracking
- Implemented API rate limiting and input validation with security middleware
- Built webhook processing engine with async event handling and retry logic
- Created external API call handlers with circuit breaker pattern and error handling
- Implemented data synchronization between external services and internal data
- Added integration health monitoring and status tracking
- Created three mock external services (User Management, Payment, Communication)
- Implemented idempotency for webhook processing to handle duplicates gracefully
- Added comprehensive security headers and XSS/CSRF protection
- Set up Alembic database migrations with proper SQLite configuration
- Included extensive documentation and API examples

Architecture features:
- Multi-tenant isolation at database level
- Circuit breaker pattern for external API resilience
- Async background task processing
- Complete audit trail with user context
- Role-based permission system
- Webhook signature verification
- Request validation and sanitization
- Health monitoring endpoints

Co-Authored-By: Claude <noreply@anthropic.com>
2025-06-27 21:14:30 +00:00

206 lines
6.5 KiB
Python

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
from datetime import datetime
import uuid
from enum import Enum
app = FastAPI(title="Mock Communication Service", version="1.0.0")
class MessageType(str, Enum):
EMAIL = "email"
SMS = "sms"
PUSH_NOTIFICATION = "push_notification"
class MessageStatus(str, Enum):
PENDING = "pending"
SENT = "sent"
DELIVERED = "delivered"
BOUNCED = "bounced"
FAILED = "failed"
class EmailMessage(BaseModel):
id: Optional[str] = None
organization_id: int
to: str
from_email: str
subject: str
body: str
status: MessageStatus = MessageStatus.PENDING
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class SMSMessage(BaseModel):
id: Optional[str] = None
organization_id: int
to: str
message: str
status: MessageStatus = MessageStatus.PENDING
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class PushNotification(BaseModel):
id: Optional[str] = None
organization_id: int
user_id: str
title: str
body: str
status: MessageStatus = MessageStatus.PENDING
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
# Mock data storage
messages_db: Dict[str, Dict[str, Any]] = {}
organization_communications: Dict[int, List[str]] = {}
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "communication", "timestamp": datetime.utcnow()}
@app.post("/emails", response_model=EmailMessage)
async def send_email(email: EmailMessage):
message_id = str(uuid.uuid4())
email.id = message_id
email.created_at = datetime.utcnow()
# Simulate email sending
import random
delivery_rate = 0.92 # 92% delivery rate
if random.random() < delivery_rate:
email.status = MessageStatus.DELIVERED
event_type = "email.delivered"
else:
email.status = MessageStatus.BOUNCED
event_type = "email.bounced"
email.updated_at = datetime.utcnow()
messages_db[message_id] = {**email.dict(), "type": MessageType.EMAIL}
# Add to organization communications
if email.organization_id not in organization_communications:
organization_communications[email.organization_id] = []
organization_communications[email.organization_id].append(message_id)
# Send webhook (simulated)
await send_webhook(event_type, email.dict())
return email
@app.post("/sms", response_model=SMSMessage)
async def send_sms(sms: SMSMessage):
message_id = str(uuid.uuid4())
sms.id = message_id
sms.created_at = datetime.utcnow()
# Simulate SMS sending
import random
delivery_rate = 0.95 # 95% delivery rate
if random.random() < delivery_rate:
sms.status = MessageStatus.DELIVERED
event_type = "sms.delivered"
else:
sms.status = MessageStatus.FAILED
event_type = "sms.failed"
sms.updated_at = datetime.utcnow()
messages_db[message_id] = {**sms.dict(), "type": MessageType.SMS}
# Add to organization communications
if sms.organization_id not in organization_communications:
organization_communications[sms.organization_id] = []
organization_communications[sms.organization_id].append(message_id)
# Send webhook (simulated)
await send_webhook(event_type, sms.dict())
return sms
@app.post("/notifications", response_model=PushNotification)
async def send_notification(notification: PushNotification):
message_id = str(uuid.uuid4())
notification.id = message_id
notification.created_at = datetime.utcnow()
# Simulate push notification sending
import random
delivery_rate = 0.88 # 88% delivery rate
if random.random() < delivery_rate:
notification.status = MessageStatus.DELIVERED
event_type = "notification.delivered"
else:
notification.status = MessageStatus.FAILED
event_type = "notification.failed"
notification.updated_at = datetime.utcnow()
messages_db[message_id] = {**notification.dict(), "type": MessageType.PUSH_NOTIFICATION}
# Add to organization communications
if notification.organization_id not in organization_communications:
organization_communications[notification.organization_id] = []
organization_communications[notification.organization_id].append(message_id)
# Send webhook (simulated)
await send_webhook(event_type, notification.dict())
return notification
@app.get("/messages/{message_id}/status")
async def get_delivery_status(message_id: str):
if message_id not in messages_db:
raise HTTPException(status_code=404, detail="Message not found")
message = messages_db[message_id]
return {
"message_id": message_id,
"status": message["status"],
"type": message["type"],
"updated_at": message["updated_at"]
}
@app.get("/organizations/{organization_id}/communications")
async def get_communication_history(organization_id: int):
if organization_id not in organization_communications:
return {"messages": [], "total_messages": 0}
org_messages = []
for message_id in organization_communications[organization_id]:
if message_id in messages_db:
org_messages.append(messages_db[message_id])
# Group by type
emails = [m for m in org_messages if m.get("type") == MessageType.EMAIL]
sms_messages = [m for m in org_messages if m.get("type") == MessageType.SMS]
notifications = [m for m in org_messages if m.get("type") == MessageType.PUSH_NOTIFICATION]
return {
"emails": emails,
"sms_messages": sms_messages,
"notifications": notifications,
"total_messages": len(org_messages),
"summary": {
"emails_sent": len(emails),
"sms_sent": len(sms_messages),
"notifications_sent": len(notifications)
}
}
async def send_webhook(event_type: str, data: Dict[str, Any]):
"""Simulate sending webhook to main service"""
webhook_data = {
"event_id": str(uuid.uuid4()),
"event_type": event_type,
"timestamp": datetime.utcnow().isoformat(),
"data": data
}
# In a real implementation, this would send HTTP request to main service
print(f"Webhook sent: {event_type} - {webhook_data['event_id']}")
return webhook_data
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8003)