diff --git a/README.md b/README.md index ce316bf..2cf658a 100644 --- a/README.md +++ b/README.md @@ -1287,6 +1287,7 @@ Notes: - `followup` keeps at most one pending item while a request is active; newer followups replace older pending items. - `steer` and `steer_backlog` replace pending backlog with the newest request while one is active. - `interrupt` uses steer-backlog queueing behavior and now also requests active-run cancellation when a newer request arrives. +- When interrupt preemption occurs, gateway emits a transient content notice to the requester and writes a `queue.preempt` audit event. - Active cancellation remains best-effort and stops at agent safe points; use `agent.cancel` for explicit user-triggered cancellation control. - `debounce_ms` delays the next queued execution, helping collapse bursty same-session traffic. - `summarize_overflow` enables richer overflow error messages and payload metadata. diff --git a/docs/api/PROTOCOL.md b/docs/api/PROTOCOL.md index ce6d0f7..34a6c36 100644 --- a/docs/api/PROTOCOL.md +++ b/docs/api/PROTOCOL.md @@ -69,7 +69,7 @@ sequenceDiagram G-->>C: result.cancelled=true/false ``` -`interrupt` queue mode also requests active-run cancellation when a newer request is enqueued for the same session lane. Cancellation still completes at agent/tool-loop safe points. +`interrupt` queue mode also requests active-run cancellation when a newer request is enqueued for the same session lane. Cancellation still completes at agent/tool-loop safe points. When this preemption happens, the requester receives a transient `content` notice and the audit log records `queue.preempt`. ### Base URL diff --git a/docs/plans/2026-02-15-openclaw-gap-roadmap.md b/docs/plans/2026-02-15-openclaw-gap-roadmap.md index 8ad2733..33c10fd 100644 --- a/docs/plans/2026-02-15-openclaw-gap-roadmap.md +++ b/docs/plans/2026-02-15-openclaw-gap-roadmap.md @@ -332,8 +332,9 @@ These are substantial UX/ecosystem projects or highly platform-specific; defer u ## Suggested Next Execution Order -1) Queue/run-control polish (interrupt preemption telemetry + UX) -2) Daily memory continuity tuning (if continuity quality is still lacking) -3) Auth-profile expansion beyond API-key pools (if needed) +1) Daily memory continuity tuning (if continuity quality is still lacking) +2) Auth-profile expansion beyond API-key pools (if needed) +3) Additional run-control UX refinements only if interrupt behavior is still insufficient in production Note: API-key pool auth profile cooldown/backoff (`auth_profile_cooldown_ms`) shipped on 2026-02-19. +Note: Queue interrupt preemption telemetry/notice (`queue.preempt` + requester content hint) shipped on 2026-02-19. diff --git a/docs/plans/state.json b/docs/plans/state.json index b055e19..db40a5f 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -5793,6 +5793,23 @@ "docs/plans/state.json" ], "test_status": "pnpm test:run src/models/rotating.test.ts src/daemon/clientFactory.test.ts src/config/schema.test.ts + pnpm typecheck passing" + }, + "queue-interrupt-preemption-telemetry": { + "status": "completed", + "date": "2026-02-19", + "updated": "2026-02-19", + "summary": "Hardened interrupt queue mode visibility by emitting `queue.preempt` audit events when a newer request preempts an active run, and by sending a requester-facing content notice indicating that the previous in-flight run was cancelled before processing the latest message.", + "files_modified": [ + "src/audit/types.ts", + "src/audit/logger.ts", + "src/gateway/handlers/agent.ts", + "src/gateway/handlers/agent.test.ts", + "README.md", + "docs/api/PROTOCOL.md", + "docs/plans/2026-02-15-openclaw-gap-roadmap.md", + "docs/plans/state.json" + ], + "test_status": "pnpm test:run src/gateway/handlers/agent.test.ts src/models/rotating.test.ts src/daemon/clientFactory.test.ts src/config/schema.test.ts + pnpm typecheck passing" } }, "overall_progress": { diff --git a/src/audit/logger.ts b/src/audit/logger.ts index f69b1b7..8ecee59 100644 --- a/src/audit/logger.ts +++ b/src/audit/logger.ts @@ -19,6 +19,7 @@ import type { SessionCheckpointEvent, SessionAutoCompactEvent, UserActionEvent, + QueuePreemptEvent, BackendRouteEvent, BackendFallbackEvent, CronTriggerEvent, @@ -194,6 +195,11 @@ export class AuditLogger { this.write({ level: 'info', event_type: 'user.action', event: event as unknown as Record }); } + queuePreempt(event: QueuePreemptEvent): void { + if (!this.shouldLog('sessions', 'info')) {return;} + this.write({ level: 'info', event_type: 'queue.preempt', event: event as unknown as Record }); + } + backendRoute(event: BackendRouteEvent): void { if (!this.shouldLog('sessions', 'info')) {return;} this.write({ level: 'info', event_type: 'backend.route', event: event as unknown as Record }); diff --git a/src/audit/types.ts b/src/audit/types.ts index b29f69d..21d2e19 100644 --- a/src/audit/types.ts +++ b/src/audit/types.ts @@ -11,6 +11,7 @@ export type AuditEventType = | 'skills.installer.execution_blocked' | 'skills.installer.command_result' | 'skills.registry_install' // Session lifecycle | 'session.create' | 'session.message' | 'session.delete' | 'session.transfer' | 'session.compact' | 'session.checkpoint' | 'session.auto_compact' | 'user.action' + | 'queue.preempt' | 'backend.route' | 'backend.fallback' // Automation - Cron | 'cron.trigger' | 'cron.sent' | 'cron.add' | 'cron.remove' @@ -210,6 +211,16 @@ export interface UserActionEvent { command?: string; } +export interface QueuePreemptEvent { + session_id: string; + channel: string; + sender: string; + lane_id: string; + request_id: string; + mode: 'interrupt'; + cancelled_active_run: boolean; +} + export interface BackendRouteEvent { session_id: string; channel: string; diff --git a/src/gateway/handlers/agent.test.ts b/src/gateway/handlers/agent.test.ts index 7d5d01c..da97351 100644 --- a/src/gateway/handlers/agent.test.ts +++ b/src/gateway/handlers/agent.test.ts @@ -49,6 +49,7 @@ describe('createAgentHandlers command fast-path', () => { registerBuiltinCommands(commandRegistry); const mockAuditLogger = { userAction: vi.fn(), + queuePreempt: vi.fn(), }; const handlers = createAgentHandlers({ @@ -364,6 +365,16 @@ describe('createAgentHandlers command fast-path', () => { }); describe('createAgentHandlers queue policy resolution', () => { + const mockAuditLogger = { + userAction: vi.fn(), + queuePreempt: vi.fn(), + }; + + beforeEach(() => { + vi.clearAllMocks(); + initAuditLogger(mockAuditLogger as any); + }); + it('passes resolved per-request queue policy into lane enqueue', async () => { const mockAgent = { process: vi.fn(async () => 'ok'), @@ -554,6 +565,15 @@ describe('createAgentHandlers queue policy resolution', () => { expect(sessionBridge.cancelSession).toHaveBeenCalledWith('ws:s1'); expect(sessionBridge.cancel).not.toHaveBeenCalled(); - expect((sent[0] as GatewayEvent).event).toBe('done'); + expect(mockAuditLogger.queuePreempt).toHaveBeenCalledWith(expect.objectContaining({ + session_id: 'ws:s1', + lane_id: 'ws:s1', + request_id: '7', + mode: 'interrupt', + cancelled_active_run: true, + })); + expect((sent[0] as GatewayEvent).event).toBe('content'); + expect(((sent[0] as GatewayEvent).data as { text: string }).text).toContain('Interrupt mode'); + expect((sent[1] as GatewayEvent).event).toBe('done'); }); }); diff --git a/src/gateway/handlers/agent.ts b/src/gateway/handlers/agent.ts index 5227311..f4936b5 100644 --- a/src/gateway/handlers/agent.ts +++ b/src/gateway/handlers/agent.ts @@ -90,26 +90,41 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { const laneIsProcessing = typeof laneQueueWithProcessing.isProcessing === 'function' ? laneQueueWithProcessing.isProcessing(laneId) : false; + const requestId = request.id.toString(); + let interruptedPreviousRun = false; // Interrupt mode should preempt active work when a newer request arrives. // LaneQueue itself only rejects queued entries, so we also request agent cancellation. if (resolvedPolicy?.mode === 'interrupt' && laneIsProcessing) { - if (sessionId) { - deps.sessionBridge.cancelSession(sessionId); - } else { - deps.sessionBridge.cancel(connectionId); - } + const cancelled = sessionId + ? deps.sessionBridge.cancelSession(sessionId) + : deps.sessionBridge.cancel(connectionId); + interruptedPreviousRun = cancelled; + auditLogger?.queuePreempt?.({ + session_id: sessionId ?? `ws:${connectionId}`, + channel: 'ws', + sender: connectionId, + lane_id: laneId, + request_id: requestId, + mode: 'interrupt', + cancelled_active_run: cancelled, + }); } // Enqueue the work — if the lane is idle it runs immediately, // otherwise it waits for earlier requests on the same session to finish. - const requestId = request.id.toString(); deps.metrics?.startRequest(requestId, { sessionId: laneId, channel: 'ws' }); try { return await deps.laneQueue.enqueue(laneId, async () => { deps.sessionBridge.setBusy(connectionId, true); + if (interruptedPreviousRun) { + await send(makeEvent(request.id, 'content', { + text: 'Interrupt mode: cancelled the previous in-flight run and processing your latest message.', + })); + } + const commandInput = safeParams.metadata?.isCommand && typeof safeParams.metadata.command === 'string' ? `/${safeParams.metadata.command}${safeParams.metadata.commandArgs ? ` ${safeParams.metadata.commandArgs}` : ''}` : (safeParams.message ?? '');