From 46099664f0f00e995880abf947612394f8fcc293 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Fri, 13 Feb 2026 08:51:14 -0800 Subject: [PATCH] feat(gateway): wire safe-point runtime cancellation for agent.cancel --- src/backends/native/agent.test.ts | 37 +++++++++++ src/backends/native/agent.ts | 96 +++++++++++++++++++++------ src/backends/native/orchestrator.ts | 10 +++ src/gateway/handlers/agent.ts | 21 ++++-- src/gateway/handlers/handlers.test.ts | 15 ++++- src/gateway/session-bridge.test.ts | 18 +++++ src/gateway/session-bridge.ts | 11 +++ 7 files changed, 182 insertions(+), 26 deletions(-) diff --git a/src/backends/native/agent.test.ts b/src/backends/native/agent.test.ts index b8d77ef..c8dd997 100644 --- a/src/backends/native/agent.test.ts +++ b/src/backends/native/agent.test.ts @@ -73,6 +73,43 @@ describe('NativeAgent', () => { expect(mockSession.addMessage).toHaveBeenNthCalledWith(1, { role: 'user', content: 'Hi' }); expect(mockSession.addMessage).toHaveBeenNthCalledWith(2, { role: 'assistant', content: 'Hello!' }); }); + + it('supports cancellation during single-turn model wait', async () => { + let release!: () => void; + const blocked = new Promise((resolve) => { + release = resolve; + }); + + const mockClient: ModelClient = { + chat: vi.fn(async () => { + await blocked; + return { + content: 'Late response', + stopReason: 'end_turn', + usage: { inputTokens: 10, outputTokens: 5 }, + } satisfies ChatResponse; + }), + }; + + const agent = new NativeAgent({ + modelClient: mockClient, + systemPrompt: 'You are helpful.', + }); + + const pending = agent.process('Please wait'); + await new Promise((resolve) => queueMicrotask(resolve)); + expect(agent.isCancellable()).toBe(true); + + agent.cancel(); + release(); + + const response = await pending; + expect(response).toBe('Operation cancelled by user.'); + expect(agent.isCancellable()).toBe(false); + + const history = agent.getHistory(); + expect(history[history.length - 1]).toEqual({ role: 'assistant', content: 'Operation cancelled by user.' }); + }); }); // Simple test tool diff --git a/src/backends/native/agent.ts b/src/backends/native/agent.ts index 26c334d..ff021bc 100644 --- a/src/backends/native/agent.ts +++ b/src/backends/native/agent.ts @@ -53,6 +53,8 @@ export class NativeAgent { private _attachmentCollector?: OutboundAttachmentCollector; private _thinking: boolean = false; private _lastToolFingerprint?: string; + private _cancelRequested = false; + private _runInProgress = false; constructor(config: NativeAgentConfig) { this.modelClient = config.modelClient; @@ -71,31 +73,48 @@ export class NativeAgent { } async process(userMessage: string, attachments?: Attachment[]): Promise { + this._cancelRequested = false; + this._runInProgress = true; + // Detect and strip !!think prefix for per-message thinking mode - if (userMessage.startsWith('!!think ') || userMessage === '!!think') { - this._thinking = true; - userMessage = userMessage.replace(/^!!think\s*/, '').trim() || 'Think about this.'; - } else { - this._thinking = false; + try { + if (userMessage.startsWith('!!think ') || userMessage === '!!think') { + this._thinking = true; + userMessage = userMessage.replace(/^!!think\s*/, '').trim() || 'Think about this.'; + } else { + this._thinking = false; + } + + const userMsg = buildUserMessage(userMessage, attachments); + + if (this.session) { + this.session.addMessage(userMsg); + } else { + this.inMemoryHistory.push(userMsg); + } + + // If no tools configured, use the simple single-turn path + if (!this.toolRegistry || !this.toolExecutor) { + return await this.singleTurn(); + } + + return await this.toolLoop(); + } catch (error) { + if (this.isAbortError(error)) { + const cancelledMsg = 'Operation cancelled by user.'; + this.addToHistory({ role: 'assistant', content: cancelledMsg }); + return cancelledMsg; + } + throw error; + } finally { + this._runInProgress = false; + this._cancelRequested = false; } - - const userMsg = buildUserMessage(userMessage, attachments); - - if (this.session) { - this.session.addMessage(userMsg); - } else { - this.inMemoryHistory.push(userMsg); - } - - // If no tools configured, use the simple single-turn path - if (!this.toolRegistry || !this.toolExecutor) { - return this.singleTurn(); - } - - return this.toolLoop(); } private async singleTurn(): Promise { + this.throwIfCancelled(); + const request: ChatRequest = { messages: this.history, system: this.systemPrompt, @@ -103,6 +122,7 @@ export class NativeAgent { }; const response = await this.chatWithRouter(request); + this.throwIfCancelled(); this._totalUsage.inputTokens += response.usage.inputTokens; this._totalUsage.outputTokens += response.usage.outputTokens; @@ -159,6 +179,8 @@ export class NativeAgent { for (let iteration = 0; iteration < this.maxIterations; iteration++) { try { + this.throwIfCancelled(); + // Build request — cast loopMessages to Message[] because the underlying // model client will pass them through to the API which accepts structured content. const request = { @@ -169,6 +191,7 @@ export class NativeAgent { }; const response = await this.chatWithRouter(request); + this.throwIfCancelled(); this._totalUsage.inputTokens += response.usage.inputTokens; this._totalUsage.outputTokens += response.usage.outputTokens; @@ -234,6 +257,8 @@ export class NativeAgent { const toolResultBlocks: unknown[] = []; lastToolResults = []; for (const tc of toolCalls) { + this.throwIfCancelled(); + const internalName = this.toolRegistry!.getByApiName(tc.name)?.name ?? tc.name; this.onToolUse?.({ type: 'start', tool: internalName, args: tc.args }); @@ -280,6 +305,13 @@ export class NativeAgent { return breakMsg; } } catch (error) { + if (this.isAbortError(error)) { + const cancelledMsg = 'Operation cancelled by user.'; + const assistantMsg: Message = { role: 'assistant', content: cancelledMsg }; + this.addToHistory(assistantMsg); + return cancelledMsg; + } + const errorMsg = `Error in tool loop (iteration ${iteration + 1}): ${error instanceof Error ? error.message : String(error)}`; const assistantMsg: Message = { role: 'assistant', content: errorMsg }; this.addToHistory(assistantMsg); @@ -363,4 +395,28 @@ export class NativeAgent { getAttachmentCollector(): OutboundAttachmentCollector | undefined { return this._attachmentCollector; } + + cancel(): void { + if (this._runInProgress) { + this._cancelRequested = true; + } + } + + isCancellable(): boolean { + return this._runInProgress; + } + + private throwIfCancelled(): void { + if (!this._cancelRequested) { + return; + } + + const error = new Error('Operation cancelled by user.'); + error.name = 'AbortError'; + throw error; + } + + private isAbortError(error: unknown): boolean { + return error instanceof Error && error.name === 'AbortError'; + } } diff --git a/src/backends/native/orchestrator.ts b/src/backends/native/orchestrator.ts index dd3e5e1..98bf396 100644 --- a/src/backends/native/orchestrator.ts +++ b/src/backends/native/orchestrator.ts @@ -304,6 +304,16 @@ export class AgentOrchestrator { this._agent.setOnToolUse(callback); } + /** Request cancellation for the current primary-agent operation. */ + cancel(): void { + this._agent.cancel(); + } + + /** Whether the primary agent currently has an in-flight operation. */ + isCancellable(): boolean { + return this._agent.isCancellable(); + } + // ── Usage & config accessors ────────────────────────────────────── /** diff --git a/src/gateway/handlers/agent.ts b/src/gateway/handlers/agent.ts index ed7ebe3..d8e4316 100644 --- a/src/gateway/handlers/agent.ts +++ b/src/gateway/handlers/agent.ts @@ -182,8 +182,6 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { }, 'agent.cancel': async (request: GatewayRequest): Promise => { - // Cancel is a placeholder — proper cancellation requires abort controller support in NativeAgent. - // For now, just report whether the agent was busy. const params = request.params as { connectionId?: string } | undefined; const connectionId = params?.connectionId as string; @@ -191,9 +189,22 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { return makeError(request.id, ErrorCode.InvalidRequest, 'connectionId is required'); } - const wasBusy = deps.sessionBridge.isBusy(connectionId); - // TODO: Wire AbortController into NativeAgent for actual cancellation - return { id: request.id, result: { cancelled: wasBusy } }; + const sessionId = deps.sessionBridge.getSessionId(connectionId); + const laneId = sessionId ?? connectionId; + + // Clear any queued (not-yet-started) work first. + deps.laneQueue.cancel(laneId); + + const cancelled = deps.sessionBridge.cancel(connectionId); + return { + id: request.id, + result: { + cancelled, + message: cancelled + ? 'Cancellation requested. The active operation will stop at the next safe point.' + : 'No active operation to cancel.', + }, + }; }, }; } diff --git a/src/gateway/handlers/handlers.test.ts b/src/gateway/handlers/handlers.test.ts index 800bf7d..2c661bf 100644 --- a/src/gateway/handlers/handlers.test.ts +++ b/src/gateway/handlers/handlers.test.ts @@ -251,6 +251,7 @@ describe('agent handlers', () => { getAgent: vi.fn(() => mockAgent), getSessionId: vi.fn(() => 'ws:conn-1'), isBusy: vi.fn(() => false), + cancel: vi.fn(() => false), setBusy: vi.fn(), setOnToolUse: vi.fn(), }; @@ -265,6 +266,7 @@ describe('agent handlers', () => { beforeEach(() => { vi.clearAllMocks(); mockBridge.isBusy.mockReturnValue(false); + mockBridge.cancel.mockReturnValue(false); mockBridge.getAgent.mockReturnValue(mockAgent); mockAgent.process.mockResolvedValue('response text'); }); @@ -399,11 +401,22 @@ describe('agent handlers', () => { }); it('agent.cancel returns cancelled state', async () => { - mockBridge.isBusy.mockReturnValue(true); + mockBridge.cancel.mockReturnValue(true); const req: GatewayRequest = { id: 7, method: 'agent.cancel', params: { connectionId: 'conn-1' } }; const result = await handlers['agent.cancel'](req) as GatewayResponse; expect((result.result as any).cancelled).toBe(true); + expect((result.result as any).message).toContain('Cancellation requested'); + expect(mockBridge.cancel).toHaveBeenCalledWith('conn-1'); + }); + + it('agent.cancel returns not-cancelled when no active operation exists', async () => { + mockBridge.cancel.mockReturnValue(false); + const req: GatewayRequest = { id: 8, method: 'agent.cancel', params: { connectionId: 'conn-1' } }; + const result = await handlers['agent.cancel'](req) as GatewayResponse; + + expect((result.result as any).cancelled).toBe(false); + expect((result.result as any).message).toContain('No active operation'); }); }); diff --git a/src/gateway/session-bridge.test.ts b/src/gateway/session-bridge.test.ts index b89e2cb..1f342c2 100644 --- a/src/gateway/session-bridge.test.ts +++ b/src/gateway/session-bridge.test.ts @@ -113,6 +113,24 @@ describe('SessionBridge', () => { expect(bridge.isBusy('conn-1')).toBe(false); }); + it('cancel returns false when no active operation exists', () => { + const bridge = createBridge(); + bridge.connect('conn-1'); + + expect(bridge.cancel('conn-1')).toBe(false); + }); + + it('cancel requests cancellation when connection is busy', () => { + const bridge = createBridge(); + bridge.connect('conn-1'); + const agent = bridge.getAgent('conn-1'); + const cancelSpy = vi.spyOn(agent!, 'cancel'); + + bridge.setBusy('conn-1', true); + expect(bridge.cancel('conn-1')).toBe(true); + expect(cancelSpy).toHaveBeenCalledTimes(1); + }); + it('switchSession changes session for a connection', () => { const bridge = createBridge(); bridge.connect('conn-1'); diff --git a/src/gateway/session-bridge.ts b/src/gateway/session-bridge.ts index ed07907..cfd8555 100644 --- a/src/gateway/session-bridge.ts +++ b/src/gateway/session-bridge.ts @@ -97,6 +97,17 @@ export class SessionBridge { if (client) {client.busy = busy;} } + /** Request cancellation for the current operation on a connection's agent. */ + cancel(connectionId: string): boolean { + const client = this.clients.get(connectionId); + if (!client || !client.busy) { + return false; + } + + client.agent.cancel(); + return true; + } + /** Set onToolUse callback for a connection's agent. */ setOnToolUse(connectionId: string, callback: ((event: ToolUseEvent) => void) | undefined): void { const client = this.clients.get(connectionId);