Phase 1 run-control semantics and run_state events

This commit is contained in:
William Valentin
2026-02-25 10:22:44 -08:00
parent ae21681958
commit e4ee6acce8
13 changed files with 485 additions and 120 deletions
+31 -1
View File
@@ -69,7 +69,7 @@ sequenceDiagram
end end
G->>A: process(message) in that session G->>A: process(message) in that session
A-->>G: streaming events (content/tool_start/tool_end/context_warning) A-->>G: streaming events (content/tool_start/tool_end/context_warning/run_state)
G-->>C: events + final done G-->>C: events + final done
C->>G: agent.cancel {connectionId} C->>G: agent.cancel {connectionId}
@@ -1028,6 +1028,18 @@ Send a message to the agent and stream response.
} }
``` ```
`run_state` event:
```json
{
"id": 7,
"event": "run_state",
"data": {
"state": "complete",
"timestamp": 1730140800000
}
}
```
`done` event: `done` event:
```json ```json
{ {
@@ -1550,6 +1562,21 @@ Proactive context pressure signal emitted by `agent.send` before `done`.
} }
``` ```
#### `run_state`
Run lifecycle transition emitted during `agent.send` processing.
```json
{
"id": 1,
"event": "run_state",
"data": {
"state": "start",
"timestamp": 1730140800000
}
}
```
#### `done` #### `done`
Agent processing complete (final response). Agent processing complete (final response).
@@ -1715,6 +1742,9 @@ class FlynnClient {
case 'context_warning': case 'context_warning':
console.warn('Context warning:', data.level, data.message); console.warn('Context warning:', data.level, data.message);
break; break;
case 'run_state':
console.log('Run state:', data.state, data.timestamp);
break;
case 'done': case 'done':
console.log('Done:', data.content); console.log('Done:', data.content);
break; break;
+4
View File
@@ -139,6 +139,10 @@ Outbound Reply
-> ChannelAdapter.send() (text + optional attachments) -> ChannelAdapter.send() (text + optional attachments)
``` ```
Gateway streaming UX signals:
- WebSocket `agent.send` emits `run_state` lifecycle events (`start`, `cancel_requested`, `cancelled`, `complete`, `error`) for UI/state rendering.
Key files: Key files:
- Routing + per-session agent creation: `src/daemon/routing.ts` - Routing + per-session agent creation: `src/daemon/routing.ts`
@@ -91,6 +91,7 @@ Within a lane:
- Only one request executes at a time. - Only one request executes at a time.
- Later requests queue (FIFO) and start after the active request finishes. - Later requests queue (FIFO) and start after the active request finishes.
- `interrupt` mode enforces "latest wins": any queued backlog is superseded and the active run is asked to cancel so the newest request becomes the next (or immediate) execution.
Across lanes: Across lanes:
@@ -112,6 +113,7 @@ Important:
- Cancellation is best-effort for the currently running work: it stops at the next safe point in the agent loop. - Cancellation is best-effort for the currently running work: it stops at the next safe point in the agent loop.
- Queued work is deterministically rejected. - Queued work is deterministically rejected.
- Gateway streams `run_state` events (`start`, `cancel_requested`, `cancelled`, `complete`, `error`) so clients can render lifecycle state without parsing assistant text.
Key files: Key files:
+73 -50
View File
@@ -3635,7 +3635,7 @@
"multi_agent_routing": { "multi_agent_routing": {
"priority": "P2", "priority": "P2",
"status": "completed", "status": "completed",
"description": "Config-driven agent routing: AgentConfigRegistry for named agent configs (system_prompt, model_tier, tool_profile, sandbox), AgentRouter with senderchanneldefault resolution (glob patterns), per-agent tool registry cloning with sandboxed overrides, daemon wiring", "description": "Config-driven agent routing: AgentConfigRegistry for named agent configs (system_prompt, model_tier, tool_profile, sandbox), AgentRouter with sender\u2192channel\u2192default resolution (glob patterns), per-agent tool registry cloning with sandboxed overrides, daemon wiring",
"files_created": [ "files_created": [
"src/agents/registry.ts", "src/agents/registry.ts",
"src/agents/registry.test.ts", "src/agents/registry.test.ts",
@@ -3789,7 +3789,7 @@
"auto_login": { "auto_login": {
"priority": "P5", "priority": "P5",
"status": "completed", "status": "completed",
"description": "Lazy token resolution with onLoginRequired callback triggers OAuth device flow automatically on first API call when no token is available", "description": "Lazy token resolution with onLoginRequired callback \u2014 triggers OAuth device flow automatically on first API call when no token is available",
"files_modified": [ "files_modified": [
"src/models/github.ts", "src/models/github.ts",
"src/daemon/index.ts", "src/daemon/index.ts",
@@ -3937,7 +3937,7 @@
"p8-agent-tools": { "p8-agent-tools": {
"status": "completed", "status": "completed",
"date": "2026-02-07", "date": "2026-02-07",
"summary": "8 new agent-callable tools exposing existing internal APIs, plus gap analysis audit update (25% 65% match rate)", "summary": "8 new agent-callable tools exposing existing internal APIs, plus gap analysis audit update (25% \u2192 65% match rate)",
"phases": { "phases": {
"sessions_tools": { "sessions_tools": {
"priority": "P8", "priority": "P8",
@@ -3994,7 +3994,7 @@
"file_patch_tool": { "file_patch_tool": {
"priority": "Tier3", "priority": "Tier3",
"status": "completed", "status": "completed",
"description": "file.patch tool for multi-hunk structured patches apply multiple line-based edits (replacements, insertions, deletions) across one or more files in a single tool call. Hunks applied bottom-up to preserve line numbers.", "description": "file.patch tool for multi-hunk structured patches \u2014 apply multiple line-based edits (replacements, insertions, deletions) across one or more files in a single tool call. Hunks applied bottom-up to preserve line numbers.",
"files_created": [ "files_created": [
"src/tools/builtin/file-patch.ts", "src/tools/builtin/file-patch.ts",
"src/tools/builtin/file-patch.test.ts" "src/tools/builtin/file-patch.test.ts"
@@ -4008,7 +4008,7 @@
"gmail_pubsub_watcher": { "gmail_pubsub_watcher": {
"priority": "Tier3", "priority": "Tier3",
"status": "completed", "status": "completed",
"description": "Gmail Pub/Sub watcher ChannelAdapter monitors Gmail via Google Cloud Pub/Sub push notifications with polling fallback. OAuth2 auth, configurable watch labels, template rendering with email metadata placeholders. Wired into daemon lifecycle and gateway (POST /gmail/push endpoint).", "description": "Gmail Pub/Sub watcher ChannelAdapter \u2014 monitors Gmail via Google Cloud Pub/Sub push notifications with polling fallback. OAuth2 auth, configurable watch labels, template rendering with email metadata placeholders. Wired into daemon lifecycle and gateway (POST /gmail/push endpoint).",
"files_created": [ "files_created": [
"src/automation/gmail.ts", "src/automation/gmail.ts",
"src/automation/gmail.test.ts" "src/automation/gmail.test.ts"
@@ -4133,7 +4133,7 @@
"phases": { "phases": {
"ollama_tool_calling": { "ollama_tool_calling": {
"status": "completed", "status": "completed",
"description": "Pass tools to Ollama API in correct format, parse tool_calls from responses with generated IDs, set stopReason to 'tool_use'. Handle thinking field from reasoning models (deepseek-r1, glm-4.7-flash) use as content fallback and expose via thinkingContent. Streaming support for both tool calls and thinking.", "description": "Pass tools to Ollama API in correct format, parse tool_calls from responses with generated IDs, set stopReason to 'tool_use'. Handle thinking field from reasoning models (deepseek-r1, glm-4.7-flash) \u2014 use as content fallback and expose via thinkingContent. Streaming support for both tool calls and thinking.",
"files_modified": [ "files_modified": [
"src/models/local/ollama.ts", "src/models/local/ollama.ts",
"src/models/local/ollama.test.ts" "src/models/local/ollama.test.ts"
@@ -4159,7 +4159,7 @@
"lane_queue": { "lane_queue": {
"priority": "Tier3", "priority": "Tier3",
"status": "completed", "status": "completed",
"description": "Per-session FIFO queue in gateway serializes concurrent requests instead of rejecting. LaneQueue class with enqueue/cancel/queueLength methods.", "description": "Per-session FIFO queue in gateway \u2014 serializes concurrent requests instead of rejecting. LaneQueue class with enqueue/cancel/queueLength methods.",
"files_created": [ "files_created": [
"src/gateway/lane-queue.ts", "src/gateway/lane-queue.ts",
"src/gateway/lane-queue.test.ts" "src/gateway/lane-queue.test.ts"
@@ -4174,7 +4174,7 @@
"credential_redaction": { "credential_redaction": {
"priority": "Tier3", "priority": "Tier3",
"status": "completed", "status": "completed",
"description": "Expanded redactConfig() from 2 secret locations to 18+ secret fields telegram, discord, slack tokens; server.token; all model tier api_key/auth_token; web_search, audio, memory embedding api_keys; webhook secrets; gmail credentials; MCP server env vars.", "description": "Expanded redactConfig() from 2 secret locations to 18+ secret fields \u2014 telegram, discord, slack tokens; server.token; all model tier api_key/auth_token; web_search, audio, memory embedding api_keys; webhook secrets; gmail credentials; MCP server env vars.",
"files_modified": [ "files_modified": [
"src/gateway/handlers/config.ts", "src/gateway/handlers/config.ts",
"src/gateway/handlers/handlers.test.ts" "src/gateway/handlers/handlers.test.ts"
@@ -4199,7 +4199,7 @@
"xai_grok_provider": { "xai_grok_provider": {
"priority": "Tier3", "priority": "Tier3",
"status": "completed", "status": "completed",
"description": "xAI as OpenAI-compatible model provider reuses OpenAIClient with baseURL https://api.x.ai/v1, XAI_API_KEY env var fallback, pricing for grok-3/grok-3-mini/grok-2/grok-2-mini/grok-3-fast.", "description": "xAI as OpenAI-compatible model provider \u2014 reuses OpenAIClient with baseURL https://api.x.ai/v1, XAI_API_KEY env var fallback, pricing for grok-3/grok-3-mini/grok-2/grok-2-mini/grok-3-fast.",
"files_modified": [ "files_modified": [
"src/config/schema.ts", "src/config/schema.ts",
"src/daemon/index.ts", "src/daemon/index.ts",
@@ -4209,7 +4209,7 @@
"voyage_ai_embeddings": { "voyage_ai_embeddings": {
"priority": "Tier3", "priority": "Tier3",
"status": "completed", "status": "completed",
"description": "Voyage AI embedding provider for memory/vector search OpenAI SDK with baseURL https://api.voyageai.com/v1, defaults to 1024 dimensions, VOYAGE_API_KEY env var.", "description": "Voyage AI embedding provider for memory/vector search \u2014 OpenAI SDK with baseURL https://api.voyageai.com/v1, defaults to 1024 dimensions, VOYAGE_API_KEY env var.",
"files_modified": [ "files_modified": [
"src/config/schema.ts", "src/config/schema.ts",
"src/memory/embeddings.ts", "src/memory/embeddings.ts",
@@ -4227,7 +4227,7 @@
"gateway_lock": { "gateway_lock": {
"priority": "Tier4", "priority": "Tier4",
"status": "completed", "status": "completed",
"description": "Single-client gateway mode if lock=true and a client is connected, reject new connections with code 4003. UI detects locked state.", "description": "Single-client gateway mode \u2014 if lock=true and a client is connected, reject new connections with code 4003. UI detects locked state.",
"files_modified": [ "files_modified": [
"src/config/schema.ts", "src/config/schema.ts",
"src/gateway/server.ts", "src/gateway/server.ts",
@@ -6656,52 +6656,75 @@
"docs/plans/state.json" "docs/plans/state.json"
], ],
"test_status": "runtime validation: docker compose recreate + curl 127.0.0.1:18801 + POST /v1/audio/transcriptions returned HTTP response" "test_status": "runtime validation: docker compose recreate + curl 127.0.0.1:18801 + POST /v1/audio/transcriptions returned HTTP response"
},
"deeper-surfaces-phase1-run-control": {
"status": "completed",
"date": "2026-02-25",
"updated": "2026-02-25",
"summary": "Implemented Phase 1 run-control semantics and gateway UX signals: hardened interrupt queue behavior with latest-wins semantics, added run_state lifecycle events to the gateway protocol/UI, aligned channel-path interrupt cancellation with active-run tracking, and added focused tests.",
"files_modified": [
"src/gateway/lane-queue.ts",
"src/gateway/lane-queue.test.ts",
"src/gateway/handlers/agent.ts",
"src/gateway/handlers/agent.test.ts",
"src/gateway/protocol.ts",
"src/gateway/ui/pages/chat.js",
"src/gateway/ui/pages/chat.test.ts",
"src/daemon/routing.ts",
"src/daemon/routing.test.ts",
"docs/api/PROTOCOL.md",
"docs/architecture/AGENT_DIAGRAM.md",
"docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md",
"docs/plans/state.json"
],
"test_status": "pnpm test:run src/gateway/lane-queue.test.ts src/gateway/handlers/agent.test.ts src/gateway/ui/pages/chat.test.ts src/daemon/routing.test.ts passing"
} }
}, },
"overall_progress": { "overall_progress": {
"total_test_count": 2009, "total_test_count": 2013,
"all_tests_passing": true, "all_tests_passing": true,
"p0_completion": "3/3 (100%)", "p0_completion": "3/3 (100%)",
"p1_completion": "4/4 (100%)", "p1_completion": "4/4 (100%)",
"p2_completion": "7/7 (100%)", "p2_completion": "7/7 (100%)",
"p3_completion": "completed (group chat, gateway auth, Gemini, OpenRouter, Bedrock, browser control)", "p3_completion": "completed (group chat, gateway auth, Gemini, OpenRouter, Bedrock, browser control)",
"p4_completion": "1/1 (100%) multimodal media pipeline", "p4_completion": "1/1 (100%) \u2014 multimodal media pipeline",
"p5_completion": "1/1 (100%) GitHub Copilot provider with auto-login", "p5_completion": "1/1 (100%) \u2014 GitHub Copilot provider with auto-login",
"p6_completion": "4/4 (100%) enhanced media pipeline (image.analyze, outbound attachments, gateway attachments, audio transcription)", "p6_completion": "4/4 (100%) \u2014 enhanced media pipeline (image.analyze, outbound attachments, gateway attachments, audio transcription)",
"p7_completion": "6/6 (100%) web UI dashboard SPA (dashboard, chat, sessions, settings)", "p7_completion": "6/6 (100%) \u2014 web UI dashboard SPA (dashboard, chat, sessions, settings)",
"p8_completion": "8/8 (100%) agent tools (sessions.list/history/create/delete, agents.list, message.send, cron.list/trigger) + gap analysis audit", "p8_completion": "8/8 (100%) \u2014 agent tools (sessions.list/history/create/delete, agents.list, message.send, cron.list/trigger) + gap analysis audit",
"tier1_completion": "5/5 (100%) !!think prefix, /verbose command, typing indicators (Discord/WhatsApp), session pruning (TTL), tool groups", "tier1_completion": "5/5 (100%) \u2014 !!think prefix, /verbose command, typing indicators (Discord/WhatsApp), session pruning (TTL), tool groups",
"tier2_completion": "4/4 (100%) inbound webhooks, vector memory search, Dockerfile, heartbeat monitor", "tier2_completion": "4/4 (100%) \u2014 inbound webhooks, vector memory search, Dockerfile, heartbeat monitor",
"tier3_completion": "5/5 (100%) lane queue, credential redaction, web UI token dashboard, xAI (Grok) provider, Voyage AI embeddings", "tier3_completion": "5/5 (100%) \u2014 lane queue, credential redaction, web UI token dashboard, xAI (Grok) provider, Voyage AI embeddings",
"tier4_completion": "4/4 (100%) gateway lock, shell completion, Tailscale Serve/Funnel, DM pairing codes", "tier4_completion": "4/4 (100%) \u2014 gateway lock, shell completion, Tailscale Serve/Funnel, DM pairing codes",
"feature_gap_scorecard": "128/128 match (100%), 0 partial (0%), 0 missing (0%)", "feature_gap_scorecard": "128/128 match (100%), 0 partial (0%), 0 missing (0%)",
"operator_dx_milestone": "Phase 3 (Live Ops Dashboard): 2/2 plans complete milestone done", "operator_dx_milestone": "Phase 3 (Live Ops Dashboard): 2/2 plans complete \u2014 milestone done",
"dashboard_observability": "completed service health graphs + core service log viewer added to web UI via observability RPCs and bounded backend sampling", "dashboard_observability": "completed \u2014 service health graphs + core service log viewer added to web UI via observability RPCs and bounded backend sampling",
"gmail_auth_cli": "flynn gmail-auth command implemented with OAuth2 flow, doctor check, config routed to Telegram", "gmail_auth_cli": "flynn gmail-auth command implemented with OAuth2 flow, doctor check, config routed to Telegram",
"gmail_filter_creation": "completed gmail.filter.create tool added with criteria/action validation; gmail-auth requests explicit gmail.settings.basic + gmail.readonly scopes for filter creation and inbox reads", "gmail_filter_creation": "completed \u2014 gmail.filter.create tool added with criteria/action validation; gmail-auth requests explicit gmail.settings.basic + gmail.readonly scopes for filter creation and inbox reads",
"toolloop_action_intent_recovery": "completed when a model claims it will execute a tool but emits no tool call, NativeAgent now issues one internal nudge and continues the same turn to execute tools or produce a concrete blocker", "toolloop_action_intent_recovery": "completed \u2014 when a model claims it will execute a tool but emits no tool call, NativeAgent now issues one internal nudge and continues the same turn to execute tools or produce a concrete blocker",
"toolloop_execution_claim_recovery": "completed when a model claims a known tool already succeeded/failed without emitting a tool call, NativeAgent now nudges once and retries the same turn before returning text", "toolloop_execution_claim_recovery": "completed \u2014 when a model claims a known tool already succeeded/failed without emitting a tool call, NativeAgent now nudges once and retries the same turn before returning text",
"daily_briefing_google_scope_remediation": "completed calendar.* and tasks.* now append explicit re-auth guidance (`flynn gcal-auth` / `flynn gtasks-auth`) for insufficient-scope errors, and operator runbook includes remediation steps", "daily_briefing_google_scope_remediation": "completed \u2014 calendar.* and tasks.* now append explicit re-auth guidance (`flynn gcal-auth` / `flynn gtasks-auth`) for insufficient-scope errors, and operator runbook includes remediation steps",
"council_tool_timeout_override": "completed ToolExecutor supports per-tool timeout overrides and council.run now uses a 180s timeout to avoid false 30s council timeouts in the tool loop", "council_tool_timeout_override": "completed \u2014 ToolExecutor supports per-tool timeout overrides and council.run now uses a 180s timeout to avoid false 30s council timeouts in the tool loop",
"minimal_tui_multiline_paste_mode": "completed minimal TUI now supports `/paste`/`/multiline` multiline compose mode ending with single '.' line, preventing newline truncation for pasted prompts", "minimal_tui_multiline_paste_mode": "completed \u2014 minimal TUI now supports `/paste`/`/multiline` multiline compose mode ending with single '.' line, preventing newline truncation for pasted prompts",
"config_profile_consolidation": "completed config/paas.yaml is now generated from canonical config/default.yaml + config/profiles/paas.overlay.yaml with CI-checkable drift detection", "config_profile_consolidation": "completed \u2014 config/paas.yaml is now generated from canonical config/default.yaml + config/profiles/paas.overlay.yaml with CI-checkable drift detection",
"google_auth_hardening": "completed shared Google OAuth runtime helper + auth store (auth.json), legacy token-file migration, refresh persistence, service-wide doctor checks, and unified `flynn google-auth` command", "google_auth_hardening": "completed \u2014 shared Google OAuth runtime helper + auth store (auth.json), legacy token-file migration, refresh persistence, service-wide doctor checks, and unified `flynn google-auth` command",
"model_router_correctness": "completed fallback paths now avoid duplicate clients, apply retry policy consistently, and reject unsupported OpenAI OAuth tool requests early", "model_router_correctness": "completed \u2014 fallback paths now avoid duplicate clients, apply retry policy consistently, and reject unsupported OpenAI OAuth tool requests early",
"native_audio_support": "completed smart routing for native audio (Gemini/OpenAI/GitHub) vs Whisper transcription fallback, plus 2026-02-23 arg hydration hardening, tool.args_rewritten audit metric, transient fetch retry/timeout hardening, localhost->127.0.0.1 fallback for transcription endpoint connectivity, and whisper docker-compose entrypoint arg fix for port 18801", "native_audio_support": "completed \u2014 smart routing for native audio (Gemini/OpenAI/GitHub) vs Whisper transcription fallback, plus 2026-02-23 arg hydration hardening, tool.args_rewritten audit metric, transient fetch retry/timeout hardening, localhost->127.0.0.1 fallback for transcription endpoint connectivity, and whisper docker-compose entrypoint arg fix for port 18801",
"remaining_phases_completion": "Phase 1: 3/3 (100%) context levels, command registry, memory structure. Phase 2: 3/3 (100%) component registry, confidence routing, history index. Phase 3: 2/2 (100%) adaptive memory/compaction, truthfulness/autonomy hardening", "remaining_phases_completion": "Phase 1: 3/3 (100%) \u2014 context levels, command registry, memory structure. Phase 2: 3/3 (100%) \u2014 component registry, confidence routing, history index. Phase 3: 2/2 (100%) \u2014 adaptive memory/compaction, truthfulness/autonomy hardening",
"deeper_surfaces_behavior_stack_plan": "completed documented a decision-complete balanced-hybrid roadmap for OpenClaw-like end-user surface depth plus integrated behavior semantics with phased scope, acceptance gates, and rollout constraints", "deeper_surfaces_behavior_stack_plan": "completed \u2014 documented a decision-complete balanced-hybrid roadmap for OpenClaw-like end-user surface depth plus integrated behavior semantics with phased scope, acceptance gates, and rollout constraints",
"deeper_surfaces_phase0_ticket_pack": "completed produced an atomic implementation checklist for Phase 0 baseline observability work (audit events, router/gateway emitters, metrics counters, baseline summary tooling, docs sync)", "deeper_surfaces_phase0_ticket_pack": "completed \u2014 produced an atomic implementation checklist for Phase 0 baseline observability work (audit events, router/gateway emitters, metrics counters, baseline summary tooling, docs sync)",
"deeper_surfaces_phase0_ticket_01": "completed audit schema/logger now capture run lifecycle and reaction decision baseline events (`run.state`, `run.cancel`, `reaction.match`, `reaction.skip`) with regression test coverage", "deeper_surfaces_phase0_ticket_01": "completed \u2014 audit schema/logger now capture run lifecycle and reaction decision baseline events (`run.state`, `run.cancel`, `reaction.match`, `reaction.skip`) with regression test coverage",
"deeper_surfaces_phase0_ticket_02": "completed gateway + daemon routing emit run lifecycle/cancel telemetry and reaction match/skip audit events with filter summaries and cancellation latency, plus focused tests", "deeper_surfaces_phase0_ticket_02": "completed \u2014 gateway + daemon routing emit run lifecycle/cancel telemetry and reaction match/skip audit events with filter summaries and cancellation latency, plus focused tests",
"deeper_surfaces_phase0_ticket_03": "completed gateway metrics now track run-state outcomes, cancel latency samples, and reaction decision counters with routing/gateway emitters", "deeper_surfaces_phase0_ticket_03": "completed \u2014 gateway metrics now track run-state outcomes, cancel latency samples, and reaction decision counters with routing/gateway emitters",
"deeper_surfaces_phase0_ticket_04": "completed added phase-0 baseline summary tooling for run outcomes, cancel latency, and reaction decisions with markdown/json CLI output", "deeper_surfaces_phase0_ticket_04": "completed \u2014 added phase-0 baseline summary tooling for run outcomes, cancel latency, and reaction decisions with markdown/json CLI output",
"deeper_surfaces_phase0_ticket_05": "completed documented phase-0 telemetry fields/workflow, refreshed architecture/protocol docs, and generated baseline artifacts from a probe log with representative channel + gateway events", "deeper_surfaces_phase0_ticket_05": "completed \u2014 documented phase-0 telemetry fields/workflow, refreshed architecture/protocol docs, and generated baseline artifacts from a probe log with representative channel + gateway events",
"next_up": "Replace probe baseline artifacts with live audit samples once gateway/channel sessions emit real run/reaction events", "next_up": "Replace probe baseline artifacts with live audit samples once gateway/channel sessions emit real run/reaction events",
"pi_embedded_canary_spike": "completed added optional pi_embedded backend adapter, canary-safe no-tools routing guard, backend success/fallback latency audit events, and docs/diagram updates while native remains default", "pi_embedded_canary_spike": "completed \u2014 added optional pi_embedded backend adapter, canary-safe no-tools routing guard, backend success/fallback latency audit events, and docs/diagram updates while native remains default",
"pi_embedded_evaluation_phase": "completed final decision rollback (applied in runtime config): Window A failed latency/fallback gates (p50 +259ms, p95 +5695ms, fallback 25%, categories: pi_module_interface/empty_assistant_text); Window B remained sample-insufficient; controlled probes verified guard coverage (pi_no_tools_mode/capability_query/attachments_present each hit once)", "pi_embedded_evaluation_phase": "completed \u2014 final decision rollback (applied in runtime config): Window A failed latency/fallback gates (p50 +259ms, p95 +5695ms, fallback 25%, categories: pi_module_interface/empty_assistant_text); Window B remained sample-insufficient; controlled probes verified guard coverage (pi_no_tools_mode/capability_query/attachments_present each hit once)",
"pi_embedded_manual_mode": "completed added persisted runtime backend controls for manual Pi activation/deactivation (`/runtime` preferred, `/backend` alias; `status`, `activate pi`, `deactivate pi`, `use config`) while keeping config-driven default routing", "pi_embedded_manual_mode": "completed \u2014 added persisted runtime backend controls for manual Pi activation/deactivation (`/runtime` preferred, `/backend` alias; `status`, `activate pi`, `deactivate pi`, `use config`) while keeping config-driven default routing",
"openclaw_gateway_first_tui_runtime_unification": "completed shared `/runtime` backend-mode command service across channel router + gateway, plus TUI `/runtime` forwarding through a gateway bridge with daemon/gateway auto-start attach", "openclaw_gateway_first_tui_runtime_unification": "completed \u2014 shared `/runtime` backend-mode command service across channel router + gateway, plus TUI `/runtime` forwarding through a gateway bridge with daemon/gateway auto-start attach",
"gateway_startup_eaddrinuse_hardening": "completed gateway bind collisions now fail deterministically with explicit error handling and TUI auto-start treats EADDRINUSE as attach race with connect retry" "gateway_startup_eaddrinuse_hardening": "completed \u2014 gateway bind collisions now fail deterministically with explicit error handling and TUI auto-start treats EADDRINUSE as attach race with connect retry",
"deeper_surfaces_phase1_run_control": "completed \u2014 interrupt queue mode now enforces latest-wins semantics with channel-path preemption, and gateway emits run_state lifecycle events rendered in the web UI"
}, },
"soul_md_and_cron_create": { "soul_md_and_cron_create": {
"date": "2026-02-11", "date": "2026-02-11",
@@ -6715,7 +6738,7 @@
}, },
"local-model-message-normalization": { "local-model-message-normalization": {
"date": "2026-02-11", "date": "2026-02-11",
"summary": "Ollama & llama.cpp tool calling message normalization normalizeMessagesForOllama() converts tool_use/tool_result content blocks to Ollama's native role:tool format, normalizeMessagesForLlamaCpp() converts to OpenAI-style tool_calls arrays with hybrid fallback for GGUF templates that drop role:tool messages.", "summary": "Ollama & llama.cpp tool calling message normalization \u2014 normalizeMessagesForOllama() converts tool_use/tool_result content blocks to Ollama's native role:tool format, normalizeMessagesForLlamaCpp() converts to OpenAI-style tool_calls arrays with hybrid fallback for GGUF templates that drop role:tool messages.",
"files_modified": [ "files_modified": [
"src/models/local/ollama.ts", "src/models/local/ollama.ts",
"src/models/local/ollama.test.ts", "src/models/local/ollama.test.ts",
@@ -6734,7 +6757,7 @@
"native-audio-support": { "native-audio-support": {
"status": "completed", "status": "completed",
"date": "2026-02-11", "date": "2026-02-11",
"summary": "Native audio input support voice messages passed directly to audio-capable models (Gemini, OpenAI, GitHub) instead of always transcribing via Whisper. Smart routing decides per-model whether to pass raw audio or transcribe first.", "summary": "Native audio input support \u2014 voice messages passed directly to audio-capable models (Gemini, OpenAI, GitHub) instead of always transcribing via Whisper. Smart routing decides per-model whether to pass raw audio or transcribe first.",
"phases": { "phases": {
"audio_transcribe_tool": { "audio_transcribe_tool": {
"status": "completed", "status": "completed",
@@ -6771,7 +6794,7 @@
}, },
"tests_and_token_estimation": { "tests_and_token_estimation": {
"status": "completed", "status": "completed",
"description": "Audio tests for media helpers, audio token estimation (base64→bytes→durationtokens at 32 tokens/sec), supports_audio config override wiring", "description": "Audio tests for media helpers, audio token estimation (base64\u2192bytes\u2192duration\u2192tokens at 32 tokens/sec), supports_audio config override wiring",
"files_modified": [ "files_modified": [
"src/models/media.test.ts", "src/models/media.test.ts",
"src/context/tokens.ts", "src/context/tokens.ts",
@@ -6783,7 +6806,7 @@
}, },
"stopreason-normalization": { "stopreason-normalization": {
"date": "2026-02-11", "date": "2026-02-11",
"summary": "Normalize OpenAI/GitHub finish_reason to Flynn stopReason conventions. OpenAI 'stop' 'end_turn', 'length' 'max_tokens', 'tool_calls' with tools 'tool_use', 'tool_calls' without tools 'end_turn'. Fixes premature agent loop exit when falling back to GitHub Copilot (Anthropic API quota exhausted). Agent loop now accepts both 'tool_use' and 'tool_calls' as belt-and-suspenders.", "summary": "Normalize OpenAI/GitHub finish_reason to Flynn stopReason conventions. OpenAI 'stop' \u2192 'end_turn', 'length' \u2192 'max_tokens', 'tool_calls' with tools \u2192 'tool_use', 'tool_calls' without tools \u2192 'end_turn'. Fixes premature agent loop exit when falling back to GitHub Copilot (Anthropic API quota exhausted). Agent loop now accepts both 'tool_use' and 'tool_calls' as belt-and-suspenders.",
"files_modified": [ "files_modified": [
"src/models/openai.ts", "src/models/openai.ts",
"src/models/openai.test.ts", "src/models/openai.test.ts",
+84
View File
@@ -437,6 +437,90 @@ describe('daemon command fast-path integration', () => {
}); });
}); });
it('preempts active runs when queue mode is interrupt', async () => {
const cancelSpy = vi.spyOn(AgentOrchestrator.prototype, 'cancel');
vi.spyOn(AgentOrchestrator.prototype, 'isCancellable').mockReturnValue(true);
const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process');
let resolveFirst: ((value: string) => void) | undefined;
let markStarted: (() => void) | undefined;
const started = new Promise<void>((resolve) => { markStarted = resolve; });
processSpy
.mockImplementationOnce(() => {
markStarted?.();
return new Promise<string>((resolve) => { resolveFirst = resolve; });
})
.mockResolvedValueOnce('second');
const session = {
id: 'telegram:user-interrupt',
addMessage: vi.fn(),
getHistory: vi.fn(() => []),
clear: vi.fn(),
replaceHistory: vi.fn(),
getConfig: vi.fn((key: string) => (key === 'queue.mode' ? 'interrupt' : undefined)),
setConfig: vi.fn(),
deleteConfig: vi.fn(),
};
const router = createMessageRouter({
sessionManager: {
getSession: vi.fn(() => session),
} as unknown as MessageRouterDeps['sessionManager'],
modelRouter: {
getAvailableTiers: () => ['fast', 'default', 'complex', 'local'],
getAllLabels: () => ({ fast: 'fast', default: 'default', complex: 'complex', local: 'local' }),
getLabel: (tier: string) => tier,
} as unknown as MessageRouterDeps['modelRouter'],
systemPrompt: 'test prompt',
toolRegistry: {
clone() { return this; },
register: vi.fn(),
} as unknown as MessageRouterDeps['toolRegistry'],
toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'],
config: {
agents: {
primary_tier: 'default',
delegation: {
compaction: 'fast',
memory_extraction: 'fast',
classification: 'fast',
tool_summarisation: 'fast',
complex_reasoning: 'complex',
},
max_delegation_depth: 3,
max_iterations: 10,
},
compaction: { enabled: false },
models: { default: { provider: 'anthropic', model: 'claude' } },
server: { queue: { mode: 'collect' } },
} as unknown as MessageRouterDeps['config'],
});
const reply = vi.fn(async (_message: OutboundMessage) => {});
const firstRun = router.handler({
id: 'm-interrupt-1',
channel: 'telegram',
senderId: 'user-interrupt',
text: 'first',
timestamp: Date.now(),
} as MessageRouterInput, reply);
await started;
await router.handler({
id: 'm-interrupt-2',
channel: 'telegram',
senderId: 'user-interrupt',
text: 'second',
timestamp: Date.now(),
} as MessageRouterInput, reply);
expect(cancelSpy).toHaveBeenCalled();
resolveFirst?.('first');
await firstRun;
});
it('emits run.state start and complete for non-command channel messages', async () => { it('emits run.state start and complete for non-command channel messages', async () => {
const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process').mockResolvedValue('ok'); const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process').mockResolvedValue('ok');
const mockAuditLogger = { const mockAuditLogger = {
+72 -40
View File
@@ -443,6 +443,56 @@ export function createMessageRouter(deps: {
}); });
} }
function requestActiveRunCancellation(input: {
sessionId: string;
channel: string;
senderId: string;
requestId: string;
}): { cancelled: boolean; latencyMs: number } {
const cancelStartedAt = Date.now();
const run = activeRuns.get(input.sessionId);
if (!run || !run.isCancellable()) {
const latencyMs = Date.now() - cancelStartedAt;
deps.metrics?.recordCancelLatency(latencyMs);
auditLogger?.runCancel?.({
session_id: input.sessionId,
channel: input.channel,
sender: input.senderId,
source: 'channel',
requested: true,
acknowledged: false,
request_id: input.requestId,
latency_ms: latencyMs,
});
return { cancelled: false, latencyMs };
}
run.cancel();
const cancelLatencyMs = Date.now() - cancelStartedAt;
deps.metrics?.recordCancelLatency(cancelLatencyMs);
auditLogger?.runCancel?.({
session_id: input.sessionId,
channel: input.channel,
sender: input.senderId,
source: 'channel',
requested: true,
acknowledged: true,
request_id: input.requestId,
latency_ms: cancelLatencyMs,
});
auditLogger?.runState?.({
session_id: input.sessionId,
channel: input.channel,
sender: input.senderId,
source: 'channel',
state: 'cancel_requested',
request_id: input.requestId,
duration_ms: cancelLatencyMs,
});
deps.metrics?.recordRunState('cancel_requested');
return { cancelled: true, latencyMs: cancelLatencyMs };
}
function executeBackendCommand(inputRaw: string, activeTier: string): string { function executeBackendCommand(inputRaw: string, activeTier: string): string {
return executeRuntimeBackendModeCommand(inputRaw, { return executeRuntimeBackendModeCommand(inputRaw, {
getActiveTier: () => activeTier, getActiveTier: () => activeTier,
@@ -770,6 +820,21 @@ export function createMessageRouter(deps: {
} }
} }
const session = deps.sessionManager.getSession(msg.channel, msg.senderId);
const queueMode = session.getConfig('queue.mode') ?? deps.config.server?.queue?.mode ?? 'collect';
const rawCommand = msg.metadata?.isCommand
? msg.metadata.command
: incomingText.trim().startsWith('/') ? incomingText.trim().slice(1).split(/\s+/, 1)[0] : undefined;
const isCancelCommand = rawCommand === 'stop' || rawCommand === 'cancel';
if (queueMode === 'interrupt' && !isCancelCommand) {
requestActiveRunCancellation({
sessionId: sessionIdForRun,
channel: msg.channel,
senderId: msg.senderId,
requestId: msg.id,
});
}
const automationReactions = deps.config.automation?.reactions ?? []; const automationReactions = deps.config.automation?.reactions ?? [];
if (!msg.metadata?.isCommand) { if (!msg.metadata?.isCommand) {
if (automationReactions.length === 0) { if (automationReactions.length === 0) {
@@ -896,7 +961,6 @@ export function createMessageRouter(deps: {
}); });
if (deps.commandRegistry && deps.commandRegistry.isCommand(commandInput)) { if (deps.commandRegistry && deps.commandRegistry.isCommand(commandInput)) {
const session = deps.sessionManager.getSession(msg.channel, msg.senderId);
const commandResult = await deps.commandRegistry.execute(commandInput, { const commandResult = await deps.commandRegistry.execute(commandInput, {
channel: msg.channel, channel: msg.channel,
senderId: msg.senderId, senderId: msg.senderId,
@@ -1066,46 +1130,15 @@ export function createMessageRouter(deps: {
return ''; return '';
}, },
cancelRun: () => { cancelRun: () => {
const cancelStartedAt = Date.now(); const result = requestActiveRunCancellation({
const run = activeRuns.get(session.id); sessionId: session.id,
if (!run || !run.isCancellable()) {
deps.metrics?.recordCancelLatency(Date.now() - cancelStartedAt);
auditLogger?.runCancel?.({
session_id: session.id,
channel: msg.channel,
sender: msg.senderId,
source: 'channel',
requested: true,
acknowledged: false,
request_id: msg.id,
latency_ms: Date.now() - cancelStartedAt,
});
return 'No active operation to cancel.';
}
run.cancel();
const cancelLatencyMs = Date.now() - cancelStartedAt;
deps.metrics?.recordCancelLatency(cancelLatencyMs);
auditLogger?.runCancel?.({
session_id: session.id,
channel: msg.channel, channel: msg.channel,
sender: msg.senderId, senderId: msg.senderId,
source: 'channel', requestId: msg.id,
requested: true,
acknowledged: true,
request_id: msg.id,
latency_ms: cancelLatencyMs,
}); });
auditLogger?.runState?.({ return result.cancelled
session_id: session.id, ? 'Cancellation requested. The active operation will stop at the next safe point.'
channel: msg.channel, : 'No active operation to cancel.';
sender: msg.senderId,
source: 'channel',
state: 'cancel_requested',
request_id: msg.id,
duration_ms: cancelLatencyMs,
});
deps.metrics?.recordRunState('cancel_requested');
return 'Cancellation requested. The active operation will stop at the next safe point.';
}, },
delegateAgent: async (agentName: string, task: string) => { delegateAgent: async (agentName: string, task: string) => {
@@ -1515,7 +1548,6 @@ export function createMessageRouter(deps: {
// Determine if the active model supports native audio input // Determine if the active model supports native audio input
let effectiveTier: string = deps.config.agents.primary_tier ?? 'default'; let effectiveTier: string = deps.config.agents.primary_tier ?? 'default';
const session = deps.sessionManager.getSession(msg.channel, msg.senderId);
const sessionTierOverride = session.getConfig('modelTier'); const sessionTierOverride = session.getConfig('modelTier');
const tierFromUseCaseMetadata = tierFromUseCase(deps.config, msg.metadata?.modelFor); const tierFromUseCaseMetadata = tierFromUseCase(deps.config, msg.metadata?.modelFor);
+13 -10
View File
@@ -331,8 +331,9 @@ describe('createAgentHandlers command fast-path', () => {
await handlers['agent.send'](req, send); await handlers['agent.send'](req, send);
expect(mockAgent.process).toHaveBeenCalledWith('/not-a-real-command', undefined); expect(mockAgent.process).toHaveBeenCalledWith('/not-a-real-command', undefined);
expect((sent[0] as GatewayEvent).event).toBe('done'); const doneEvent = sent.find((msg) => (msg as GatewayEvent).event === 'done') as GatewayEvent | undefined;
expect(((sent[0] as GatewayEvent).data as { content: string }).content).toBe('agent response'); expect(doneEvent).toBeTruthy();
expect(((doneEvent as GatewayEvent).data as { content: string }).content).toBe('agent response');
}); });
it('handles /approvals command via fast-path when hook engine is available', async () => { it('handles /approvals command via fast-path when hook engine is available', async () => {
@@ -421,7 +422,7 @@ describe('createAgentHandlers command fast-path', () => {
state: 'cancelled', state: 'cancelled',
}), }),
); );
expect((sent[0] as GatewayEvent).event).toBe('done'); expect(sent.some((msg) => (msg as GatewayEvent).event === 'done')).toBe(true);
}); });
it('emits run.cancel telemetry for agent.cancel requests', async () => { it('emits run.cancel telemetry for agent.cancel requests', async () => {
@@ -429,7 +430,7 @@ describe('createAgentHandlers command fast-path', () => {
id: 16, id: 16,
method: 'agent.cancel', method: 'agent.cancel',
params: { connectionId: 'conn-1' }, params: { connectionId: 'conn-1' },
}); }, vi.fn());
expect(sessionBridge.cancel).toHaveBeenCalledWith('conn-1'); expect(sessionBridge.cancel).toHaveBeenCalledWith('conn-1');
expect(mockAuditLogger.runCancel).toHaveBeenCalledWith( expect(mockAuditLogger.runCancel).toHaveBeenCalledWith(
@@ -492,9 +493,10 @@ describe('createAgentHandlers command fast-path', () => {
params: { message: 'hello', connectionId: 'conn-1' }, params: { message: 'hello', connectionId: 'conn-1' },
}, send); }, send);
expect(sent).toHaveLength(2); const events = sent.map((msg) => (msg as GatewayEvent).event);
expect((sent[0] as GatewayEvent).event).toBe('context_warning'); expect(events).toContain('context_warning');
expect((sent[1] as GatewayEvent).event).toBe('done'); expect(events).toContain('done');
expect(events.indexOf('context_warning')).toBeLessThan(events.indexOf('done'));
}); });
}); });
@@ -706,8 +708,9 @@ describe('createAgentHandlers queue policy resolution', () => {
mode: 'interrupt', mode: 'interrupt',
cancelled_active_run: true, cancelled_active_run: true,
})); }));
expect((sent[0] as GatewayEvent).event).toBe('content'); const contentEvent = sent.find((msg) => (msg as GatewayEvent).event === 'content') as GatewayEvent | undefined;
expect(((sent[0] as GatewayEvent).data as { text: string }).text).toContain('Interrupt mode'); expect(contentEvent).toBeTruthy();
expect((sent[1] as GatewayEvent).event).toBe('done'); expect(((contentEvent as GatewayEvent).data as { text: string }).text).toContain('Interrupt mode');
expect(sent.some((msg) => (msg as GatewayEvent).event === 'done')).toBe(true);
}); });
}); });
+46 -8
View File
@@ -86,6 +86,8 @@ function listEnabledExternalBackends(config?: Config): string[] {
} }
export function createAgentHandlers(deps: AgentHandlerDeps) { export function createAgentHandlers(deps: AgentHandlerDeps) {
const activeRequestIds = new Map<string, number>();
return { return {
'agent.send': async (request: GatewayRequest, send: SendFn): Promise<OutboundMessage | void> => { 'agent.send': async (request: GatewayRequest, send: SendFn): Promise<OutboundMessage | void> => {
const params = request.params as { message?: string; connectionId?: string; attachments?: GatewayAttachment[]; metadata?: { isCommand?: boolean; command?: string; commandArgs?: string } } | undefined; const params = request.params as { message?: string; connectionId?: string; attachments?: GatewayAttachment[]; metadata?: { isCommand?: boolean; command?: string; commandArgs?: string } } | undefined;
@@ -116,10 +118,13 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
const laneId = sessionId ?? connectionId; const laneId = sessionId ?? connectionId;
const channel = sessionId?.split(':', 1)[0] ?? 'ws'; const channel = sessionId?.split(':', 1)[0] ?? 'ws';
const resolvedPolicy = deps.resolveQueuePolicy?.({ laneId, sessionId, connectionId, channel }); const resolvedPolicy = deps.resolveQueuePolicy?.({ laneId, sessionId, connectionId, channel });
const laneQueueWithProcessing = deps.laneQueue as LaneQueue & { isProcessing?: (lane: string) => boolean }; const laneQueueWithProcessing = deps.laneQueue as LaneQueue & {
const laneIsProcessing = typeof laneQueueWithProcessing.isProcessing === 'function' isProcessing?: (lane: string) => boolean;
? laneQueueWithProcessing.isProcessing(laneId) isActive?: (lane: string) => boolean;
: false; };
const laneIsActive = typeof laneQueueWithProcessing.isActive === 'function'
? laneQueueWithProcessing.isActive(laneId)
: (laneQueueWithProcessing.isProcessing?.(laneId) ?? false);
const requestId = request.id.toString(); const requestId = request.id.toString();
const sessionIdForAudit = sessionId ?? `ws:${connectionId}`; const sessionIdForAudit = sessionId ?? `ws:${connectionId}`;
const runStartedAt = Date.now(); const runStartedAt = Date.now();
@@ -127,7 +132,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
// Interrupt mode should preempt active work when a newer request arrives. // Interrupt mode should preempt active work when a newer request arrives.
// LaneQueue itself only rejects queued entries, so we also request agent cancellation. // LaneQueue itself only rejects queued entries, so we also request agent cancellation.
if (resolvedPolicy?.mode === 'interrupt' && laneIsProcessing) { if (resolvedPolicy?.mode === 'interrupt' && laneIsActive) {
const cancelStartedAt = Date.now(); const cancelStartedAt = Date.now();
const cancelled = sessionId const cancelled = sessionId
? deps.sessionBridge.cancelSession(sessionId) ? deps.sessionBridge.cancelSession(sessionId)
@@ -165,6 +170,13 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
duration_ms: cancelLatencyMs, duration_ms: cancelLatencyMs,
}); });
deps.metrics?.recordRunState('cancel_requested'); deps.metrics?.recordRunState('cancel_requested');
const activeRequestId = activeRequestIds.get(connectionId);
if (activeRequestId && activeRequestId !== request.id) {
send(makeEvent(activeRequestId, 'run_state', {
state: 'cancel_requested',
timestamp: Date.now(),
}));
}
} }
} }
@@ -202,6 +214,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
const isCommand = Boolean(commandInput && deps.commandRegistry?.isCommand(commandInput)); const isCommand = Boolean(commandInput && deps.commandRegistry?.isCommand(commandInput));
if (!isCommand) { if (!isCommand) {
activeRequestIds.set(connectionId, request.id);
auditLogger?.runState?.({ auditLogger?.runState?.({
session_id: sessionIdForAudit, session_id: sessionIdForAudit,
channel: 'ws', channel: 'ws',
@@ -211,9 +224,13 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
request_id: requestId, request_id: requestId,
}); });
deps.metrics?.recordRunState('start'); deps.metrics?.recordRunState('start');
send(makeEvent(request.id, 'run_state', {
state: 'start',
timestamp: Date.now(),
}));
} }
if (commandInput && deps.commandRegistry?.isCommand(commandInput)) { if (isCommand) {
const sessionId = deps.sessionBridge.getSessionId(connectionId); const sessionId = deps.sessionBridge.getSessionId(connectionId);
const commandResult = await deps.commandRegistry.execute(commandInput, { const commandResult = await deps.commandRegistry.execute(commandInput, {
channel: 'ws', channel: 'ws',
@@ -646,6 +663,10 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
if (commandResult.handled) { if (commandResult.handled) {
send(makeEvent(request.id, 'done', { content: commandResult.text })); send(makeEvent(request.id, 'done', { content: commandResult.text }));
deps.sessionBridge.setBusy(connectionId, false);
deps.sessionBridge.setOnToolUse(connectionId, undefined);
activeRequestIds.delete(connectionId);
deps.metrics?.endRequest(requestId);
return; return;
} }
} }
@@ -699,10 +720,14 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
context: { sessionId: laneId, level: contextAlert.level }, context: { sessionId: laneId, level: contextAlert.level },
}); });
} }
send(makeEvent(request.id, 'done', { content: response }));
const finalState = response.trim().toLowerCase() === 'operation cancelled by user.' const finalState = response.trim().toLowerCase() === 'operation cancelled by user.'
? 'cancelled' ? 'cancelled'
: 'complete'; : 'complete';
send(makeEvent(request.id, 'run_state', {
state: finalState,
timestamp: Date.now(),
}));
send(makeEvent(request.id, 'done', { content: response }));
auditLogger?.runState?.({ auditLogger?.runState?.({
session_id: sessionIdForAudit, session_id: sessionIdForAudit,
channel: 'ws', channel: 'ws',
@@ -723,6 +748,11 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
message, message,
context: { sessionId: laneId }, context: { sessionId: laneId },
}); });
send(makeEvent(request.id, 'run_state', {
state: 'error',
timestamp: Date.now(),
message,
}));
send(makeEvent(request.id, 'error', { code: ErrorCode.InternalError, message })); send(makeEvent(request.id, 'error', { code: ErrorCode.InternalError, message }));
auditLogger?.runState?.({ auditLogger?.runState?.({
session_id: sessionIdForAudit, session_id: sessionIdForAudit,
@@ -738,6 +768,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
} finally { } finally {
deps.sessionBridge.setBusy(connectionId, false); deps.sessionBridge.setBusy(connectionId, false);
deps.sessionBridge.setOnToolUse(connectionId, undefined); deps.sessionBridge.setOnToolUse(connectionId, undefined);
activeRequestIds.delete(connectionId);
deps.metrics?.endRequest(requestId); deps.metrics?.endRequest(requestId);
} }
}, resolvedPolicy); }, resolvedPolicy);
@@ -754,7 +785,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
} }
}, },
'agent.cancel': async (request: GatewayRequest): Promise<OutboundMessage> => { 'agent.cancel': async (request: GatewayRequest, send: SendFn): Promise<OutboundMessage> => {
const params = request.params as { connectionId?: string } | undefined; const params = request.params as { connectionId?: string } | undefined;
const connectionId = params?.connectionId as string; const connectionId = params?.connectionId as string;
@@ -795,6 +826,13 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
}); });
deps.metrics?.recordRunState('cancel_requested'); deps.metrics?.recordRunState('cancel_requested');
} }
const activeRequestId = activeRequestIds.get(connectionId);
if (cancelled && activeRequestId) {
send(makeEvent(activeRequestId, 'run_state', {
state: 'cancel_requested',
timestamp: Date.now(),
}));
}
return { return {
id: request.id, id: request.id,
result: { result: {
+58 -1
View File
@@ -1,4 +1,4 @@
import { describe, it, expect } from 'vitest'; import { describe, it, expect, vi } from 'vitest';
import { LaneQueue, LaneQueueRejectedError } from './lane-queue.js'; import { LaneQueue, LaneQueueRejectedError } from './lane-queue.js';
describe('LaneQueue', () => { describe('LaneQueue', () => {
@@ -230,6 +230,25 @@ describe('LaneQueue', () => {
await expect(p3).resolves.toBe('new-pending'); await expect(p3).resolves.toBe('new-pending');
}); });
it('interrupt mode keeps only the most recent pending request', async () => {
const queue = new LaneQueue({ mode: 'interrupt' });
let resolveFirst!: () => void;
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
const p1 = queue.enqueue('lane-a', async () => {
await firstBlocks;
return 'active';
});
const p2 = queue.enqueue('lane-a', async () => 'old-pending');
const p3 = queue.enqueue('lane-a', async () => 'latest-pending');
await expect(p2).rejects.toThrow('Superseded by newer request');
resolveFirst();
await expect(p1).resolves.toBe('active');
await expect(p3).resolves.toBe('latest-pending');
});
it('drop_new overflow rejects newest request when cap is reached', async () => { it('drop_new overflow rejects newest request when cap is reached', async () => {
const queue = new LaneQueue({ cap: 1, overflow: 'drop_new' }); const queue = new LaneQueue({ cap: 1, overflow: 'drop_new' });
let resolveFirst!: () => void; let resolveFirst!: () => void;
@@ -249,6 +268,44 @@ describe('LaneQueue', () => {
await expect(p2).resolves.toBe('pending-1'); await expect(p2).resolves.toBe('pending-1');
}); });
it('interrupt mode clears debounce backlog to run latest request immediately', async () => {
vi.useFakeTimers();
try {
const queue = new LaneQueue({ mode: 'interrupt', debounceMs: 50 });
const events: string[] = [];
let resolveFirst!: () => void;
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
const p1 = queue.enqueue('lane-a', async () => {
events.push('first:start');
await firstBlocks;
events.push('first:end');
return 'first';
});
const p2 = queue.enqueue('lane-a', async () => {
events.push('second:start');
return 'second';
});
resolveFirst();
await p1;
await Promise.resolve();
const p3 = queue.enqueue('lane-a', async () => {
events.push('third:start');
return 'third';
});
await Promise.resolve();
expect(events).toContain('third:start');
await expect(p2).rejects.toThrow('Superseded by newer request');
await expect(p3).resolves.toBe('third');
} finally {
vi.useRealTimers();
}
});
it('drop_old overflow evicts oldest pending request when cap is reached', async () => { it('drop_old overflow evicts oldest pending request when cap is reached', async () => {
const queue = new LaneQueue({ cap: 1, overflow: 'drop_old' }); const queue = new LaneQueue({ cap: 1, overflow: 'drop_old' });
let resolveFirst!: () => void; let resolveFirst!: () => void;
+21 -10
View File
@@ -98,15 +98,9 @@ export class LaneQueue {
this.lanes.set(laneId, lane); this.lanes.set(laneId, lane);
} }
// If nothing is running on this lane, execute immediately if (effective.mode === 'interrupt' && lane.debounceTimer) {
if (!lane.active && !lane.debounceTimer) { clearTimeout(lane.debounceTimer);
lane.active = true; lane.debounceTimer = undefined;
try {
return await work();
} finally {
lane.active = false;
this.processNext(laneId);
}
} }
if (effective.mode === 'steer' || effective.mode === 'steer_backlog' || effective.mode === 'interrupt') { if (effective.mode === 'steer' || effective.mode === 'steer_backlog' || effective.mode === 'interrupt') {
@@ -125,6 +119,17 @@ export class LaneQueue {
}); });
} }
// If nothing is running on this lane, execute immediately
if (!lane.active && !lane.debounceTimer && lane.queue.length === 0) {
lane.active = true;
try {
return await work();
} finally {
lane.active = false;
this.processNext(laneId);
}
}
if (lane.queue.length >= effective.cap) { if (lane.queue.length >= effective.cap) {
if (effective.overflow === 'drop_new') { if (effective.overflow === 'drop_new') {
return Promise.reject( return Promise.reject(
@@ -168,12 +173,18 @@ export class LaneQueue {
}); });
} }
/** Check if a lane currently has active work executing. */ /** Check if a lane is active or waiting to start due to debounce. */
isProcessing(laneId: string): boolean { isProcessing(laneId: string): boolean {
const lane = this.lanes.get(laneId); const lane = this.lanes.get(laneId);
return (lane?.active ?? false) || Boolean(lane?.debounceTimer); return (lane?.active ?? false) || Boolean(lane?.debounceTimer);
} }
/** Check if a lane currently has active work executing (not counting debounce delay). */
isActive(laneId: string): boolean {
const lane = this.lanes.get(laneId);
return lane?.active ?? false;
}
/** Get the number of pending (not yet started) items in a lane. */ /** Get the number of pending (not yet started) items in a lane. */
queueLength(laneId: string): number { queueLength(laneId: string): number {
return this.lanes.get(laneId)?.queue.length ?? 0; return this.lanes.get(laneId)?.queue.length ?? 0;
+9
View File
@@ -95,6 +95,7 @@ export type EventType =
| 'tool_end' | 'tool_end'
| 'context_warning' | 'context_warning'
| 'attachment' | 'attachment'
| 'run_state'
| 'done' | 'done'
| 'error'; | 'error';
@@ -142,6 +143,14 @@ export interface AttachmentEventData {
filename?: string; filename?: string;
} }
export type RunState = 'start' | 'cancel_requested' | 'cancelled' | 'complete' | 'error';
export interface RunStateEventData {
state: RunState;
timestamp: number;
message?: string;
}
export interface DoneEventData { export interface DoneEventData {
content: string; content: string;
} }
+21
View File
@@ -715,6 +715,10 @@ async function sendMessage(client, overrideText) {
scrollToBottom(); scrollToBottom();
// Create placeholder for assistant response // Create placeholder for assistant response
const statusLine = document.createElement('div');
statusLine.className = 'px-1 text-[11px] leading-none text-zinc-500 select-none hidden';
statusLine.textContent = 'Run status: queued';
_elements.messages.appendChild(statusLine);
const placeholder = document.createElement('div'); const placeholder = document.createElement('div');
placeholder.className = 'rounded-lg px-3.5 py-2.5 text-sm leading-relaxed break-words whitespace-pre-wrap bg-zinc-900 border border-zinc-800 text-zinc-50 streaming-cursor'; placeholder.className = 'rounded-lg px-3.5 py-2.5 text-sm leading-relaxed break-words whitespace-pre-wrap bg-zinc-900 border border-zinc-800 text-zinc-50 streaming-cursor';
placeholder.innerHTML = '<span class="text-zinc-500">Thinking...</span>'; placeholder.innerHTML = '<span class="text-zinc-500">Thinking...</span>';
@@ -759,6 +763,22 @@ async function sendMessage(client, overrideText) {
scrollToBottom(); scrollToBottom();
}); });
stream.on('run_state', (data) => {
if (!data || !data.state) {
return;
}
const labels = {
start: 'Run status: working',
cancel_requested: 'Run status: cancellation requested',
cancelled: 'Run status: cancelled',
complete: 'Run status: complete',
error: `Run status: error${data.message ? ` (${data.message})` : ''}`,
};
statusLine.textContent = labels[data.state] || `Run status: ${data.state}`;
statusLine.classList.remove('hidden');
scrollToBottom();
});
const done = await stream.result; const done = await stream.result;
const content = done?.content ?? done?.text ?? '(no response)'; const content = done?.content ?? done?.text ?? '(no response)';
const assistantMessage = createMessageEl('assistant', content, Date.now()); const assistantMessage = createMessageEl('assistant', content, Date.now());
@@ -771,6 +791,7 @@ async function sendMessage(client, overrideText) {
_cancelling = false; _cancelling = false;
updateSendButton(); updateSendButton();
clearPendingAttachments(); clearPendingAttachments();
statusLine.remove();
scrollToBottom(); scrollToBottom();
} }
} }
+51
View File
@@ -185,4 +185,55 @@ describe('ChatPage wiring', () => {
expect(calls.some((entry) => entry.method === 'agent.cancel')).toBe(true); expect(calls.some((entry) => entry.method === 'agent.cancel')).toBe(true);
}); });
it('renders run_state updates during streaming', async () => {
let resolveResult: ((value: { content: string }) => void) | undefined;
const handlers = new Map<string, Array<(data: any) => void>>();
const stream = {
on(event: string, cb: (data: any) => void) {
if (!handlers.has(event)) {
handlers.set(event, []);
}
handlers.get(event)?.push(cb);
},
emit(event: string, data: any) {
for (const cb of handlers.get(event) ?? []) {
cb(data);
}
},
result: new Promise<{ content: string }>((resolve) => {
resolveResult = resolve;
}),
};
const client = {
async call(method: string) {
if (method === 'sessions.list') {
return { sessions: [] };
}
return null;
},
stream() {
return stream;
},
};
await ChatPage.render(root, client);
const input = root.querySelector('#chat-input');
input.value = 'hello';
root.querySelector('#chat-send').dispatchEvent(new windowObj.Event('click', { bubbles: true }));
await Promise.resolve();
stream.emit('run_state', { state: 'start' });
await Promise.resolve();
const statusLine = Array.from(root.querySelectorAll('div'))
.find((el: any) => String(el.textContent ?? '').startsWith('Run status:'));
expect(statusLine).toBeTruthy();
expect(statusLine.classList.contains('hidden')).toBe(false);
resolveResult?.({ content: 'ok' });
await Promise.resolve();
});
}); });