feat(gateway): support per-channel and per-session queue policy overrides

This commit is contained in:
William Valentin
2026-02-16 11:51:26 -08:00
parent f7284a4ef1
commit fbd24d4379
11 changed files with 181 additions and 9 deletions
+59
View File
@@ -124,3 +124,62 @@ describe('createAgentHandlers command fast-path', () => {
expect(((sent[0] as GatewayEvent).data as { content: string }).content).toBe('agent response');
});
});
describe('createAgentHandlers queue policy resolution', () => {
it('passes resolved per-request queue policy into lane enqueue', async () => {
const mockAgent = {
process: vi.fn(async () => 'ok'),
getUsage: vi.fn(() => ({
primary: { inputTokens: 0, outputTokens: 0, calls: 0 },
delegation: {},
total: { inputTokens: 0, outputTokens: 0, calls: 0, estimatedCost: 0 },
})),
getModelTier: vi.fn(() => 'default'),
setModelTier: vi.fn(),
compact: vi.fn(async () => null),
reset: vi.fn(),
setOnToolUse: vi.fn(),
};
const sessionBridge = {
getAgent: vi.fn(() => mockAgent),
getSessionId: vi.fn(() => 'ws:s1'),
setBusy: vi.fn(),
setOnToolUse: vi.fn(),
isBusy: vi.fn(() => false),
};
const laneQueue = {
enqueue: vi.fn(async (_laneId: string, work: () => Promise<unknown>) => work()),
cancel: vi.fn(),
} as unknown as LaneQueue;
const resolveQueuePolicy = vi.fn(() => ({ mode: 'steer' as const, cap: 3 }));
const handlers = createAgentHandlers({
sessionBridge: sessionBridge as unknown as AgentHandlerDeps['sessionBridge'],
laneQueue,
resolveQueuePolicy,
});
const sent: OutboundMessage[] = [];
const send = vi.fn((msg: OutboundMessage) => sent.push(msg));
await handlers['agent.send']({
id: 1,
method: 'agent.send',
params: { message: 'hello', connectionId: 'conn-1' },
}, send);
expect(resolveQueuePolicy).toHaveBeenCalledWith({
laneId: 'ws:s1',
sessionId: 'ws:s1',
connectionId: 'conn-1',
channel: 'ws',
});
expect((laneQueue.enqueue as unknown as ReturnType<typeof vi.fn>).mock.calls[0][2]).toEqual({
mode: 'steer',
cap: 3,
});
});
});
+9 -1
View File
@@ -3,6 +3,7 @@ import type { SendFn } from '../router.js';
import { makeEvent, makeError, ErrorCode } from '../protocol.js';
import type { SessionBridge } from '../session-bridge.js';
import type { LaneQueue } from '../lane-queue.js';
import type { LaneQueueConfig } from '../lane-queue.js';
import type { MetricsCollector } from '../metrics.js';
import type { Attachment } from '../../channels/types.js';
import type { SessionManager } from '../../session/manager.js';
@@ -14,6 +15,12 @@ import { randomUUID } from 'crypto';
export interface AgentHandlerDeps {
sessionBridge: SessionBridge;
laneQueue: LaneQueue;
resolveQueuePolicy?: (ctx: {
laneId: string;
sessionId?: string;
connectionId: string;
channel: string;
}) => Partial<LaneQueueConfig> | undefined;
metrics?: MetricsCollector;
sessionManager?: SessionManager;
commandRegistry?: CommandRegistry;
@@ -48,6 +55,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
// Falls back to connectionId if session lookup fails (shouldn't happen).
const sessionId = deps.sessionBridge.getSessionId(connectionId);
const laneId = sessionId ?? connectionId;
const channel = sessionId?.split(':', 1)[0] ?? 'ws';
// Enqueue the work — if the lane is idle it runs immediately,
// otherwise it waits for earlier requests on the same session to finish.
@@ -312,7 +320,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
deps.sessionBridge.setOnToolUse(connectionId, undefined);
deps.metrics?.endRequest(requestId);
}
});
}, deps.resolveQueuePolicy?.({ laneId, sessionId, connectionId, channel }));
},
'agent.cancel': async (request: GatewayRequest): Promise<OutboundMessage> => {
+19
View File
@@ -248,4 +248,23 @@ describe('LaneQueue', () => {
await expect(p1).resolves.toBe('active');
await expect(p3).resolves.toBe('new-pending');
});
it('supports per-enqueue policy overrides', async () => {
const queue = new LaneQueue({ mode: 'collect', cap: 10, overflow: 'drop_old' });
let resolveFirst!: () => void;
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
const p1 = queue.enqueue('lane-a', async () => {
await firstBlocks;
return 'active';
});
const p2 = queue.enqueue('lane-a', async () => 'old-pending', { mode: 'steer' });
const p3 = queue.enqueue('lane-a', async () => 'latest-pending', { mode: 'steer' });
await expect(p2).rejects.toThrow('Superseded by newer request');
resolveFirst();
await expect(p1).resolves.toBe('active');
await expect(p3).resolves.toBe('latest-pending');
});
});
+13 -4
View File
@@ -47,7 +47,8 @@ export class LaneQueue {
* Returns a promise that resolves with the work's return value
* once it has been executed (which may be immediately if the lane is idle).
*/
async enqueue<T>(laneId: string, work: () => Promise<T>): Promise<T> {
async enqueue<T>(laneId: string, work: () => Promise<T>, policy?: Partial<LaneQueueConfig>): Promise<T> {
const effective = this.resolvePolicy(policy);
let lane = this.lanes.get(laneId);
if (!lane) {
lane = { active: false, queue: [] };
@@ -65,12 +66,12 @@ export class LaneQueue {
}
}
if (this.config.mode === 'steer' || this.config.mode === 'interrupt') {
if (effective.mode === 'steer' || effective.mode === 'interrupt') {
this.rejectPending(lane, 'Superseded by newer request');
}
if (lane.queue.length >= this.config.cap) {
if (this.config.overflow === 'drop_new') {
if (lane.queue.length >= effective.cap) {
if (effective.overflow === 'drop_new') {
return Promise.reject(new Error('Lane queue full (drop_new)'));
}
// drop_old
@@ -131,6 +132,14 @@ export class LaneQueue {
}
}
private resolvePolicy(policy?: Partial<LaneQueueConfig>): LaneQueueConfig {
return {
mode: policy?.mode ?? this.config.mode,
cap: Math.max(1, policy?.cap ?? this.config.cap),
overflow: policy?.overflow ?? this.config.overflow,
};
}
/**
* Process the next queued entry for a lane (called after current work finishes).
* Runs asynchronously so the caller's finally block completes first.
+16 -1
View File
@@ -85,7 +85,12 @@ export interface GatewayServerConfig {
maxViolations?: number;
violationWindowMs?: number;
};
queue?: Partial<LaneQueueConfig>;
queue?: Partial<LaneQueueConfig> & {
overrides?: {
channels?: Record<string, Partial<LaneQueueConfig>>;
sessions?: Record<string, Partial<LaneQueueConfig>>;
};
};
/** Optional pairing manager for DM pairing code management via gateway. */
pairingManager?: PairingManager;
memoryStore?: MemoryStore;
@@ -199,6 +204,16 @@ export class GatewayServer {
const agentHandlers = createAgentHandlers({
sessionBridge: this.sessionBridge,
laneQueue: this.laneQueue,
resolveQueuePolicy: ({ sessionId, channel }) => {
const sessionPolicy = sessionId
? this.config.queue?.overrides?.sessions?.[sessionId]
: undefined;
if (sessionPolicy) {
return sessionPolicy;
}
return this.config.queue?.overrides?.channels?.[channel];
},
metrics: this.metrics,
sessionManager: this.config.sessionManager,
commandRegistry: this.config.commandRegistry,