const express = require("express"); const router = express.Router(); const sseAuth = require("../middleware/sseAuth"); const sseService = require("../services/sseService"); const logger = require("../utils/logger"); /** * @route GET /api/sse/stream * @desc SSE stream endpoint * @access Private */ router.get("/stream", sseAuth, (req, res) => { const userId = req.user.id; // Set SSE headers res.setHeader("Content-Type", "text/event-stream"); res.setHeader("Cache-Control", "no-cache"); res.setHeader("Connection", "keep-alive"); res.setHeader("X-Accel-Buffering", "no"); // Disable nginx buffering // Send initial connection success message res.write(`event: connected\ndata: ${JSON.stringify({ userId, timestamp: new Date().toISOString() })}\n\n`); // Register client sseService.addClient(userId, res); // Send heartbeat every 30 seconds to keep connection alive const heartbeatInterval = setInterval(() => { try { res.write(`:heartbeat\n\n`); } catch (error) { logger.error(`Heartbeat failed for user`, { userId, error: error.message }); clearInterval(heartbeatInterval); sseService.removeClient(userId); } }, 30000); // Handle client disconnect req.on("close", () => { clearInterval(heartbeatInterval); sseService.removeClient(userId); logger.info(`SSE stream closed`, { userId }); }); // Handle connection errors req.on("error", (error) => { logger.error(`SSE stream error`, { userId, error: error.message }); clearInterval(heartbeatInterval); sseService.removeClient(userId); }); }); /** * @route POST /api/sse/subscribe * @desc Subscribe to SSE topics * @access Private */ router.post("/subscribe", sseAuth, (req, res) => { try { const userId = req.user.id; const { topics } = req.body; // Validate topics if (!topics || !Array.isArray(topics) || topics.length === 0) { return res.status(400).json({ success: false, msg: "Topics must be a non-empty array" }); } // Subscribe to topics const success = sseService.subscribe(userId, topics); if (!success) { return res.status(400).json({ success: false, msg: "User not connected to SSE stream" }); } res.json({ success: true, msg: "Subscribed to topics", topics }); } catch (error) { logger.error(`Subscribe error`, { error: error.message }); res.status(500).json({ success: false, msg: "Server error" }); } }); /** * @route POST /api/sse/unsubscribe * @desc Unsubscribe from SSE topics * @access Private */ router.post("/unsubscribe", sseAuth, (req, res) => { try { const userId = req.user.id; const { topics } = req.body; // Validate topics if (!topics || !Array.isArray(topics) || topics.length === 0) { return res.status(400).json({ success: false, msg: "Topics must be a non-empty array" }); } // Unsubscribe from topics const success = sseService.unsubscribe(userId, topics); if (!success) { return res.status(400).json({ success: false, msg: "User not connected to SSE stream" }); } res.json({ success: true, msg: "Unsubscribed from topics", topics }); } catch (error) { logger.error(`Unsubscribe error`, { error: error.message }); res.status(500).json({ success: false, msg: "Server error" }); } }); module.exports = router;