const logger = require("../utils/logger"); /** * SSE Service for Server-Sent Events * Manages SSE connections, topic subscriptions, and broadcasting */ class SSEService { constructor() { // Map of userId -> {res: Response, topics: Set} this.clients = new Map(); // Map of topicName -> Set this.topics = new Map(); } /** * Add a client connection * @param {string} userId - User ID * @param {Response} res - Express response object */ addClient(userId, res) { // Remove existing client if any this.removeClient(userId); this.clients.set(userId, { res, topics: new Set(), }); logger.info(`SSE client added`, { userId, totalClients: this.clients.size }); } /** * Remove a client connection * @param {string} userId - User ID */ removeClient(userId) { const client = this.clients.get(userId); if (!client) { return; } // Unsubscribe from all topics client.topics.forEach((topic) => { const topicSubscribers = this.topics.get(topic); if (topicSubscribers) { topicSubscribers.delete(userId); if (topicSubscribers.size === 0) { this.topics.delete(topic); } } }); this.clients.delete(userId); logger.info(`SSE client removed`, { userId, totalClients: this.clients.size }); } /** * Subscribe a user to topics * @param {string} userId - User ID * @param {string[]} topicList - Array of topic names */ subscribe(userId, topicList) { const client = this.clients.get(userId); if (!client) { logger.warn(`Cannot subscribe: client not found`, { userId }); return false; } topicList.forEach((topic) => { // Add to client's topics client.topics.add(topic); // Add to topic's subscribers if (!this.topics.has(topic)) { this.topics.set(topic, new Set()); } this.topics.get(topic).add(userId); }); logger.info(`User subscribed to topics`, { userId, topics: topicList }); return true; } /** * Unsubscribe a user from topics * @param {string} userId - User ID * @param {string[]} topicList - Array of topic names */ unsubscribe(userId, topicList) { const client = this.clients.get(userId); if (!client) { logger.warn(`Cannot unsubscribe: client not found`, { userId }); return false; } topicList.forEach((topic) => { // Remove from client's topics client.topics.delete(topic); // Remove from topic's subscribers const topicSubscribers = this.topics.get(topic); if (topicSubscribers) { topicSubscribers.delete(userId); if (topicSubscribers.size === 0) { this.topics.delete(topic); } } }); logger.info(`User unsubscribed from topics`, { userId, topics: topicList }); return true; } /** * Broadcast an event to all connected clients * @param {string} eventType - Event type * @param {object} data - Event data */ broadcast(eventType, data) { const message = `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`; let sentCount = 0; this.clients.forEach((client, userId) => { try { client.res.write(message); sentCount++; } catch (error) { logger.error(`Failed to send SSE to user`, { userId, error: error.message }); this.removeClient(userId); } }); logger.debug(`Broadcast event`, { eventType, recipients: sentCount }); } /** * Broadcast an event to all subscribers of a topic * @param {string} topic - Topic name * @param {string} eventType - Event type * @param {object} data - Event data */ broadcastToTopic(topic, eventType, data) { const subscribers = this.topics.get(topic); if (!subscribers || subscribers.size === 0) { logger.debug(`No subscribers for topic`, { topic }); return; } const message = `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`; let sentCount = 0; subscribers.forEach((userId) => { const client = this.clients.get(userId); if (!client) { return; } try { client.res.write(message); sentCount++; } catch (error) { logger.error(`Failed to send SSE to user`, { userId, error: error.message }); this.removeClient(userId); } }); logger.debug(`Broadcast to topic`, { topic, eventType, recipients: sentCount }); } /** * Send an event to a specific user * @param {string} userId - User ID * @param {string} eventType - Event type * @param {object} data - Event data */ sendToUser(userId, eventType, data) { const client = this.clients.get(userId); if (!client) { logger.debug(`User not connected`, { userId }); return false; } const message = `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`; try { client.res.write(message); logger.debug(`Sent event to user`, { userId, eventType }); return true; } catch (error) { logger.error(`Failed to send SSE to user`, { userId, error: error.message }); this.removeClient(userId); return false; } } /** * Get service statistics * @returns {object} Stats object */ getStats() { const topicStats = {}; this.topics.forEach((subscribers, topic) => { topicStats[topic] = subscribers.size; }); return { totalClients: this.clients.size, totalTopics: this.topics.size, topics: topicStats, }; } } // Export singleton instance module.exports = new SSEService();