feat(gateway): complete openclaw phase1 queue parity v2

This commit is contained in:
William Valentin
2026-02-16 12:04:33 -08:00
parent 78da226542
commit 813a0dc5c5
19 changed files with 678 additions and 53 deletions
+80 -3
View File
@@ -1,6 +1,6 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import type { GatewayEvent, GatewayRequest, OutboundMessage } from '../protocol.js';
import { LaneQueue } from '../lane-queue.js';
import { LaneQueue, LaneQueueRejectedError } from '../lane-queue.js';
import { createAgentHandlers } from './agent.js';
import type { AgentHandlerDeps } from './agent.js';
import { CommandRegistry, registerBuiltinCommands } from '../../commands/index.js';
@@ -28,6 +28,7 @@ describe('createAgentHandlers command fast-path', () => {
};
const sessionManager = {
getSessionConfig: vi.fn(),
setSessionConfig: vi.fn(),
deleteSessionConfig: vi.fn(),
};
@@ -123,6 +124,26 @@ describe('createAgentHandlers command fast-path', () => {
expect((sent[0] as GatewayEvent).event).toBe('done');
expect(((sent[0] as GatewayEvent).data as { content: string }).content).toBe('agent response');
});
it('handles /queue command via fast-path and persists queue session config', async () => {
const sent: OutboundMessage[] = [];
const send = vi.fn((msg: OutboundMessage) => sent.push(msg));
const req: GatewayRequest = {
id: 5,
method: 'agent.send',
params: {
message: '/queue set mode followup',
connectionId: 'conn-1',
metadata: { isCommand: true, command: 'queue', commandArgs: 'set mode followup' },
},
};
await handlers['agent.send'](req, send);
expect(sessionManager.setSessionConfig).toHaveBeenCalledWith('ws', 'ws:conn-1', 'queue.mode', 'followup');
expect(mockAgent.process).not.toHaveBeenCalled();
expect(((sent[0] as GatewayEvent).data as { content: string }).content).toContain('Set queue.mode=followup');
});
});
describe('createAgentHandlers queue policy resolution', () => {
@@ -154,7 +175,7 @@ describe('createAgentHandlers queue policy resolution', () => {
cancel: vi.fn(),
} as unknown as LaneQueue;
const resolveQueuePolicy = vi.fn(() => ({ mode: 'steer' as const, cap: 3 }));
const resolveQueuePolicy = vi.fn(() => ({ mode: 'steer_backlog' as const, cap: 3, debounceMs: 25 }));
const handlers = createAgentHandlers({
sessionBridge: sessionBridge as unknown as AgentHandlerDeps['sessionBridge'],
@@ -178,8 +199,64 @@ describe('createAgentHandlers queue policy resolution', () => {
channel: 'ws',
});
expect((laneQueue.enqueue as unknown as ReturnType<typeof vi.fn>).mock.calls[0][2]).toEqual({
mode: 'steer',
mode: 'steer_backlog',
cap: 3,
debounceMs: 25,
});
});
it('emits structured queue error events for lane rejections', async () => {
const sessionBridge = {
getAgent: vi.fn(() => ({
process: vi.fn(async () => 'ok'),
getUsage: vi.fn(() => ({
primary: { inputTokens: 0, outputTokens: 0, calls: 0 },
delegation: {},
total: { inputTokens: 0, outputTokens: 0, calls: 0, estimatedCost: 0 },
})),
getModelTier: vi.fn(() => 'default'),
setModelTier: vi.fn(),
compact: vi.fn(async () => null),
reset: vi.fn(),
})),
getSessionId: vi.fn(() => 'ws:s1'),
setBusy: vi.fn(),
setOnToolUse: vi.fn(),
isBusy: vi.fn(() => false),
};
const laneQueue = {
enqueue: vi.fn(async () => {
throw new LaneQueueRejectedError({
code: 'overflow',
laneId: 'ws:s1',
mode: 'followup',
overflow: 'drop_new',
droppedCount: 1,
message: 'Lane queue full (drop_new)',
});
}),
cancel: vi.fn(),
} as unknown as LaneQueue;
const handlers = createAgentHandlers({
sessionBridge: sessionBridge as unknown as AgentHandlerDeps['sessionBridge'],
laneQueue,
});
const sent: OutboundMessage[] = [];
const send = vi.fn((msg: OutboundMessage) => sent.push(msg));
await handlers['agent.send']({
id: 6,
method: 'agent.send',
params: { message: 'hello', connectionId: 'conn-1' },
}, send);
expect(sent).toHaveLength(1);
const event = sent[0] as GatewayEvent;
expect(event.event).toBe('error');
expect((event.data as { code: number }).code).toBe(3);
expect((event.data as { queue?: { code: string } }).queue?.code).toBe('overflow');
});
});
+99 -2
View File
@@ -4,6 +4,7 @@ import { makeEvent, makeError, ErrorCode } from '../protocol.js';
import type { SessionBridge } from '../session-bridge.js';
import type { LaneQueue } from '../lane-queue.js';
import type { LaneQueueConfig } from '../lane-queue.js';
import { LaneQueueRejectedError } from '../lane-queue.js';
import type { MetricsCollector } from '../metrics.js';
import type { Attachment } from '../../channels/types.js';
import type { SessionManager } from '../../session/manager.js';
@@ -56,13 +57,15 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
const sessionId = deps.sessionBridge.getSessionId(connectionId);
const laneId = sessionId ?? connectionId;
const channel = sessionId?.split(':', 1)[0] ?? 'ws';
const resolvedPolicy = deps.resolveQueuePolicy?.({ laneId, sessionId, connectionId, channel });
// Enqueue the work — if the lane is idle it runs immediately,
// otherwise it waits for earlier requests on the same session to finish.
const requestId = request.id.toString();
deps.metrics?.startRequest(requestId, { sessionId: laneId, channel: 'ws' });
return deps.laneQueue.enqueue(laneId, async () => {
try {
return await deps.laneQueue.enqueue(laneId, async () => {
deps.sessionBridge.setBusy(connectionId, true);
const commandInput = safeParams.metadata?.isCommand && typeof safeParams.metadata.command === 'string'
@@ -256,6 +259,89 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
return `Elevated mode: on until ${new Date(untilMs).toISOString()}`;
},
getQueue: () => {
const mode = resolvedPolicy?.mode ?? 'collect';
const cap = resolvedPolicy?.cap ?? 50;
const overflow = resolvedPolicy?.overflow ?? 'drop_old';
const debounceMs = resolvedPolicy?.debounceMs ?? 0;
const summarizeOverflow = resolvedPolicy?.summarizeOverflow ?? true;
const source = deps.sessionManager && sessionId
? deps.sessionManager.getSessionConfig('ws', sessionId, 'queue.mode') ? 'session override' : 'default/channel'
: 'default/channel';
return [
'**Queue policy**',
`mode: ${mode}`,
`cap: ${cap}`,
`overflow: ${overflow}`,
`debounce_ms: ${debounceMs}`,
`summarize_overflow: ${summarizeOverflow}`,
`source: ${source}`,
].join('\n');
},
setQueue: (input: string) => {
if (!deps.sessionManager || !sessionId) {
return 'Queue command is not available in this session.';
}
const [rawKey, ...rest] = input.trim().split(/\s+/);
const value = rest.join(' ').trim();
if (!rawKey || !value) {
return 'Usage: /queue <mode|cap|overflow|debounce_ms|summarize_overflow> <value>';
}
const key = rawKey.toLowerCase();
if (key === 'mode') {
if (!['collect', 'followup', 'steer', 'steer_backlog', 'interrupt'].includes(value)) {
return 'Invalid mode. Use one of: collect, followup, steer, steer_backlog, interrupt';
}
deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.mode', value);
return `Set queue.mode=${value} for this session`;
}
if (key === 'cap') {
const cap = Number.parseInt(value, 10);
if (!Number.isFinite(cap) || cap < 1 || cap > 1000) {
return 'Invalid cap. Use an integer between 1 and 1000';
}
deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.cap', String(cap));
return `Set queue.cap=${cap} for this session`;
}
if (key === 'overflow') {
if (value !== 'drop_old' && value !== 'drop_new') {
return 'Invalid overflow. Use drop_old or drop_new';
}
deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.overflow', value);
return `Set queue.overflow=${value} for this session`;
}
if (key === 'debounce_ms') {
const debounceMs = Number.parseInt(value, 10);
if (!Number.isFinite(debounceMs) || debounceMs < 0 || debounceMs > 60_000) {
return 'Invalid debounce_ms. Use an integer between 0 and 60000';
}
deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.debounce_ms', String(debounceMs));
return `Set queue.debounce_ms=${debounceMs} for this session`;
}
if (key === 'summarize_overflow') {
const normalized = value.toLowerCase();
if (normalized !== 'true' && normalized !== 'false') {
return 'Invalid summarize_overflow. Use true or false';
}
deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.summarize_overflow', normalized);
return `Set queue.summarize_overflow=${normalized} for this session`;
}
return 'Unknown queue key. Use one of: mode, cap, overflow, debounce_ms, summarize_overflow';
},
resetQueue: () => {
if (!deps.sessionManager || !sessionId) {
return 'Queue command is not available in this session.';
}
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.mode');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.cap');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.overflow');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.debounce_ms');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.summarize_overflow');
return 'Reset session queue overrides.';
},
},
});
@@ -320,7 +406,18 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
deps.sessionBridge.setOnToolUse(connectionId, undefined);
deps.metrics?.endRequest(requestId);
}
}, deps.resolveQueuePolicy?.({ laneId, sessionId, connectionId, channel }));
}, resolvedPolicy);
} catch (err) {
if (err instanceof LaneQueueRejectedError) {
send(makeEvent(request.id, 'error', {
code: ErrorCode.AgentBusy,
message: err.message,
queue: err.details,
}));
return;
}
throw err;
}
},
'agent.cancel': async (request: GatewayRequest): Promise<OutboundMessage> => {
+25
View File
@@ -125,6 +125,31 @@ const PATCHABLE_KEYS: Record<string, (config: Config, value: unknown) => boolean
config.server.localhost = value;
return true;
},
'server.queue.mode': (config, value) => {
if (!['collect', 'followup', 'steer', 'steer_backlog', 'interrupt'].includes(String(value))) {return false;}
config.server.queue.mode = value as typeof config.server.queue.mode;
return true;
},
'server.queue.cap': (config, value) => {
if (typeof value !== 'number' || !Number.isFinite(value) || value < 1 || value > 1000) {return false;}
config.server.queue.cap = Math.floor(value);
return true;
},
'server.queue.overflow': (config, value) => {
if (value !== 'drop_old' && value !== 'drop_new') {return false;}
config.server.queue.overflow = value;
return true;
},
'server.queue.debounce_ms': (config, value) => {
if (typeof value !== 'number' || !Number.isFinite(value) || value < 0 || value > 60_000) {return false;}
config.server.queue.debounce_ms = Math.floor(value);
return true;
},
'server.queue.summarize_overflow': (config, value) => {
if (typeof value !== 'boolean') {return false;}
config.server.queue.summarize_overflow = value;
return true;
},
};
export function createConfigHandlers(deps: ConfigHandlerDeps) {
+19 -3
View File
@@ -720,7 +720,18 @@ describe('config handlers', () => {
function makeConfig() {
return {
telegram: { bot_token: 'secret-token-123', allowed_chat_ids: [12345] },
server: { tailscale: {}, localhost: true, port: 18800 },
server: {
tailscale: {},
localhost: true,
port: 18800,
queue: {
mode: 'collect' as const,
cap: 50,
overflow: 'drop_old' as const,
debounce_ms: 0,
summarize_overflow: true,
},
},
models: {
default: { provider: 'anthropic' as const, model: 'claude-3-haiku', api_key: 'sk-secret-key' },
fallback_chain: ['anthropic'],
@@ -754,18 +765,22 @@ describe('config handlers', () => {
patches: {
'hooks.confirm': ['shell.exec', 'file.write'],
'hooks.log': ['file.read'],
'server.queue.mode': 'followup',
'server.queue.debounce_ms': 100,
},
},
};
const result = await handlers['config.patch'](req) as GatewayResponse;
const r = result.result as { applied: string[]; rejected: string[]; persisted: boolean };
expect(r.applied).toEqual(['hooks.confirm', 'hooks.log']);
expect(r.applied).toEqual(['hooks.confirm', 'hooks.log', 'server.queue.mode', 'server.queue.debounce_ms']);
expect(r.rejected).toEqual([]);
expect(r.persisted).toBe(false);
// Verify the config was actually mutated
expect(config.hooks.confirm).toEqual(['shell.exec', 'file.write']);
expect(config.hooks.log).toEqual(['file.read']);
expect(config.server.queue.mode).toBe('followup');
expect(config.server.queue.debounce_ms).toBe(100);
});
it('config.patch rejects unknown keys', async () => {
@@ -798,6 +813,7 @@ describe('config handlers', () => {
params: {
patches: {
'hooks.confirm': 'not-an-array',
'server.queue.cap': 0,
},
},
};
@@ -805,7 +821,7 @@ describe('config handlers', () => {
const r = result.result as { applied: string[]; rejected: string[]; persisted: boolean };
expect(r.applied).toEqual([]);
expect(r.rejected).toEqual(['hooks.confirm']);
expect(r.rejected).toEqual(['hooks.confirm', 'server.queue.cap']);
expect(r.persisted).toBe(false);
});
+75 -6
View File
@@ -1,5 +1,5 @@
import { describe, it, expect } from 'vitest';
import { LaneQueue } from './lane-queue.js';
import { LaneQueue, LaneQueueRejectedError } from './lane-queue.js';
describe('LaneQueue', () => {
it('executes a single item immediately', async () => {
@@ -192,8 +192,8 @@ describe('LaneQueue', () => {
expect(r2).toBe('second');
});
it('steer mode keeps only the most recent pending request', async () => {
const queue = new LaneQueue({ mode: 'steer' });
it('followup mode keeps only the most recent pending request', async () => {
const queue = new LaneQueue({ mode: 'followup' });
let resolveFirst!: () => void;
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
@@ -204,13 +204,32 @@ describe('LaneQueue', () => {
const p2 = queue.enqueue('lane-a', async () => 'old-pending');
const p3 = queue.enqueue('lane-a', async () => 'latest-pending');
await expect(p2).rejects.toThrow('Superseded by newer request');
await expect(p2).rejects.toThrow('Superseded by newer follow-up request');
resolveFirst();
await expect(p1).resolves.toBe('active');
await expect(p3).resolves.toBe('latest-pending');
});
it('steer_backlog mode replaces existing pending backlog with latest request', async () => {
const queue = new LaneQueue({ mode: 'steer_backlog' });
let resolveFirst!: () => void;
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
const p1 = queue.enqueue('lane-a', async () => {
await firstBlocks;
return 'active';
});
const p2 = queue.enqueue('lane-a', async () => 'old-pending');
const p3 = queue.enqueue('lane-a', async () => 'new-pending');
await expect(p2).rejects.toThrow('Superseded by newer request');
resolveFirst();
await expect(p1).resolves.toBe('active');
await expect(p3).resolves.toBe('new-pending');
});
it('drop_new overflow rejects newest request when cap is reached', async () => {
const queue = new LaneQueue({ cap: 1, overflow: 'drop_new' });
let resolveFirst!: () => void;
@@ -223,7 +242,7 @@ describe('LaneQueue', () => {
const p2 = queue.enqueue('lane-a', async () => 'pending-1');
const p3 = queue.enqueue('lane-a', async () => 'pending-2');
await expect(p3).rejects.toThrow('Lane queue full (drop_new)');
await expect(p3).rejects.toThrow('Lane queue full (drop_new): request rejected with 1 pending');
resolveFirst();
await expect(p1).resolves.toBe('active');
@@ -242,7 +261,7 @@ describe('LaneQueue', () => {
const p2 = queue.enqueue('lane-a', async () => 'old-pending');
const p3 = queue.enqueue('lane-a', async () => 'new-pending');
await expect(p2).rejects.toThrow('Lane queue overflow (drop_old)');
await expect(p2).rejects.toThrow('Lane queue overflow (drop_old): oldest pending request dropped');
resolveFirst();
await expect(p1).resolves.toBe('active');
@@ -267,4 +286,54 @@ describe('LaneQueue', () => {
await expect(p1).resolves.toBe('active');
await expect(p3).resolves.toBe('latest-pending');
});
it('applies debounce before starting queued work', async () => {
const queue = new LaneQueue({ debounceMs: 25 });
const events: string[] = [];
let resolveFirst!: () => void;
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
const p1 = queue.enqueue('lane-a', async () => {
events.push('active:start');
await firstBlocks;
events.push('active:end');
return 'active';
});
const p2 = queue.enqueue('lane-a', async () => {
events.push('next:start');
return 'next';
});
resolveFirst();
await p1;
expect(queue.isProcessing('lane-a')).toBe(true);
expect(events).toEqual(['active:start', 'active:end']);
await new Promise((r) => setTimeout(r, 40));
await expect(p2).resolves.toBe('next');
expect(events).toEqual(['active:start', 'active:end', 'next:start']);
});
it('returns structured queue rejection errors', async () => {
const queue = new LaneQueue({ cap: 1, overflow: 'drop_new' });
let resolveFirst!: () => void;
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
const p1 = queue.enqueue('lane-a', async () => {
await firstBlocks;
return 'active';
});
const p2 = queue.enqueue('lane-a', async () => 'pending');
const p3 = queue.enqueue('lane-a', async () => 'dropped');
const err = await p3.catch((e) => e) as LaneQueueRejectedError;
expect(err).toBeInstanceOf(LaneQueueRejectedError);
expect(err.details.code).toBe('overflow');
expect(err.details.overflow).toBe('drop_new');
expect(err.details.laneId).toBe('lane-a');
resolveFirst();
await expect(p1).resolves.toBe('active');
await expect(p2).resolves.toBe('pending');
});
});
+140 -14
View File
@@ -14,20 +14,56 @@ interface QueueEntry<T = unknown> {
work: () => Promise<T>;
resolve: (value: T) => void;
reject: (reason: unknown) => void;
policy: LaneQueueConfig;
metadata?: LaneQueueEnqueueMetadata;
}
interface Lane {
active: boolean;
queue: QueueEntry[];
debounceTimer?: ReturnType<typeof setTimeout>;
}
export type LaneQueueMode = 'collect' | 'steer' | 'interrupt';
export type LaneQueueMode = 'collect' | 'followup' | 'steer' | 'steer_backlog' | 'interrupt';
export type LaneQueueOverflow = 'drop_old' | 'drop_new';
export interface LaneQueueConfig {
mode: LaneQueueMode;
cap: number;
overflow: LaneQueueOverflow;
debounceMs: number;
summarizeOverflow: boolean;
}
export interface LaneQueueEnqueueMetadata {
requestId?: string;
label?: string;
}
export interface LaneQueueEnqueueOptions {
policy?: Partial<LaneQueueConfig>;
metadata?: LaneQueueEnqueueMetadata;
}
export type LaneQueueRejectCode = 'superseded' | 'overflow' | 'cancelled';
export interface LaneQueueRejectDetails {
code: LaneQueueRejectCode;
laneId: string;
mode: LaneQueueMode;
overflow?: LaneQueueOverflow;
droppedCount?: number;
message: string;
}
export class LaneQueueRejectedError extends Error {
readonly details: LaneQueueRejectDetails;
constructor(details: LaneQueueRejectDetails) {
super(details.message);
this.name = 'LaneQueueRejectedError';
this.details = details;
}
}
export class LaneQueue {
@@ -39,6 +75,8 @@ export class LaneQueue {
mode: config?.mode ?? 'collect',
cap: Math.max(1, config?.cap ?? 50),
overflow: config?.overflow ?? 'drop_old',
debounceMs: Math.max(0, config?.debounceMs ?? 0),
summarizeOverflow: config?.summarizeOverflow ?? true,
};
}
@@ -47,8 +85,13 @@ export class LaneQueue {
* Returns a promise that resolves with the work's return value
* once it has been executed (which may be immediately if the lane is idle).
*/
async enqueue<T>(laneId: string, work: () => Promise<T>, policy?: Partial<LaneQueueConfig>): Promise<T> {
const effective = this.resolvePolicy(policy);
async enqueue<T>(
laneId: string,
work: () => Promise<T>,
policyOrOptions?: Partial<LaneQueueConfig> | LaneQueueEnqueueOptions,
): Promise<T> {
const options = this.normalizeEnqueueOptions(policyOrOptions);
const effective = this.resolvePolicy(options.policy);
let lane = this.lanes.get(laneId);
if (!lane) {
lane = { active: false, queue: [] };
@@ -56,7 +99,7 @@ export class LaneQueue {
}
// If nothing is running on this lane, execute immediately
if (!lane.active) {
if (!lane.active && !lane.debounceTimer) {
lane.active = true;
try {
return await work();
@@ -66,17 +109,51 @@ export class LaneQueue {
}
}
if (effective.mode === 'steer' || effective.mode === 'interrupt') {
this.rejectPending(lane, 'Superseded by newer request');
if (effective.mode === 'steer' || effective.mode === 'steer_backlog' || effective.mode === 'interrupt') {
this.rejectPending(laneId, lane, {
code: 'superseded',
laneId,
mode: effective.mode,
message: 'Superseded by newer request',
});
} else if (effective.mode === 'followup' && lane.queue.length > 0) {
this.rejectPending(laneId, lane, {
code: 'superseded',
laneId,
mode: effective.mode,
message: 'Superseded by newer follow-up request',
});
}
if (lane.queue.length >= effective.cap) {
if (effective.overflow === 'drop_new') {
return Promise.reject(new Error('Lane queue full (drop_new)'));
return Promise.reject(
new LaneQueueRejectedError({
code: 'overflow',
laneId,
mode: effective.mode,
overflow: 'drop_new',
droppedCount: 1,
message: effective.summarizeOverflow
? `Lane queue full (drop_new): request rejected with ${lane.queue.length} pending`
: 'Lane queue full (drop_new)',
}),
);
}
// drop_old
const dropped = lane.queue.shift();
dropped?.reject(new Error('Lane queue overflow (drop_old)'));
dropped?.reject(
new LaneQueueRejectedError({
code: 'overflow',
laneId,
mode: effective.mode,
overflow: 'drop_old',
droppedCount: 1,
message: effective.summarizeOverflow
? 'Lane queue overflow (drop_old): oldest pending request dropped'
: 'Lane queue overflow (drop_old)',
}),
);
}
// Otherwise, queue the work and return a deferred promise
@@ -85,13 +162,16 @@ export class LaneQueue {
work: work as () => Promise<unknown>,
resolve: resolve as (value: unknown) => void,
reject,
policy: effective,
metadata: options.metadata,
});
});
}
/** Check if a lane currently has active work executing. */
isProcessing(laneId: string): boolean {
return this.lanes.get(laneId)?.active ?? false;
const lane = this.lanes.get(laneId);
return (lane?.active ?? false) || Boolean(lane?.debounceTimer);
}
/** Get the number of pending (not yet started) items in a lane. */
@@ -117,18 +197,28 @@ export class LaneQueue {
const lane = this.lanes.get(laneId);
if (!lane) {return;}
this.rejectPending(lane, 'Lane cancelled');
if (lane.debounceTimer) {
clearTimeout(lane.debounceTimer);
lane.debounceTimer = undefined;
}
this.rejectPending(laneId, lane, {
code: 'cancelled',
laneId,
mode: this.config.mode,
message: 'Lane cancelled',
});
// Clean up empty idle lanes
if (!lane.active && lane.queue.length === 0) {
if (!lane.active && lane.queue.length === 0 && !lane.debounceTimer) {
this.lanes.delete(laneId);
}
}
private rejectPending(lane: Lane, reason: string): void {
private rejectPending(laneId: string, lane: Lane, details: LaneQueueRejectDetails): void {
const pending = lane.queue.splice(0);
for (const entry of pending) {
entry.reject(new Error(reason));
entry.reject(new LaneQueueRejectedError({ ...details, laneId, mode: entry.policy.mode }));
}
}
@@ -137,6 +227,8 @@ export class LaneQueue {
mode: policy?.mode ?? this.config.mode,
cap: Math.max(1, policy?.cap ?? this.config.cap),
overflow: policy?.overflow ?? this.config.overflow,
debounceMs: Math.max(0, policy?.debounceMs ?? this.config.debounceMs),
summarizeOverflow: policy?.summarizeOverflow ?? this.config.summarizeOverflow,
};
}
@@ -144,10 +236,28 @@ export class LaneQueue {
* Process the next queued entry for a lane (called after current work finishes).
* Runs asynchronously so the caller's finally block completes first.
*/
private processNext(laneId: string): void {
private processNext(laneId: string, skipDebounce = false): void {
const lane = this.lanes.get(laneId);
if (!lane) {return;}
if (lane.active || lane.debounceTimer) {
return;
}
const next = lane.queue[0];
if (!next) {
this.lanes.delete(laneId);
return;
}
if (!skipDebounce && next.policy.debounceMs > 0) {
lane.debounceTimer = setTimeout(() => {
lane.debounceTimer = undefined;
this.processNext(laneId, true);
}, next.policy.debounceMs);
return;
}
const entry = lane.queue.shift();
if (!entry) {
// Lane is empty — clean up
@@ -164,4 +274,20 @@ export class LaneQueue {
this.processNext(laneId);
});
}
private normalizeEnqueueOptions(
policyOrOptions?: Partial<LaneQueueConfig> | LaneQueueEnqueueOptions,
): LaneQueueEnqueueOptions {
if (!policyOrOptions) {
return {};
}
if ('policy' in policyOrOptions || 'metadata' in policyOrOptions) {
return policyOrOptions as LaneQueueEnqueueOptions;
}
return {
policy: policyOrOptions as Partial<LaneQueueConfig>,
};
}
}
+1
View File
@@ -85,6 +85,7 @@ export interface DoneEventData {
export interface ErrorEventData {
code: ErrorCode;
message: string;
queue?: Record<string, unknown>;
}
// ── Error codes ────────────────────────────────────────────────
+46 -6
View File
@@ -205,14 +205,54 @@ export class GatewayServer {
sessionBridge: this.sessionBridge,
laneQueue: this.laneQueue,
resolveQueuePolicy: ({ sessionId, channel }) => {
const sessionPolicy = sessionId
? this.config.queue?.overrides?.sessions?.[sessionId]
: undefined;
if (sessionPolicy) {
return sessionPolicy;
const resolved: Partial<LaneQueueConfig> = {};
const channelPolicy = this.config.queue?.overrides?.channels?.[channel];
if (channelPolicy) {
Object.assign(resolved, channelPolicy);
}
return this.config.queue?.overrides?.channels?.[channel];
const configuredSessionPolicy = sessionId
? this.config.queue?.overrides?.sessions?.[sessionId]
: undefined;
if (configuredSessionPolicy) {
Object.assign(resolved, configuredSessionPolicy);
}
if (sessionId) {
const runtimeMode = this.config.sessionManager.getSessionConfig('ws', sessionId, 'queue.mode');
const runtimeCap = this.config.sessionManager.getSessionConfig('ws', sessionId, 'queue.cap');
const runtimeOverflow = this.config.sessionManager.getSessionConfig('ws', sessionId, 'queue.overflow');
const runtimeDebounce = this.config.sessionManager.getSessionConfig('ws', sessionId, 'queue.debounce_ms');
const runtimeSummarize = this.config.sessionManager.getSessionConfig('ws', sessionId, 'queue.summarize_overflow');
if (runtimeMode && ['collect', 'followup', 'steer', 'steer_backlog', 'interrupt'].includes(runtimeMode)) {
resolved.mode = runtimeMode as LaneQueueConfig['mode'];
}
if (runtimeCap) {
const cap = Number.parseInt(runtimeCap, 10);
if (Number.isFinite(cap) && cap >= 1 && cap <= 1000) {
resolved.cap = cap;
}
}
if (runtimeOverflow && (runtimeOverflow === 'drop_old' || runtimeOverflow === 'drop_new')) {
resolved.overflow = runtimeOverflow;
}
if (runtimeDebounce) {
const debounceMs = Number.parseInt(runtimeDebounce, 10);
if (Number.isFinite(debounceMs) && debounceMs >= 0 && debounceMs <= 60_000) {
resolved.debounceMs = debounceMs;
}
}
if (runtimeSummarize === 'true' || runtimeSummarize === 'false') {
resolved.summarizeOverflow = runtimeSummarize === 'true';
}
}
return resolved;
},
metrics: this.metrics,
sessionManager: this.config.sessionManager,