- Replace Socket.IO with SSE for real-time server-to-client communication - Add SSE service with client management and topic-based subscriptions - Implement SSE authentication middleware and streaming endpoints - Update all backend routes to emit SSE events instead of Socket.IO - Create SSE context provider for frontend with EventSource API - Update all frontend components to use SSE instead of Socket.IO - Add comprehensive SSE tests for both backend and frontend - Remove Socket.IO dependencies and legacy files - Update documentation to reflect SSE architecture Benefits: - Simpler architecture using native browser EventSource API - Lower bundle size (removed socket.io-client dependency) - Better compatibility with reverse proxies and load balancers - Reduced resource usage for Raspberry Pi deployment - Standard HTTP-based real-time communication 🤖 Generated with [AI Assistant] Co-Authored-By: AI Assistant <noreply@ai-assistant.com>
217 lines
5.5 KiB
JavaScript
217 lines
5.5 KiB
JavaScript
const logger = require("../utils/logger");
|
|
|
|
/**
|
|
* SSE Service for Server-Sent Events
|
|
* Manages SSE connections, topic subscriptions, and broadcasting
|
|
*/
|
|
class SSEService {
|
|
constructor() {
|
|
// Map of userId -> {res: Response, topics: Set<string>}
|
|
this.clients = new Map();
|
|
|
|
// Map of topicName -> Set<userId>
|
|
this.topics = new Map();
|
|
}
|
|
|
|
/**
|
|
* Add a client connection
|
|
* @param {string} userId - User ID
|
|
* @param {Response} res - Express response object
|
|
*/
|
|
addClient(userId, res) {
|
|
// Remove existing client if any
|
|
this.removeClient(userId);
|
|
|
|
this.clients.set(userId, {
|
|
res,
|
|
topics: new Set(),
|
|
});
|
|
|
|
logger.info(`SSE client added`, { userId, totalClients: this.clients.size });
|
|
}
|
|
|
|
/**
|
|
* Remove a client connection
|
|
* @param {string} userId - User ID
|
|
*/
|
|
removeClient(userId) {
|
|
const client = this.clients.get(userId);
|
|
if (!client) {
|
|
return;
|
|
}
|
|
|
|
// Unsubscribe from all topics
|
|
client.topics.forEach((topic) => {
|
|
const topicSubscribers = this.topics.get(topic);
|
|
if (topicSubscribers) {
|
|
topicSubscribers.delete(userId);
|
|
if (topicSubscribers.size === 0) {
|
|
this.topics.delete(topic);
|
|
}
|
|
}
|
|
});
|
|
|
|
this.clients.delete(userId);
|
|
logger.info(`SSE client removed`, { userId, totalClients: this.clients.size });
|
|
}
|
|
|
|
/**
|
|
* Subscribe a user to topics
|
|
* @param {string} userId - User ID
|
|
* @param {string[]} topicList - Array of topic names
|
|
*/
|
|
subscribe(userId, topicList) {
|
|
const client = this.clients.get(userId);
|
|
if (!client) {
|
|
logger.warn(`Cannot subscribe: client not found`, { userId });
|
|
return false;
|
|
}
|
|
|
|
topicList.forEach((topic) => {
|
|
// Add to client's topics
|
|
client.topics.add(topic);
|
|
|
|
// Add to topic's subscribers
|
|
if (!this.topics.has(topic)) {
|
|
this.topics.set(topic, new Set());
|
|
}
|
|
this.topics.get(topic).add(userId);
|
|
});
|
|
|
|
logger.info(`User subscribed to topics`, { userId, topics: topicList });
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Unsubscribe a user from topics
|
|
* @param {string} userId - User ID
|
|
* @param {string[]} topicList - Array of topic names
|
|
*/
|
|
unsubscribe(userId, topicList) {
|
|
const client = this.clients.get(userId);
|
|
if (!client) {
|
|
logger.warn(`Cannot unsubscribe: client not found`, { userId });
|
|
return false;
|
|
}
|
|
|
|
topicList.forEach((topic) => {
|
|
// Remove from client's topics
|
|
client.topics.delete(topic);
|
|
|
|
// Remove from topic's subscribers
|
|
const topicSubscribers = this.topics.get(topic);
|
|
if (topicSubscribers) {
|
|
topicSubscribers.delete(userId);
|
|
if (topicSubscribers.size === 0) {
|
|
this.topics.delete(topic);
|
|
}
|
|
}
|
|
});
|
|
|
|
logger.info(`User unsubscribed from topics`, { userId, topics: topicList });
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Broadcast an event to all connected clients
|
|
* @param {string} eventType - Event type
|
|
* @param {object} data - Event data
|
|
*/
|
|
broadcast(eventType, data) {
|
|
const message = `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`;
|
|
let sentCount = 0;
|
|
|
|
this.clients.forEach((client, userId) => {
|
|
try {
|
|
client.res.write(message);
|
|
sentCount++;
|
|
} catch (error) {
|
|
logger.error(`Failed to send SSE to user`, { userId, error: error.message });
|
|
this.removeClient(userId);
|
|
}
|
|
});
|
|
|
|
logger.debug(`Broadcast event`, { eventType, recipients: sentCount });
|
|
}
|
|
|
|
/**
|
|
* Broadcast an event to all subscribers of a topic
|
|
* @param {string} topic - Topic name
|
|
* @param {string} eventType - Event type
|
|
* @param {object} data - Event data
|
|
*/
|
|
broadcastToTopic(topic, eventType, data) {
|
|
const subscribers = this.topics.get(topic);
|
|
if (!subscribers || subscribers.size === 0) {
|
|
logger.debug(`No subscribers for topic`, { topic });
|
|
return;
|
|
}
|
|
|
|
const message = `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`;
|
|
let sentCount = 0;
|
|
|
|
subscribers.forEach((userId) => {
|
|
const client = this.clients.get(userId);
|
|
if (!client) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
client.res.write(message);
|
|
sentCount++;
|
|
} catch (error) {
|
|
logger.error(`Failed to send SSE to user`, { userId, error: error.message });
|
|
this.removeClient(userId);
|
|
}
|
|
});
|
|
|
|
logger.debug(`Broadcast to topic`, { topic, eventType, recipients: sentCount });
|
|
}
|
|
|
|
/**
|
|
* Send an event to a specific user
|
|
* @param {string} userId - User ID
|
|
* @param {string} eventType - Event type
|
|
* @param {object} data - Event data
|
|
*/
|
|
sendToUser(userId, eventType, data) {
|
|
const client = this.clients.get(userId);
|
|
if (!client) {
|
|
logger.debug(`User not connected`, { userId });
|
|
return false;
|
|
}
|
|
|
|
const message = `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`;
|
|
|
|
try {
|
|
client.res.write(message);
|
|
logger.debug(`Sent event to user`, { userId, eventType });
|
|
return true;
|
|
} catch (error) {
|
|
logger.error(`Failed to send SSE to user`, { userId, error: error.message });
|
|
this.removeClient(userId);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get service statistics
|
|
* @returns {object} Stats object
|
|
*/
|
|
getStats() {
|
|
const topicStats = {};
|
|
this.topics.forEach((subscribers, topic) => {
|
|
topicStats[topic] = subscribers.size;
|
|
});
|
|
|
|
return {
|
|
totalClients: this.clients.size,
|
|
totalTopics: this.topics.size,
|
|
topics: topicStats,
|
|
};
|
|
}
|
|
}
|
|
|
|
// Export singleton instance
|
|
module.exports = new SSEService();
|