From 3cc9e16ef554c34dc95e5580bf3f3e2a05c5a282 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Thu, 26 Feb 2026 13:28:10 -0800 Subject: [PATCH] feat(subagents): complete queue, budgets, audit, and inspection controls --- README.md | 14 + config/default.yaml | 5 + docs/api/PROTOCOL.md | 3 +- docs/architecture/AGENT_DIAGRAM.md | 5 +- .../GATEWAY_SESSIONS_AND_QUEUE.md | 2 +- ...-personal-assistant-productization-plan.md | 2 +- .../2026-02-26-subagents-support-plan.md | 29 +- docs/plans/state.json | 24 +- src/audit/logger.test.ts | 26 ++ src/audit/logger.ts | 13 + src/audit/types.ts | 29 ++ src/backends/native/subagents.test.ts | 87 +++++ src/backends/native/subagents.ts | 299 ++++++++++++++++-- src/commands/builtin/index.test.ts | 30 +- src/commands/builtin/index.ts | 17 + src/commands/types.ts | 1 + src/config/schema.test.ts | 26 ++ src/config/schema.ts | 5 + src/daemon/routing.test.ts | 71 +++++ src/daemon/routing.ts | 57 ++++ src/gateway/ui/pages/chat.js | 1 + src/tools/builtin/subagents.test.ts | 27 ++ src/tools/builtin/subagents.ts | 19 ++ 23 files changed, 741 insertions(+), 51 deletions(-) diff --git a/README.md b/README.md index cf83652..254ce2e 100644 --- a/README.md +++ b/README.md @@ -618,6 +618,7 @@ Notes: | `/deny [id] [reason]` | Deny latest (or specific) pending gate | | `/skill ` | In-chat skill discovery/install (`list`, `search `, `install `) | | `/runtime ` | Show or control global runtime backend mode (`/backend ...` alias also supported) | +| `/subagents [list\|summary [limit]\|cancel \|delete ]` | Inspect and control spawned subagent sessions | ## Web UI Dashboard @@ -661,6 +662,7 @@ pnpm tui:fs | `/tools` | Show authoritative runtime tool list for this session | | `/research ` | Delegate a task to `agent_configs.research` | | `/council ` | Run dual D/P councils pipeline with bridge+meta merge (brief in TUI, full artifacts saved to disk) | +| `/subagents [list\|summary [limit]\|cancel \|delete ]` | Inspect and control spawned subagent sessions | | `/compact` | Compact conversation context | | `/usage` | Show token usage and cost | | `/context` | Show estimated context-window usage | @@ -806,6 +808,13 @@ Available tools: - `subagent.delete` — remove a child session and clear its history - `subagent.summary` — inspect transcript summary for a child session +Session controls: + +- Queue modes: `followup` (FIFO) or `interrupt` (latest-wins cancellation + supersede). +- Budget guardrails: max turns, max total tokens, and per-turn timeout. +- Tool profile override per subagent spawn (`minimal|messaging|coding|full`). +- Runtime inspection command: `/subagents [list|summary [limit]|cancel |delete ]`. + Example flow: ```json @@ -823,6 +832,11 @@ agents: enabled: true max_active_sessions: 6 idle_ttl_ms: 3600000 + queue_mode: followup + default_tool_profile: minimal + max_turns: 40 + max_total_tokens: 200000 + turn_timeout_ms: 120000 ``` ## Running as Service diff --git a/config/default.yaml b/config/default.yaml index 7694a50..9d3e825 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -302,6 +302,11 @@ agents: enabled: true max_active_sessions: 6 idle_ttl_ms: 3600000 + queue_mode: followup + default_tool_profile: minimal + max_turns: 40 + max_total_tokens: 200000 + turn_timeout_ms: 120000 # ── Memory / Embeddings ────────────────────────────────────────────── # Enable hybrid keyword + vector search using local Ollama embeddings. diff --git a/docs/api/PROTOCOL.md b/docs/api/PROTOCOL.md index a4a9158..b01cf20 100644 --- a/docs/api/PROTOCOL.md +++ b/docs/api/PROTOCOL.md @@ -41,7 +41,8 @@ The gateway serialises agent work **per session**, not per WebSocket connection: - The gateway `agent.send` command path and channel-router path use the same runtime backend-mode command service; `flynn tui` forwards `/runtime ...` through this gateway path for parity. - Backend routing and fallback outcomes are emitted to audit logs (`backend.route`, `backend.success`, `backend.fallback`) for rollout evaluation; this telemetry is outside JSON-RPC response payloads. - Session-start memory injection (`user/profile` + `user/working`) is server-side and controlled by `memory.user_namespace`; it does not affect protocol payloads. -- Multi-turn child agents are exposed through tool calls (`subagent.spawn/send/list/cancel/delete/summary`) inside the agent loop; they do not add new JSON-RPC methods. +- Multi-turn child agents are exposed through tool calls (`subagent.spawn/send/list/cancel/delete/summary`) inside the agent loop; child sessions support per-session queue mode and budget guardrails but do not add new JSON-RPC methods. +- Session command fast-path includes `/subagents` (`list|summary|cancel|delete`) for child-session inspection/control without protocol changes. This is implemented via a per-lane queue (`LaneQueue`) in the gateway server, and used by `agent.send` and `agent.cancel`. diff --git a/docs/architecture/AGENT_DIAGRAM.md b/docs/architecture/AGENT_DIAGRAM.md index 3ef91d4..6f241bd 100644 --- a/docs/architecture/AGENT_DIAGRAM.md +++ b/docs/architecture/AGENT_DIAGRAM.md @@ -137,8 +137,9 @@ Tool Calls (inside NativeAgent loop) +---------------------------> AuditLogger (redacted) Subagent sessions (multi-turn child agents) - parent AgentOrchestrator -> subagent.* tools -> SubagentManager (TTL cleanup) - SubagentManager -> child AgentOrchestrator (session namespace: subagent::) + parent AgentOrchestrator -> subagent.* tools -> SubagentManager (TTL cleanup + queue/budget controls) + SubagentManager -> child AgentOrchestrator (session namespace: subagent::, trace_id) + SubagentManager -> AuditLogger (subagent.lifecycle + subagent.turn events) child AgentOrchestrator -> NativeAgent/tool loop (same policy engine, recursion tools removed) Session start (when `memory.user_namespace` is set) diff --git a/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md b/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md index 173bd48..ce9d1be 100644 --- a/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md +++ b/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md @@ -17,7 +17,7 @@ If you only want the protocol surface, see `docs/api/PROTOCOL.md`. - Backend routing outcomes are auditable via `backend.route` / `backend.success` / `backend.fallback`, which enables offline canary evaluation without changing gateway protocol methods. - Run lifecycle/cancel intent and reaction decisions are emitted to audit logs, and aggregated into `system.metrics` counters (runStates, cancelLatencyMs, reactions) for dashboards. - Reaction matching is deterministic (priority + cooldown + recursion guard) before intent/agent routing. -- `subagent.*` tools create child orchestrators scoped to the parent conversation (`subagent::`) with idle TTL cleanup; this is tool-loop behavior, not a separate gateway RPC session lane. +- `subagent.*` tools create child orchestrators scoped to the parent conversation (`subagent::`) with idle TTL cleanup, per-child queue mode (`followup|interrupt`), and session budgets (turn/token/timeout); this is tool-loop behavior, not a separate gateway RPC session lane. - Companion `node.*` registration is per WebSocket connection; reconnects must re-register capabilities before invoking node RPC methods. - Canvas artifacts are persisted per session under the gateway data directory for UI recovery across restarts. - TTS output is best-effort; synthesis failures fall back to text-only responses. diff --git a/docs/plans/2026-02-26-personal-assistant-productization-plan.md b/docs/plans/2026-02-26-personal-assistant-productization-plan.md index f1cb1b1..fa903e4 100644 --- a/docs/plans/2026-02-26-personal-assistant-productization-plan.md +++ b/docs/plans/2026-02-26-personal-assistant-productization-plan.md @@ -13,6 +13,7 @@ The following were previously treated as gaps but are already implemented in Fly 3. Browser automation baseline is present (`browser.navigate/click/type/screenshot/content/eval` in `src/tools/builtin/browser/tools.ts`). 4. Companion protocol/runtime foundation is present (`src/companion/runtimeClient.ts`, `src/companion/platformClients.ts`). 5. Talk mode + wake phrase baseline is present (`src/daemon/routing.ts`, `audio.talk_mode` schema support). +6. Subagent sessions now include queue/budget controls, transcript export, and session inspection UX (`subagent.*`, `/subagents`). ## Remaining Product Gaps (Now) @@ -20,7 +21,6 @@ The following were previously treated as gaps but are already implemented in Fly 2. Voice UX is functional but not yet a polished, end-to-end daily-driver experience across surfaces. 3. Browser tools exist but lack task-level reliability primitives (checkpoints/retries/guardrails) for autonomous workflows. 4. Onboarding lacks a "first success" guided path that validates real integrations live during setup. -5. Subagent sessions are now available (`subagent.*`) with idle TTL cleanup and transcript summary support, but still need budgeting/UI visibility for larger autonomous workflows. ## Product Goal diff --git a/docs/plans/2026-02-26-subagents-support-plan.md b/docs/plans/2026-02-26-subagents-support-plan.md index 145211b..d80176d 100644 --- a/docs/plans/2026-02-26-subagents-support-plan.md +++ b/docs/plans/2026-02-26-subagents-support-plan.md @@ -1,7 +1,7 @@ # Subagents Support Plan (Flynn) Date: 2026-02-26 -Status: phase 1 implemented, phase 2 partially implemented +Status: phases 1-3 implemented Scope: add OpenClaw-style multi-turn subagent session support in Flynn without changing channel surface scope (Telegram-first) ## Constraints @@ -30,22 +30,25 @@ Scope: add OpenClaw-style multi-turn subagent session support in Flynn without c - `max_active_sessions` 5. Added policy/profile support so `subagent.*` is controlled through `group:agents` and tool profiles. -## Phase 2 (Next) +## Phase 2 (Implemented) -1. Add per-subagent TTL/idle eviction and auto-cleanup metrics. (implemented: TTL eviction) -2. Add optional transcript export/summarization (`subagent.summary`). (implemented) -3. Add per-subagent tool-profile override (read-only by default for risky workloads). (pending) -4. Add parent-child trace IDs in audit events for easier debugging. (pending) +1. Added idle TTL eviction plus audit lifecycle events for cleanup visibility. +2. Added transcript export/summarization via `subagent.summary`. +3. Added per-subagent tool-profile override (`queue_mode`, `tool_profile` on spawn). +4. Added parent-child trace IDs and subagent lifecycle/turn audit events. -## Phase 3 (Stretch) +## Phase 3 (Implemented) -1. Add queue semantics for child sessions (`followup` vs `interrupt` per subagent). -2. Add explicit resource budgets (token/time) per child session. -3. Add UI affordances in gateway chat for subagent session inspection. +1. Added queue semantics per child session (`followup` FIFO, `interrupt` latest-wins). +2. Added explicit resource budgets (max turns, max total tokens, per-turn timeout). +3. Added operator/UI affordances: + - `/subagents` command (list/summary/cancel/delete), + - gateway chat slash suggestion for `/subagents`. -## Acceptance Criteria (Phase 1) +## Final Acceptance Criteria -1. Parent agent can spawn and continue a child subagent across multiple turns. +1. Parent agent can spawn and continue child subagents across multiple turns. 2. Child session state is isolated and delete clears history. 3. Recursion tooling (`agent.delegate`, `council.run`, `subagent.*`) is removed from child registries. -4. Tests cover manager lifecycle, tool behavior, config parsing, and policy profile inclusion. +4. Child sessions support queue policy + budget guardrails + transcript inspection. +5. Tests cover manager lifecycle, tool behavior, config parsing, routing command wiring, and audit event logging. diff --git a/docs/plans/state.json b/docs/plans/state.json index d27bc2b..9217b27 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -6800,21 +6800,23 @@ "status": "completed", "date": "2026-02-26", "updated": "2026-02-26", - "summary": "Implemented Phase 1 and partial Phase 2 subagent support: added a SubagentManager with multi-turn child sessions, idle TTL cleanup, new `subagent.*` tools (spawn/send/list/cancel/delete/summary), routing wiring, config guardrails, policy/profile integration, docs/diagram updates, and focused test coverage.", + "summary": "Completed subagent phases 1-3: added queue semantics (`followup`/`interrupt`), turn/token/timeout budgets, per-subagent tool-profile overrides, parent-child trace IDs with lifecycle/turn audit events, `/subagents` runtime command surface, and updated docs/diagram coverage.", "files_modified": [ "src/backends/native/subagents.ts", "src/backends/native/subagents.test.ts", - "src/backends/native/index.ts", - "src/backends/index.ts", + "src/audit/types.ts", + "src/audit/logger.ts", + "src/audit/logger.test.ts", "src/tools/builtin/subagents.ts", "src/tools/builtin/subagents.test.ts", - "src/tools/builtin/index.ts", - "src/tools/index.ts", - "src/tools/policy.ts", - "src/tools/policy.test.ts", + "src/commands/types.ts", + "src/commands/builtin/index.ts", + "src/commands/builtin/index.test.ts", "src/config/schema.ts", "src/config/schema.test.ts", "src/daemon/routing.ts", + "src/daemon/routing.test.ts", + "src/gateway/ui/pages/chat.js", "config/default.yaml", "README.md", "docs/api/PROTOCOL.md", @@ -6824,11 +6826,11 @@ "docs/plans/2026-02-26-personal-assistant-productization-plan.md", "docs/plans/state.json" ], - "test_status": "pnpm test:run src/backends/native/subagents.test.ts src/tools/builtin/subagents.test.ts src/tools/policy.test.ts src/config/schema.test.ts src/daemon/routing.test.ts passing + pnpm typecheck" + "test_status": "pnpm test:run src/backends/native/subagents.test.ts src/tools/builtin/subagents.test.ts src/commands/builtin/index.test.ts src/audit/logger.test.ts src/config/schema.test.ts src/daemon/routing.test.ts passing + pnpm typecheck" } }, "overall_progress": { - "total_test_count": 2533, + "total_test_count": 2534, "all_tests_passing": true, "p0_completion": "3/3 (100%)", "p1_completion": "4/4 (100%)", @@ -6843,7 +6845,7 @@ "tier2_completion": "4/4 (100%) \u2014 inbound webhooks, vector memory search, Dockerfile, heartbeat monitor", "tier3_completion": "5/5 (100%) \u2014 lane queue, credential redaction, web UI token dashboard, xAI (Grok) provider, Voyage AI embeddings", "tier4_completion": "4/4 (100%) \u2014 gateway lock, shell completion, Tailscale Serve/Funnel, DM pairing codes", - "feature_gap_scorecard": "rebaselined 2026-02-26 — channel breadth, setup wizard, baseline browser automation, and partial phase-2 subagent support (`subagent.*` + idle TTL cleanup + transcript summary) are implemented; remaining high-impact personal-assistant gaps center on shipped companion apps (desktop/mobile), voice UX polish, browser workflow reliability primitives, and first-success onboarding funnel optimization.", + "feature_gap_scorecard": "rebaselined 2026-02-26 — channel breadth, setup wizard, baseline browser automation, and full subagent support (`subagent.*` + queue modes + budgets + trace/audit + `/subagents` inspection) are implemented; remaining high-impact personal-assistant gaps center on shipped companion apps (desktop/mobile), voice UX polish, browser workflow reliability primitives, and first-success onboarding funnel optimization.", "operator_dx_milestone": "Phase 3 (Live Ops Dashboard): 2/2 plans complete \u2014 milestone done", "dashboard_observability": "completed \u2014 service health graphs + core service log viewer added to web UI via observability RPCs and bounded backend sampling", "gmail_auth_cli": "flynn gmail-auth command implemented with OAuth2 flow, doctor check, config routed to Telegram", @@ -6877,7 +6879,7 @@ "deeper_surfaces_phase4_rollout": "completed \u2014 phase 4 rollout and operator readiness plan documented: canary rollout plan by feature flag/surface, explicit rollback playbook, operator docs and architecture/protocol docs synchronized", "post_phase_test_fixes": "completed \u2014 fixed 4 test failures introduced by phases 1-3: iOS/Android push listNodes (missing publishHeartbeat before platform-filtered query), server.test agent.send (run_state events now precede done; added sendAndWaitForDone helper), httpBody 413 (req.destroy() closed socket before response could be sent; replaced with Connection: close header on 413 responses)", "personal_assistant_productization_plan": "proposed \u2014 8-10 week phased roadmap defined (companion MVP surfaces, voice reliability hardening, browser workflow reliability layer, onboarding 2.0 first-success funnel) with measurable exit gates.", - "subagents_support": "completed \u2014 phase-1 plus partial phase-2 subagent runtime support added with `subagent.spawn/send/list/cancel/delete/summary`, per-parent child-session orchestration, idle TTL cleanup (`agents.subagents.idle_ttl_ms`), config guardrails, and focused regression tests." + "subagents_support": "completed \u2014 subagent phases 1-3 shipped with `subagent.spawn/send/list/cancel/delete/summary`, per-child queue mode (`followup|interrupt`), budgets (`max_turns`, `max_total_tokens`, `turn_timeout_ms`), tool-profile overrides, trace-linked audit events, `/subagents` inspection commands, and focused regression tests." }, "soul_md_and_cron_create": { "date": "2026-02-11", diff --git a/src/audit/logger.test.ts b/src/audit/logger.test.ts index ce98383..49a31dd 100644 --- a/src/audit/logger.test.ts +++ b/src/audit/logger.test.ts @@ -123,6 +123,26 @@ describe('AuditLogger', () => { reason: 'no_match', candidate_count: 4, }); + logger.subagentLifecycle({ + parent_session_id: 'telegram:123', + subagent_id: 'planner', + trace_id: 'trace-planner', + action: 'spawn', + agent: 'research', + tier: 'complex', + queue_mode: 'followup', + tool_profile: 'minimal', + }); + logger.subagentTurn({ + parent_session_id: 'telegram:123', + subagent_id: 'planner', + trace_id: 'trace-planner', + action: 'complete', + queue_mode: 'followup', + duration_ms: 88, + input_chars: 42, + output_chars: 120, + }); await logger.close(); await waitForFlush(); @@ -135,6 +155,8 @@ describe('AuditLogger', () => { expect(eventTypes).toContain('run.cancel'); expect(eventTypes).toContain('reaction.match'); expect(eventTypes).toContain('reaction.skip'); + expect(eventTypes).toContain('subagent.lifecycle'); + expect(eventTypes).toContain('subagent.turn'); const runError = events.find((event) => ( event.event_type === 'run.state' @@ -145,6 +167,10 @@ describe('AuditLogger', () => { const reactionSkip = events.find((event) => event.event_type === 'reaction.skip'); expect(reactionSkip?.level).toBe('debug'); expect(reactionSkip?.event.reason).toBe('no_match'); + + const subagentLifecycle = events.find((event) => event.event_type === 'subagent.lifecycle'); + expect(subagentLifecycle?.level).toBe('info'); + expect(subagentLifecycle?.event.action).toBe('spawn'); } finally { if (previousHome === undefined) { delete process.env.HOME; diff --git a/src/audit/logger.ts b/src/audit/logger.ts index 0ae5606..b8e0d71 100644 --- a/src/audit/logger.ts +++ b/src/audit/logger.ts @@ -26,6 +26,8 @@ import type { RunCancelEvent, ReactionMatchEvent, ReactionSkipEvent, + SubagentLifecycleEvent, + SubagentTurnEvent, BackendRouteEvent, BackendSuccessEvent, BackendFallbackEvent, @@ -237,6 +239,17 @@ export class AuditLogger { this.write({ level: 'debug', event_type: 'reaction.skip', event: event as unknown as Record }); } + subagentLifecycle(event: SubagentLifecycleEvent): void { + if (!this.shouldLog('sessions', 'info')) {return;} + this.write({ level: 'info', event_type: 'subagent.lifecycle', event: event as unknown as Record }); + } + + subagentTurn(event: SubagentTurnEvent): void { + const level = event.action === 'error' ? 'warn' : 'debug'; + if (!this.shouldLog('sessions', level)) {return;} + this.write({ level, event_type: 'subagent.turn', event: event as unknown as Record }); + } + backendRoute(event: BackendRouteEvent): void { if (!this.shouldLog('sessions', 'info')) {return;} this.write({ level: 'info', event_type: 'backend.route', event: event as unknown as Record }); diff --git a/src/audit/types.ts b/src/audit/types.ts index abd38fd..4039499 100644 --- a/src/audit/types.ts +++ b/src/audit/types.ts @@ -14,6 +14,7 @@ export type AuditEventType = | 'queue.preempt' | 'run.state' | 'run.cancel' | 'reaction.match' | 'reaction.skip' + | 'subagent.lifecycle' | 'subagent.turn' | 'backend.route' | 'backend.success' | 'backend.fallback' // Automation - Cron | 'cron.trigger' | 'cron.sent' | 'cron.add' | 'cron.remove' @@ -303,6 +304,34 @@ export interface BackendFallbackEvent { duration_ms?: number; } +export interface SubagentLifecycleEvent { + parent_session_id: string; + subagent_id: string; + trace_id: string; + action: 'spawn' | 'cancel' | 'delete' | 'ttl_evict' | 'summary'; + agent?: string; + tier?: string; + queue_mode?: 'followup' | 'interrupt'; + tool_profile?: string; + reason?: string; +} + +export interface SubagentTurnEvent { + parent_session_id: string; + subagent_id: string; + trace_id: string; + action: 'queued' | 'superseded' | 'start' | 'complete' | 'error'; + request_id?: string; + queue_mode?: 'followup' | 'interrupt'; + pending_count?: number; + duration_ms?: number; + error?: string; + input_chars?: number; + output_chars?: number; + turn_count?: number; + total_tokens?: number; +} + export interface CronTriggerEvent { job_name: string; schedule: string; diff --git a/src/backends/native/subagents.test.ts b/src/backends/native/subagents.test.ts index 6ec4ebc..6e97311 100644 --- a/src/backends/native/subagents.test.ts +++ b/src/backends/native/subagents.test.ts @@ -8,6 +8,8 @@ const mocks = vi.hoisted(() => { processCalls, cancellable: true, cancelCalls: 0, + usageInput: 0, + usageOutput: 0, }; }); @@ -29,6 +31,8 @@ vi.mock('./orchestrator.js', () => { async process(message: string): Promise { mocks.processCalls.push(message); const output = `subagent:${message}`; + mocks.usageInput += Math.ceil(message.length / 4); + mocks.usageOutput += Math.ceil(output.length / 4); this.session.addMessage({ role: 'user', content: message }); this.session.addMessage({ role: 'assistant', content: output }); return output; @@ -41,6 +45,23 @@ vi.mock('./orchestrator.js', () => { cancel(): void { mocks.cancelCalls += 1; } + + getUsage(): { + primary: { inputTokens: number; outputTokens: number; calls: number }; + delegation: Record; + total: { inputTokens: number; outputTokens: number; calls: number; estimatedCost: number }; + } { + return { + primary: { inputTokens: mocks.usageInput, outputTokens: mocks.usageOutput, calls: mocks.processCalls.length }, + delegation: {}, + total: { + inputTokens: mocks.usageInput, + outputTokens: mocks.usageOutput, + calls: mocks.processCalls.length, + estimatedCost: 0, + }, + }; + } } return { AgentOrchestrator }; @@ -122,6 +143,8 @@ describe('SubagentManager', () => { mocks.processCalls.length = 0; mocks.cancellable = true; mocks.cancelCalls = 0; + mocks.usageInput = 0; + mocks.usageOutput = 0; }); it('spawns, sends, lists, cancels, and deletes subagent sessions', async () => { @@ -162,8 +185,13 @@ describe('SubagentManager', () => { }, maxDelegationDepth: 3, defaultPrimaryTier: 'default', + defaultQueueMode: 'followup', + defaultToolProfile: 'minimal', maxIterations: 12, maxActiveSessions: 2, + maxTurns: 40, + maxTotalTokens: 200000, + turnTimeoutMs: 120000, idleTtlMs: 60000, }); @@ -171,6 +199,7 @@ describe('SubagentManager', () => { expect(spawned.id).toBe('planner'); expect(spawned.agent).toBe('research'); expect(spawned.tier).toBe('complex'); + expect(spawned.queueMode).toBe('followup'); // verify blocked orchestration tools are not passed to child subagents const ctorConfig = mocks.ctorConfigs[0] as { toolRegistry: ToolRegistry }; @@ -220,7 +249,12 @@ describe('SubagentManager', () => { }, maxDelegationDepth: 3, defaultPrimaryTier: 'default', + defaultQueueMode: 'followup', + defaultToolProfile: 'minimal', maxActiveSessions: 1, + maxTurns: 40, + maxTotalTokens: 200000, + turnTimeoutMs: 120000, idleTtlMs: 60000, }); @@ -249,7 +283,12 @@ describe('SubagentManager', () => { }, maxDelegationDepth: 3, defaultPrimaryTier: 'default', + defaultQueueMode: 'followup', + defaultToolProfile: 'minimal', maxActiveSessions: 3, + maxTurns: 40, + maxTotalTokens: 200000, + turnTimeoutMs: 120000, idleTtlMs: 60000, }); @@ -274,7 +313,12 @@ describe('SubagentManager', () => { }, maxDelegationDepth: 3, defaultPrimaryTier: 'default', + defaultQueueMode: 'followup', + defaultToolProfile: 'minimal', maxActiveSessions: 3, + maxTurns: 40, + maxTotalTokens: 200000, + turnTimeoutMs: 120000, idleTtlMs: 1000, }); @@ -284,4 +328,47 @@ describe('SubagentManager', () => { expect(removed).toEqual(['ttl-one']); expect(manager.list()).toEqual([]); }); + + it('enforces per-session turn/token budgets and interrupt latest-wins behavior', async () => { + const sessionManager = createSessionManagerMock(); + const manager = new SubagentManager({ + parentSessionId: 'telegram:eve', + modelRouter: {} as never, + sessionManager: sessionManager.api as never, + toolRegistry: new ToolRegistry(), + toolExecutor: {} as never, + agentConfigRegistry: createAgentRegistryMock() as never, + delegation: { + compaction: 'fast', + memory_extraction: 'fast', + classification: 'fast', + tool_summarisation: 'fast', + complex_reasoning: 'complex', + }, + maxDelegationDepth: 3, + defaultPrimaryTier: 'default', + defaultQueueMode: 'interrupt', + defaultToolProfile: 'minimal', + maxActiveSessions: 3, + maxTurns: 2, + maxTotalTokens: 200000, + turnTimeoutMs: 120000, + idleTtlMs: 60000, + }); + + manager.spawn({ agent: 'helper', subagentId: 'interrupt-one' }); + const p1 = manager.send('interrupt-one', 'first request'); + const p2 = manager.send('interrupt-one', 'second request'); + + const [r1, r2] = await Promise.allSettled([p1, p2]); + expect(mocks.cancelCalls).toBeGreaterThanOrEqual(1); + if (r1.status === 'fulfilled') { + expect(r1.value.content).toContain('subagent:'); + } + expect(r2.status).toBe('fulfilled'); + if (r2.status === 'fulfilled') { + expect(r2.value.content).toBe('subagent:second request'); + } + await expect(manager.send('interrupt-one', 'third request')).rejects.toThrow('max turns'); + }); }); diff --git a/src/backends/native/subagents.ts b/src/backends/native/subagents.ts index 108d98d..45c90d9 100644 --- a/src/backends/native/subagents.ts +++ b/src/backends/native/subagents.ts @@ -1,15 +1,20 @@ import { randomUUID } from 'node:crypto'; import type { AgentConfigRegistry } from '../../agents/registry.js'; +import type { ToolProfile } from '../../config/schema.js'; import type { Message } from '../../models/types.js'; import type { ToolPolicyContext } from '../../tools/policy.js'; +import { PROFILE_TOOLS } from '../../tools/policy.js'; import type { ModelRouter, ModelTier } from '../../models/router.js'; import type { SessionManager } from '../../session/manager.js'; import type { ToolRegistry } from '../../tools/registry.js'; import type { ToolExecutor } from '../../tools/executor.js'; +import { auditLogger } from '../../audit/index.js'; import { AgentOrchestrator, type DelegationConfig } from './orchestrator.js'; const SUBAGENT_FRONTEND = 'subagent'; +type SubagentQueueMode = 'followup' | 'interrupt'; + const BLOCKED_SUBAGENT_TOOL_NAMES = [ 'agent.delegate', 'council.run', @@ -21,6 +26,13 @@ const BLOCKED_SUBAGENT_TOOL_NAMES = [ 'subagent.summary', ]; +interface QueuedTurn { + requestId: string; + message: string; + resolve: (result: SubagentSendResult) => void; + reject: (error: Error) => void; +} + export interface SubagentManagerConfig { parentSessionId: string; modelRouter: ModelRouter; @@ -31,8 +43,13 @@ export interface SubagentManagerConfig { delegation: DelegationConfig; maxDelegationDepth: number; defaultPrimaryTier: ModelTier; + defaultQueueMode: SubagentQueueMode; + defaultToolProfile: ToolProfile; maxIterations?: number; maxActiveSessions: number; + maxTurns: number; + maxTotalTokens: number; + turnTimeoutMs: number; idleTtlMs: number; toolPolicyContext?: ToolPolicyContext; } @@ -42,16 +59,24 @@ export interface SpawnSubagentRequest { subagentId?: string; tier?: ModelTier; systemPrompt?: string; + queueMode?: SubagentQueueMode; + toolProfile?: ToolProfile; } interface ManagedSubagent { id: string; agent: string; tier: ModelTier; + queueMode: SubagentQueueMode; + toolProfile: ToolProfile; + traceId: string; sessionUserId: string; createdAt: number; updatedAt: number; busy: boolean; + processing: boolean; + pending: QueuedTurn[]; + completedTurns: number; orchestrator: AgentOrchestrator; } @@ -59,7 +84,12 @@ export interface SubagentSessionSummary { id: string; agent: string; tier: ModelTier; + queueMode: SubagentQueueMode; + toolProfile: ToolProfile; + traceId: string; messageCount: number; + completedTurns: number; + pendingCount: number; createdAt: number; updatedAt: number; busy: boolean; @@ -101,13 +131,13 @@ export class SubagentManager { if (!agentConfig) { const available = this.config.agentConfigRegistry.list().map((entry) => entry.name); throw new Error( - `Agent \"${agentName}\" not found. Available agents: ${available.length > 0 ? available.join(', ') : 'none'}`, + `Agent "${agentName}" not found. Available agents: ${available.length > 0 ? available.join(', ') : 'none'}`, ); } const id = this.resolveSubagentId(request.subagentId); if (this.sessions.has(id)) { - throw new Error(`Subagent session \"${id}\" already exists.`); + throw new Error(`Subagent session "${id}" already exists.`); } if (this.sessions.size >= this.config.maxActiveSessions) { throw new Error( @@ -118,15 +148,16 @@ export class SubagentManager { const tier = request.tier ?? agentConfig.modelTier ?? this.config.defaultPrimaryTier; const systemPrompt = request.systemPrompt ?? agentConfig.systemPrompt - ?? `You are subagent \"${agentName}\". Complete assigned tasks clearly and concisely.`; + ?? `You are subagent "${agentName}". Complete assigned tasks clearly and concisely.`; + const queueMode = request.queueMode ?? this.config.defaultQueueMode; + const toolProfile = request.toolProfile ?? agentConfig.toolProfile ?? this.config.defaultToolProfile; + const now = Date.now(); + const traceId = `subagent:${this.config.parentSessionId}:${id}:${randomUUID().slice(0, 8)}`; const sessionUserId = `${this.config.parentSessionId}:${id}`; const session = this.config.sessionManager.getSession(SUBAGENT_FRONTEND, sessionUserId); - const subagentToolRegistry = this.config.toolRegistry.clone(); - for (const toolName of BLOCKED_SUBAGENT_TOOL_NAMES) { - subagentToolRegistry.unregister(toolName); - } + const subagentToolRegistry = this.buildSubagentToolRegistry(toolProfile); const policyContext: ToolPolicyContext | undefined = this.config.toolPolicyContext ? { @@ -154,13 +185,30 @@ export class SubagentManager { id, agent: agentName, tier, + queueMode, + toolProfile, + traceId, sessionUserId, createdAt: now, updatedAt: now, busy: false, + processing: false, + pending: [], + completedTurns: 0, orchestrator: subagent, }); + auditLogger?.subagentLifecycle?.({ + parent_session_id: this.config.parentSessionId, + subagent_id: id, + trace_id: traceId, + action: 'spawn', + agent: agentName, + tier, + queue_mode: queueMode, + tool_profile: toolProfile, + }); + return this.getSummaryById(id); } @@ -173,18 +221,53 @@ export class SubagentManager { throw new Error('message is required'); } - subagent.busy = true; - subagent.updatedAt = Date.now(); - try { - const content = await subagent.orchestrator.process(trimmed); - subagent.updatedAt = Date.now(); - return { - content, - session: this.getSummary(subagent), - }; - } finally { - subagent.busy = false; - } + this.assertBudgets(subagent); + + return new Promise((resolve, reject) => { + const requestId = `rq-${randomUUID().slice(0, 8)}`; + const queued: QueuedTurn = { requestId, message: trimmed, resolve, reject }; + + if (subagent.queueMode === 'interrupt') { + while (subagent.pending.length > 0) { + const dropped = subagent.pending.shift(); + if (!dropped) { + continue; + } + auditLogger?.subagentTurn?.({ + parent_session_id: this.config.parentSessionId, + subagent_id: subagent.id, + trace_id: subagent.traceId, + action: 'superseded', + request_id: dropped.requestId, + queue_mode: subagent.queueMode, + }); + dropped.reject(new Error(`Superseded by newer subagent request (${requestId}).`)); + } + if (subagent.busy && subagent.orchestrator.isCancellable()) { + subagent.orchestrator.cancel(); + auditLogger?.subagentLifecycle?.({ + parent_session_id: this.config.parentSessionId, + subagent_id: subagent.id, + trace_id: subagent.traceId, + action: 'cancel', + queue_mode: subagent.queueMode, + reason: 'interrupt_mode_latest_wins', + }); + } + } + + subagent.pending.push(queued); + auditLogger?.subagentTurn?.({ + parent_session_id: this.config.parentSessionId, + subagent_id: subagent.id, + trace_id: subagent.traceId, + action: 'queued', + request_id: requestId, + queue_mode: subagent.queueMode, + pending_count: subagent.pending.length, + }); + this.processQueue(subagent); + }); } cancel(subagentId: string): boolean { @@ -196,6 +279,14 @@ export class SubagentManager { } subagent.orchestrator.cancel(); subagent.updatedAt = Date.now(); + auditLogger?.subagentLifecycle?.({ + parent_session_id: this.config.parentSessionId, + subagent_id: subagent.id, + trace_id: subagent.traceId, + action: 'cancel', + queue_mode: subagent.queueMode, + reason: 'explicit_cancel', + }); return true; } @@ -209,10 +300,27 @@ export class SubagentManager { subagent.orchestrator.cancel(); } + while (subagent.pending.length > 0) { + const queued = subagent.pending.shift(); + if (!queued) { + continue; + } + queued.reject(new Error(`Subagent session "${subagent.id}" was deleted.`)); + } + const session = this.config.sessionManager.getSession(SUBAGENT_FRONTEND, subagent.sessionUserId); session.clear(); this.config.sessionManager.closeSession(SUBAGENT_FRONTEND, subagent.sessionUserId); this.sessions.delete(subagentId); + + auditLogger?.subagentLifecycle?.({ + parent_session_id: this.config.parentSessionId, + subagent_id: subagent.id, + trace_id: subagent.traceId, + action: 'delete', + queue_mode: subagent.queueMode, + reason: 'explicit_delete', + }); return true; } @@ -235,6 +343,15 @@ export class SubagentManager { : history.length; const tail = history.slice(Math.max(0, history.length - max)); const messages = tail.map((entry) => this.toTranscriptEntry(entry)); + + auditLogger?.subagentLifecycle?.({ + parent_session_id: this.config.parentSessionId, + subagent_id: subagent.id, + trace_id: subagent.traceId, + action: 'summary', + queue_mode: subagent.queueMode, + }); + return { session: this.getSummary(subagent), messages, @@ -247,7 +364,7 @@ export class SubagentManager { } const removed: string[] = []; for (const [id, session] of this.sessions.entries()) { - if (session.busy) { + if (session.busy || session.pending.length > 0) { continue; } if ((nowMs - session.updatedAt) <= this.config.idleTtlMs) { @@ -255,6 +372,14 @@ export class SubagentManager { } this.delete(id); removed.push(id); + auditLogger?.subagentLifecycle?.({ + parent_session_id: this.config.parentSessionId, + subagent_id: session.id, + trace_id: session.traceId, + action: 'ttl_evict', + queue_mode: session.queueMode, + reason: 'idle_ttl_elapsed', + }); } return removed; } @@ -271,7 +396,7 @@ export class SubagentManager { const normalized = id.trim(); const subagent = this.sessions.get(normalized); if (!subagent) { - throw new Error(`Subagent session \"${normalized}\" not found.`); + throw new Error(`Subagent session "${normalized}" not found.`); } return subagent; } @@ -287,7 +412,12 @@ export class SubagentManager { id: subagent.id, agent: subagent.agent, tier: subagent.tier, + queueMode: subagent.queueMode, + toolProfile: subagent.toolProfile, + traceId: subagent.traceId, messageCount: session.getHistory().length, + completedTurns: subagent.completedTurns, + pendingCount: subagent.pending.length, createdAt: subagent.createdAt, updatedAt: subagent.updatedAt, busy: subagent.busy, @@ -301,4 +431,131 @@ export class SubagentManager { timestamp: entry.timestamp, }; } + + private async processQueue(subagent: ManagedSubagent): Promise { + if (subagent.processing) { + return; + } + subagent.processing = true; + + try { + while (subagent.pending.length > 0) { + const next = subagent.pending.shift(); + if (!next) { + continue; + } + const startedAt = Date.now(); + subagent.busy = true; + subagent.updatedAt = startedAt; + + try { + this.assertBudgets(subagent); + auditLogger?.subagentTurn?.({ + parent_session_id: this.config.parentSessionId, + subagent_id: subagent.id, + trace_id: subagent.traceId, + action: 'start', + request_id: next.requestId, + queue_mode: subagent.queueMode, + input_chars: next.message.length, + pending_count: subagent.pending.length, + }); + + const content = await this.runTurnWithTimeout(subagent, next.message); + subagent.completedTurns += 1; + subagent.updatedAt = Date.now(); + const totalTokens = this.getTotalTokens(subagent); + + auditLogger?.subagentTurn?.({ + parent_session_id: this.config.parentSessionId, + subagent_id: subagent.id, + trace_id: subagent.traceId, + action: 'complete', + request_id: next.requestId, + queue_mode: subagent.queueMode, + duration_ms: subagent.updatedAt - startedAt, + output_chars: content.length, + turn_count: subagent.completedTurns, + total_tokens: totalTokens, + pending_count: subagent.pending.length, + }); + + next.resolve({ + content, + session: this.getSummary(subagent), + }); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + subagent.updatedAt = Date.now(); + auditLogger?.subagentTurn?.({ + parent_session_id: this.config.parentSessionId, + subagent_id: subagent.id, + trace_id: subagent.traceId, + action: 'error', + request_id: next.requestId, + queue_mode: subagent.queueMode, + duration_ms: subagent.updatedAt - startedAt, + error: message, + pending_count: subagent.pending.length, + }); + next.reject(error instanceof Error ? error : new Error(message)); + } finally { + subagent.busy = false; + } + } + } finally { + subagent.processing = false; + } + } + + private async runTurnWithTimeout(subagent: ManagedSubagent, message: string): Promise { + return await new Promise((resolve, reject) => { + const timer = setTimeout(() => { + if (subagent.orchestrator.isCancellable()) { + subagent.orchestrator.cancel(); + } + reject(new Error(`Subagent turn timed out after ${this.config.turnTimeoutMs}ms`)); + }, this.config.turnTimeoutMs); + + subagent.orchestrator.process(message) + .then((result) => { + clearTimeout(timer); + resolve(result); + }) + .catch((error) => { + clearTimeout(timer); + reject(error); + }); + }); + } + + private assertBudgets(subagent: ManagedSubagent): void { + if (subagent.completedTurns >= this.config.maxTurns) { + throw new Error(`Subagent session "${subagent.id}" reached max turns (${this.config.maxTurns}).`); + } + if (this.getTotalTokens(subagent) >= this.config.maxTotalTokens) { + throw new Error(`Subagent session "${subagent.id}" reached max total tokens (${this.config.maxTotalTokens}).`); + } + } + + private getTotalTokens(subagent: ManagedSubagent): number { + const usage = subagent.orchestrator.getUsage(); + return usage.total.inputTokens + usage.total.outputTokens; + } + + private buildSubagentToolRegistry(profile: ToolProfile): ToolRegistry { + const subagentToolRegistry = this.config.toolRegistry.clone(); + if (profile !== 'full') { + const allowed = PROFILE_TOOLS[profile]; + for (const tool of subagentToolRegistry.list()) { + if (!allowed.has(tool.name)) { + subagentToolRegistry.unregister(tool.name); + } + } + } + for (const toolName of BLOCKED_SUBAGENT_TOOL_NAMES) { + subagentToolRegistry.unregister(toolName); + } + return subagentToolRegistry; + } } diff --git a/src/commands/builtin/index.test.ts b/src/commands/builtin/index.test.ts index c2d6f9b..6526b7b 100644 --- a/src/commands/builtin/index.test.ts +++ b/src/commands/builtin/index.test.ts @@ -1,6 +1,6 @@ import { describe, it, expect, vi } from 'vitest'; -import { createApproveCommand, createApprovalsCommand, createBackendCommand, createContextCommand, createCouncilCommand, createDenyCommand, createElevateCommand, createModelCommand, createQueueCommand, createResearchCommand, createSkillCommand, createStopCommand, createToolsCommand, createTransferCommand } from './index.js'; +import { createApproveCommand, createApprovalsCommand, createBackendCommand, createContextCommand, createCouncilCommand, createDenyCommand, createElevateCommand, createModelCommand, createQueueCommand, createResearchCommand, createSkillCommand, createStopCommand, createSubagentsCommand, createToolsCommand, createTransferCommand } from './index.js'; describe('builtin /model command', () => { it('passes through the full argument string', async () => { @@ -94,6 +94,34 @@ describe('builtin /council command', () => { }); }); +describe('builtin /subagents command', () => { + it('passes through raw subcommands', async () => { + const cmd = createSubagentsCommand(); + const subagentsCommand = vi.fn(() => 'subagents listed'); + const result = await cmd.execute(['summary', 'planner', '10'], { + channel: 'test', + senderId: 'user', + sessionId: 's1', + rawInput: '/subagents summary planner 10', + services: { subagentsCommand }, + }); + expect(subagentsCommand).toHaveBeenCalledWith('summary planner 10'); + expect(result).toEqual({ handled: true, text: 'subagents listed' }); + }); + + it('returns not-available when service is missing', async () => { + const cmd = createSubagentsCommand(); + const result = await cmd.execute([], { + channel: 'test', + senderId: 'user', + sessionId: 's1', + rawInput: '/subagents', + services: {}, + }); + expect(result).toEqual({ handled: true, text: 'Subagents command is not available in this session.' }); + }); +}); + describe('builtin /elevate command', () => { it('passes through the full argument string', async () => { const cmd = createElevateCommand(); diff --git a/src/commands/builtin/index.ts b/src/commands/builtin/index.ts index 704315e..adac401 100644 --- a/src/commands/builtin/index.ts +++ b/src/commands/builtin/index.ts @@ -274,6 +274,22 @@ export function createCouncilCommand(): CommandDefinition { }; } +export function createSubagentsCommand(): CommandDefinition { + return { + name: 'subagents', + description: 'Inspect subagent sessions (list/summary/cancel/delete)', + execute: async (args, ctx) => { + if (!ctx.services?.subagentsCommand) { + return notAvailable('Subagents command'); + } + return { + handled: true, + text: await ctx.services.subagentsCommand(args.join(' ').trim()), + }; + }, + }; +} + export function createTransferCommand(): CommandDefinition { return { name: 'transfer', @@ -381,6 +397,7 @@ export function registerBuiltinCommands(registry: CommandRegistry): void { registry.register(createContextCommand()); registry.register(createResearchCommand()); registry.register(createCouncilCommand()); + registry.register(createSubagentsCommand()); registry.register(createModelCommand()); registry.register(createCompactCommand()); registry.register(createResetCommand()); diff --git a/src/commands/types.ts b/src/commands/types.ts index 48b7ee3..8665548 100644 --- a/src/commands/types.ts +++ b/src/commands/types.ts @@ -29,6 +29,7 @@ export interface CommandServices { reset?: () => Promise | string; delegateAgent?: (agentName: string, task: string) => Promise | string; runCouncil?: (task: string) => Promise | string; + subagentsCommand?: (input: string) => Promise | string; getElevation?: () => Promise | string; setElevation?: (input: string) => Promise | string; diff --git a/src/config/schema.test.ts b/src/config/schema.test.ts index dfe6714..11050d7 100644 --- a/src/config/schema.test.ts +++ b/src/config/schema.test.ts @@ -1701,6 +1701,11 @@ describe('configSchema — agents truthfulness/autonomy', () => { expect(result.agents.subagents.enabled).toBe(true); expect(result.agents.subagents.max_active_sessions).toBe(6); expect(result.agents.subagents.idle_ttl_ms).toBe(3600000); + expect(result.agents.subagents.queue_mode).toBe('followup'); + expect(result.agents.subagents.default_tool_profile).toBe('minimal'); + expect(result.agents.subagents.max_turns).toBe(40); + expect(result.agents.subagents.max_total_tokens).toBe(200000); + expect(result.agents.subagents.turn_timeout_ms).toBe(120000); expect(result.agents.immutable_denylist).toEqual( expect.arrayContaining([ expect.objectContaining({ tool: 'shell.exec', args_pattern: 'git push origin main' }), @@ -1720,6 +1725,11 @@ describe('configSchema — agents truthfulness/autonomy', () => { enabled: false, max_active_sessions: 3, idle_ttl_ms: 120000, + queue_mode: 'interrupt', + default_tool_profile: 'messaging', + max_turns: 12, + max_total_tokens: 50000, + turn_timeout_ms: 90000, }, immutable_denylist: [ { tool: 'shell.exec', args_pattern: 'rm -rf /', reason: 'too destructive' }, @@ -1733,6 +1743,11 @@ describe('configSchema — agents truthfulness/autonomy', () => { expect(result.agents.subagents.enabled).toBe(false); expect(result.agents.subagents.max_active_sessions).toBe(3); expect(result.agents.subagents.idle_ttl_ms).toBe(120000); + expect(result.agents.subagents.queue_mode).toBe('interrupt'); + expect(result.agents.subagents.default_tool_profile).toBe('messaging'); + expect(result.agents.subagents.max_turns).toBe(12); + expect(result.agents.subagents.max_total_tokens).toBe(50000); + expect(result.agents.subagents.turn_timeout_ms).toBe(90000); expect(result.agents.immutable_denylist).toEqual([ { tool: 'shell.exec', args_pattern: 'rm -rf /', reason: 'too destructive' }, ]); @@ -1760,6 +1775,17 @@ describe('configSchema — agents truthfulness/autonomy', () => { })).toThrow(); }); + it('rejects invalid subagent queue mode', () => { + expect(() => configSchema.parse({ + ...minimalConfig, + agents: { + subagents: { + queue_mode: 'latest', + }, + }, + })).toThrow(); + }); + it('rejects invalid truthfulness_mode', () => { expect(() => configSchema.parse({ ...minimalConfig, diff --git a/src/config/schema.ts b/src/config/schema.ts index 99f8eff..4552d66 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -539,6 +539,11 @@ const agentsSchema = z.object({ enabled: z.boolean().default(true), max_active_sessions: z.number().min(1).max(32).default(6), idle_ttl_ms: z.number().min(60_000).max(86_400_000).default(3_600_000), + queue_mode: z.enum(['followup', 'interrupt']).default('followup'), + default_tool_profile: z.enum(['minimal', 'messaging', 'coding', 'full']).default('minimal'), + max_turns: z.number().min(1).max(500).default(40), + max_total_tokens: z.number().min(1000).max(5_000_000).default(200_000), + turn_timeout_ms: z.number().min(1000).max(10 * 60 * 1000).default(120_000), }).default({}), auto_escalate: z.boolean().default(false), max_delegation_depth: z.number().min(1).max(10).default(3), diff --git a/src/daemon/routing.test.ts b/src/daemon/routing.test.ts index 4393d03..120f450 100644 --- a/src/daemon/routing.test.ts +++ b/src/daemon/routing.test.ts @@ -737,6 +737,77 @@ describe('daemon command fast-path integration', () => { expect(session.setConfig).toHaveBeenCalledWith('queue.mode', 'followup'); }); + it('handles /subagents list via command fast-path', async () => { + const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process'); + const session = { + id: 'telegram:user-subagents', + addMessage: vi.fn(), + getHistory: vi.fn(() => []), + clear: vi.fn(), + replaceHistory: vi.fn(), + getConfig: vi.fn(() => undefined), + setConfig: vi.fn(), + deleteConfig: vi.fn(), + }; + + const commandRegistry = new CommandRegistry(); + registerBuiltinCommands(commandRegistry); + + const agentConfigRegistry = new AgentConfigRegistry(); + agentConfigRegistry.loadFromConfig({ + assistant: { model_tier: 'default', sandbox: false }, + helper: { model_tier: 'fast', sandbox: false }, + }); + + const router = createMessageRouter({ + sessionManager: { + getSession: vi.fn(() => session), + } as unknown as MessageRouterDeps['sessionManager'], + modelRouter: { + getAvailableTiers: () => ['fast', 'default', 'complex', 'local'], + getAllLabels: () => ({ fast: 'fast', default: 'default', complex: 'complex', local: 'local' }), + getLabel: (tier: string) => tier, + } as unknown as MessageRouterDeps['modelRouter'], + systemPrompt: 'test prompt', + toolRegistry: { + clone() { return this; }, + register: vi.fn(), + } as unknown as MessageRouterDeps['toolRegistry'], + toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'], + config: { + agents: { + primary_tier: 'default', + delegation: { + compaction: 'fast', + memory_extraction: 'fast', + classification: 'fast', + tool_summarisation: 'fast', + complex_reasoning: 'complex', + }, + max_delegation_depth: 3, + max_iterations: 10, + }, + compaction: { enabled: false }, + models: { default: { provider: 'anthropic', model: 'claude' } }, + } as unknown as MessageRouterDeps['config'], + commandRegistry, + agentConfigRegistry, + }); + + const reply = vi.fn(async (_message: OutboundMessage) => {}); + await router.handler({ + id: 'subagents-1', + channel: 'telegram', + senderId: 'user-subagents', + text: '/subagents list', + timestamp: Date.now(), + metadata: { isCommand: true, command: 'subagents', commandArgs: 'list' }, + } as MessageRouterInput, reply); + + expect(processSpy).not.toHaveBeenCalled(); + expect(reply).toHaveBeenCalledWith(expect.objectContaining({ text: 'No active subagent sessions.' })); + }); + it('uses intent match to override agent target', async () => { const session = { id: 'telegram:user-2', diff --git a/src/daemon/routing.ts b/src/daemon/routing.ts index 1c47cb4..84a4b78 100644 --- a/src/daemon/routing.ts +++ b/src/daemon/routing.ts @@ -695,8 +695,13 @@ export function createMessageRouter(deps: { delegation: delegationConfig, maxDelegationDepth: deps.config.agents.max_delegation_depth ?? 3, defaultPrimaryTier: effectiveTier, + defaultQueueMode: deps.config.agents.subagents?.queue_mode ?? 'followup', + defaultToolProfile: deps.config.agents.subagents?.default_tool_profile ?? 'minimal', maxIterations: deps.config.agents.max_iterations, maxActiveSessions: maxSubagentSessions, + maxTurns: deps.config.agents.subagents?.max_turns ?? 40, + maxTotalTokens: deps.config.agents.subagents?.max_total_tokens ?? 200_000, + turnTimeoutMs: deps.config.agents.subagents?.turn_timeout_ms ?? 120_000, idleTtlMs: deps.config.agents.subagents?.idle_ttl_ms ?? 3_600_000, }); for (const tool of createSubagentTools(subagentManager)) { @@ -1274,6 +1279,58 @@ export function createMessageRouter(deps: { } return result.output; }, + subagentsCommand: async (input: string) => { + if (!subagentManager) { + return 'Subagents are not enabled for this session.'; + } + const raw = input.trim(); + if (!raw || raw === 'list') { + const entries = subagentManager.list(); + if (entries.length === 0) { + return 'No active subagent sessions.'; + } + return [ + `Active subagents (${entries.length}):`, + ...entries.map((entry) => ( + `- ${entry.id} agent=${entry.agent} tier=${entry.tier} queue=${entry.queueMode} profile=${entry.toolProfile} ` + + `turns=${entry.completedTurns} pending=${entry.pendingCount} busy=${entry.busy ? 'yes' : 'no'}` + )), + ].join('\n'); + } + + const [action, ...rest] = raw.split(/\s+/); + if ((action === 'summary' || action === 'show') && rest.length >= 1) { + const subagentId = rest[0]; + const limitRaw = rest[1]; + const parsedLimit = limitRaw ? Number.parseInt(limitRaw, 10) : undefined; + const transcript = subagentManager.getTranscript(subagentId, Number.isFinite(parsedLimit) ? parsedLimit : undefined); + return [ + `Subagent ${transcript.session.id} summary:`, + `- agent=${transcript.session.agent} tier=${transcript.session.tier} queue=${transcript.session.queueMode} profile=${transcript.session.toolProfile}`, + `- turns=${transcript.session.completedTurns} messages=${transcript.session.messageCount} pending=${transcript.session.pendingCount}`, + 'Transcript:', + ...(transcript.messages.length > 0 + ? transcript.messages.map((entry, idx) => `${idx + 1}. [${entry.role}] ${entry.content.slice(0, 200)}`) + : ['(empty)']), + ].join('\n'); + } + + if (action === 'cancel' && rest.length >= 1) { + const cancelled = subagentManager.cancel(rest[0]); + return cancelled + ? `Cancellation requested for subagent \"${rest[0]}\".` + : `No active operation to cancel for subagent \"${rest[0]}\".`; + } + + if ((action === 'delete' || action === 'rm') && rest.length >= 1) { + const deleted = subagentManager.delete(rest[0]); + return deleted + ? `Deleted subagent session \"${rest[0]}\".` + : `Subagent session \"${rest[0]}\" not found.`; + } + + return 'Usage: /subagents [list|summary [limit]|cancel |delete ]'; + }, getElevation: () => { return getElevationStatusMessage({ diff --git a/src/gateway/ui/pages/chat.js b/src/gateway/ui/pages/chat.js index 490b6e8..e874afa 100644 --- a/src/gateway/ui/pages/chat.js +++ b/src/gateway/ui/pages/chat.js @@ -31,6 +31,7 @@ const SLASH_COMMANDS = [ { name: '/usage', desc: 'Show token usage' }, { name: '/status', desc: 'Show system health' }, { name: '/model', desc: 'Show current model' }, + { name: '/subagents', desc: 'Inspect subagent sessions' }, { name: '/stop', desc: 'Stop active response' }, { name: '/cancel', desc: 'Alias for /stop' }, { name: '/approvals', desc: 'List pending guarded actions' }, diff --git a/src/tools/builtin/subagents.test.ts b/src/tools/builtin/subagents.test.ts index 65a949d..90900db 100644 --- a/src/tools/builtin/subagents.test.ts +++ b/src/tools/builtin/subagents.test.ts @@ -25,7 +25,12 @@ describe('subagent tools', () => { id: 'planner', agent: 'research', tier: 'complex', + queueMode: 'followup', + toolProfile: 'minimal', + traceId: 'trace-planner', messageCount: 0, + completedTurns: 0, + pendingCount: 0, createdAt: 1, updatedAt: 1, busy: false, @@ -36,7 +41,12 @@ describe('subagent tools', () => { id: 'planner', agent: 'research', tier: 'complex', + queueMode: 'followup', + toolProfile: 'minimal', + traceId: 'trace-planner', messageCount: 2, + completedTurns: 1, + pendingCount: 0, createdAt: 1, updatedAt: 2, busy: false, @@ -60,6 +70,8 @@ describe('subagent tools', () => { agent: 'research', subagentId: 'planner', tier: undefined, + queueMode: undefined, + toolProfile: undefined, systemPrompt: undefined, }); expect(mockController.send).toHaveBeenCalledWith('planner', 'Create a checklist'); @@ -72,7 +84,12 @@ describe('subagent tools', () => { id: 'planner', agent: 'research', tier: 'complex', + queueMode: 'followup', + toolProfile: 'minimal', + traceId: 'trace-planner', messageCount: 4, + completedTurns: 2, + pendingCount: 0, createdAt: 1, updatedAt: 3, busy: false, @@ -83,7 +100,12 @@ describe('subagent tools', () => { id: 'planner', agent: 'research', tier: 'complex', + queueMode: 'followup', + toolProfile: 'minimal', + traceId: 'trace-planner', messageCount: 4, + completedTurns: 2, + pendingCount: 0, createdAt: 1, updatedAt: 3, busy: false, @@ -96,7 +118,12 @@ describe('subagent tools', () => { id: 'planner', agent: 'research', tier: 'complex', + queueMode: 'followup', + toolProfile: 'minimal', + traceId: 'trace-planner', messageCount: 4, + completedTurns: 2, + pendingCount: 0, createdAt: 1, updatedAt: 3, busy: false, diff --git a/src/tools/builtin/subagents.ts b/src/tools/builtin/subagents.ts index c655eb7..6ba596b 100644 --- a/src/tools/builtin/subagents.ts +++ b/src/tools/builtin/subagents.ts @@ -1,11 +1,17 @@ import type { Tool, ToolResult } from '../types.js'; +import type { ToolProfile } from '../../config/schema.js'; import type { ModelTier } from '../../models/router.js'; interface SubagentSessionSummary { id: string; agent: string; tier: ModelTier; + queueMode: 'followup' | 'interrupt'; + toolProfile: ToolProfile; + traceId: string; messageCount: number; + completedTurns: number; + pendingCount: number; createdAt: number; updatedAt: number; busy: boolean; @@ -17,6 +23,8 @@ interface SubagentController { subagentId?: string; tier?: ModelTier; systemPrompt?: string; + queueMode?: 'followup' | 'interrupt'; + toolProfile?: ToolProfile; }): SubagentSessionSummary; send(subagentId: string, message: string): Promise<{ content: string; @@ -40,6 +48,8 @@ interface SpawnArgs { subagent_id?: string; tier?: ModelTier; system_prompt?: string; + queue_mode?: 'followup' | 'interrupt'; + tool_profile?: ToolProfile; task?: string; } @@ -62,7 +72,12 @@ function formatSummary(summary: SubagentSessionSummary): string { `id=${summary.id}`, `agent=${summary.agent}`, `tier=${summary.tier}`, + `queue=${summary.queueMode}`, + `profile=${summary.toolProfile}`, + `trace=${summary.traceId}`, + `turns=${summary.completedTurns}`, `messages=${summary.messageCount}`, + `pending=${summary.pendingCount}`, `busy=${summary.busy ? 'yes' : 'no'}`, ].join(' '); } @@ -81,6 +96,8 @@ export function createSubagentTools(controller: SubagentController): Tool[] { agent: { type: 'string', description: 'Agent profile name from agent_configs (e.g. research, coder).' }, subagent_id: { type: 'string', description: 'Optional custom subagent session ID.' }, tier: { type: 'string', description: 'Optional model tier override (fast|default|complex|local).' }, + queue_mode: { type: 'string', description: 'Optional queue mode override (followup|interrupt).' }, + tool_profile: { type: 'string', description: 'Optional tool profile override (minimal|messaging|coding|full).' }, system_prompt: { type: 'string', description: 'Optional system prompt override for this subagent session.' }, task: { type: 'string', description: 'Optional initial task to run right after spawn.' }, }, @@ -93,6 +110,8 @@ export function createSubagentTools(controller: SubagentController): Tool[] { agent: args.agent, subagentId: args.subagent_id, tier: args.tier, + queueMode: args.queue_mode, + toolProfile: args.tool_profile, systemPrompt: args.system_prompt, });