feat(gateway): add interrupt preemption telemetry and requester notice
This commit is contained in:
@@ -1287,6 +1287,7 @@ Notes:
|
|||||||
- `followup` keeps at most one pending item while a request is active; newer followups replace older pending items.
|
- `followup` keeps at most one pending item while a request is active; newer followups replace older pending items.
|
||||||
- `steer` and `steer_backlog` replace pending backlog with the newest request while one is active.
|
- `steer` and `steer_backlog` replace pending backlog with the newest request while one is active.
|
||||||
- `interrupt` uses steer-backlog queueing behavior and now also requests active-run cancellation when a newer request arrives.
|
- `interrupt` uses steer-backlog queueing behavior and now also requests active-run cancellation when a newer request arrives.
|
||||||
|
- When interrupt preemption occurs, gateway emits a transient content notice to the requester and writes a `queue.preempt` audit event.
|
||||||
- Active cancellation remains best-effort and stops at agent safe points; use `agent.cancel` for explicit user-triggered cancellation control.
|
- Active cancellation remains best-effort and stops at agent safe points; use `agent.cancel` for explicit user-triggered cancellation control.
|
||||||
- `debounce_ms` delays the next queued execution, helping collapse bursty same-session traffic.
|
- `debounce_ms` delays the next queued execution, helping collapse bursty same-session traffic.
|
||||||
- `summarize_overflow` enables richer overflow error messages and payload metadata.
|
- `summarize_overflow` enables richer overflow error messages and payload metadata.
|
||||||
|
|||||||
@@ -69,7 +69,7 @@ sequenceDiagram
|
|||||||
G-->>C: result.cancelled=true/false
|
G-->>C: result.cancelled=true/false
|
||||||
```
|
```
|
||||||
|
|
||||||
`interrupt` queue mode also requests active-run cancellation when a newer request is enqueued for the same session lane. Cancellation still completes at agent/tool-loop safe points.
|
`interrupt` queue mode also requests active-run cancellation when a newer request is enqueued for the same session lane. Cancellation still completes at agent/tool-loop safe points. When this preemption happens, the requester receives a transient `content` notice and the audit log records `queue.preempt`.
|
||||||
|
|
||||||
### Base URL
|
### Base URL
|
||||||
|
|
||||||
|
|||||||
@@ -332,8 +332,9 @@ These are substantial UX/ecosystem projects or highly platform-specific; defer u
|
|||||||
|
|
||||||
## Suggested Next Execution Order
|
## Suggested Next Execution Order
|
||||||
|
|
||||||
1) Queue/run-control polish (interrupt preemption telemetry + UX)
|
1) Daily memory continuity tuning (if continuity quality is still lacking)
|
||||||
2) Daily memory continuity tuning (if continuity quality is still lacking)
|
2) Auth-profile expansion beyond API-key pools (if needed)
|
||||||
3) Auth-profile expansion beyond API-key pools (if needed)
|
3) Additional run-control UX refinements only if interrupt behavior is still insufficient in production
|
||||||
|
|
||||||
Note: API-key pool auth profile cooldown/backoff (`auth_profile_cooldown_ms`) shipped on 2026-02-19.
|
Note: API-key pool auth profile cooldown/backoff (`auth_profile_cooldown_ms`) shipped on 2026-02-19.
|
||||||
|
Note: Queue interrupt preemption telemetry/notice (`queue.preempt` + requester content hint) shipped on 2026-02-19.
|
||||||
|
|||||||
@@ -5793,6 +5793,23 @@
|
|||||||
"docs/plans/state.json"
|
"docs/plans/state.json"
|
||||||
],
|
],
|
||||||
"test_status": "pnpm test:run src/models/rotating.test.ts src/daemon/clientFactory.test.ts src/config/schema.test.ts + pnpm typecheck passing"
|
"test_status": "pnpm test:run src/models/rotating.test.ts src/daemon/clientFactory.test.ts src/config/schema.test.ts + pnpm typecheck passing"
|
||||||
|
},
|
||||||
|
"queue-interrupt-preemption-telemetry": {
|
||||||
|
"status": "completed",
|
||||||
|
"date": "2026-02-19",
|
||||||
|
"updated": "2026-02-19",
|
||||||
|
"summary": "Hardened interrupt queue mode visibility by emitting `queue.preempt` audit events when a newer request preempts an active run, and by sending a requester-facing content notice indicating that the previous in-flight run was cancelled before processing the latest message.",
|
||||||
|
"files_modified": [
|
||||||
|
"src/audit/types.ts",
|
||||||
|
"src/audit/logger.ts",
|
||||||
|
"src/gateway/handlers/agent.ts",
|
||||||
|
"src/gateway/handlers/agent.test.ts",
|
||||||
|
"README.md",
|
||||||
|
"docs/api/PROTOCOL.md",
|
||||||
|
"docs/plans/2026-02-15-openclaw-gap-roadmap.md",
|
||||||
|
"docs/plans/state.json"
|
||||||
|
],
|
||||||
|
"test_status": "pnpm test:run src/gateway/handlers/agent.test.ts src/models/rotating.test.ts src/daemon/clientFactory.test.ts src/config/schema.test.ts + pnpm typecheck passing"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"overall_progress": {
|
"overall_progress": {
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ import type {
|
|||||||
SessionCheckpointEvent,
|
SessionCheckpointEvent,
|
||||||
SessionAutoCompactEvent,
|
SessionAutoCompactEvent,
|
||||||
UserActionEvent,
|
UserActionEvent,
|
||||||
|
QueuePreemptEvent,
|
||||||
BackendRouteEvent,
|
BackendRouteEvent,
|
||||||
BackendFallbackEvent,
|
BackendFallbackEvent,
|
||||||
CronTriggerEvent,
|
CronTriggerEvent,
|
||||||
@@ -194,6 +195,11 @@ export class AuditLogger {
|
|||||||
this.write({ level: 'info', event_type: 'user.action', event: event as unknown as Record<string, unknown> });
|
this.write({ level: 'info', event_type: 'user.action', event: event as unknown as Record<string, unknown> });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
queuePreempt(event: QueuePreemptEvent): void {
|
||||||
|
if (!this.shouldLog('sessions', 'info')) {return;}
|
||||||
|
this.write({ level: 'info', event_type: 'queue.preempt', event: event as unknown as Record<string, unknown> });
|
||||||
|
}
|
||||||
|
|
||||||
backendRoute(event: BackendRouteEvent): void {
|
backendRoute(event: BackendRouteEvent): void {
|
||||||
if (!this.shouldLog('sessions', 'info')) {return;}
|
if (!this.shouldLog('sessions', 'info')) {return;}
|
||||||
this.write({ level: 'info', event_type: 'backend.route', event: event as unknown as Record<string, unknown> });
|
this.write({ level: 'info', event_type: 'backend.route', event: event as unknown as Record<string, unknown> });
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ export type AuditEventType =
|
|||||||
| 'skills.installer.execution_blocked' | 'skills.installer.command_result' | 'skills.registry_install'
|
| 'skills.installer.execution_blocked' | 'skills.installer.command_result' | 'skills.registry_install'
|
||||||
// Session lifecycle
|
// Session lifecycle
|
||||||
| 'session.create' | 'session.message' | 'session.delete' | 'session.transfer' | 'session.compact' | 'session.checkpoint' | 'session.auto_compact' | 'user.action'
|
| 'session.create' | 'session.message' | 'session.delete' | 'session.transfer' | 'session.compact' | 'session.checkpoint' | 'session.auto_compact' | 'user.action'
|
||||||
|
| 'queue.preempt'
|
||||||
| 'backend.route' | 'backend.fallback'
|
| 'backend.route' | 'backend.fallback'
|
||||||
// Automation - Cron
|
// Automation - Cron
|
||||||
| 'cron.trigger' | 'cron.sent' | 'cron.add' | 'cron.remove'
|
| 'cron.trigger' | 'cron.sent' | 'cron.add' | 'cron.remove'
|
||||||
@@ -210,6 +211,16 @@ export interface UserActionEvent {
|
|||||||
command?: string;
|
command?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface QueuePreemptEvent {
|
||||||
|
session_id: string;
|
||||||
|
channel: string;
|
||||||
|
sender: string;
|
||||||
|
lane_id: string;
|
||||||
|
request_id: string;
|
||||||
|
mode: 'interrupt';
|
||||||
|
cancelled_active_run: boolean;
|
||||||
|
}
|
||||||
|
|
||||||
export interface BackendRouteEvent {
|
export interface BackendRouteEvent {
|
||||||
session_id: string;
|
session_id: string;
|
||||||
channel: string;
|
channel: string;
|
||||||
|
|||||||
@@ -49,6 +49,7 @@ describe('createAgentHandlers command fast-path', () => {
|
|||||||
registerBuiltinCommands(commandRegistry);
|
registerBuiltinCommands(commandRegistry);
|
||||||
const mockAuditLogger = {
|
const mockAuditLogger = {
|
||||||
userAction: vi.fn(),
|
userAction: vi.fn(),
|
||||||
|
queuePreempt: vi.fn(),
|
||||||
};
|
};
|
||||||
|
|
||||||
const handlers = createAgentHandlers({
|
const handlers = createAgentHandlers({
|
||||||
@@ -364,6 +365,16 @@ describe('createAgentHandlers command fast-path', () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
describe('createAgentHandlers queue policy resolution', () => {
|
describe('createAgentHandlers queue policy resolution', () => {
|
||||||
|
const mockAuditLogger = {
|
||||||
|
userAction: vi.fn(),
|
||||||
|
queuePreempt: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
vi.clearAllMocks();
|
||||||
|
initAuditLogger(mockAuditLogger as any);
|
||||||
|
});
|
||||||
|
|
||||||
it('passes resolved per-request queue policy into lane enqueue', async () => {
|
it('passes resolved per-request queue policy into lane enqueue', async () => {
|
||||||
const mockAgent = {
|
const mockAgent = {
|
||||||
process: vi.fn(async () => 'ok'),
|
process: vi.fn(async () => 'ok'),
|
||||||
@@ -554,6 +565,15 @@ describe('createAgentHandlers queue policy resolution', () => {
|
|||||||
|
|
||||||
expect(sessionBridge.cancelSession).toHaveBeenCalledWith('ws:s1');
|
expect(sessionBridge.cancelSession).toHaveBeenCalledWith('ws:s1');
|
||||||
expect(sessionBridge.cancel).not.toHaveBeenCalled();
|
expect(sessionBridge.cancel).not.toHaveBeenCalled();
|
||||||
expect((sent[0] as GatewayEvent).event).toBe('done');
|
expect(mockAuditLogger.queuePreempt).toHaveBeenCalledWith(expect.objectContaining({
|
||||||
|
session_id: 'ws:s1',
|
||||||
|
lane_id: 'ws:s1',
|
||||||
|
request_id: '7',
|
||||||
|
mode: 'interrupt',
|
||||||
|
cancelled_active_run: true,
|
||||||
|
}));
|
||||||
|
expect((sent[0] as GatewayEvent).event).toBe('content');
|
||||||
|
expect(((sent[0] as GatewayEvent).data as { text: string }).text).toContain('Interrupt mode');
|
||||||
|
expect((sent[1] as GatewayEvent).event).toBe('done');
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -90,26 +90,41 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
|
|||||||
const laneIsProcessing = typeof laneQueueWithProcessing.isProcessing === 'function'
|
const laneIsProcessing = typeof laneQueueWithProcessing.isProcessing === 'function'
|
||||||
? laneQueueWithProcessing.isProcessing(laneId)
|
? laneQueueWithProcessing.isProcessing(laneId)
|
||||||
: false;
|
: false;
|
||||||
|
const requestId = request.id.toString();
|
||||||
|
let interruptedPreviousRun = false;
|
||||||
|
|
||||||
// Interrupt mode should preempt active work when a newer request arrives.
|
// Interrupt mode should preempt active work when a newer request arrives.
|
||||||
// LaneQueue itself only rejects queued entries, so we also request agent cancellation.
|
// LaneQueue itself only rejects queued entries, so we also request agent cancellation.
|
||||||
if (resolvedPolicy?.mode === 'interrupt' && laneIsProcessing) {
|
if (resolvedPolicy?.mode === 'interrupt' && laneIsProcessing) {
|
||||||
if (sessionId) {
|
const cancelled = sessionId
|
||||||
deps.sessionBridge.cancelSession(sessionId);
|
? deps.sessionBridge.cancelSession(sessionId)
|
||||||
} else {
|
: deps.sessionBridge.cancel(connectionId);
|
||||||
deps.sessionBridge.cancel(connectionId);
|
interruptedPreviousRun = cancelled;
|
||||||
}
|
auditLogger?.queuePreempt?.({
|
||||||
|
session_id: sessionId ?? `ws:${connectionId}`,
|
||||||
|
channel: 'ws',
|
||||||
|
sender: connectionId,
|
||||||
|
lane_id: laneId,
|
||||||
|
request_id: requestId,
|
||||||
|
mode: 'interrupt',
|
||||||
|
cancelled_active_run: cancelled,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enqueue the work — if the lane is idle it runs immediately,
|
// Enqueue the work — if the lane is idle it runs immediately,
|
||||||
// otherwise it waits for earlier requests on the same session to finish.
|
// otherwise it waits for earlier requests on the same session to finish.
|
||||||
const requestId = request.id.toString();
|
|
||||||
deps.metrics?.startRequest(requestId, { sessionId: laneId, channel: 'ws' });
|
deps.metrics?.startRequest(requestId, { sessionId: laneId, channel: 'ws' });
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return await deps.laneQueue.enqueue(laneId, async () => {
|
return await deps.laneQueue.enqueue(laneId, async () => {
|
||||||
deps.sessionBridge.setBusy(connectionId, true);
|
deps.sessionBridge.setBusy(connectionId, true);
|
||||||
|
|
||||||
|
if (interruptedPreviousRun) {
|
||||||
|
await send(makeEvent(request.id, 'content', {
|
||||||
|
text: 'Interrupt mode: cancelled the previous in-flight run and processing your latest message.',
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
const commandInput = safeParams.metadata?.isCommand && typeof safeParams.metadata.command === 'string'
|
const commandInput = safeParams.metadata?.isCommand && typeof safeParams.metadata.command === 'string'
|
||||||
? `/${safeParams.metadata.command}${safeParams.metadata.commandArgs ? ` ${safeParams.metadata.commandArgs}` : ''}`
|
? `/${safeParams.metadata.command}${safeParams.metadata.commandArgs ? ` ${safeParams.metadata.commandArgs}` : ''}`
|
||||||
: (safeParams.message ?? '');
|
: (safeParams.message ?? '');
|
||||||
|
|||||||
Reference in New Issue
Block a user