diff --git a/README.md b/README.md index e23c54a..0d4bbba 100644 --- a/README.md +++ b/README.md @@ -1201,8 +1201,8 @@ Notes: - `collect` keeps all queued requests (subject to `cap`). - `followup` keeps at most one pending item while a request is active; newer followups replace older pending items. - `steer` and `steer_backlog` replace pending backlog with the newest request while one is active. -- `interrupt` uses steer-backlog queueing behavior; active work still requires `agent.cancel` for best-effort cancellation. -- `interrupt` currently does not force-stop already running work; use `agent.cancel` for active cancellation. +- `interrupt` uses steer-backlog queueing behavior and now also requests active-run cancellation when a newer request arrives. +- Active cancellation remains best-effort and stops at agent safe points; use `agent.cancel` for explicit user-triggered cancellation control. - `debounce_ms` delays the next queued execution, helping collapse bursty same-session traffic. - `summarize_overflow` enables richer overflow error messages and payload metadata. - On overflow, `drop_old` evicts the oldest pending request, `drop_new` rejects the new request. diff --git a/docs/api/PROTOCOL.md b/docs/api/PROTOCOL.md index 3be99bd..2f3fd0a 100644 --- a/docs/api/PROTOCOL.md +++ b/docs/api/PROTOCOL.md @@ -34,7 +34,7 @@ The gateway serialises agent work **per session**, not per WebSocket connection: - Requests that target the same `sessionId` run one-at-a-time (FIFO) in a per-session lane. - Requests for different sessions can run in parallel. -- Lane policy is configurable (`collect`, `followup`, `steer_backlog`, `interrupt`) with per-channel and per-session overrides. +- Lane policy is configurable (`collect`, `followup`, `steer`, `steer_backlog`, `interrupt`) with per-channel and per-session overrides. - Session-local overrides can be managed at runtime via `agent.send` commands: `/queue`, `/queue set ...`, `/queue reset`. This is implemented via a per-lane queue (`LaneQueue`) in the gateway server, and used by `agent.send` and `agent.cancel`. @@ -69,6 +69,8 @@ sequenceDiagram G-->>C: result.cancelled=true/false ``` +`interrupt` queue mode also requests active-run cancellation when a newer request is enqueued for the same session lane. Cancellation still completes at agent/tool-loop safe points. + ### Base URL - WebSocket: `ws://localhost:18800` (or `wss://` if using TLS) diff --git a/docs/plans/state.json b/docs/plans/state.json index 745cc52..728a032 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -5163,10 +5163,26 @@ "docs/plans/state.json" ], "test_status": "Docs-only change (no code paths affected)" + }, + "queue-interrupt-active-cancel": { + "status": "completed", + "date": "2026-02-18", + "updated": "2026-02-18", + "summary": "Implemented Tier A queue-interrupt semantics hardening in gateway routing: interrupt mode now requests cancellation of active session work when a newer request arrives, while preserving best-effort safe-point cancellation. Added session-level cancellation API, handler integration, regression tests, and docs updates.", + "files_modified": [ + "src/gateway/session-bridge.ts", + "src/gateway/session-bridge.test.ts", + "src/gateway/handlers/agent.ts", + "src/gateway/handlers/agent.test.ts", + "README.md", + "docs/api/PROTOCOL.md", + "docs/plans/state.json" + ], + "test_status": "pnpm test:run src/gateway/session-bridge.test.ts src/gateway/handlers/agent.test.ts + pnpm typecheck passing" } }, "overall_progress": { - "total_test_count": 1900, + "total_test_count": 1903, "all_tests_passing": true, "p0_completion": "3/3 (100%)", "p1_completion": "4/4 (100%)", @@ -5186,7 +5202,7 @@ "gmail_auth_cli": "flynn gmail-auth command implemented with OAuth2 flow, doctor check, config routed to Telegram", "native_audio_support": "completed — smart routing for native audio (Gemini/OpenAI/GitHub) vs Whisper transcription fallback", "remaining_phases_completion": "Phase 1: 3/3 (100%) — context levels, command registry, memory structure. Phase 2: 3/3 (100%) — component registry, confidence routing, history index. Phase 3: 2/2 (100%) — adaptive memory/compaction, truthfulness/autonomy hardening", - "next_up": "Monitor production feedback for bidirectional session-transfer command behavior across Telegram/TUI and prioritize the next post-parity reliability/capability slice" + "next_up": "Implement Tier A2 from the OpenClaw roadmap: daily memory-log cadence + proactive extraction beyond compaction-only paths" }, "soul_md_and_cron_create": { "date": "2026-02-11", diff --git a/src/gateway/handlers/agent.test.ts b/src/gateway/handlers/agent.test.ts index 4b21b63..4add825 100644 --- a/src/gateway/handlers/agent.test.ts +++ b/src/gateway/handlers/agent.test.ts @@ -367,4 +367,64 @@ describe('createAgentHandlers queue policy resolution', () => { expect((event.data as { code: number }).code).toBe(3); expect((event.data as { queue?: { code: string } }).queue?.code).toBe('overflow'); }); + + it('requests active-session cancellation when interrupt mode receives a new message', async () => { + const mockAgent = { + process: vi.fn(async () => 'ok'), + consumeContextAlert: vi.fn(() => undefined), + getContextBudget: vi.fn(() => ({ + estimatedTokens: 0, + contextWindow: 128000, + remainingTokens: 128000, + usagePct: 0, + thresholdPct: 80, + thresholdTokens: 102400, + shouldCompact: false, + })), + getUsage: vi.fn(() => ({ + primary: { inputTokens: 0, outputTokens: 0, calls: 0 }, + delegation: {}, + total: { inputTokens: 0, outputTokens: 0, calls: 0, estimatedCost: 0 }, + })), + getModelTier: vi.fn(() => 'default'), + setModelTier: vi.fn(), + compact: vi.fn(async () => null), + reset: vi.fn(), + }; + + const sessionBridge = { + getAgent: vi.fn(() => mockAgent), + getSessionId: vi.fn(() => 'ws:s1'), + setBusy: vi.fn(), + setOnToolUse: vi.fn(), + isBusy: vi.fn(() => false), + cancelSession: vi.fn(() => true), + cancel: vi.fn(() => true), + }; + + const laneQueue = { + enqueue: vi.fn(async (_laneId: string, work: () => Promise) => work()), + cancel: vi.fn(), + isProcessing: vi.fn(() => true), + } as unknown as LaneQueue; + + const handlers = createAgentHandlers({ + sessionBridge: sessionBridge as unknown as AgentHandlerDeps['sessionBridge'], + laneQueue, + resolveQueuePolicy: vi.fn(() => ({ mode: 'interrupt' as const })), + }); + + const sent: OutboundMessage[] = []; + const send = vi.fn((msg: OutboundMessage) => sent.push(msg)); + + await handlers['agent.send']({ + id: 7, + method: 'agent.send', + params: { message: 'newest', connectionId: 'conn-1' }, + }, send); + + expect(sessionBridge.cancelSession).toHaveBeenCalledWith('ws:s1'); + expect(sessionBridge.cancel).not.toHaveBeenCalled(); + expect((sent[0] as GatewayEvent).event).toBe('done'); + }); }); diff --git a/src/gateway/handlers/agent.ts b/src/gateway/handlers/agent.ts index 1de113f..a273fa3 100644 --- a/src/gateway/handlers/agent.ts +++ b/src/gateway/handlers/agent.ts @@ -58,6 +58,20 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { const laneId = sessionId ?? connectionId; const channel = sessionId?.split(':', 1)[0] ?? 'ws'; const resolvedPolicy = deps.resolveQueuePolicy?.({ laneId, sessionId, connectionId, channel }); + const laneQueueWithProcessing = deps.laneQueue as LaneQueue & { isProcessing?: (lane: string) => boolean }; + const laneIsProcessing = typeof laneQueueWithProcessing.isProcessing === 'function' + ? laneQueueWithProcessing.isProcessing(laneId) + : false; + + // Interrupt mode should preempt active work when a newer request arrives. + // LaneQueue itself only rejects queued entries, so we also request agent cancellation. + if (resolvedPolicy?.mode === 'interrupt' && laneIsProcessing) { + if (sessionId) { + deps.sessionBridge.cancelSession(sessionId); + } else { + deps.sessionBridge.cancel(connectionId); + } + } // Enqueue the work — if the lane is idle it runs immediately, // otherwise it waits for earlier requests on the same session to finish. diff --git a/src/gateway/session-bridge.test.ts b/src/gateway/session-bridge.test.ts index 7acc4f1..3da5209 100644 --- a/src/gateway/session-bridge.test.ts +++ b/src/gateway/session-bridge.test.ts @@ -185,6 +185,30 @@ describe('SessionBridge', () => { expect(cancelSpy).toHaveBeenCalledTimes(1); }); + it('cancelSession requests cancellation when session has a busy connection', () => { + const bridge = createBridge(); + bridge.connect('conn-1'); + bridge.connect('conn-2'); + bridge.switchSession('conn-2', 'ws:conn-1'); + + const agent = bridge.getAgent('conn-1'); + if (!agent) { + throw new Error('Expected agent for conn-1'); + } + const cancelSpy = vi.spyOn(agent, 'cancel'); + + bridge.setBusy('conn-2', true); + expect(bridge.cancelSession('ws:conn-1')).toBe(true); + expect(cancelSpy).toHaveBeenCalledTimes(1); + }); + + it('cancelSession returns false when no active operation exists', () => { + const bridge = createBridge(); + bridge.connect('conn-1'); + + expect(bridge.cancelSession('ws:conn-1')).toBe(false); + }); + it('switchSession changes session for a connection', () => { const bridge = createBridge(); bridge.connect('conn-1'); diff --git a/src/gateway/session-bridge.ts b/src/gateway/session-bridge.ts index 1e73acb..d042a31 100644 --- a/src/gateway/session-bridge.ts +++ b/src/gateway/session-bridge.ts @@ -132,6 +132,30 @@ export class SessionBridge { return true; } + /** + * Request cancellation for the active operation in a session. + * Returns true if at least one connection in the session is currently busy. + */ + cancelSession(sessionId: string): boolean { + const clients = Array.from(this.clients.values()).filter((client) => client.sessionId === sessionId); + if (clients.length === 0) { + return false; + } + + const hasBusyClient = clients.some((client) => client.busy); + if (!hasBusyClient) { + return false; + } + + const agent = this.agents.get(sessionId); + if (!agent) { + return false; + } + + agent.cancel(); + return true; + } + /** Set onToolUse callback for a connection's agent. */ setOnToolUse(connectionId: string, callback: ((event: ToolUseEvent) => void) | undefined): void { const client = this.clients.get(connectionId);