feat: Migrate from Socket.IO to Server-Sent Events (SSE)
- Replace Socket.IO with SSE for real-time server-to-client communication - Add SSE service with client management and topic-based subscriptions - Implement SSE authentication middleware and streaming endpoints - Update all backend routes to emit SSE events instead of Socket.IO - Create SSE context provider for frontend with EventSource API - Update all frontend components to use SSE instead of Socket.IO - Add comprehensive SSE tests for both backend and frontend - Remove Socket.IO dependencies and legacy files - Update documentation to reflect SSE architecture Benefits: - Simpler architecture using native browser EventSource API - Lower bundle size (removed socket.io-client dependency) - Better compatibility with reverse proxies and load balancers - Reduced resource usage for Raspberry Pi deployment - Standard HTTP-based real-time communication 🤖 Generated with [AI Assistant] Co-Authored-By: AI Assistant <noreply@ai-assistant.com>
This commit is contained in:
@@ -0,0 +1,216 @@
|
||||
const logger = require("../utils/logger");
|
||||
|
||||
/**
|
||||
* SSE Service for Server-Sent Events
|
||||
* Manages SSE connections, topic subscriptions, and broadcasting
|
||||
*/
|
||||
class SSEService {
|
||||
constructor() {
|
||||
// Map of userId -> {res: Response, topics: Set<string>}
|
||||
this.clients = new Map();
|
||||
|
||||
// Map of topicName -> Set<userId>
|
||||
this.topics = new Map();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a client connection
|
||||
* @param {string} userId - User ID
|
||||
* @param {Response} res - Express response object
|
||||
*/
|
||||
addClient(userId, res) {
|
||||
// Remove existing client if any
|
||||
this.removeClient(userId);
|
||||
|
||||
this.clients.set(userId, {
|
||||
res,
|
||||
topics: new Set(),
|
||||
});
|
||||
|
||||
logger.info(`SSE client added`, { userId, totalClients: this.clients.size });
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a client connection
|
||||
* @param {string} userId - User ID
|
||||
*/
|
||||
removeClient(userId) {
|
||||
const client = this.clients.get(userId);
|
||||
if (!client) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Unsubscribe from all topics
|
||||
client.topics.forEach((topic) => {
|
||||
const topicSubscribers = this.topics.get(topic);
|
||||
if (topicSubscribers) {
|
||||
topicSubscribers.delete(userId);
|
||||
if (topicSubscribers.size === 0) {
|
||||
this.topics.delete(topic);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
this.clients.delete(userId);
|
||||
logger.info(`SSE client removed`, { userId, totalClients: this.clients.size });
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe a user to topics
|
||||
* @param {string} userId - User ID
|
||||
* @param {string[]} topicList - Array of topic names
|
||||
*/
|
||||
subscribe(userId, topicList) {
|
||||
const client = this.clients.get(userId);
|
||||
if (!client) {
|
||||
logger.warn(`Cannot subscribe: client not found`, { userId });
|
||||
return false;
|
||||
}
|
||||
|
||||
topicList.forEach((topic) => {
|
||||
// Add to client's topics
|
||||
client.topics.add(topic);
|
||||
|
||||
// Add to topic's subscribers
|
||||
if (!this.topics.has(topic)) {
|
||||
this.topics.set(topic, new Set());
|
||||
}
|
||||
this.topics.get(topic).add(userId);
|
||||
});
|
||||
|
||||
logger.info(`User subscribed to topics`, { userId, topics: topicList });
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsubscribe a user from topics
|
||||
* @param {string} userId - User ID
|
||||
* @param {string[]} topicList - Array of topic names
|
||||
*/
|
||||
unsubscribe(userId, topicList) {
|
||||
const client = this.clients.get(userId);
|
||||
if (!client) {
|
||||
logger.warn(`Cannot unsubscribe: client not found`, { userId });
|
||||
return false;
|
||||
}
|
||||
|
||||
topicList.forEach((topic) => {
|
||||
// Remove from client's topics
|
||||
client.topics.delete(topic);
|
||||
|
||||
// Remove from topic's subscribers
|
||||
const topicSubscribers = this.topics.get(topic);
|
||||
if (topicSubscribers) {
|
||||
topicSubscribers.delete(userId);
|
||||
if (topicSubscribers.size === 0) {
|
||||
this.topics.delete(topic);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
logger.info(`User unsubscribed from topics`, { userId, topics: topicList });
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Broadcast an event to all connected clients
|
||||
* @param {string} eventType - Event type
|
||||
* @param {object} data - Event data
|
||||
*/
|
||||
broadcast(eventType, data) {
|
||||
const message = `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`;
|
||||
let sentCount = 0;
|
||||
|
||||
this.clients.forEach((client, userId) => {
|
||||
try {
|
||||
client.res.write(message);
|
||||
sentCount++;
|
||||
} catch (error) {
|
||||
logger.error(`Failed to send SSE to user`, { userId, error: error.message });
|
||||
this.removeClient(userId);
|
||||
}
|
||||
});
|
||||
|
||||
logger.debug(`Broadcast event`, { eventType, recipients: sentCount });
|
||||
}
|
||||
|
||||
/**
|
||||
* Broadcast an event to all subscribers of a topic
|
||||
* @param {string} topic - Topic name
|
||||
* @param {string} eventType - Event type
|
||||
* @param {object} data - Event data
|
||||
*/
|
||||
broadcastToTopic(topic, eventType, data) {
|
||||
const subscribers = this.topics.get(topic);
|
||||
if (!subscribers || subscribers.size === 0) {
|
||||
logger.debug(`No subscribers for topic`, { topic });
|
||||
return;
|
||||
}
|
||||
|
||||
const message = `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`;
|
||||
let sentCount = 0;
|
||||
|
||||
subscribers.forEach((userId) => {
|
||||
const client = this.clients.get(userId);
|
||||
if (!client) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
client.res.write(message);
|
||||
sentCount++;
|
||||
} catch (error) {
|
||||
logger.error(`Failed to send SSE to user`, { userId, error: error.message });
|
||||
this.removeClient(userId);
|
||||
}
|
||||
});
|
||||
|
||||
logger.debug(`Broadcast to topic`, { topic, eventType, recipients: sentCount });
|
||||
}
|
||||
|
||||
/**
|
||||
* Send an event to a specific user
|
||||
* @param {string} userId - User ID
|
||||
* @param {string} eventType - Event type
|
||||
* @param {object} data - Event data
|
||||
*/
|
||||
sendToUser(userId, eventType, data) {
|
||||
const client = this.clients.get(userId);
|
||||
if (!client) {
|
||||
logger.debug(`User not connected`, { userId });
|
||||
return false;
|
||||
}
|
||||
|
||||
const message = `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`;
|
||||
|
||||
try {
|
||||
client.res.write(message);
|
||||
logger.debug(`Sent event to user`, { userId, eventType });
|
||||
return true;
|
||||
} catch (error) {
|
||||
logger.error(`Failed to send SSE to user`, { userId, error: error.message });
|
||||
this.removeClient(userId);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get service statistics
|
||||
* @returns {object} Stats object
|
||||
*/
|
||||
getStats() {
|
||||
const topicStats = {};
|
||||
this.topics.forEach((subscribers, topic) => {
|
||||
topicStats[topic] = subscribers.size;
|
||||
});
|
||||
|
||||
return {
|
||||
totalClients: this.clients.size,
|
||||
totalTopics: this.topics.size,
|
||||
topics: topicStats,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Export singleton instance
|
||||
module.exports = new SSEService();
|
||||
Reference in New Issue
Block a user