feat: preempt active runs in interrupt queue mode
This commit is contained in:
@@ -1201,8 +1201,8 @@ Notes:
|
|||||||
- `collect` keeps all queued requests (subject to `cap`).
|
- `collect` keeps all queued requests (subject to `cap`).
|
||||||
- `followup` keeps at most one pending item while a request is active; newer followups replace older pending items.
|
- `followup` keeps at most one pending item while a request is active; newer followups replace older pending items.
|
||||||
- `steer` and `steer_backlog` replace pending backlog with the newest request while one is active.
|
- `steer` and `steer_backlog` replace pending backlog with the newest request while one is active.
|
||||||
- `interrupt` uses steer-backlog queueing behavior; active work still requires `agent.cancel` for best-effort cancellation.
|
- `interrupt` uses steer-backlog queueing behavior and now also requests active-run cancellation when a newer request arrives.
|
||||||
- `interrupt` currently does not force-stop already running work; use `agent.cancel` for active cancellation.
|
- Active cancellation remains best-effort and stops at agent safe points; use `agent.cancel` for explicit user-triggered cancellation control.
|
||||||
- `debounce_ms` delays the next queued execution, helping collapse bursty same-session traffic.
|
- `debounce_ms` delays the next queued execution, helping collapse bursty same-session traffic.
|
||||||
- `summarize_overflow` enables richer overflow error messages and payload metadata.
|
- `summarize_overflow` enables richer overflow error messages and payload metadata.
|
||||||
- On overflow, `drop_old` evicts the oldest pending request, `drop_new` rejects the new request.
|
- On overflow, `drop_old` evicts the oldest pending request, `drop_new` rejects the new request.
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ The gateway serialises agent work **per session**, not per WebSocket connection:
|
|||||||
|
|
||||||
- Requests that target the same `sessionId` run one-at-a-time (FIFO) in a per-session lane.
|
- Requests that target the same `sessionId` run one-at-a-time (FIFO) in a per-session lane.
|
||||||
- Requests for different sessions can run in parallel.
|
- Requests for different sessions can run in parallel.
|
||||||
- Lane policy is configurable (`collect`, `followup`, `steer_backlog`, `interrupt`) with per-channel and per-session overrides.
|
- Lane policy is configurable (`collect`, `followup`, `steer`, `steer_backlog`, `interrupt`) with per-channel and per-session overrides.
|
||||||
- Session-local overrides can be managed at runtime via `agent.send` commands: `/queue`, `/queue set ...`, `/queue reset`.
|
- Session-local overrides can be managed at runtime via `agent.send` commands: `/queue`, `/queue set ...`, `/queue reset`.
|
||||||
|
|
||||||
This is implemented via a per-lane queue (`LaneQueue`) in the gateway server, and used by `agent.send` and `agent.cancel`.
|
This is implemented via a per-lane queue (`LaneQueue`) in the gateway server, and used by `agent.send` and `agent.cancel`.
|
||||||
@@ -69,6 +69,8 @@ sequenceDiagram
|
|||||||
G-->>C: result.cancelled=true/false
|
G-->>C: result.cancelled=true/false
|
||||||
```
|
```
|
||||||
|
|
||||||
|
`interrupt` queue mode also requests active-run cancellation when a newer request is enqueued for the same session lane. Cancellation still completes at agent/tool-loop safe points.
|
||||||
|
|
||||||
### Base URL
|
### Base URL
|
||||||
|
|
||||||
- WebSocket: `ws://localhost:18800` (or `wss://` if using TLS)
|
- WebSocket: `ws://localhost:18800` (or `wss://` if using TLS)
|
||||||
|
|||||||
+18
-2
@@ -5163,10 +5163,26 @@
|
|||||||
"docs/plans/state.json"
|
"docs/plans/state.json"
|
||||||
],
|
],
|
||||||
"test_status": "Docs-only change (no code paths affected)"
|
"test_status": "Docs-only change (no code paths affected)"
|
||||||
|
},
|
||||||
|
"queue-interrupt-active-cancel": {
|
||||||
|
"status": "completed",
|
||||||
|
"date": "2026-02-18",
|
||||||
|
"updated": "2026-02-18",
|
||||||
|
"summary": "Implemented Tier A queue-interrupt semantics hardening in gateway routing: interrupt mode now requests cancellation of active session work when a newer request arrives, while preserving best-effort safe-point cancellation. Added session-level cancellation API, handler integration, regression tests, and docs updates.",
|
||||||
|
"files_modified": [
|
||||||
|
"src/gateway/session-bridge.ts",
|
||||||
|
"src/gateway/session-bridge.test.ts",
|
||||||
|
"src/gateway/handlers/agent.ts",
|
||||||
|
"src/gateway/handlers/agent.test.ts",
|
||||||
|
"README.md",
|
||||||
|
"docs/api/PROTOCOL.md",
|
||||||
|
"docs/plans/state.json"
|
||||||
|
],
|
||||||
|
"test_status": "pnpm test:run src/gateway/session-bridge.test.ts src/gateway/handlers/agent.test.ts + pnpm typecheck passing"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"overall_progress": {
|
"overall_progress": {
|
||||||
"total_test_count": 1900,
|
"total_test_count": 1903,
|
||||||
"all_tests_passing": true,
|
"all_tests_passing": true,
|
||||||
"p0_completion": "3/3 (100%)",
|
"p0_completion": "3/3 (100%)",
|
||||||
"p1_completion": "4/4 (100%)",
|
"p1_completion": "4/4 (100%)",
|
||||||
@@ -5186,7 +5202,7 @@
|
|||||||
"gmail_auth_cli": "flynn gmail-auth command implemented with OAuth2 flow, doctor check, config routed to Telegram",
|
"gmail_auth_cli": "flynn gmail-auth command implemented with OAuth2 flow, doctor check, config routed to Telegram",
|
||||||
"native_audio_support": "completed — smart routing for native audio (Gemini/OpenAI/GitHub) vs Whisper transcription fallback",
|
"native_audio_support": "completed — smart routing for native audio (Gemini/OpenAI/GitHub) vs Whisper transcription fallback",
|
||||||
"remaining_phases_completion": "Phase 1: 3/3 (100%) — context levels, command registry, memory structure. Phase 2: 3/3 (100%) — component registry, confidence routing, history index. Phase 3: 2/2 (100%) — adaptive memory/compaction, truthfulness/autonomy hardening",
|
"remaining_phases_completion": "Phase 1: 3/3 (100%) — context levels, command registry, memory structure. Phase 2: 3/3 (100%) — component registry, confidence routing, history index. Phase 3: 2/2 (100%) — adaptive memory/compaction, truthfulness/autonomy hardening",
|
||||||
"next_up": "Monitor production feedback for bidirectional session-transfer command behavior across Telegram/TUI and prioritize the next post-parity reliability/capability slice"
|
"next_up": "Implement Tier A2 from the OpenClaw roadmap: daily memory-log cadence + proactive extraction beyond compaction-only paths"
|
||||||
},
|
},
|
||||||
"soul_md_and_cron_create": {
|
"soul_md_and_cron_create": {
|
||||||
"date": "2026-02-11",
|
"date": "2026-02-11",
|
||||||
|
|||||||
@@ -367,4 +367,64 @@ describe('createAgentHandlers queue policy resolution', () => {
|
|||||||
expect((event.data as { code: number }).code).toBe(3);
|
expect((event.data as { code: number }).code).toBe(3);
|
||||||
expect((event.data as { queue?: { code: string } }).queue?.code).toBe('overflow');
|
expect((event.data as { queue?: { code: string } }).queue?.code).toBe('overflow');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('requests active-session cancellation when interrupt mode receives a new message', async () => {
|
||||||
|
const mockAgent = {
|
||||||
|
process: vi.fn(async () => 'ok'),
|
||||||
|
consumeContextAlert: vi.fn(() => undefined),
|
||||||
|
getContextBudget: vi.fn(() => ({
|
||||||
|
estimatedTokens: 0,
|
||||||
|
contextWindow: 128000,
|
||||||
|
remainingTokens: 128000,
|
||||||
|
usagePct: 0,
|
||||||
|
thresholdPct: 80,
|
||||||
|
thresholdTokens: 102400,
|
||||||
|
shouldCompact: false,
|
||||||
|
})),
|
||||||
|
getUsage: vi.fn(() => ({
|
||||||
|
primary: { inputTokens: 0, outputTokens: 0, calls: 0 },
|
||||||
|
delegation: {},
|
||||||
|
total: { inputTokens: 0, outputTokens: 0, calls: 0, estimatedCost: 0 },
|
||||||
|
})),
|
||||||
|
getModelTier: vi.fn(() => 'default'),
|
||||||
|
setModelTier: vi.fn(),
|
||||||
|
compact: vi.fn(async () => null),
|
||||||
|
reset: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
const sessionBridge = {
|
||||||
|
getAgent: vi.fn(() => mockAgent),
|
||||||
|
getSessionId: vi.fn(() => 'ws:s1'),
|
||||||
|
setBusy: vi.fn(),
|
||||||
|
setOnToolUse: vi.fn(),
|
||||||
|
isBusy: vi.fn(() => false),
|
||||||
|
cancelSession: vi.fn(() => true),
|
||||||
|
cancel: vi.fn(() => true),
|
||||||
|
};
|
||||||
|
|
||||||
|
const laneQueue = {
|
||||||
|
enqueue: vi.fn(async (_laneId: string, work: () => Promise<unknown>) => work()),
|
||||||
|
cancel: vi.fn(),
|
||||||
|
isProcessing: vi.fn(() => true),
|
||||||
|
} as unknown as LaneQueue;
|
||||||
|
|
||||||
|
const handlers = createAgentHandlers({
|
||||||
|
sessionBridge: sessionBridge as unknown as AgentHandlerDeps['sessionBridge'],
|
||||||
|
laneQueue,
|
||||||
|
resolveQueuePolicy: vi.fn(() => ({ mode: 'interrupt' as const })),
|
||||||
|
});
|
||||||
|
|
||||||
|
const sent: OutboundMessage[] = [];
|
||||||
|
const send = vi.fn((msg: OutboundMessage) => sent.push(msg));
|
||||||
|
|
||||||
|
await handlers['agent.send']({
|
||||||
|
id: 7,
|
||||||
|
method: 'agent.send',
|
||||||
|
params: { message: 'newest', connectionId: 'conn-1' },
|
||||||
|
}, send);
|
||||||
|
|
||||||
|
expect(sessionBridge.cancelSession).toHaveBeenCalledWith('ws:s1');
|
||||||
|
expect(sessionBridge.cancel).not.toHaveBeenCalled();
|
||||||
|
expect((sent[0] as GatewayEvent).event).toBe('done');
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -58,6 +58,20 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
|
|||||||
const laneId = sessionId ?? connectionId;
|
const laneId = sessionId ?? connectionId;
|
||||||
const channel = sessionId?.split(':', 1)[0] ?? 'ws';
|
const channel = sessionId?.split(':', 1)[0] ?? 'ws';
|
||||||
const resolvedPolicy = deps.resolveQueuePolicy?.({ laneId, sessionId, connectionId, channel });
|
const resolvedPolicy = deps.resolveQueuePolicy?.({ laneId, sessionId, connectionId, channel });
|
||||||
|
const laneQueueWithProcessing = deps.laneQueue as LaneQueue & { isProcessing?: (lane: string) => boolean };
|
||||||
|
const laneIsProcessing = typeof laneQueueWithProcessing.isProcessing === 'function'
|
||||||
|
? laneQueueWithProcessing.isProcessing(laneId)
|
||||||
|
: false;
|
||||||
|
|
||||||
|
// Interrupt mode should preempt active work when a newer request arrives.
|
||||||
|
// LaneQueue itself only rejects queued entries, so we also request agent cancellation.
|
||||||
|
if (resolvedPolicy?.mode === 'interrupt' && laneIsProcessing) {
|
||||||
|
if (sessionId) {
|
||||||
|
deps.sessionBridge.cancelSession(sessionId);
|
||||||
|
} else {
|
||||||
|
deps.sessionBridge.cancel(connectionId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Enqueue the work — if the lane is idle it runs immediately,
|
// Enqueue the work — if the lane is idle it runs immediately,
|
||||||
// otherwise it waits for earlier requests on the same session to finish.
|
// otherwise it waits for earlier requests on the same session to finish.
|
||||||
|
|||||||
@@ -185,6 +185,30 @@ describe('SessionBridge', () => {
|
|||||||
expect(cancelSpy).toHaveBeenCalledTimes(1);
|
expect(cancelSpy).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('cancelSession requests cancellation when session has a busy connection', () => {
|
||||||
|
const bridge = createBridge();
|
||||||
|
bridge.connect('conn-1');
|
||||||
|
bridge.connect('conn-2');
|
||||||
|
bridge.switchSession('conn-2', 'ws:conn-1');
|
||||||
|
|
||||||
|
const agent = bridge.getAgent('conn-1');
|
||||||
|
if (!agent) {
|
||||||
|
throw new Error('Expected agent for conn-1');
|
||||||
|
}
|
||||||
|
const cancelSpy = vi.spyOn(agent, 'cancel');
|
||||||
|
|
||||||
|
bridge.setBusy('conn-2', true);
|
||||||
|
expect(bridge.cancelSession('ws:conn-1')).toBe(true);
|
||||||
|
expect(cancelSpy).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('cancelSession returns false when no active operation exists', () => {
|
||||||
|
const bridge = createBridge();
|
||||||
|
bridge.connect('conn-1');
|
||||||
|
|
||||||
|
expect(bridge.cancelSession('ws:conn-1')).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
it('switchSession changes session for a connection', () => {
|
it('switchSession changes session for a connection', () => {
|
||||||
const bridge = createBridge();
|
const bridge = createBridge();
|
||||||
bridge.connect('conn-1');
|
bridge.connect('conn-1');
|
||||||
|
|||||||
@@ -132,6 +132,30 @@ export class SessionBridge {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Request cancellation for the active operation in a session.
|
||||||
|
* Returns true if at least one connection in the session is currently busy.
|
||||||
|
*/
|
||||||
|
cancelSession(sessionId: string): boolean {
|
||||||
|
const clients = Array.from(this.clients.values()).filter((client) => client.sessionId === sessionId);
|
||||||
|
if (clients.length === 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const hasBusyClient = clients.some((client) => client.busy);
|
||||||
|
if (!hasBusyClient) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const agent = this.agents.get(sessionId);
|
||||||
|
if (!agent) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
agent.cancel();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/** Set onToolUse callback for a connection's agent. */
|
/** Set onToolUse callback for a connection's agent. */
|
||||||
setOnToolUse(connectionId: string, callback: ((event: ToolUseEvent) => void) | undefined): void {
|
setOnToolUse(connectionId: string, callback: ((event: ToolUseEvent) => void) | undefined): void {
|
||||||
const client = this.clients.get(connectionId);
|
const client = this.clients.get(connectionId);
|
||||||
|
|||||||
Reference in New Issue
Block a user