from sqlalchemy.orm import Session from typing import Dict, Any, List, Optional from datetime import datetime import asyncio import logging from app.models.integration import ExternalIntegration, IntegrationHealth, IntegrationType from app.integrations.external_apis.client import ( UserServiceClient, PaymentServiceClient, CommunicationServiceClient ) from app.services.audit import AuditService logger = logging.getLogger(__name__) class IntegrationService: def __init__(self, db: Session): self.db = db self.audit_service = AuditService(db) # Initialize external service clients self.user_client = UserServiceClient() self.payment_client = PaymentServiceClient() self.communication_client = CommunicationServiceClient() def get_integrations(self, organization_id: int) -> List[ExternalIntegration]: """Get all integrations for an organization""" return self.db.query(ExternalIntegration).filter( ExternalIntegration.organization_id == organization_id ).all() def create_integration( self, organization_id: int, name: str, integration_type: IntegrationType, endpoint_url: str, api_key: Optional[str] = None, config: Optional[Dict[str, Any]] = None ) -> ExternalIntegration: """Create a new external integration""" integration = ExternalIntegration( organization_id=organization_id, name=name, type=integration_type, endpoint_url=endpoint_url, api_key=api_key, config=str(config) if config else None ) self.db.add(integration) self.db.commit() self.db.refresh(integration) # Log integration creation self.audit_service.log_action( organization_id=organization_id, action="create", resource_type="external_integration", resource_id=str(integration.id), details={ "integration_name": name, "integration_type": integration_type.value, "endpoint_url": endpoint_url } ) return integration async def sync_user_data(self, organization_id: int) -> Dict[str, Any]: """Sync user data from external user service""" try: logger.info(f"Starting user data sync for organization {organization_id}") # Get users from external service external_users = await self.user_client.sync_users(organization_id) # Process and sync users (simplified for demo) synced_count = 0 for user_data in external_users: # Here you would implement the actual sync logic # For demo, we'll just count them synced_count += 1 # Log sync activity self.audit_service.log_action( organization_id=organization_id, action="update", resource_type="user_sync", details={ "synced_users": synced_count, "sync_timestamp": datetime.utcnow().isoformat() } ) return { "status": "success", "synced_users": synced_count, "timestamp": datetime.utcnow() } except Exception as e: logger.error(f"User sync failed for organization {organization_id}: {str(e)}") # Log sync failure self.audit_service.log_action( organization_id=organization_id, action="update", resource_type="user_sync", details={ "status": "failed", "error": str(e), "sync_timestamp": datetime.utcnow().isoformat() } ) return { "status": "failed", "error": str(e), "timestamp": datetime.utcnow() } async def sync_payment_data(self, organization_id: int) -> Dict[str, Any]: """Sync payment data from external payment service""" try: logger.info(f"Starting payment data sync for organization {organization_id}") # Get billing history from external service billing_history = await self.payment_client.get_billing_history(organization_id) # Process and sync payment data (simplified for demo) synced_count = len(billing_history) # Log sync activity self.audit_service.log_action( organization_id=organization_id, action="update", resource_type="payment_sync", details={ "synced_payments": synced_count, "sync_timestamp": datetime.utcnow().isoformat() } ) return { "status": "success", "synced_payments": synced_count, "timestamp": datetime.utcnow() } except Exception as e: logger.error(f"Payment sync failed for organization {organization_id}: {str(e)}") # Log sync failure self.audit_service.log_action( organization_id=organization_id, action="update", resource_type="payment_sync", details={ "status": "failed", "error": str(e), "sync_timestamp": datetime.utcnow().isoformat() } ) return { "status": "failed", "error": str(e), "timestamp": datetime.utcnow() } async def sync_communication_data(self, organization_id: int) -> Dict[str, Any]: """Sync communication data from external communication service""" try: logger.info(f"Starting communication data sync for organization {organization_id}") # Get communication history from external service comm_history = await self.communication_client.get_communication_history(organization_id) # Process and sync communication data (simplified for demo) synced_count = len(comm_history) # Log sync activity self.audit_service.log_action( organization_id=organization_id, action="update", resource_type="communication_sync", details={ "synced_communications": synced_count, "sync_timestamp": datetime.utcnow().isoformat() } ) return { "status": "success", "synced_communications": synced_count, "timestamp": datetime.utcnow() } except Exception as e: logger.error(f"Communication sync failed for organization {organization_id}: {str(e)}") # Log sync failure self.audit_service.log_action( organization_id=organization_id, action="update", resource_type="communication_sync", details={ "status": "failed", "error": str(e), "sync_timestamp": datetime.utcnow().isoformat() } ) return { "status": "failed", "error": str(e), "timestamp": datetime.utcnow() } async def full_sync(self, organization_id: int) -> Dict[str, Any]: """Perform full data synchronization from all external services""" logger.info(f"Starting full sync for organization {organization_id}") # Run all syncs concurrently user_sync_task = asyncio.create_task(self.sync_user_data(organization_id)) payment_sync_task = asyncio.create_task(self.sync_payment_data(organization_id)) comm_sync_task = asyncio.create_task(self.sync_communication_data(organization_id)) # Wait for all syncs to complete user_result = await user_sync_task payment_result = await payment_sync_task comm_result = await comm_sync_task return { "status": "completed", "user_sync": user_result, "payment_sync": payment_result, "communication_sync": comm_result, "timestamp": datetime.utcnow() } async def check_integration_health(self, integration: ExternalIntegration) -> Dict[str, Any]: """Check health of a specific integration""" start_time = datetime.utcnow() try: # Simple health check - try to make a basic request if integration.type == IntegrationType.USER_MANAGEMENT: # Try to get health status from user service await self.user_client.get("/health") elif integration.type == IntegrationType.PAYMENT: # Try to get health status from payment service await self.payment_client.get("/health") elif integration.type == IntegrationType.COMMUNICATION: # Try to get health status from communication service await self.communication_client.get("/health") response_time = int((datetime.utcnow() - start_time).total_seconds() * 1000) # Record health check health_record = IntegrationHealth( integration_id=integration.id, status="healthy", response_time=response_time ) self.db.add(health_record) self.db.commit() return { "status": "healthy", "response_time": response_time, "timestamp": datetime.utcnow() } except Exception as e: response_time = int((datetime.utcnow() - start_time).total_seconds() * 1000) # Record health check failure health_record = IntegrationHealth( integration_id=integration.id, status="unhealthy", response_time=response_time, error_message=str(e) ) self.db.add(health_record) self.db.commit() return { "status": "unhealthy", "error": str(e), "response_time": response_time, "timestamp": datetime.utcnow() } async def check_all_integrations_health(self, organization_id: int) -> Dict[str, Any]: """Check health of all integrations for an organization""" integrations = self.get_integrations(organization_id) health_results = {} for integration in integrations: if integration.is_active: health_result = await self.check_integration_health(integration) health_results[integration.name] = health_result return { "organization_id": organization_id, "integrations": health_results, "timestamp": datetime.utcnow() }