/** * LaneQueue — per-lane FIFO queue for serialising async work. * * Each "lane" (keyed by session ID) processes work items one at a time. * If a lane is idle, work starts immediately. If it's busy, the work * is queued and a promise is returned that resolves when it's this * entry's turn to execute. * * Independent lanes run in parallel — only items within the same lane * are serialised. */ interface QueueEntry { work: () => Promise; resolve: (value: T) => void; reject: (reason: unknown) => void; } interface Lane { active: boolean; queue: QueueEntry[]; } export class LaneQueue { private lanes: Map = new Map(); /** * Enqueue a unit of work for the given lane. * Returns a promise that resolves with the work's return value * once it has been executed (which may be immediately if the lane is idle). */ async enqueue(laneId: string, work: () => Promise): Promise { let lane = this.lanes.get(laneId); if (!lane) { lane = { active: false, queue: [] }; this.lanes.set(laneId, lane); } // If nothing is running on this lane, execute immediately if (!lane.active) { lane.active = true; try { return await work(); } finally { lane.active = false; this.processNext(laneId); } } // Otherwise, queue the work and return a deferred promise return new Promise((resolve, reject) => { lane!.queue.push({ work: work as () => Promise, resolve: resolve as (value: unknown) => void, reject, }); }); } /** Check if a lane currently has active work executing. */ isProcessing(laneId: string): boolean { return this.lanes.get(laneId)?.active ?? false; } /** Get the number of pending (not yet started) items in a lane. */ queueLength(laneId: string): number { return this.lanes.get(laneId)?.queue.length ?? 0; } /** Get the total number of pending items across all lanes. */ totalPending(): number { let total = 0; for (const lane of this.lanes.values()) { total += lane.queue.length; } return total; } /** * Cancel all pending entries in a lane. * Active work is NOT interrupted — only queued items are rejected. * Rejected promises receive an Error with message "Lane cancelled". */ cancel(laneId: string): void { const lane = this.lanes.get(laneId); if (!lane) {return;} const pending = lane.queue.splice(0); for (const entry of pending) { entry.reject(new Error('Lane cancelled')); } // Clean up empty idle lanes if (!lane.active && lane.queue.length === 0) { this.lanes.delete(laneId); } } /** * Process the next queued entry for a lane (called after current work finishes). * Runs asynchronously so the caller's finally block completes first. */ private processNext(laneId: string): void { const lane = this.lanes.get(laneId); if (!lane) {return;} const entry = lane.queue.shift(); if (!entry) { // Lane is empty — clean up this.lanes.delete(laneId); return; } lane.active = true; entry.work() .then((value) => entry.resolve(value)) .catch((err) => entry.reject(err)) .finally(() => { lane.active = false; this.processNext(laneId); }); } }