From e4ee6acce8a8fc87a37c54e269f0c4a67335f9d6 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Wed, 25 Feb 2026 10:22:44 -0800 Subject: [PATCH] Phase 1 run-control semantics and run_state events --- docs/api/PROTOCOL.md | 32 ++++- docs/architecture/AGENT_DIAGRAM.md | 4 + .../GATEWAY_SESSIONS_AND_QUEUE.md | 2 + docs/plans/state.json | 123 +++++++++++------- src/daemon/routing.test.ts | 84 ++++++++++++ src/daemon/routing.ts | 112 ++++++++++------ src/gateway/handlers/agent.test.ts | 23 ++-- src/gateway/handlers/agent.ts | 54 ++++++-- src/gateway/lane-queue.test.ts | 59 ++++++++- src/gateway/lane-queue.ts | 31 +++-- src/gateway/protocol.ts | 9 ++ src/gateway/ui/pages/chat.js | 21 +++ src/gateway/ui/pages/chat.test.ts | 51 ++++++++ 13 files changed, 485 insertions(+), 120 deletions(-) diff --git a/docs/api/PROTOCOL.md b/docs/api/PROTOCOL.md index 1f11c6c..8a30732 100644 --- a/docs/api/PROTOCOL.md +++ b/docs/api/PROTOCOL.md @@ -69,7 +69,7 @@ sequenceDiagram end G->>A: process(message) in that session - A-->>G: streaming events (content/tool_start/tool_end/context_warning) + A-->>G: streaming events (content/tool_start/tool_end/context_warning/run_state) G-->>C: events + final done C->>G: agent.cancel {connectionId} @@ -1028,6 +1028,18 @@ Send a message to the agent and stream response. } ``` +`run_state` event: +```json +{ + "id": 7, + "event": "run_state", + "data": { + "state": "complete", + "timestamp": 1730140800000 + } +} +``` + `done` event: ```json { @@ -1550,6 +1562,21 @@ Proactive context pressure signal emitted by `agent.send` before `done`. } ``` +#### `run_state` + +Run lifecycle transition emitted during `agent.send` processing. + +```json +{ + "id": 1, + "event": "run_state", + "data": { + "state": "start", + "timestamp": 1730140800000 + } +} +``` + #### `done` Agent processing complete (final response). @@ -1715,6 +1742,9 @@ class FlynnClient { case 'context_warning': console.warn('Context warning:', data.level, data.message); break; + case 'run_state': + console.log('Run state:', data.state, data.timestamp); + break; case 'done': console.log('Done:', data.content); break; diff --git a/docs/architecture/AGENT_DIAGRAM.md b/docs/architecture/AGENT_DIAGRAM.md index 6a11534..4e5c0b5 100644 --- a/docs/architecture/AGENT_DIAGRAM.md +++ b/docs/architecture/AGENT_DIAGRAM.md @@ -139,6 +139,10 @@ Outbound Reply -> ChannelAdapter.send() (text + optional attachments) ``` +Gateway streaming UX signals: + +- WebSocket `agent.send` emits `run_state` lifecycle events (`start`, `cancel_requested`, `cancelled`, `complete`, `error`) for UI/state rendering. + Key files: - Routing + per-session agent creation: `src/daemon/routing.ts` diff --git a/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md b/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md index 5dc2228..392f97f 100644 --- a/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md +++ b/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md @@ -91,6 +91,7 @@ Within a lane: - Only one request executes at a time. - Later requests queue (FIFO) and start after the active request finishes. +- `interrupt` mode enforces "latest wins": any queued backlog is superseded and the active run is asked to cancel so the newest request becomes the next (or immediate) execution. Across lanes: @@ -112,6 +113,7 @@ Important: - Cancellation is best-effort for the currently running work: it stops at the next safe point in the agent loop. - Queued work is deterministically rejected. +- Gateway streams `run_state` events (`start`, `cancel_requested`, `cancelled`, `complete`, `error`) so clients can render lifecycle state without parsing assistant text. Key files: diff --git a/docs/plans/state.json b/docs/plans/state.json index e197a73..f70f6e5 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -3635,7 +3635,7 @@ "multi_agent_routing": { "priority": "P2", "status": "completed", - "description": "Config-driven agent routing: AgentConfigRegistry for named agent configs (system_prompt, model_tier, tool_profile, sandbox), AgentRouter with sender→channel→default resolution (glob patterns), per-agent tool registry cloning with sandboxed overrides, daemon wiring", + "description": "Config-driven agent routing: AgentConfigRegistry for named agent configs (system_prompt, model_tier, tool_profile, sandbox), AgentRouter with sender\u2192channel\u2192default resolution (glob patterns), per-agent tool registry cloning with sandboxed overrides, daemon wiring", "files_created": [ "src/agents/registry.ts", "src/agents/registry.test.ts", @@ -3789,7 +3789,7 @@ "auto_login": { "priority": "P5", "status": "completed", - "description": "Lazy token resolution with onLoginRequired callback — triggers OAuth device flow automatically on first API call when no token is available", + "description": "Lazy token resolution with onLoginRequired callback \u2014 triggers OAuth device flow automatically on first API call when no token is available", "files_modified": [ "src/models/github.ts", "src/daemon/index.ts", @@ -3937,7 +3937,7 @@ "p8-agent-tools": { "status": "completed", "date": "2026-02-07", - "summary": "8 new agent-callable tools exposing existing internal APIs, plus gap analysis audit update (25% → 65% match rate)", + "summary": "8 new agent-callable tools exposing existing internal APIs, plus gap analysis audit update (25% \u2192 65% match rate)", "phases": { "sessions_tools": { "priority": "P8", @@ -3994,7 +3994,7 @@ "file_patch_tool": { "priority": "Tier3", "status": "completed", - "description": "file.patch tool for multi-hunk structured patches — apply multiple line-based edits (replacements, insertions, deletions) across one or more files in a single tool call. Hunks applied bottom-up to preserve line numbers.", + "description": "file.patch tool for multi-hunk structured patches \u2014 apply multiple line-based edits (replacements, insertions, deletions) across one or more files in a single tool call. Hunks applied bottom-up to preserve line numbers.", "files_created": [ "src/tools/builtin/file-patch.ts", "src/tools/builtin/file-patch.test.ts" @@ -4008,7 +4008,7 @@ "gmail_pubsub_watcher": { "priority": "Tier3", "status": "completed", - "description": "Gmail Pub/Sub watcher ChannelAdapter — monitors Gmail via Google Cloud Pub/Sub push notifications with polling fallback. OAuth2 auth, configurable watch labels, template rendering with email metadata placeholders. Wired into daemon lifecycle and gateway (POST /gmail/push endpoint).", + "description": "Gmail Pub/Sub watcher ChannelAdapter \u2014 monitors Gmail via Google Cloud Pub/Sub push notifications with polling fallback. OAuth2 auth, configurable watch labels, template rendering with email metadata placeholders. Wired into daemon lifecycle and gateway (POST /gmail/push endpoint).", "files_created": [ "src/automation/gmail.ts", "src/automation/gmail.test.ts" @@ -4133,7 +4133,7 @@ "phases": { "ollama_tool_calling": { "status": "completed", - "description": "Pass tools to Ollama API in correct format, parse tool_calls from responses with generated IDs, set stopReason to 'tool_use'. Handle thinking field from reasoning models (deepseek-r1, glm-4.7-flash) — use as content fallback and expose via thinkingContent. Streaming support for both tool calls and thinking.", + "description": "Pass tools to Ollama API in correct format, parse tool_calls from responses with generated IDs, set stopReason to 'tool_use'. Handle thinking field from reasoning models (deepseek-r1, glm-4.7-flash) \u2014 use as content fallback and expose via thinkingContent. Streaming support for both tool calls and thinking.", "files_modified": [ "src/models/local/ollama.ts", "src/models/local/ollama.test.ts" @@ -4159,7 +4159,7 @@ "lane_queue": { "priority": "Tier3", "status": "completed", - "description": "Per-session FIFO queue in gateway — serializes concurrent requests instead of rejecting. LaneQueue class with enqueue/cancel/queueLength methods.", + "description": "Per-session FIFO queue in gateway \u2014 serializes concurrent requests instead of rejecting. LaneQueue class with enqueue/cancel/queueLength methods.", "files_created": [ "src/gateway/lane-queue.ts", "src/gateway/lane-queue.test.ts" @@ -4174,7 +4174,7 @@ "credential_redaction": { "priority": "Tier3", "status": "completed", - "description": "Expanded redactConfig() from 2 secret locations to 18+ secret fields — telegram, discord, slack tokens; server.token; all model tier api_key/auth_token; web_search, audio, memory embedding api_keys; webhook secrets; gmail credentials; MCP server env vars.", + "description": "Expanded redactConfig() from 2 secret locations to 18+ secret fields \u2014 telegram, discord, slack tokens; server.token; all model tier api_key/auth_token; web_search, audio, memory embedding api_keys; webhook secrets; gmail credentials; MCP server env vars.", "files_modified": [ "src/gateway/handlers/config.ts", "src/gateway/handlers/handlers.test.ts" @@ -4199,7 +4199,7 @@ "xai_grok_provider": { "priority": "Tier3", "status": "completed", - "description": "xAI as OpenAI-compatible model provider — reuses OpenAIClient with baseURL https://api.x.ai/v1, XAI_API_KEY env var fallback, pricing for grok-3/grok-3-mini/grok-2/grok-2-mini/grok-3-fast.", + "description": "xAI as OpenAI-compatible model provider \u2014 reuses OpenAIClient with baseURL https://api.x.ai/v1, XAI_API_KEY env var fallback, pricing for grok-3/grok-3-mini/grok-2/grok-2-mini/grok-3-fast.", "files_modified": [ "src/config/schema.ts", "src/daemon/index.ts", @@ -4209,7 +4209,7 @@ "voyage_ai_embeddings": { "priority": "Tier3", "status": "completed", - "description": "Voyage AI embedding provider for memory/vector search — OpenAI SDK with baseURL https://api.voyageai.com/v1, defaults to 1024 dimensions, VOYAGE_API_KEY env var.", + "description": "Voyage AI embedding provider for memory/vector search \u2014 OpenAI SDK with baseURL https://api.voyageai.com/v1, defaults to 1024 dimensions, VOYAGE_API_KEY env var.", "files_modified": [ "src/config/schema.ts", "src/memory/embeddings.ts", @@ -4227,7 +4227,7 @@ "gateway_lock": { "priority": "Tier4", "status": "completed", - "description": "Single-client gateway mode — if lock=true and a client is connected, reject new connections with code 4003. UI detects locked state.", + "description": "Single-client gateway mode \u2014 if lock=true and a client is connected, reject new connections with code 4003. UI detects locked state.", "files_modified": [ "src/config/schema.ts", "src/gateway/server.ts", @@ -6656,52 +6656,75 @@ "docs/plans/state.json" ], "test_status": "runtime validation: docker compose recreate + curl 127.0.0.1:18801 + POST /v1/audio/transcriptions returned HTTP response" + }, + "deeper-surfaces-phase1-run-control": { + "status": "completed", + "date": "2026-02-25", + "updated": "2026-02-25", + "summary": "Implemented Phase 1 run-control semantics and gateway UX signals: hardened interrupt queue behavior with latest-wins semantics, added run_state lifecycle events to the gateway protocol/UI, aligned channel-path interrupt cancellation with active-run tracking, and added focused tests.", + "files_modified": [ + "src/gateway/lane-queue.ts", + "src/gateway/lane-queue.test.ts", + "src/gateway/handlers/agent.ts", + "src/gateway/handlers/agent.test.ts", + "src/gateway/protocol.ts", + "src/gateway/ui/pages/chat.js", + "src/gateway/ui/pages/chat.test.ts", + "src/daemon/routing.ts", + "src/daemon/routing.test.ts", + "docs/api/PROTOCOL.md", + "docs/architecture/AGENT_DIAGRAM.md", + "docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md", + "docs/plans/state.json" + ], + "test_status": "pnpm test:run src/gateway/lane-queue.test.ts src/gateway/handlers/agent.test.ts src/gateway/ui/pages/chat.test.ts src/daemon/routing.test.ts passing" } }, "overall_progress": { - "total_test_count": 2009, + "total_test_count": 2013, "all_tests_passing": true, "p0_completion": "3/3 (100%)", "p1_completion": "4/4 (100%)", "p2_completion": "7/7 (100%)", "p3_completion": "completed (group chat, gateway auth, Gemini, OpenRouter, Bedrock, browser control)", - "p4_completion": "1/1 (100%) — multimodal media pipeline", - "p5_completion": "1/1 (100%) — GitHub Copilot provider with auto-login", - "p6_completion": "4/4 (100%) — enhanced media pipeline (image.analyze, outbound attachments, gateway attachments, audio transcription)", - "p7_completion": "6/6 (100%) — web UI dashboard SPA (dashboard, chat, sessions, settings)", - "p8_completion": "8/8 (100%) — agent tools (sessions.list/history/create/delete, agents.list, message.send, cron.list/trigger) + gap analysis audit", - "tier1_completion": "5/5 (100%) — !!think prefix, /verbose command, typing indicators (Discord/WhatsApp), session pruning (TTL), tool groups", - "tier2_completion": "4/4 (100%) — inbound webhooks, vector memory search, Dockerfile, heartbeat monitor", - "tier3_completion": "5/5 (100%) — lane queue, credential redaction, web UI token dashboard, xAI (Grok) provider, Voyage AI embeddings", - "tier4_completion": "4/4 (100%) — gateway lock, shell completion, Tailscale Serve/Funnel, DM pairing codes", + "p4_completion": "1/1 (100%) \u2014 multimodal media pipeline", + "p5_completion": "1/1 (100%) \u2014 GitHub Copilot provider with auto-login", + "p6_completion": "4/4 (100%) \u2014 enhanced media pipeline (image.analyze, outbound attachments, gateway attachments, audio transcription)", + "p7_completion": "6/6 (100%) \u2014 web UI dashboard SPA (dashboard, chat, sessions, settings)", + "p8_completion": "8/8 (100%) \u2014 agent tools (sessions.list/history/create/delete, agents.list, message.send, cron.list/trigger) + gap analysis audit", + "tier1_completion": "5/5 (100%) \u2014 !!think prefix, /verbose command, typing indicators (Discord/WhatsApp), session pruning (TTL), tool groups", + "tier2_completion": "4/4 (100%) \u2014 inbound webhooks, vector memory search, Dockerfile, heartbeat monitor", + "tier3_completion": "5/5 (100%) \u2014 lane queue, credential redaction, web UI token dashboard, xAI (Grok) provider, Voyage AI embeddings", + "tier4_completion": "4/4 (100%) \u2014 gateway lock, shell completion, Tailscale Serve/Funnel, DM pairing codes", "feature_gap_scorecard": "128/128 match (100%), 0 partial (0%), 0 missing (0%)", - "operator_dx_milestone": "Phase 3 (Live Ops Dashboard): 2/2 plans complete — milestone done", - "dashboard_observability": "completed — service health graphs + core service log viewer added to web UI via observability RPCs and bounded backend sampling", + "operator_dx_milestone": "Phase 3 (Live Ops Dashboard): 2/2 plans complete \u2014 milestone done", + "dashboard_observability": "completed \u2014 service health graphs + core service log viewer added to web UI via observability RPCs and bounded backend sampling", "gmail_auth_cli": "flynn gmail-auth command implemented with OAuth2 flow, doctor check, config routed to Telegram", - "gmail_filter_creation": "completed — gmail.filter.create tool added with criteria/action validation; gmail-auth requests explicit gmail.settings.basic + gmail.readonly scopes for filter creation and inbox reads", - "toolloop_action_intent_recovery": "completed — when a model claims it will execute a tool but emits no tool call, NativeAgent now issues one internal nudge and continues the same turn to execute tools or produce a concrete blocker", - "toolloop_execution_claim_recovery": "completed — when a model claims a known tool already succeeded/failed without emitting a tool call, NativeAgent now nudges once and retries the same turn before returning text", - "daily_briefing_google_scope_remediation": "completed — calendar.* and tasks.* now append explicit re-auth guidance (`flynn gcal-auth` / `flynn gtasks-auth`) for insufficient-scope errors, and operator runbook includes remediation steps", - "council_tool_timeout_override": "completed — ToolExecutor supports per-tool timeout overrides and council.run now uses a 180s timeout to avoid false 30s council timeouts in the tool loop", - "minimal_tui_multiline_paste_mode": "completed — minimal TUI now supports `/paste`/`/multiline` multiline compose mode ending with single '.' line, preventing newline truncation for pasted prompts", - "config_profile_consolidation": "completed — config/paas.yaml is now generated from canonical config/default.yaml + config/profiles/paas.overlay.yaml with CI-checkable drift detection", - "google_auth_hardening": "completed — shared Google OAuth runtime helper + auth store (auth.json), legacy token-file migration, refresh persistence, service-wide doctor checks, and unified `flynn google-auth` command", - "model_router_correctness": "completed — fallback paths now avoid duplicate clients, apply retry policy consistently, and reject unsupported OpenAI OAuth tool requests early", - "native_audio_support": "completed — smart routing for native audio (Gemini/OpenAI/GitHub) vs Whisper transcription fallback, plus 2026-02-23 arg hydration hardening, tool.args_rewritten audit metric, transient fetch retry/timeout hardening, localhost->127.0.0.1 fallback for transcription endpoint connectivity, and whisper docker-compose entrypoint arg fix for port 18801", - "remaining_phases_completion": "Phase 1: 3/3 (100%) — context levels, command registry, memory structure. Phase 2: 3/3 (100%) — component registry, confidence routing, history index. Phase 3: 2/2 (100%) — adaptive memory/compaction, truthfulness/autonomy hardening", - "deeper_surfaces_behavior_stack_plan": "completed — documented a decision-complete balanced-hybrid roadmap for OpenClaw-like end-user surface depth plus integrated behavior semantics with phased scope, acceptance gates, and rollout constraints", - "deeper_surfaces_phase0_ticket_pack": "completed — produced an atomic implementation checklist for Phase 0 baseline observability work (audit events, router/gateway emitters, metrics counters, baseline summary tooling, docs sync)", - "deeper_surfaces_phase0_ticket_01": "completed — audit schema/logger now capture run lifecycle and reaction decision baseline events (`run.state`, `run.cancel`, `reaction.match`, `reaction.skip`) with regression test coverage", - "deeper_surfaces_phase0_ticket_02": "completed — gateway + daemon routing emit run lifecycle/cancel telemetry and reaction match/skip audit events with filter summaries and cancellation latency, plus focused tests", - "deeper_surfaces_phase0_ticket_03": "completed — gateway metrics now track run-state outcomes, cancel latency samples, and reaction decision counters with routing/gateway emitters", - "deeper_surfaces_phase0_ticket_04": "completed — added phase-0 baseline summary tooling for run outcomes, cancel latency, and reaction decisions with markdown/json CLI output", - "deeper_surfaces_phase0_ticket_05": "completed — documented phase-0 telemetry fields/workflow, refreshed architecture/protocol docs, and generated baseline artifacts from a probe log with representative channel + gateway events", + "gmail_filter_creation": "completed \u2014 gmail.filter.create tool added with criteria/action validation; gmail-auth requests explicit gmail.settings.basic + gmail.readonly scopes for filter creation and inbox reads", + "toolloop_action_intent_recovery": "completed \u2014 when a model claims it will execute a tool but emits no tool call, NativeAgent now issues one internal nudge and continues the same turn to execute tools or produce a concrete blocker", + "toolloop_execution_claim_recovery": "completed \u2014 when a model claims a known tool already succeeded/failed without emitting a tool call, NativeAgent now nudges once and retries the same turn before returning text", + "daily_briefing_google_scope_remediation": "completed \u2014 calendar.* and tasks.* now append explicit re-auth guidance (`flynn gcal-auth` / `flynn gtasks-auth`) for insufficient-scope errors, and operator runbook includes remediation steps", + "council_tool_timeout_override": "completed \u2014 ToolExecutor supports per-tool timeout overrides and council.run now uses a 180s timeout to avoid false 30s council timeouts in the tool loop", + "minimal_tui_multiline_paste_mode": "completed \u2014 minimal TUI now supports `/paste`/`/multiline` multiline compose mode ending with single '.' line, preventing newline truncation for pasted prompts", + "config_profile_consolidation": "completed \u2014 config/paas.yaml is now generated from canonical config/default.yaml + config/profiles/paas.overlay.yaml with CI-checkable drift detection", + "google_auth_hardening": "completed \u2014 shared Google OAuth runtime helper + auth store (auth.json), legacy token-file migration, refresh persistence, service-wide doctor checks, and unified `flynn google-auth` command", + "model_router_correctness": "completed \u2014 fallback paths now avoid duplicate clients, apply retry policy consistently, and reject unsupported OpenAI OAuth tool requests early", + "native_audio_support": "completed \u2014 smart routing for native audio (Gemini/OpenAI/GitHub) vs Whisper transcription fallback, plus 2026-02-23 arg hydration hardening, tool.args_rewritten audit metric, transient fetch retry/timeout hardening, localhost->127.0.0.1 fallback for transcription endpoint connectivity, and whisper docker-compose entrypoint arg fix for port 18801", + "remaining_phases_completion": "Phase 1: 3/3 (100%) \u2014 context levels, command registry, memory structure. Phase 2: 3/3 (100%) \u2014 component registry, confidence routing, history index. Phase 3: 2/2 (100%) \u2014 adaptive memory/compaction, truthfulness/autonomy hardening", + "deeper_surfaces_behavior_stack_plan": "completed \u2014 documented a decision-complete balanced-hybrid roadmap for OpenClaw-like end-user surface depth plus integrated behavior semantics with phased scope, acceptance gates, and rollout constraints", + "deeper_surfaces_phase0_ticket_pack": "completed \u2014 produced an atomic implementation checklist for Phase 0 baseline observability work (audit events, router/gateway emitters, metrics counters, baseline summary tooling, docs sync)", + "deeper_surfaces_phase0_ticket_01": "completed \u2014 audit schema/logger now capture run lifecycle and reaction decision baseline events (`run.state`, `run.cancel`, `reaction.match`, `reaction.skip`) with regression test coverage", + "deeper_surfaces_phase0_ticket_02": "completed \u2014 gateway + daemon routing emit run lifecycle/cancel telemetry and reaction match/skip audit events with filter summaries and cancellation latency, plus focused tests", + "deeper_surfaces_phase0_ticket_03": "completed \u2014 gateway metrics now track run-state outcomes, cancel latency samples, and reaction decision counters with routing/gateway emitters", + "deeper_surfaces_phase0_ticket_04": "completed \u2014 added phase-0 baseline summary tooling for run outcomes, cancel latency, and reaction decisions with markdown/json CLI output", + "deeper_surfaces_phase0_ticket_05": "completed \u2014 documented phase-0 telemetry fields/workflow, refreshed architecture/protocol docs, and generated baseline artifacts from a probe log with representative channel + gateway events", "next_up": "Replace probe baseline artifacts with live audit samples once gateway/channel sessions emit real run/reaction events", - "pi_embedded_canary_spike": "completed — added optional pi_embedded backend adapter, canary-safe no-tools routing guard, backend success/fallback latency audit events, and docs/diagram updates while native remains default", - "pi_embedded_evaluation_phase": "completed — final decision rollback (applied in runtime config): Window A failed latency/fallback gates (p50 +259ms, p95 +5695ms, fallback 25%, categories: pi_module_interface/empty_assistant_text); Window B remained sample-insufficient; controlled probes verified guard coverage (pi_no_tools_mode/capability_query/attachments_present each hit once)", - "pi_embedded_manual_mode": "completed — added persisted runtime backend controls for manual Pi activation/deactivation (`/runtime` preferred, `/backend` alias; `status`, `activate pi`, `deactivate pi`, `use config`) while keeping config-driven default routing", - "openclaw_gateway_first_tui_runtime_unification": "completed — shared `/runtime` backend-mode command service across channel router + gateway, plus TUI `/runtime` forwarding through a gateway bridge with daemon/gateway auto-start attach", - "gateway_startup_eaddrinuse_hardening": "completed — gateway bind collisions now fail deterministically with explicit error handling and TUI auto-start treats EADDRINUSE as attach race with connect retry" + "pi_embedded_canary_spike": "completed \u2014 added optional pi_embedded backend adapter, canary-safe no-tools routing guard, backend success/fallback latency audit events, and docs/diagram updates while native remains default", + "pi_embedded_evaluation_phase": "completed \u2014 final decision rollback (applied in runtime config): Window A failed latency/fallback gates (p50 +259ms, p95 +5695ms, fallback 25%, categories: pi_module_interface/empty_assistant_text); Window B remained sample-insufficient; controlled probes verified guard coverage (pi_no_tools_mode/capability_query/attachments_present each hit once)", + "pi_embedded_manual_mode": "completed \u2014 added persisted runtime backend controls for manual Pi activation/deactivation (`/runtime` preferred, `/backend` alias; `status`, `activate pi`, `deactivate pi`, `use config`) while keeping config-driven default routing", + "openclaw_gateway_first_tui_runtime_unification": "completed \u2014 shared `/runtime` backend-mode command service across channel router + gateway, plus TUI `/runtime` forwarding through a gateway bridge with daemon/gateway auto-start attach", + "gateway_startup_eaddrinuse_hardening": "completed \u2014 gateway bind collisions now fail deterministically with explicit error handling and TUI auto-start treats EADDRINUSE as attach race with connect retry", + "deeper_surfaces_phase1_run_control": "completed \u2014 interrupt queue mode now enforces latest-wins semantics with channel-path preemption, and gateway emits run_state lifecycle events rendered in the web UI" }, "soul_md_and_cron_create": { "date": "2026-02-11", @@ -6715,7 +6738,7 @@ }, "local-model-message-normalization": { "date": "2026-02-11", - "summary": "Ollama & llama.cpp tool calling message normalization — normalizeMessagesForOllama() converts tool_use/tool_result content blocks to Ollama's native role:tool format, normalizeMessagesForLlamaCpp() converts to OpenAI-style tool_calls arrays with hybrid fallback for GGUF templates that drop role:tool messages.", + "summary": "Ollama & llama.cpp tool calling message normalization \u2014 normalizeMessagesForOllama() converts tool_use/tool_result content blocks to Ollama's native role:tool format, normalizeMessagesForLlamaCpp() converts to OpenAI-style tool_calls arrays with hybrid fallback for GGUF templates that drop role:tool messages.", "files_modified": [ "src/models/local/ollama.ts", "src/models/local/ollama.test.ts", @@ -6734,7 +6757,7 @@ "native-audio-support": { "status": "completed", "date": "2026-02-11", - "summary": "Native audio input support — voice messages passed directly to audio-capable models (Gemini, OpenAI, GitHub) instead of always transcribing via Whisper. Smart routing decides per-model whether to pass raw audio or transcribe first.", + "summary": "Native audio input support \u2014 voice messages passed directly to audio-capable models (Gemini, OpenAI, GitHub) instead of always transcribing via Whisper. Smart routing decides per-model whether to pass raw audio or transcribe first.", "phases": { "audio_transcribe_tool": { "status": "completed", @@ -6771,7 +6794,7 @@ }, "tests_and_token_estimation": { "status": "completed", - "description": "Audio tests for media helpers, audio token estimation (base64→bytes→duration→tokens at 32 tokens/sec), supports_audio config override wiring", + "description": "Audio tests for media helpers, audio token estimation (base64\u2192bytes\u2192duration\u2192tokens at 32 tokens/sec), supports_audio config override wiring", "files_modified": [ "src/models/media.test.ts", "src/context/tokens.ts", @@ -6783,7 +6806,7 @@ }, "stopreason-normalization": { "date": "2026-02-11", - "summary": "Normalize OpenAI/GitHub finish_reason to Flynn stopReason conventions. OpenAI 'stop' → 'end_turn', 'length' → 'max_tokens', 'tool_calls' with tools → 'tool_use', 'tool_calls' without tools → 'end_turn'. Fixes premature agent loop exit when falling back to GitHub Copilot (Anthropic API quota exhausted). Agent loop now accepts both 'tool_use' and 'tool_calls' as belt-and-suspenders.", + "summary": "Normalize OpenAI/GitHub finish_reason to Flynn stopReason conventions. OpenAI 'stop' \u2192 'end_turn', 'length' \u2192 'max_tokens', 'tool_calls' with tools \u2192 'tool_use', 'tool_calls' without tools \u2192 'end_turn'. Fixes premature agent loop exit when falling back to GitHub Copilot (Anthropic API quota exhausted). Agent loop now accepts both 'tool_use' and 'tool_calls' as belt-and-suspenders.", "files_modified": [ "src/models/openai.ts", "src/models/openai.test.ts", diff --git a/src/daemon/routing.test.ts b/src/daemon/routing.test.ts index 1bb1384..7bc9769 100644 --- a/src/daemon/routing.test.ts +++ b/src/daemon/routing.test.ts @@ -437,6 +437,90 @@ describe('daemon command fast-path integration', () => { }); }); + it('preempts active runs when queue mode is interrupt', async () => { + const cancelSpy = vi.spyOn(AgentOrchestrator.prototype, 'cancel'); + vi.spyOn(AgentOrchestrator.prototype, 'isCancellable').mockReturnValue(true); + const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process'); + let resolveFirst: ((value: string) => void) | undefined; + let markStarted: (() => void) | undefined; + const started = new Promise((resolve) => { markStarted = resolve; }); + processSpy + .mockImplementationOnce(() => { + markStarted?.(); + return new Promise((resolve) => { resolveFirst = resolve; }); + }) + .mockResolvedValueOnce('second'); + + const session = { + id: 'telegram:user-interrupt', + addMessage: vi.fn(), + getHistory: vi.fn(() => []), + clear: vi.fn(), + replaceHistory: vi.fn(), + getConfig: vi.fn((key: string) => (key === 'queue.mode' ? 'interrupt' : undefined)), + setConfig: vi.fn(), + deleteConfig: vi.fn(), + }; + + const router = createMessageRouter({ + sessionManager: { + getSession: vi.fn(() => session), + } as unknown as MessageRouterDeps['sessionManager'], + modelRouter: { + getAvailableTiers: () => ['fast', 'default', 'complex', 'local'], + getAllLabels: () => ({ fast: 'fast', default: 'default', complex: 'complex', local: 'local' }), + getLabel: (tier: string) => tier, + } as unknown as MessageRouterDeps['modelRouter'], + systemPrompt: 'test prompt', + toolRegistry: { + clone() { return this; }, + register: vi.fn(), + } as unknown as MessageRouterDeps['toolRegistry'], + toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'], + config: { + agents: { + primary_tier: 'default', + delegation: { + compaction: 'fast', + memory_extraction: 'fast', + classification: 'fast', + tool_summarisation: 'fast', + complex_reasoning: 'complex', + }, + max_delegation_depth: 3, + max_iterations: 10, + }, + compaction: { enabled: false }, + models: { default: { provider: 'anthropic', model: 'claude' } }, + server: { queue: { mode: 'collect' } }, + } as unknown as MessageRouterDeps['config'], + }); + + const reply = vi.fn(async (_message: OutboundMessage) => {}); + const firstRun = router.handler({ + id: 'm-interrupt-1', + channel: 'telegram', + senderId: 'user-interrupt', + text: 'first', + timestamp: Date.now(), + } as MessageRouterInput, reply); + + await started; + + await router.handler({ + id: 'm-interrupt-2', + channel: 'telegram', + senderId: 'user-interrupt', + text: 'second', + timestamp: Date.now(), + } as MessageRouterInput, reply); + + expect(cancelSpy).toHaveBeenCalled(); + + resolveFirst?.('first'); + await firstRun; + }); + it('emits run.state start and complete for non-command channel messages', async () => { const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process').mockResolvedValue('ok'); const mockAuditLogger = { diff --git a/src/daemon/routing.ts b/src/daemon/routing.ts index 9284b94..63b1167 100644 --- a/src/daemon/routing.ts +++ b/src/daemon/routing.ts @@ -443,6 +443,56 @@ export function createMessageRouter(deps: { }); } + function requestActiveRunCancellation(input: { + sessionId: string; + channel: string; + senderId: string; + requestId: string; + }): { cancelled: boolean; latencyMs: number } { + const cancelStartedAt = Date.now(); + const run = activeRuns.get(input.sessionId); + if (!run || !run.isCancellable()) { + const latencyMs = Date.now() - cancelStartedAt; + deps.metrics?.recordCancelLatency(latencyMs); + auditLogger?.runCancel?.({ + session_id: input.sessionId, + channel: input.channel, + sender: input.senderId, + source: 'channel', + requested: true, + acknowledged: false, + request_id: input.requestId, + latency_ms: latencyMs, + }); + return { cancelled: false, latencyMs }; + } + + run.cancel(); + const cancelLatencyMs = Date.now() - cancelStartedAt; + deps.metrics?.recordCancelLatency(cancelLatencyMs); + auditLogger?.runCancel?.({ + session_id: input.sessionId, + channel: input.channel, + sender: input.senderId, + source: 'channel', + requested: true, + acknowledged: true, + request_id: input.requestId, + latency_ms: cancelLatencyMs, + }); + auditLogger?.runState?.({ + session_id: input.sessionId, + channel: input.channel, + sender: input.senderId, + source: 'channel', + state: 'cancel_requested', + request_id: input.requestId, + duration_ms: cancelLatencyMs, + }); + deps.metrics?.recordRunState('cancel_requested'); + return { cancelled: true, latencyMs: cancelLatencyMs }; + } + function executeBackendCommand(inputRaw: string, activeTier: string): string { return executeRuntimeBackendModeCommand(inputRaw, { getActiveTier: () => activeTier, @@ -770,6 +820,21 @@ export function createMessageRouter(deps: { } } + const session = deps.sessionManager.getSession(msg.channel, msg.senderId); + const queueMode = session.getConfig('queue.mode') ?? deps.config.server?.queue?.mode ?? 'collect'; + const rawCommand = msg.metadata?.isCommand + ? msg.metadata.command + : incomingText.trim().startsWith('/') ? incomingText.trim().slice(1).split(/\s+/, 1)[0] : undefined; + const isCancelCommand = rawCommand === 'stop' || rawCommand === 'cancel'; + if (queueMode === 'interrupt' && !isCancelCommand) { + requestActiveRunCancellation({ + sessionId: sessionIdForRun, + channel: msg.channel, + senderId: msg.senderId, + requestId: msg.id, + }); + } + const automationReactions = deps.config.automation?.reactions ?? []; if (!msg.metadata?.isCommand) { if (automationReactions.length === 0) { @@ -896,7 +961,6 @@ export function createMessageRouter(deps: { }); if (deps.commandRegistry && deps.commandRegistry.isCommand(commandInput)) { - const session = deps.sessionManager.getSession(msg.channel, msg.senderId); const commandResult = await deps.commandRegistry.execute(commandInput, { channel: msg.channel, senderId: msg.senderId, @@ -1066,46 +1130,15 @@ export function createMessageRouter(deps: { return ''; }, cancelRun: () => { - const cancelStartedAt = Date.now(); - const run = activeRuns.get(session.id); - if (!run || !run.isCancellable()) { - deps.metrics?.recordCancelLatency(Date.now() - cancelStartedAt); - auditLogger?.runCancel?.({ - session_id: session.id, - channel: msg.channel, - sender: msg.senderId, - source: 'channel', - requested: true, - acknowledged: false, - request_id: msg.id, - latency_ms: Date.now() - cancelStartedAt, - }); - return 'No active operation to cancel.'; - } - run.cancel(); - const cancelLatencyMs = Date.now() - cancelStartedAt; - deps.metrics?.recordCancelLatency(cancelLatencyMs); - auditLogger?.runCancel?.({ - session_id: session.id, + const result = requestActiveRunCancellation({ + sessionId: session.id, channel: msg.channel, - sender: msg.senderId, - source: 'channel', - requested: true, - acknowledged: true, - request_id: msg.id, - latency_ms: cancelLatencyMs, + senderId: msg.senderId, + requestId: msg.id, }); - auditLogger?.runState?.({ - session_id: session.id, - channel: msg.channel, - sender: msg.senderId, - source: 'channel', - state: 'cancel_requested', - request_id: msg.id, - duration_ms: cancelLatencyMs, - }); - deps.metrics?.recordRunState('cancel_requested'); - return 'Cancellation requested. The active operation will stop at the next safe point.'; + return result.cancelled + ? 'Cancellation requested. The active operation will stop at the next safe point.' + : 'No active operation to cancel.'; }, delegateAgent: async (agentName: string, task: string) => { @@ -1515,7 +1548,6 @@ export function createMessageRouter(deps: { // Determine if the active model supports native audio input let effectiveTier: string = deps.config.agents.primary_tier ?? 'default'; - const session = deps.sessionManager.getSession(msg.channel, msg.senderId); const sessionTierOverride = session.getConfig('modelTier'); const tierFromUseCaseMetadata = tierFromUseCase(deps.config, msg.metadata?.modelFor); diff --git a/src/gateway/handlers/agent.test.ts b/src/gateway/handlers/agent.test.ts index 56b8578..6378896 100644 --- a/src/gateway/handlers/agent.test.ts +++ b/src/gateway/handlers/agent.test.ts @@ -331,8 +331,9 @@ describe('createAgentHandlers command fast-path', () => { await handlers['agent.send'](req, send); expect(mockAgent.process).toHaveBeenCalledWith('/not-a-real-command', undefined); - expect((sent[0] as GatewayEvent).event).toBe('done'); - expect(((sent[0] as GatewayEvent).data as { content: string }).content).toBe('agent response'); + const doneEvent = sent.find((msg) => (msg as GatewayEvent).event === 'done') as GatewayEvent | undefined; + expect(doneEvent).toBeTruthy(); + expect(((doneEvent as GatewayEvent).data as { content: string }).content).toBe('agent response'); }); it('handles /approvals command via fast-path when hook engine is available', async () => { @@ -421,7 +422,7 @@ describe('createAgentHandlers command fast-path', () => { state: 'cancelled', }), ); - expect((sent[0] as GatewayEvent).event).toBe('done'); + expect(sent.some((msg) => (msg as GatewayEvent).event === 'done')).toBe(true); }); it('emits run.cancel telemetry for agent.cancel requests', async () => { @@ -429,7 +430,7 @@ describe('createAgentHandlers command fast-path', () => { id: 16, method: 'agent.cancel', params: { connectionId: 'conn-1' }, - }); + }, vi.fn()); expect(sessionBridge.cancel).toHaveBeenCalledWith('conn-1'); expect(mockAuditLogger.runCancel).toHaveBeenCalledWith( @@ -492,9 +493,10 @@ describe('createAgentHandlers command fast-path', () => { params: { message: 'hello', connectionId: 'conn-1' }, }, send); - expect(sent).toHaveLength(2); - expect((sent[0] as GatewayEvent).event).toBe('context_warning'); - expect((sent[1] as GatewayEvent).event).toBe('done'); + const events = sent.map((msg) => (msg as GatewayEvent).event); + expect(events).toContain('context_warning'); + expect(events).toContain('done'); + expect(events.indexOf('context_warning')).toBeLessThan(events.indexOf('done')); }); }); @@ -706,8 +708,9 @@ describe('createAgentHandlers queue policy resolution', () => { mode: 'interrupt', cancelled_active_run: true, })); - expect((sent[0] as GatewayEvent).event).toBe('content'); - expect(((sent[0] as GatewayEvent).data as { text: string }).text).toContain('Interrupt mode'); - expect((sent[1] as GatewayEvent).event).toBe('done'); + const contentEvent = sent.find((msg) => (msg as GatewayEvent).event === 'content') as GatewayEvent | undefined; + expect(contentEvent).toBeTruthy(); + expect(((contentEvent as GatewayEvent).data as { text: string }).text).toContain('Interrupt mode'); + expect(sent.some((msg) => (msg as GatewayEvent).event === 'done')).toBe(true); }); }); diff --git a/src/gateway/handlers/agent.ts b/src/gateway/handlers/agent.ts index a3f244a..7ab39dc 100644 --- a/src/gateway/handlers/agent.ts +++ b/src/gateway/handlers/agent.ts @@ -86,6 +86,8 @@ function listEnabledExternalBackends(config?: Config): string[] { } export function createAgentHandlers(deps: AgentHandlerDeps) { + const activeRequestIds = new Map(); + return { 'agent.send': async (request: GatewayRequest, send: SendFn): Promise => { const params = request.params as { message?: string; connectionId?: string; attachments?: GatewayAttachment[]; metadata?: { isCommand?: boolean; command?: string; commandArgs?: string } } | undefined; @@ -116,10 +118,13 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { const laneId = sessionId ?? connectionId; const channel = sessionId?.split(':', 1)[0] ?? 'ws'; const resolvedPolicy = deps.resolveQueuePolicy?.({ laneId, sessionId, connectionId, channel }); - const laneQueueWithProcessing = deps.laneQueue as LaneQueue & { isProcessing?: (lane: string) => boolean }; - const laneIsProcessing = typeof laneQueueWithProcessing.isProcessing === 'function' - ? laneQueueWithProcessing.isProcessing(laneId) - : false; + const laneQueueWithProcessing = deps.laneQueue as LaneQueue & { + isProcessing?: (lane: string) => boolean; + isActive?: (lane: string) => boolean; + }; + const laneIsActive = typeof laneQueueWithProcessing.isActive === 'function' + ? laneQueueWithProcessing.isActive(laneId) + : (laneQueueWithProcessing.isProcessing?.(laneId) ?? false); const requestId = request.id.toString(); const sessionIdForAudit = sessionId ?? `ws:${connectionId}`; const runStartedAt = Date.now(); @@ -127,7 +132,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { // Interrupt mode should preempt active work when a newer request arrives. // LaneQueue itself only rejects queued entries, so we also request agent cancellation. - if (resolvedPolicy?.mode === 'interrupt' && laneIsProcessing) { + if (resolvedPolicy?.mode === 'interrupt' && laneIsActive) { const cancelStartedAt = Date.now(); const cancelled = sessionId ? deps.sessionBridge.cancelSession(sessionId) @@ -165,6 +170,13 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { duration_ms: cancelLatencyMs, }); deps.metrics?.recordRunState('cancel_requested'); + const activeRequestId = activeRequestIds.get(connectionId); + if (activeRequestId && activeRequestId !== request.id) { + send(makeEvent(activeRequestId, 'run_state', { + state: 'cancel_requested', + timestamp: Date.now(), + })); + } } } @@ -202,6 +214,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { const isCommand = Boolean(commandInput && deps.commandRegistry?.isCommand(commandInput)); if (!isCommand) { + activeRequestIds.set(connectionId, request.id); auditLogger?.runState?.({ session_id: sessionIdForAudit, channel: 'ws', @@ -211,9 +224,13 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { request_id: requestId, }); deps.metrics?.recordRunState('start'); + send(makeEvent(request.id, 'run_state', { + state: 'start', + timestamp: Date.now(), + })); } - if (commandInput && deps.commandRegistry?.isCommand(commandInput)) { + if (isCommand) { const sessionId = deps.sessionBridge.getSessionId(connectionId); const commandResult = await deps.commandRegistry.execute(commandInput, { channel: 'ws', @@ -646,6 +663,10 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { if (commandResult.handled) { send(makeEvent(request.id, 'done', { content: commandResult.text })); + deps.sessionBridge.setBusy(connectionId, false); + deps.sessionBridge.setOnToolUse(connectionId, undefined); + activeRequestIds.delete(connectionId); + deps.metrics?.endRequest(requestId); return; } } @@ -699,10 +720,14 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { context: { sessionId: laneId, level: contextAlert.level }, }); } - send(makeEvent(request.id, 'done', { content: response })); const finalState = response.trim().toLowerCase() === 'operation cancelled by user.' ? 'cancelled' : 'complete'; + send(makeEvent(request.id, 'run_state', { + state: finalState, + timestamp: Date.now(), + })); + send(makeEvent(request.id, 'done', { content: response })); auditLogger?.runState?.({ session_id: sessionIdForAudit, channel: 'ws', @@ -723,6 +748,11 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { message, context: { sessionId: laneId }, }); + send(makeEvent(request.id, 'run_state', { + state: 'error', + timestamp: Date.now(), + message, + })); send(makeEvent(request.id, 'error', { code: ErrorCode.InternalError, message })); auditLogger?.runState?.({ session_id: sessionIdForAudit, @@ -738,6 +768,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { } finally { deps.sessionBridge.setBusy(connectionId, false); deps.sessionBridge.setOnToolUse(connectionId, undefined); + activeRequestIds.delete(connectionId); deps.metrics?.endRequest(requestId); } }, resolvedPolicy); @@ -754,7 +785,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { } }, - 'agent.cancel': async (request: GatewayRequest): Promise => { + 'agent.cancel': async (request: GatewayRequest, send: SendFn): Promise => { const params = request.params as { connectionId?: string } | undefined; const connectionId = params?.connectionId as string; @@ -795,6 +826,13 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { }); deps.metrics?.recordRunState('cancel_requested'); } + const activeRequestId = activeRequestIds.get(connectionId); + if (cancelled && activeRequestId) { + send(makeEvent(activeRequestId, 'run_state', { + state: 'cancel_requested', + timestamp: Date.now(), + })); + } return { id: request.id, result: { diff --git a/src/gateway/lane-queue.test.ts b/src/gateway/lane-queue.test.ts index e31c638..3443894 100644 --- a/src/gateway/lane-queue.test.ts +++ b/src/gateway/lane-queue.test.ts @@ -1,4 +1,4 @@ -import { describe, it, expect } from 'vitest'; +import { describe, it, expect, vi } from 'vitest'; import { LaneQueue, LaneQueueRejectedError } from './lane-queue.js'; describe('LaneQueue', () => { @@ -230,6 +230,25 @@ describe('LaneQueue', () => { await expect(p3).resolves.toBe('new-pending'); }); + it('interrupt mode keeps only the most recent pending request', async () => { + const queue = new LaneQueue({ mode: 'interrupt' }); + let resolveFirst!: () => void; + const firstBlocks = new Promise((r) => { resolveFirst = r; }); + + const p1 = queue.enqueue('lane-a', async () => { + await firstBlocks; + return 'active'; + }); + const p2 = queue.enqueue('lane-a', async () => 'old-pending'); + const p3 = queue.enqueue('lane-a', async () => 'latest-pending'); + + await expect(p2).rejects.toThrow('Superseded by newer request'); + resolveFirst(); + + await expect(p1).resolves.toBe('active'); + await expect(p3).resolves.toBe('latest-pending'); + }); + it('drop_new overflow rejects newest request when cap is reached', async () => { const queue = new LaneQueue({ cap: 1, overflow: 'drop_new' }); let resolveFirst!: () => void; @@ -249,6 +268,44 @@ describe('LaneQueue', () => { await expect(p2).resolves.toBe('pending-1'); }); + it('interrupt mode clears debounce backlog to run latest request immediately', async () => { + vi.useFakeTimers(); + try { + const queue = new LaneQueue({ mode: 'interrupt', debounceMs: 50 }); + const events: string[] = []; + + let resolveFirst!: () => void; + const firstBlocks = new Promise((r) => { resolveFirst = r; }); + + const p1 = queue.enqueue('lane-a', async () => { + events.push('first:start'); + await firstBlocks; + events.push('first:end'); + return 'first'; + }); + const p2 = queue.enqueue('lane-a', async () => { + events.push('second:start'); + return 'second'; + }); + + resolveFirst(); + await p1; + await Promise.resolve(); + + const p3 = queue.enqueue('lane-a', async () => { + events.push('third:start'); + return 'third'; + }); + + await Promise.resolve(); + expect(events).toContain('third:start'); + await expect(p2).rejects.toThrow('Superseded by newer request'); + await expect(p3).resolves.toBe('third'); + } finally { + vi.useRealTimers(); + } + }); + it('drop_old overflow evicts oldest pending request when cap is reached', async () => { const queue = new LaneQueue({ cap: 1, overflow: 'drop_old' }); let resolveFirst!: () => void; diff --git a/src/gateway/lane-queue.ts b/src/gateway/lane-queue.ts index 885e139..24467ed 100644 --- a/src/gateway/lane-queue.ts +++ b/src/gateway/lane-queue.ts @@ -98,15 +98,9 @@ export class LaneQueue { this.lanes.set(laneId, lane); } - // If nothing is running on this lane, execute immediately - if (!lane.active && !lane.debounceTimer) { - lane.active = true; - try { - return await work(); - } finally { - lane.active = false; - this.processNext(laneId); - } + if (effective.mode === 'interrupt' && lane.debounceTimer) { + clearTimeout(lane.debounceTimer); + lane.debounceTimer = undefined; } if (effective.mode === 'steer' || effective.mode === 'steer_backlog' || effective.mode === 'interrupt') { @@ -125,6 +119,17 @@ export class LaneQueue { }); } + // If nothing is running on this lane, execute immediately + if (!lane.active && !lane.debounceTimer && lane.queue.length === 0) { + lane.active = true; + try { + return await work(); + } finally { + lane.active = false; + this.processNext(laneId); + } + } + if (lane.queue.length >= effective.cap) { if (effective.overflow === 'drop_new') { return Promise.reject( @@ -168,12 +173,18 @@ export class LaneQueue { }); } - /** Check if a lane currently has active work executing. */ + /** Check if a lane is active or waiting to start due to debounce. */ isProcessing(laneId: string): boolean { const lane = this.lanes.get(laneId); return (lane?.active ?? false) || Boolean(lane?.debounceTimer); } + /** Check if a lane currently has active work executing (not counting debounce delay). */ + isActive(laneId: string): boolean { + const lane = this.lanes.get(laneId); + return lane?.active ?? false; + } + /** Get the number of pending (not yet started) items in a lane. */ queueLength(laneId: string): number { return this.lanes.get(laneId)?.queue.length ?? 0; diff --git a/src/gateway/protocol.ts b/src/gateway/protocol.ts index c57e566..5b20e6e 100644 --- a/src/gateway/protocol.ts +++ b/src/gateway/protocol.ts @@ -95,6 +95,7 @@ export type EventType = | 'tool_end' | 'context_warning' | 'attachment' + | 'run_state' | 'done' | 'error'; @@ -142,6 +143,14 @@ export interface AttachmentEventData { filename?: string; } +export type RunState = 'start' | 'cancel_requested' | 'cancelled' | 'complete' | 'error'; + +export interface RunStateEventData { + state: RunState; + timestamp: number; + message?: string; +} + export interface DoneEventData { content: string; } diff --git a/src/gateway/ui/pages/chat.js b/src/gateway/ui/pages/chat.js index 55265d8..a303421 100644 --- a/src/gateway/ui/pages/chat.js +++ b/src/gateway/ui/pages/chat.js @@ -715,6 +715,10 @@ async function sendMessage(client, overrideText) { scrollToBottom(); // Create placeholder for assistant response + const statusLine = document.createElement('div'); + statusLine.className = 'px-1 text-[11px] leading-none text-zinc-500 select-none hidden'; + statusLine.textContent = 'Run status: queued'; + _elements.messages.appendChild(statusLine); const placeholder = document.createElement('div'); placeholder.className = 'rounded-lg px-3.5 py-2.5 text-sm leading-relaxed break-words whitespace-pre-wrap bg-zinc-900 border border-zinc-800 text-zinc-50 streaming-cursor'; placeholder.innerHTML = 'Thinking...'; @@ -759,6 +763,22 @@ async function sendMessage(client, overrideText) { scrollToBottom(); }); + stream.on('run_state', (data) => { + if (!data || !data.state) { + return; + } + const labels = { + start: 'Run status: working', + cancel_requested: 'Run status: cancellation requested', + cancelled: 'Run status: cancelled', + complete: 'Run status: complete', + error: `Run status: error${data.message ? ` (${data.message})` : ''}`, + }; + statusLine.textContent = labels[data.state] || `Run status: ${data.state}`; + statusLine.classList.remove('hidden'); + scrollToBottom(); + }); + const done = await stream.result; const content = done?.content ?? done?.text ?? '(no response)'; const assistantMessage = createMessageEl('assistant', content, Date.now()); @@ -771,6 +791,7 @@ async function sendMessage(client, overrideText) { _cancelling = false; updateSendButton(); clearPendingAttachments(); + statusLine.remove(); scrollToBottom(); } } diff --git a/src/gateway/ui/pages/chat.test.ts b/src/gateway/ui/pages/chat.test.ts index 5b6a8b0..7da2334 100644 --- a/src/gateway/ui/pages/chat.test.ts +++ b/src/gateway/ui/pages/chat.test.ts @@ -185,4 +185,55 @@ describe('ChatPage wiring', () => { expect(calls.some((entry) => entry.method === 'agent.cancel')).toBe(true); }); + + it('renders run_state updates during streaming', async () => { + let resolveResult: ((value: { content: string }) => void) | undefined; + const handlers = new Map void>>(); + const stream = { + on(event: string, cb: (data: any) => void) { + if (!handlers.has(event)) { + handlers.set(event, []); + } + handlers.get(event)?.push(cb); + }, + emit(event: string, data: any) { + for (const cb of handlers.get(event) ?? []) { + cb(data); + } + }, + result: new Promise<{ content: string }>((resolve) => { + resolveResult = resolve; + }), + }; + + const client = { + async call(method: string) { + if (method === 'sessions.list') { + return { sessions: [] }; + } + return null; + }, + stream() { + return stream; + }, + }; + + await ChatPage.render(root, client); + + const input = root.querySelector('#chat-input'); + input.value = 'hello'; + root.querySelector('#chat-send').dispatchEvent(new windowObj.Event('click', { bubbles: true })); + await Promise.resolve(); + + stream.emit('run_state', { state: 'start' }); + await Promise.resolve(); + + const statusLine = Array.from(root.querySelectorAll('div')) + .find((el: any) => String(el.textContent ?? '').startsWith('Run status:')); + expect(statusLine).toBeTruthy(); + expect(statusLine.classList.contains('hidden')).toBe(false); + + resolveResult?.({ content: 'ok' }); + await Promise.resolve(); + }); });