
- Implemented comprehensive multi-tenant data isolation using database-level security - Built JWT authentication system with role-based access control (Super Admin, Org Admin, User, Viewer) - Created RESTful API endpoints for user and organization operations - Added complete audit logging for all data modifications with IP tracking - Implemented API rate limiting and input validation with security middleware - Built webhook processing engine with async event handling and retry logic - Created external API call handlers with circuit breaker pattern and error handling - Implemented data synchronization between external services and internal data - Added integration health monitoring and status tracking - Created three mock external services (User Management, Payment, Communication) - Implemented idempotency for webhook processing to handle duplicates gracefully - Added comprehensive security headers and XSS/CSRF protection - Set up Alembic database migrations with proper SQLite configuration - Included extensive documentation and API examples Architecture features: - Multi-tenant isolation at database level - Circuit breaker pattern for external API resilience - Async background task processing - Complete audit trail with user context - Role-based permission system - Webhook signature verification - Request validation and sanitization - Health monitoring endpoints Co-Authored-By: Claude <noreply@anthropic.com>
206 lines
6.5 KiB
Python
206 lines
6.5 KiB
Python
from fastapi import FastAPI, HTTPException
|
|
from pydantic import BaseModel
|
|
from typing import List, Dict, Any, Optional
|
|
from datetime import datetime
|
|
import uuid
|
|
from enum import Enum
|
|
|
|
app = FastAPI(title="Mock Communication Service", version="1.0.0")
|
|
|
|
class MessageType(str, Enum):
|
|
EMAIL = "email"
|
|
SMS = "sms"
|
|
PUSH_NOTIFICATION = "push_notification"
|
|
|
|
class MessageStatus(str, Enum):
|
|
PENDING = "pending"
|
|
SENT = "sent"
|
|
DELIVERED = "delivered"
|
|
BOUNCED = "bounced"
|
|
FAILED = "failed"
|
|
|
|
class EmailMessage(BaseModel):
|
|
id: Optional[str] = None
|
|
organization_id: int
|
|
to: str
|
|
from_email: str
|
|
subject: str
|
|
body: str
|
|
status: MessageStatus = MessageStatus.PENDING
|
|
created_at: Optional[datetime] = None
|
|
updated_at: Optional[datetime] = None
|
|
|
|
class SMSMessage(BaseModel):
|
|
id: Optional[str] = None
|
|
organization_id: int
|
|
to: str
|
|
message: str
|
|
status: MessageStatus = MessageStatus.PENDING
|
|
created_at: Optional[datetime] = None
|
|
updated_at: Optional[datetime] = None
|
|
|
|
class PushNotification(BaseModel):
|
|
id: Optional[str] = None
|
|
organization_id: int
|
|
user_id: str
|
|
title: str
|
|
body: str
|
|
status: MessageStatus = MessageStatus.PENDING
|
|
created_at: Optional[datetime] = None
|
|
updated_at: Optional[datetime] = None
|
|
|
|
# Mock data storage
|
|
messages_db: Dict[str, Dict[str, Any]] = {}
|
|
organization_communications: Dict[int, List[str]] = {}
|
|
|
|
@app.get("/health")
|
|
async def health_check():
|
|
return {"status": "healthy", "service": "communication", "timestamp": datetime.utcnow()}
|
|
|
|
@app.post("/emails", response_model=EmailMessage)
|
|
async def send_email(email: EmailMessage):
|
|
message_id = str(uuid.uuid4())
|
|
email.id = message_id
|
|
email.created_at = datetime.utcnow()
|
|
|
|
# Simulate email sending
|
|
import random
|
|
delivery_rate = 0.92 # 92% delivery rate
|
|
|
|
if random.random() < delivery_rate:
|
|
email.status = MessageStatus.DELIVERED
|
|
event_type = "email.delivered"
|
|
else:
|
|
email.status = MessageStatus.BOUNCED
|
|
event_type = "email.bounced"
|
|
|
|
email.updated_at = datetime.utcnow()
|
|
messages_db[message_id] = {**email.dict(), "type": MessageType.EMAIL}
|
|
|
|
# Add to organization communications
|
|
if email.organization_id not in organization_communications:
|
|
organization_communications[email.organization_id] = []
|
|
organization_communications[email.organization_id].append(message_id)
|
|
|
|
# Send webhook (simulated)
|
|
await send_webhook(event_type, email.dict())
|
|
|
|
return email
|
|
|
|
@app.post("/sms", response_model=SMSMessage)
|
|
async def send_sms(sms: SMSMessage):
|
|
message_id = str(uuid.uuid4())
|
|
sms.id = message_id
|
|
sms.created_at = datetime.utcnow()
|
|
|
|
# Simulate SMS sending
|
|
import random
|
|
delivery_rate = 0.95 # 95% delivery rate
|
|
|
|
if random.random() < delivery_rate:
|
|
sms.status = MessageStatus.DELIVERED
|
|
event_type = "sms.delivered"
|
|
else:
|
|
sms.status = MessageStatus.FAILED
|
|
event_type = "sms.failed"
|
|
|
|
sms.updated_at = datetime.utcnow()
|
|
messages_db[message_id] = {**sms.dict(), "type": MessageType.SMS}
|
|
|
|
# Add to organization communications
|
|
if sms.organization_id not in organization_communications:
|
|
organization_communications[sms.organization_id] = []
|
|
organization_communications[sms.organization_id].append(message_id)
|
|
|
|
# Send webhook (simulated)
|
|
await send_webhook(event_type, sms.dict())
|
|
|
|
return sms
|
|
|
|
@app.post("/notifications", response_model=PushNotification)
|
|
async def send_notification(notification: PushNotification):
|
|
message_id = str(uuid.uuid4())
|
|
notification.id = message_id
|
|
notification.created_at = datetime.utcnow()
|
|
|
|
# Simulate push notification sending
|
|
import random
|
|
delivery_rate = 0.88 # 88% delivery rate
|
|
|
|
if random.random() < delivery_rate:
|
|
notification.status = MessageStatus.DELIVERED
|
|
event_type = "notification.delivered"
|
|
else:
|
|
notification.status = MessageStatus.FAILED
|
|
event_type = "notification.failed"
|
|
|
|
notification.updated_at = datetime.utcnow()
|
|
messages_db[message_id] = {**notification.dict(), "type": MessageType.PUSH_NOTIFICATION}
|
|
|
|
# Add to organization communications
|
|
if notification.organization_id not in organization_communications:
|
|
organization_communications[notification.organization_id] = []
|
|
organization_communications[notification.organization_id].append(message_id)
|
|
|
|
# Send webhook (simulated)
|
|
await send_webhook(event_type, notification.dict())
|
|
|
|
return notification
|
|
|
|
@app.get("/messages/{message_id}/status")
|
|
async def get_delivery_status(message_id: str):
|
|
if message_id not in messages_db:
|
|
raise HTTPException(status_code=404, detail="Message not found")
|
|
|
|
message = messages_db[message_id]
|
|
return {
|
|
"message_id": message_id,
|
|
"status": message["status"],
|
|
"type": message["type"],
|
|
"updated_at": message["updated_at"]
|
|
}
|
|
|
|
@app.get("/organizations/{organization_id}/communications")
|
|
async def get_communication_history(organization_id: int):
|
|
if organization_id not in organization_communications:
|
|
return {"messages": [], "total_messages": 0}
|
|
|
|
org_messages = []
|
|
for message_id in organization_communications[organization_id]:
|
|
if message_id in messages_db:
|
|
org_messages.append(messages_db[message_id])
|
|
|
|
# Group by type
|
|
emails = [m for m in org_messages if m.get("type") == MessageType.EMAIL]
|
|
sms_messages = [m for m in org_messages if m.get("type") == MessageType.SMS]
|
|
notifications = [m for m in org_messages if m.get("type") == MessageType.PUSH_NOTIFICATION]
|
|
|
|
return {
|
|
"emails": emails,
|
|
"sms_messages": sms_messages,
|
|
"notifications": notifications,
|
|
"total_messages": len(org_messages),
|
|
"summary": {
|
|
"emails_sent": len(emails),
|
|
"sms_sent": len(sms_messages),
|
|
"notifications_sent": len(notifications)
|
|
}
|
|
}
|
|
|
|
async def send_webhook(event_type: str, data: Dict[str, Any]):
|
|
"""Simulate sending webhook to main service"""
|
|
webhook_data = {
|
|
"event_id": str(uuid.uuid4()),
|
|
"event_type": event_type,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"data": data
|
|
}
|
|
|
|
# In a real implementation, this would send HTTP request to main service
|
|
print(f"Webhook sent: {event_type} - {webhook_data['event_id']}")
|
|
|
|
return webhook_data
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8003) |