
- Implemented comprehensive multi-tenant data isolation using database-level security - Built JWT authentication system with role-based access control (Super Admin, Org Admin, User, Viewer) - Created RESTful API endpoints for user and organization operations - Added complete audit logging for all data modifications with IP tracking - Implemented API rate limiting and input validation with security middleware - Built webhook processing engine with async event handling and retry logic - Created external API call handlers with circuit breaker pattern and error handling - Implemented data synchronization between external services and internal data - Added integration health monitoring and status tracking - Created three mock external services (User Management, Payment, Communication) - Implemented idempotency for webhook processing to handle duplicates gracefully - Added comprehensive security headers and XSS/CSRF protection - Set up Alembic database migrations with proper SQLite configuration - Included extensive documentation and API examples Architecture features: - Multi-tenant isolation at database level - Circuit breaker pattern for external API resilience - Async background task processing - Complete audit trail with user context - Role-based permission system - Webhook signature verification - Request validation and sanitization - Health monitoring endpoints Co-Authored-By: Claude <noreply@anthropic.com>
117 lines
3.8 KiB
Python
117 lines
3.8 KiB
Python
from fastapi import APIRouter, Depends, BackgroundTasks
|
|
from sqlalchemy.orm import Session
|
|
from app.core.deps import get_db, require_roles
|
|
from app.models.user import User, UserRole
|
|
from app.services.integration import IntegrationService
|
|
from app.middleware.rate_limit import limiter
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/")
|
|
async def get_integrations(
|
|
current_user: User = Depends(require_roles([UserRole.ORG_ADMIN, UserRole.SUPER_ADMIN])),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Get all integrations for current organization"""
|
|
integration_service = IntegrationService(db)
|
|
integrations = integration_service.get_integrations(current_user.organization_id)
|
|
|
|
return [
|
|
{
|
|
"id": integration.id,
|
|
"name": integration.name,
|
|
"type": integration.type.value,
|
|
"endpoint_url": integration.endpoint_url,
|
|
"is_active": integration.is_active,
|
|
"last_sync": integration.last_sync,
|
|
"created_at": integration.created_at
|
|
}
|
|
for integration in integrations
|
|
]
|
|
|
|
|
|
@router.post("/sync/users")
|
|
@limiter.limit("5/minute")
|
|
async def sync_user_data(
|
|
background_tasks: BackgroundTasks,
|
|
current_user: User = Depends(require_roles([UserRole.ORG_ADMIN, UserRole.SUPER_ADMIN])),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Trigger user data synchronization"""
|
|
integration_service = IntegrationService(db)
|
|
|
|
# Run sync in background
|
|
background_tasks.add_task(
|
|
integration_service.sync_user_data,
|
|
current_user.organization_id
|
|
)
|
|
|
|
return {"message": "User data sync initiated", "status": "started"}
|
|
|
|
|
|
@router.post("/sync/payments")
|
|
@limiter.limit("5/minute")
|
|
async def sync_payment_data(
|
|
background_tasks: BackgroundTasks,
|
|
current_user: User = Depends(require_roles([UserRole.ORG_ADMIN, UserRole.SUPER_ADMIN])),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Trigger payment data synchronization"""
|
|
integration_service = IntegrationService(db)
|
|
|
|
# Run sync in background
|
|
background_tasks.add_task(
|
|
integration_service.sync_payment_data,
|
|
current_user.organization_id
|
|
)
|
|
|
|
return {"message": "Payment data sync initiated", "status": "started"}
|
|
|
|
|
|
@router.post("/sync/communications")
|
|
@limiter.limit("5/minute")
|
|
async def sync_communication_data(
|
|
background_tasks: BackgroundTasks,
|
|
current_user: User = Depends(require_roles([UserRole.ORG_ADMIN, UserRole.SUPER_ADMIN])),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Trigger communication data synchronization"""
|
|
integration_service = IntegrationService(db)
|
|
|
|
# Run sync in background
|
|
background_tasks.add_task(
|
|
integration_service.sync_communication_data,
|
|
current_user.organization_id
|
|
)
|
|
|
|
return {"message": "Communication data sync initiated", "status": "started"}
|
|
|
|
|
|
@router.post("/sync/all")
|
|
@limiter.limit("2/minute")
|
|
async def sync_all_data(
|
|
background_tasks: BackgroundTasks,
|
|
current_user: User = Depends(require_roles([UserRole.ORG_ADMIN, UserRole.SUPER_ADMIN])),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Trigger full data synchronization from all external services"""
|
|
integration_service = IntegrationService(db)
|
|
|
|
# Run full sync in background
|
|
background_tasks.add_task(
|
|
integration_service.full_sync,
|
|
current_user.organization_id
|
|
)
|
|
|
|
return {"message": "Full data sync initiated", "status": "started"}
|
|
|
|
|
|
@router.get("/health")
|
|
async def check_integrations_health(
|
|
current_user: User = Depends(require_roles([UserRole.ORG_ADMIN, UserRole.SUPER_ADMIN])),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Check health status of all integrations"""
|
|
integration_service = IntegrationService(db)
|
|
return await integration_service.check_all_integrations_health(current_user.organization_id) |