feat(gateway): add configurable lane queue mode, cap, and overflow
This commit is contained in:
@@ -191,4 +191,61 @@ describe('LaneQueue', () => {
|
||||
const r2 = await queue.enqueue('lane-a', async () => 'second');
|
||||
expect(r2).toBe('second');
|
||||
});
|
||||
|
||||
it('steer mode keeps only the most recent pending request', async () => {
|
||||
const queue = new LaneQueue({ mode: 'steer' });
|
||||
let resolveFirst!: () => void;
|
||||
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
|
||||
|
||||
const p1 = queue.enqueue('lane-a', async () => {
|
||||
await firstBlocks;
|
||||
return 'active';
|
||||
});
|
||||
const p2 = queue.enqueue('lane-a', async () => 'old-pending');
|
||||
const p3 = queue.enqueue('lane-a', async () => 'latest-pending');
|
||||
|
||||
await expect(p2).rejects.toThrow('Superseded by newer request');
|
||||
resolveFirst();
|
||||
|
||||
await expect(p1).resolves.toBe('active');
|
||||
await expect(p3).resolves.toBe('latest-pending');
|
||||
});
|
||||
|
||||
it('drop_new overflow rejects newest request when cap is reached', async () => {
|
||||
const queue = new LaneQueue({ cap: 1, overflow: 'drop_new' });
|
||||
let resolveFirst!: () => void;
|
||||
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
|
||||
|
||||
const p1 = queue.enqueue('lane-a', async () => {
|
||||
await firstBlocks;
|
||||
return 'active';
|
||||
});
|
||||
const p2 = queue.enqueue('lane-a', async () => 'pending-1');
|
||||
const p3 = queue.enqueue('lane-a', async () => 'pending-2');
|
||||
|
||||
await expect(p3).rejects.toThrow('Lane queue full (drop_new)');
|
||||
resolveFirst();
|
||||
|
||||
await expect(p1).resolves.toBe('active');
|
||||
await expect(p2).resolves.toBe('pending-1');
|
||||
});
|
||||
|
||||
it('drop_old overflow evicts oldest pending request when cap is reached', async () => {
|
||||
const queue = new LaneQueue({ cap: 1, overflow: 'drop_old' });
|
||||
let resolveFirst!: () => void;
|
||||
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
|
||||
|
||||
const p1 = queue.enqueue('lane-a', async () => {
|
||||
await firstBlocks;
|
||||
return 'active';
|
||||
});
|
||||
const p2 = queue.enqueue('lane-a', async () => 'old-pending');
|
||||
const p3 = queue.enqueue('lane-a', async () => 'new-pending');
|
||||
|
||||
await expect(p2).rejects.toThrow('Lane queue overflow (drop_old)');
|
||||
resolveFirst();
|
||||
|
||||
await expect(p1).resolves.toBe('active');
|
||||
await expect(p3).resolves.toBe('new-pending');
|
||||
});
|
||||
});
|
||||
|
||||
@@ -21,8 +21,26 @@ interface Lane {
|
||||
queue: QueueEntry[];
|
||||
}
|
||||
|
||||
export type LaneQueueMode = 'collect' | 'steer' | 'interrupt';
|
||||
export type LaneQueueOverflow = 'drop_old' | 'drop_new';
|
||||
|
||||
export interface LaneQueueConfig {
|
||||
mode: LaneQueueMode;
|
||||
cap: number;
|
||||
overflow: LaneQueueOverflow;
|
||||
}
|
||||
|
||||
export class LaneQueue {
|
||||
private lanes: Map<string, Lane> = new Map();
|
||||
private config: LaneQueueConfig;
|
||||
|
||||
constructor(config?: Partial<LaneQueueConfig>) {
|
||||
this.config = {
|
||||
mode: config?.mode ?? 'collect',
|
||||
cap: Math.max(1, config?.cap ?? 50),
|
||||
overflow: config?.overflow ?? 'drop_old',
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Enqueue a unit of work for the given lane.
|
||||
@@ -47,6 +65,19 @@ export class LaneQueue {
|
||||
}
|
||||
}
|
||||
|
||||
if (this.config.mode === 'steer' || this.config.mode === 'interrupt') {
|
||||
this.rejectPending(lane, 'Superseded by newer request');
|
||||
}
|
||||
|
||||
if (lane.queue.length >= this.config.cap) {
|
||||
if (this.config.overflow === 'drop_new') {
|
||||
return Promise.reject(new Error('Lane queue full (drop_new)'));
|
||||
}
|
||||
// drop_old
|
||||
const dropped = lane.queue.shift();
|
||||
dropped?.reject(new Error('Lane queue overflow (drop_old)'));
|
||||
}
|
||||
|
||||
// Otherwise, queue the work and return a deferred promise
|
||||
return new Promise<T>((resolve, reject) => {
|
||||
lane.queue.push({
|
||||
@@ -85,10 +116,7 @@ export class LaneQueue {
|
||||
const lane = this.lanes.get(laneId);
|
||||
if (!lane) {return;}
|
||||
|
||||
const pending = lane.queue.splice(0);
|
||||
for (const entry of pending) {
|
||||
entry.reject(new Error('Lane cancelled'));
|
||||
}
|
||||
this.rejectPending(lane, 'Lane cancelled');
|
||||
|
||||
// Clean up empty idle lanes
|
||||
if (!lane.active && lane.queue.length === 0) {
|
||||
@@ -96,6 +124,13 @@ export class LaneQueue {
|
||||
}
|
||||
}
|
||||
|
||||
private rejectPending(lane: Lane, reason: string): void {
|
||||
const pending = lane.queue.splice(0);
|
||||
for (const entry of pending) {
|
||||
entry.reject(new Error(reason));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the next queued entry for a lane (called after current work finishes).
|
||||
* Runs asynchronously so the caller's finally block completes first.
|
||||
|
||||
@@ -6,6 +6,7 @@ import { serveStatic } from './static.js';
|
||||
import { SessionBridge } from './session-bridge.js';
|
||||
import type { SessionBridgeConfig } from './session-bridge.js';
|
||||
import { LaneQueue } from './lane-queue.js';
|
||||
import type { LaneQueueConfig } from './lane-queue.js';
|
||||
import { MetricsCollector } from './metrics.js';
|
||||
import { authenticateRequest } from './auth.js';
|
||||
import type { AuthConfig } from './auth.js';
|
||||
@@ -84,6 +85,7 @@ export interface GatewayServerConfig {
|
||||
maxViolations?: number;
|
||||
violationWindowMs?: number;
|
||||
};
|
||||
queue?: Partial<LaneQueueConfig>;
|
||||
/** Optional pairing manager for DM pairing code management via gateway. */
|
||||
pairingManager?: PairingManager;
|
||||
memoryStore?: MemoryStore;
|
||||
@@ -143,7 +145,7 @@ export class GatewayServer {
|
||||
memoryStore: config.memoryStore,
|
||||
});
|
||||
|
||||
this.laneQueue = new LaneQueue();
|
||||
this.laneQueue = new LaneQueue(config.queue);
|
||||
this.metrics = new MetricsCollector({
|
||||
getQueueDepth: () => this.laneQueue.totalPending(),
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user