from typing import Dict, Any from fastapi import WebSocket from sqlalchemy.orm import Session from app.websocket.connection_manager import connection_manager from app.models import Message, Chat, User, ChatMember, Mention, MessageType, MessageStatus from app.services.notification_service import notification_service from app.db.session import SessionLocal class ChatHandler: async def handle_message(self, websocket: WebSocket, data: Dict[str, Any]): """Handle incoming chat messages""" user_id = connection_manager.connection_user_map.get(websocket) if not user_id: return db = SessionLocal() try: message_type = data.get("type") if message_type == "send_message": await self._handle_send_message(user_id, data, db) elif message_type == "typing_start": await self._handle_typing_start(user_id, data, db) elif message_type == "typing_stop": await self._handle_typing_stop(user_id, data, db) elif message_type == "message_read": await self._handle_message_read(user_id, data, db) elif message_type == "join_chat": await self._handle_join_chat(user_id, data, db) elif message_type == "leave_chat": await self._handle_leave_chat(user_id, data, db) finally: db.close() async def _handle_send_message(self, user_id: int, data: Dict[str, Any], db: Session): """Handle sending a new message""" chat_id = data.get("chat_id") content = data.get("content", "") reply_to_id = data.get("reply_to_id") message_type = data.get("message_type", "text") # Verify user is member of chat member = db.query(ChatMember).filter( ChatMember.chat_id == chat_id, ChatMember.user_id == user_id ).first() if not member: return # Get chat and sender info chat = db.query(Chat).filter(Chat.id == chat_id).first() sender = db.query(User).filter(User.id == user_id).first() if not chat or not sender: return # Process mentions mentions = self._extract_mentions(content, db) # Encrypt message content if recipients have public keys encrypted_content = await self._encrypt_for_recipients(content, chat_id, db) # Create message message = Message( chat_id=chat_id, sender_id=user_id, reply_to_id=reply_to_id, content=encrypted_content, content_type=getattr(MessageType, message_type.upper()), status=MessageStatus.SENT ) db.add(message) db.commit() db.refresh(message) # Create mention records for mentioned_user in mentions: mention = Mention( message_id=message.id, mentioned_user_id=mentioned_user.id ) db.add(mention) if mentions: db.commit() # Prepare message data for broadcast message_data = { "type": "new_message", "message": { "id": message.id, "chat_id": chat_id, "sender_id": user_id, "sender_username": sender.username, "sender_avatar": sender.avatar_url, "content": content, # Send unencrypted for real-time display "content_type": message_type, "reply_to_id": reply_to_id, "created_at": message.created_at.isoformat(), "mentions": [{"id": u.id, "username": u.username} for u in mentions] } } # Broadcast to chat members await connection_manager.send_to_chat(message_data, chat_id, exclude_user_id=user_id) # Send delivery confirmation to sender await connection_manager.send_personal_message({ "type": "message_sent", "message_id": message.id, "chat_id": chat_id, "status": "sent" }, user_id) # Send push notifications for mentions for mentioned_user in mentions: await notification_service.send_mention_notification( mentioned_user.id, sender.username, content[:100], chat_id, message.id ) async def _handle_typing_start(self, user_id: int, data: Dict[str, Any], db: Session): """Handle typing indicator start""" chat_id = data.get("chat_id") user = db.query(User).filter(User.id == user_id).first() typing_data = { "type": "typing_start", "chat_id": chat_id, "user_id": user_id, "username": user.username if user else "Unknown" } await connection_manager.send_to_chat(typing_data, chat_id, exclude_user_id=user_id) async def _handle_typing_stop(self, user_id: int, data: Dict[str, Any], db: Session): """Handle typing indicator stop""" chat_id = data.get("chat_id") typing_data = { "type": "typing_stop", "chat_id": chat_id, "user_id": user_id } await connection_manager.send_to_chat(typing_data, chat_id, exclude_user_id=user_id) async def _handle_message_read(self, user_id: int, data: Dict[str, Any], db: Session): """Handle message read receipt""" message_id = data.get("message_id") chat_id = data.get("chat_id") # Update message status message = db.query(Message).filter(Message.id == message_id).first() if message and message.sender_id != user_id: message.status = MessageStatus.READ db.commit() # Notify sender read_receipt = { "type": "message_read", "message_id": message_id, "chat_id": chat_id, "read_by_user_id": user_id } await connection_manager.send_personal_message(read_receipt, message.sender_id) async def _handle_join_chat(self, user_id: int, data: Dict[str, Any], db: Session): """Handle user joining chat room (for real-time updates)""" chat_id = data.get("chat_id") connection_manager.add_user_to_chat(user_id, chat_id) async def _handle_leave_chat(self, user_id: int, data: Dict[str, Any], db: Session): """Handle user leaving chat room""" chat_id = data.get("chat_id") connection_manager.remove_user_from_chat(user_id, chat_id) def _extract_mentions(self, content: str, db: Session) -> list: """Extract mentioned users from message content""" import re mentions = [] # Find @username patterns mention_pattern = r'@(\w+)' usernames = re.findall(mention_pattern, content) for username in usernames: user = db.query(User).filter(User.username == username).first() if user: mentions.append(user) return mentions async def _encrypt_for_recipients(self, content: str, chat_id: int, db: Session) -> str: """Encrypt message for chat recipients (simplified E2E)""" # For now, return content as-is # In a full implementation, you'd encrypt for each recipient's public key return content # Global chat handler instance chat_handler = ChatHandler()