feat(subagents): complete queue, budgets, audit, and inspection controls

This commit is contained in:
William Valentin
2026-02-26 13:28:10 -08:00
parent b679261683
commit 3cc9e16ef5
23 changed files with 741 additions and 51 deletions
+14
View File
@@ -618,6 +618,7 @@ Notes:
| `/deny [id] [reason]` | Deny latest (or specific) pending gate | | `/deny [id] [reason]` | Deny latest (or specific) pending gate |
| `/skill <list|search|install>` | In-chat skill discovery/install (`list`, `search <term>`, `install <registry-id>`) | | `/skill <list|search|install>` | In-chat skill discovery/install (`list`, `search <term>`, `install <registry-id>`) |
| `/runtime <status\|activate pi\|deactivate pi\|use config>` | Show or control global runtime backend mode (`/backend ...` alias also supported) | | `/runtime <status\|activate pi\|deactivate pi\|use config>` | Show or control global runtime backend mode (`/backend ...` alias also supported) |
| `/subagents [list\|summary <id> [limit]\|cancel <id>\|delete <id>]` | Inspect and control spawned subagent sessions |
## Web UI Dashboard ## Web UI Dashboard
@@ -661,6 +662,7 @@ pnpm tui:fs
| `/tools` | Show authoritative runtime tool list for this session | | `/tools` | Show authoritative runtime tool list for this session |
| `/research <task>` | Delegate a task to `agent_configs.research` | | `/research <task>` | Delegate a task to `agent_configs.research` |
| `/council <task>` | Run dual D/P councils pipeline with bridge+meta merge (brief in TUI, full artifacts saved to disk) | | `/council <task>` | Run dual D/P councils pipeline with bridge+meta merge (brief in TUI, full artifacts saved to disk) |
| `/subagents [list\|summary <id> [limit]\|cancel <id>\|delete <id>]` | Inspect and control spawned subagent sessions |
| `/compact` | Compact conversation context | | `/compact` | Compact conversation context |
| `/usage` | Show token usage and cost | | `/usage` | Show token usage and cost |
| `/context` | Show estimated context-window usage | | `/context` | Show estimated context-window usage |
@@ -806,6 +808,13 @@ Available tools:
- `subagent.delete` — remove a child session and clear its history - `subagent.delete` — remove a child session and clear its history
- `subagent.summary` — inspect transcript summary for a child session - `subagent.summary` — inspect transcript summary for a child session
Session controls:
- Queue modes: `followup` (FIFO) or `interrupt` (latest-wins cancellation + supersede).
- Budget guardrails: max turns, max total tokens, and per-turn timeout.
- Tool profile override per subagent spawn (`minimal|messaging|coding|full`).
- Runtime inspection command: `/subagents [list|summary <id> [limit]|cancel <id>|delete <id>]`.
Example flow: Example flow:
```json ```json
@@ -823,6 +832,11 @@ agents:
enabled: true enabled: true
max_active_sessions: 6 max_active_sessions: 6
idle_ttl_ms: 3600000 idle_ttl_ms: 3600000
queue_mode: followup
default_tool_profile: minimal
max_turns: 40
max_total_tokens: 200000
turn_timeout_ms: 120000
``` ```
## Running as Service ## Running as Service
+5
View File
@@ -302,6 +302,11 @@ agents:
enabled: true enabled: true
max_active_sessions: 6 max_active_sessions: 6
idle_ttl_ms: 3600000 idle_ttl_ms: 3600000
queue_mode: followup
default_tool_profile: minimal
max_turns: 40
max_total_tokens: 200000
turn_timeout_ms: 120000
# ── Memory / Embeddings ────────────────────────────────────────────── # ── Memory / Embeddings ──────────────────────────────────────────────
# Enable hybrid keyword + vector search using local Ollama embeddings. # Enable hybrid keyword + vector search using local Ollama embeddings.
+2 -1
View File
@@ -41,7 +41,8 @@ The gateway serialises agent work **per session**, not per WebSocket connection:
- The gateway `agent.send` command path and channel-router path use the same runtime backend-mode command service; `flynn tui` forwards `/runtime ...` through this gateway path for parity. - The gateway `agent.send` command path and channel-router path use the same runtime backend-mode command service; `flynn tui` forwards `/runtime ...` through this gateway path for parity.
- Backend routing and fallback outcomes are emitted to audit logs (`backend.route`, `backend.success`, `backend.fallback`) for rollout evaluation; this telemetry is outside JSON-RPC response payloads. - Backend routing and fallback outcomes are emitted to audit logs (`backend.route`, `backend.success`, `backend.fallback`) for rollout evaluation; this telemetry is outside JSON-RPC response payloads.
- Session-start memory injection (`user/profile` + `user/working`) is server-side and controlled by `memory.user_namespace`; it does not affect protocol payloads. - Session-start memory injection (`user/profile` + `user/working`) is server-side and controlled by `memory.user_namespace`; it does not affect protocol payloads.
- Multi-turn child agents are exposed through tool calls (`subagent.spawn/send/list/cancel/delete/summary`) inside the agent loop; they do not add new JSON-RPC methods. - Multi-turn child agents are exposed through tool calls (`subagent.spawn/send/list/cancel/delete/summary`) inside the agent loop; child sessions support per-session queue mode and budget guardrails but do not add new JSON-RPC methods.
- Session command fast-path includes `/subagents` (`list|summary|cancel|delete`) for child-session inspection/control without protocol changes.
This is implemented via a per-lane queue (`LaneQueue`) in the gateway server, and used by `agent.send` and `agent.cancel`. This is implemented via a per-lane queue (`LaneQueue`) in the gateway server, and used by `agent.send` and `agent.cancel`.
+3 -2
View File
@@ -137,8 +137,9 @@ Tool Calls (inside NativeAgent loop)
+---------------------------> AuditLogger (redacted) +---------------------------> AuditLogger (redacted)
Subagent sessions (multi-turn child agents) Subagent sessions (multi-turn child agents)
parent AgentOrchestrator -> subagent.* tools -> SubagentManager (TTL cleanup) parent AgentOrchestrator -> subagent.* tools -> SubagentManager (TTL cleanup + queue/budget controls)
SubagentManager -> child AgentOrchestrator (session namespace: subagent:<parent>:<id>) SubagentManager -> child AgentOrchestrator (session namespace: subagent:<parent>:<id>, trace_id)
SubagentManager -> AuditLogger (subagent.lifecycle + subagent.turn events)
child AgentOrchestrator -> NativeAgent/tool loop (same policy engine, recursion tools removed) child AgentOrchestrator -> NativeAgent/tool loop (same policy engine, recursion tools removed)
Session start (when `memory.user_namespace` is set) Session start (when `memory.user_namespace` is set)
@@ -17,7 +17,7 @@ If you only want the protocol surface, see `docs/api/PROTOCOL.md`.
- Backend routing outcomes are auditable via `backend.route` / `backend.success` / `backend.fallback`, which enables offline canary evaluation without changing gateway protocol methods. - Backend routing outcomes are auditable via `backend.route` / `backend.success` / `backend.fallback`, which enables offline canary evaluation without changing gateway protocol methods.
- Run lifecycle/cancel intent and reaction decisions are emitted to audit logs, and aggregated into `system.metrics` counters (runStates, cancelLatencyMs, reactions) for dashboards. - Run lifecycle/cancel intent and reaction decisions are emitted to audit logs, and aggregated into `system.metrics` counters (runStates, cancelLatencyMs, reactions) for dashboards.
- Reaction matching is deterministic (priority + cooldown + recursion guard) before intent/agent routing. - Reaction matching is deterministic (priority + cooldown + recursion guard) before intent/agent routing.
- `subagent.*` tools create child orchestrators scoped to the parent conversation (`subagent:<parentSessionId>:<childId>`) with idle TTL cleanup; this is tool-loop behavior, not a separate gateway RPC session lane. - `subagent.*` tools create child orchestrators scoped to the parent conversation (`subagent:<parentSessionId>:<childId>`) with idle TTL cleanup, per-child queue mode (`followup|interrupt`), and session budgets (turn/token/timeout); this is tool-loop behavior, not a separate gateway RPC session lane.
- Companion `node.*` registration is per WebSocket connection; reconnects must re-register capabilities before invoking node RPC methods. - Companion `node.*` registration is per WebSocket connection; reconnects must re-register capabilities before invoking node RPC methods.
- Canvas artifacts are persisted per session under the gateway data directory for UI recovery across restarts. - Canvas artifacts are persisted per session under the gateway data directory for UI recovery across restarts.
- TTS output is best-effort; synthesis failures fall back to text-only responses. - TTS output is best-effort; synthesis failures fall back to text-only responses.
@@ -13,6 +13,7 @@ The following were previously treated as gaps but are already implemented in Fly
3. Browser automation baseline is present (`browser.navigate/click/type/screenshot/content/eval` in `src/tools/builtin/browser/tools.ts`). 3. Browser automation baseline is present (`browser.navigate/click/type/screenshot/content/eval` in `src/tools/builtin/browser/tools.ts`).
4. Companion protocol/runtime foundation is present (`src/companion/runtimeClient.ts`, `src/companion/platformClients.ts`). 4. Companion protocol/runtime foundation is present (`src/companion/runtimeClient.ts`, `src/companion/platformClients.ts`).
5. Talk mode + wake phrase baseline is present (`src/daemon/routing.ts`, `audio.talk_mode` schema support). 5. Talk mode + wake phrase baseline is present (`src/daemon/routing.ts`, `audio.talk_mode` schema support).
6. Subagent sessions now include queue/budget controls, transcript export, and session inspection UX (`subagent.*`, `/subagents`).
## Remaining Product Gaps (Now) ## Remaining Product Gaps (Now)
@@ -20,7 +21,6 @@ The following were previously treated as gaps but are already implemented in Fly
2. Voice UX is functional but not yet a polished, end-to-end daily-driver experience across surfaces. 2. Voice UX is functional but not yet a polished, end-to-end daily-driver experience across surfaces.
3. Browser tools exist but lack task-level reliability primitives (checkpoints/retries/guardrails) for autonomous workflows. 3. Browser tools exist but lack task-level reliability primitives (checkpoints/retries/guardrails) for autonomous workflows.
4. Onboarding lacks a "first success" guided path that validates real integrations live during setup. 4. Onboarding lacks a "first success" guided path that validates real integrations live during setup.
5. Subagent sessions are now available (`subagent.*`) with idle TTL cleanup and transcript summary support, but still need budgeting/UI visibility for larger autonomous workflows.
## Product Goal ## Product Goal
+16 -13
View File
@@ -1,7 +1,7 @@
# Subagents Support Plan (Flynn) # Subagents Support Plan (Flynn)
Date: 2026-02-26 Date: 2026-02-26
Status: phase 1 implemented, phase 2 partially implemented Status: phases 1-3 implemented
Scope: add OpenClaw-style multi-turn subagent session support in Flynn without changing channel surface scope (Telegram-first) Scope: add OpenClaw-style multi-turn subagent session support in Flynn without changing channel surface scope (Telegram-first)
## Constraints ## Constraints
@@ -30,22 +30,25 @@ Scope: add OpenClaw-style multi-turn subagent session support in Flynn without c
- `max_active_sessions` - `max_active_sessions`
5. Added policy/profile support so `subagent.*` is controlled through `group:agents` and tool profiles. 5. Added policy/profile support so `subagent.*` is controlled through `group:agents` and tool profiles.
## Phase 2 (Next) ## Phase 2 (Implemented)
1. Add per-subagent TTL/idle eviction and auto-cleanup metrics. (implemented: TTL eviction) 1. Added idle TTL eviction plus audit lifecycle events for cleanup visibility.
2. Add optional transcript export/summarization (`subagent.summary`). (implemented) 2. Added transcript export/summarization via `subagent.summary`.
3. Add per-subagent tool-profile override (read-only by default for risky workloads). (pending) 3. Added per-subagent tool-profile override (`queue_mode`, `tool_profile` on spawn).
4. Add parent-child trace IDs in audit events for easier debugging. (pending) 4. Added parent-child trace IDs and subagent lifecycle/turn audit events.
## Phase 3 (Stretch) ## Phase 3 (Implemented)
1. Add queue semantics for child sessions (`followup` vs `interrupt` per subagent). 1. Added queue semantics per child session (`followup` FIFO, `interrupt` latest-wins).
2. Add explicit resource budgets (token/time) per child session. 2. Added explicit resource budgets (max turns, max total tokens, per-turn timeout).
3. Add UI affordances in gateway chat for subagent session inspection. 3. Added operator/UI affordances:
- `/subagents` command (list/summary/cancel/delete),
- gateway chat slash suggestion for `/subagents`.
## Acceptance Criteria (Phase 1) ## Final Acceptance Criteria
1. Parent agent can spawn and continue a child subagent across multiple turns. 1. Parent agent can spawn and continue child subagents across multiple turns.
2. Child session state is isolated and delete clears history. 2. Child session state is isolated and delete clears history.
3. Recursion tooling (`agent.delegate`, `council.run`, `subagent.*`) is removed from child registries. 3. Recursion tooling (`agent.delegate`, `council.run`, `subagent.*`) is removed from child registries.
4. Tests cover manager lifecycle, tool behavior, config parsing, and policy profile inclusion. 4. Child sessions support queue policy + budget guardrails + transcript inspection.
5. Tests cover manager lifecycle, tool behavior, config parsing, routing command wiring, and audit event logging.
+13 -11
View File
@@ -6800,21 +6800,23 @@
"status": "completed", "status": "completed",
"date": "2026-02-26", "date": "2026-02-26",
"updated": "2026-02-26", "updated": "2026-02-26",
"summary": "Implemented Phase 1 and partial Phase 2 subagent support: added a SubagentManager with multi-turn child sessions, idle TTL cleanup, new `subagent.*` tools (spawn/send/list/cancel/delete/summary), routing wiring, config guardrails, policy/profile integration, docs/diagram updates, and focused test coverage.", "summary": "Completed subagent phases 1-3: added queue semantics (`followup`/`interrupt`), turn/token/timeout budgets, per-subagent tool-profile overrides, parent-child trace IDs with lifecycle/turn audit events, `/subagents` runtime command surface, and updated docs/diagram coverage.",
"files_modified": [ "files_modified": [
"src/backends/native/subagents.ts", "src/backends/native/subagents.ts",
"src/backends/native/subagents.test.ts", "src/backends/native/subagents.test.ts",
"src/backends/native/index.ts", "src/audit/types.ts",
"src/backends/index.ts", "src/audit/logger.ts",
"src/audit/logger.test.ts",
"src/tools/builtin/subagents.ts", "src/tools/builtin/subagents.ts",
"src/tools/builtin/subagents.test.ts", "src/tools/builtin/subagents.test.ts",
"src/tools/builtin/index.ts", "src/commands/types.ts",
"src/tools/index.ts", "src/commands/builtin/index.ts",
"src/tools/policy.ts", "src/commands/builtin/index.test.ts",
"src/tools/policy.test.ts",
"src/config/schema.ts", "src/config/schema.ts",
"src/config/schema.test.ts", "src/config/schema.test.ts",
"src/daemon/routing.ts", "src/daemon/routing.ts",
"src/daemon/routing.test.ts",
"src/gateway/ui/pages/chat.js",
"config/default.yaml", "config/default.yaml",
"README.md", "README.md",
"docs/api/PROTOCOL.md", "docs/api/PROTOCOL.md",
@@ -6824,11 +6826,11 @@
"docs/plans/2026-02-26-personal-assistant-productization-plan.md", "docs/plans/2026-02-26-personal-assistant-productization-plan.md",
"docs/plans/state.json" "docs/plans/state.json"
], ],
"test_status": "pnpm test:run src/backends/native/subagents.test.ts src/tools/builtin/subagents.test.ts src/tools/policy.test.ts src/config/schema.test.ts src/daemon/routing.test.ts passing + pnpm typecheck" "test_status": "pnpm test:run src/backends/native/subagents.test.ts src/tools/builtin/subagents.test.ts src/commands/builtin/index.test.ts src/audit/logger.test.ts src/config/schema.test.ts src/daemon/routing.test.ts passing + pnpm typecheck"
} }
}, },
"overall_progress": { "overall_progress": {
"total_test_count": 2533, "total_test_count": 2534,
"all_tests_passing": true, "all_tests_passing": true,
"p0_completion": "3/3 (100%)", "p0_completion": "3/3 (100%)",
"p1_completion": "4/4 (100%)", "p1_completion": "4/4 (100%)",
@@ -6843,7 +6845,7 @@
"tier2_completion": "4/4 (100%) \u2014 inbound webhooks, vector memory search, Dockerfile, heartbeat monitor", "tier2_completion": "4/4 (100%) \u2014 inbound webhooks, vector memory search, Dockerfile, heartbeat monitor",
"tier3_completion": "5/5 (100%) \u2014 lane queue, credential redaction, web UI token dashboard, xAI (Grok) provider, Voyage AI embeddings", "tier3_completion": "5/5 (100%) \u2014 lane queue, credential redaction, web UI token dashboard, xAI (Grok) provider, Voyage AI embeddings",
"tier4_completion": "4/4 (100%) \u2014 gateway lock, shell completion, Tailscale Serve/Funnel, DM pairing codes", "tier4_completion": "4/4 (100%) \u2014 gateway lock, shell completion, Tailscale Serve/Funnel, DM pairing codes",
"feature_gap_scorecard": "rebaselined 2026-02-26 — channel breadth, setup wizard, baseline browser automation, and partial phase-2 subagent support (`subagent.*` + idle TTL cleanup + transcript summary) are implemented; remaining high-impact personal-assistant gaps center on shipped companion apps (desktop/mobile), voice UX polish, browser workflow reliability primitives, and first-success onboarding funnel optimization.", "feature_gap_scorecard": "rebaselined 2026-02-26 — channel breadth, setup wizard, baseline browser automation, and full subagent support (`subagent.*` + queue modes + budgets + trace/audit + `/subagents` inspection) are implemented; remaining high-impact personal-assistant gaps center on shipped companion apps (desktop/mobile), voice UX polish, browser workflow reliability primitives, and first-success onboarding funnel optimization.",
"operator_dx_milestone": "Phase 3 (Live Ops Dashboard): 2/2 plans complete \u2014 milestone done", "operator_dx_milestone": "Phase 3 (Live Ops Dashboard): 2/2 plans complete \u2014 milestone done",
"dashboard_observability": "completed \u2014 service health graphs + core service log viewer added to web UI via observability RPCs and bounded backend sampling", "dashboard_observability": "completed \u2014 service health graphs + core service log viewer added to web UI via observability RPCs and bounded backend sampling",
"gmail_auth_cli": "flynn gmail-auth command implemented with OAuth2 flow, doctor check, config routed to Telegram", "gmail_auth_cli": "flynn gmail-auth command implemented with OAuth2 flow, doctor check, config routed to Telegram",
@@ -6877,7 +6879,7 @@
"deeper_surfaces_phase4_rollout": "completed \u2014 phase 4 rollout and operator readiness plan documented: canary rollout plan by feature flag/surface, explicit rollback playbook, operator docs and architecture/protocol docs synchronized", "deeper_surfaces_phase4_rollout": "completed \u2014 phase 4 rollout and operator readiness plan documented: canary rollout plan by feature flag/surface, explicit rollback playbook, operator docs and architecture/protocol docs synchronized",
"post_phase_test_fixes": "completed \u2014 fixed 4 test failures introduced by phases 1-3: iOS/Android push listNodes (missing publishHeartbeat before platform-filtered query), server.test agent.send (run_state events now precede done; added sendAndWaitForDone helper), httpBody 413 (req.destroy() closed socket before response could be sent; replaced with Connection: close header on 413 responses)", "post_phase_test_fixes": "completed \u2014 fixed 4 test failures introduced by phases 1-3: iOS/Android push listNodes (missing publishHeartbeat before platform-filtered query), server.test agent.send (run_state events now precede done; added sendAndWaitForDone helper), httpBody 413 (req.destroy() closed socket before response could be sent; replaced with Connection: close header on 413 responses)",
"personal_assistant_productization_plan": "proposed \u2014 8-10 week phased roadmap defined (companion MVP surfaces, voice reliability hardening, browser workflow reliability layer, onboarding 2.0 first-success funnel) with measurable exit gates.", "personal_assistant_productization_plan": "proposed \u2014 8-10 week phased roadmap defined (companion MVP surfaces, voice reliability hardening, browser workflow reliability layer, onboarding 2.0 first-success funnel) with measurable exit gates.",
"subagents_support": "completed \u2014 phase-1 plus partial phase-2 subagent runtime support added with `subagent.spawn/send/list/cancel/delete/summary`, per-parent child-session orchestration, idle TTL cleanup (`agents.subagents.idle_ttl_ms`), config guardrails, and focused regression tests." "subagents_support": "completed \u2014 subagent phases 1-3 shipped with `subagent.spawn/send/list/cancel/delete/summary`, per-child queue mode (`followup|interrupt`), budgets (`max_turns`, `max_total_tokens`, `turn_timeout_ms`), tool-profile overrides, trace-linked audit events, `/subagents` inspection commands, and focused regression tests."
}, },
"soul_md_and_cron_create": { "soul_md_and_cron_create": {
"date": "2026-02-11", "date": "2026-02-11",
+26
View File
@@ -123,6 +123,26 @@ describe('AuditLogger', () => {
reason: 'no_match', reason: 'no_match',
candidate_count: 4, candidate_count: 4,
}); });
logger.subagentLifecycle({
parent_session_id: 'telegram:123',
subagent_id: 'planner',
trace_id: 'trace-planner',
action: 'spawn',
agent: 'research',
tier: 'complex',
queue_mode: 'followup',
tool_profile: 'minimal',
});
logger.subagentTurn({
parent_session_id: 'telegram:123',
subagent_id: 'planner',
trace_id: 'trace-planner',
action: 'complete',
queue_mode: 'followup',
duration_ms: 88,
input_chars: 42,
output_chars: 120,
});
await logger.close(); await logger.close();
await waitForFlush(); await waitForFlush();
@@ -135,6 +155,8 @@ describe('AuditLogger', () => {
expect(eventTypes).toContain('run.cancel'); expect(eventTypes).toContain('run.cancel');
expect(eventTypes).toContain('reaction.match'); expect(eventTypes).toContain('reaction.match');
expect(eventTypes).toContain('reaction.skip'); expect(eventTypes).toContain('reaction.skip');
expect(eventTypes).toContain('subagent.lifecycle');
expect(eventTypes).toContain('subagent.turn');
const runError = events.find((event) => ( const runError = events.find((event) => (
event.event_type === 'run.state' event.event_type === 'run.state'
@@ -145,6 +167,10 @@ describe('AuditLogger', () => {
const reactionSkip = events.find((event) => event.event_type === 'reaction.skip'); const reactionSkip = events.find((event) => event.event_type === 'reaction.skip');
expect(reactionSkip?.level).toBe('debug'); expect(reactionSkip?.level).toBe('debug');
expect(reactionSkip?.event.reason).toBe('no_match'); expect(reactionSkip?.event.reason).toBe('no_match');
const subagentLifecycle = events.find((event) => event.event_type === 'subagent.lifecycle');
expect(subagentLifecycle?.level).toBe('info');
expect(subagentLifecycle?.event.action).toBe('spawn');
} finally { } finally {
if (previousHome === undefined) { if (previousHome === undefined) {
delete process.env.HOME; delete process.env.HOME;
+13
View File
@@ -26,6 +26,8 @@ import type {
RunCancelEvent, RunCancelEvent,
ReactionMatchEvent, ReactionMatchEvent,
ReactionSkipEvent, ReactionSkipEvent,
SubagentLifecycleEvent,
SubagentTurnEvent,
BackendRouteEvent, BackendRouteEvent,
BackendSuccessEvent, BackendSuccessEvent,
BackendFallbackEvent, BackendFallbackEvent,
@@ -237,6 +239,17 @@ export class AuditLogger {
this.write({ level: 'debug', event_type: 'reaction.skip', event: event as unknown as Record<string, unknown> }); this.write({ level: 'debug', event_type: 'reaction.skip', event: event as unknown as Record<string, unknown> });
} }
subagentLifecycle(event: SubagentLifecycleEvent): void {
if (!this.shouldLog('sessions', 'info')) {return;}
this.write({ level: 'info', event_type: 'subagent.lifecycle', event: event as unknown as Record<string, unknown> });
}
subagentTurn(event: SubagentTurnEvent): void {
const level = event.action === 'error' ? 'warn' : 'debug';
if (!this.shouldLog('sessions', level)) {return;}
this.write({ level, event_type: 'subagent.turn', event: event as unknown as Record<string, unknown> });
}
backendRoute(event: BackendRouteEvent): void { backendRoute(event: BackendRouteEvent): void {
if (!this.shouldLog('sessions', 'info')) {return;} if (!this.shouldLog('sessions', 'info')) {return;}
this.write({ level: 'info', event_type: 'backend.route', event: event as unknown as Record<string, unknown> }); this.write({ level: 'info', event_type: 'backend.route', event: event as unknown as Record<string, unknown> });
+29
View File
@@ -14,6 +14,7 @@ export type AuditEventType =
| 'queue.preempt' | 'queue.preempt'
| 'run.state' | 'run.cancel' | 'run.state' | 'run.cancel'
| 'reaction.match' | 'reaction.skip' | 'reaction.match' | 'reaction.skip'
| 'subagent.lifecycle' | 'subagent.turn'
| 'backend.route' | 'backend.success' | 'backend.fallback' | 'backend.route' | 'backend.success' | 'backend.fallback'
// Automation - Cron // Automation - Cron
| 'cron.trigger' | 'cron.sent' | 'cron.add' | 'cron.remove' | 'cron.trigger' | 'cron.sent' | 'cron.add' | 'cron.remove'
@@ -303,6 +304,34 @@ export interface BackendFallbackEvent {
duration_ms?: number; duration_ms?: number;
} }
export interface SubagentLifecycleEvent {
parent_session_id: string;
subagent_id: string;
trace_id: string;
action: 'spawn' | 'cancel' | 'delete' | 'ttl_evict' | 'summary';
agent?: string;
tier?: string;
queue_mode?: 'followup' | 'interrupt';
tool_profile?: string;
reason?: string;
}
export interface SubagentTurnEvent {
parent_session_id: string;
subagent_id: string;
trace_id: string;
action: 'queued' | 'superseded' | 'start' | 'complete' | 'error';
request_id?: string;
queue_mode?: 'followup' | 'interrupt';
pending_count?: number;
duration_ms?: number;
error?: string;
input_chars?: number;
output_chars?: number;
turn_count?: number;
total_tokens?: number;
}
export interface CronTriggerEvent { export interface CronTriggerEvent {
job_name: string; job_name: string;
schedule: string; schedule: string;
+87
View File
@@ -8,6 +8,8 @@ const mocks = vi.hoisted(() => {
processCalls, processCalls,
cancellable: true, cancellable: true,
cancelCalls: 0, cancelCalls: 0,
usageInput: 0,
usageOutput: 0,
}; };
}); });
@@ -29,6 +31,8 @@ vi.mock('./orchestrator.js', () => {
async process(message: string): Promise<string> { async process(message: string): Promise<string> {
mocks.processCalls.push(message); mocks.processCalls.push(message);
const output = `subagent:${message}`; const output = `subagent:${message}`;
mocks.usageInput += Math.ceil(message.length / 4);
mocks.usageOutput += Math.ceil(output.length / 4);
this.session.addMessage({ role: 'user', content: message }); this.session.addMessage({ role: 'user', content: message });
this.session.addMessage({ role: 'assistant', content: output }); this.session.addMessage({ role: 'assistant', content: output });
return output; return output;
@@ -41,6 +45,23 @@ vi.mock('./orchestrator.js', () => {
cancel(): void { cancel(): void {
mocks.cancelCalls += 1; mocks.cancelCalls += 1;
} }
getUsage(): {
primary: { inputTokens: number; outputTokens: number; calls: number };
delegation: Record<string, { inputTokens: number; outputTokens: number; calls: number }>;
total: { inputTokens: number; outputTokens: number; calls: number; estimatedCost: number };
} {
return {
primary: { inputTokens: mocks.usageInput, outputTokens: mocks.usageOutput, calls: mocks.processCalls.length },
delegation: {},
total: {
inputTokens: mocks.usageInput,
outputTokens: mocks.usageOutput,
calls: mocks.processCalls.length,
estimatedCost: 0,
},
};
}
} }
return { AgentOrchestrator }; return { AgentOrchestrator };
@@ -122,6 +143,8 @@ describe('SubagentManager', () => {
mocks.processCalls.length = 0; mocks.processCalls.length = 0;
mocks.cancellable = true; mocks.cancellable = true;
mocks.cancelCalls = 0; mocks.cancelCalls = 0;
mocks.usageInput = 0;
mocks.usageOutput = 0;
}); });
it('spawns, sends, lists, cancels, and deletes subagent sessions', async () => { it('spawns, sends, lists, cancels, and deletes subagent sessions', async () => {
@@ -162,8 +185,13 @@ describe('SubagentManager', () => {
}, },
maxDelegationDepth: 3, maxDelegationDepth: 3,
defaultPrimaryTier: 'default', defaultPrimaryTier: 'default',
defaultQueueMode: 'followup',
defaultToolProfile: 'minimal',
maxIterations: 12, maxIterations: 12,
maxActiveSessions: 2, maxActiveSessions: 2,
maxTurns: 40,
maxTotalTokens: 200000,
turnTimeoutMs: 120000,
idleTtlMs: 60000, idleTtlMs: 60000,
}); });
@@ -171,6 +199,7 @@ describe('SubagentManager', () => {
expect(spawned.id).toBe('planner'); expect(spawned.id).toBe('planner');
expect(spawned.agent).toBe('research'); expect(spawned.agent).toBe('research');
expect(spawned.tier).toBe('complex'); expect(spawned.tier).toBe('complex');
expect(spawned.queueMode).toBe('followup');
// verify blocked orchestration tools are not passed to child subagents // verify blocked orchestration tools are not passed to child subagents
const ctorConfig = mocks.ctorConfigs[0] as { toolRegistry: ToolRegistry }; const ctorConfig = mocks.ctorConfigs[0] as { toolRegistry: ToolRegistry };
@@ -220,7 +249,12 @@ describe('SubagentManager', () => {
}, },
maxDelegationDepth: 3, maxDelegationDepth: 3,
defaultPrimaryTier: 'default', defaultPrimaryTier: 'default',
defaultQueueMode: 'followup',
defaultToolProfile: 'minimal',
maxActiveSessions: 1, maxActiveSessions: 1,
maxTurns: 40,
maxTotalTokens: 200000,
turnTimeoutMs: 120000,
idleTtlMs: 60000, idleTtlMs: 60000,
}); });
@@ -249,7 +283,12 @@ describe('SubagentManager', () => {
}, },
maxDelegationDepth: 3, maxDelegationDepth: 3,
defaultPrimaryTier: 'default', defaultPrimaryTier: 'default',
defaultQueueMode: 'followup',
defaultToolProfile: 'minimal',
maxActiveSessions: 3, maxActiveSessions: 3,
maxTurns: 40,
maxTotalTokens: 200000,
turnTimeoutMs: 120000,
idleTtlMs: 60000, idleTtlMs: 60000,
}); });
@@ -274,7 +313,12 @@ describe('SubagentManager', () => {
}, },
maxDelegationDepth: 3, maxDelegationDepth: 3,
defaultPrimaryTier: 'default', defaultPrimaryTier: 'default',
defaultQueueMode: 'followup',
defaultToolProfile: 'minimal',
maxActiveSessions: 3, maxActiveSessions: 3,
maxTurns: 40,
maxTotalTokens: 200000,
turnTimeoutMs: 120000,
idleTtlMs: 1000, idleTtlMs: 1000,
}); });
@@ -284,4 +328,47 @@ describe('SubagentManager', () => {
expect(removed).toEqual(['ttl-one']); expect(removed).toEqual(['ttl-one']);
expect(manager.list()).toEqual([]); expect(manager.list()).toEqual([]);
}); });
it('enforces per-session turn/token budgets and interrupt latest-wins behavior', async () => {
const sessionManager = createSessionManagerMock();
const manager = new SubagentManager({
parentSessionId: 'telegram:eve',
modelRouter: {} as never,
sessionManager: sessionManager.api as never,
toolRegistry: new ToolRegistry(),
toolExecutor: {} as never,
agentConfigRegistry: createAgentRegistryMock() as never,
delegation: {
compaction: 'fast',
memory_extraction: 'fast',
classification: 'fast',
tool_summarisation: 'fast',
complex_reasoning: 'complex',
},
maxDelegationDepth: 3,
defaultPrimaryTier: 'default',
defaultQueueMode: 'interrupt',
defaultToolProfile: 'minimal',
maxActiveSessions: 3,
maxTurns: 2,
maxTotalTokens: 200000,
turnTimeoutMs: 120000,
idleTtlMs: 60000,
});
manager.spawn({ agent: 'helper', subagentId: 'interrupt-one' });
const p1 = manager.send('interrupt-one', 'first request');
const p2 = manager.send('interrupt-one', 'second request');
const [r1, r2] = await Promise.allSettled([p1, p2]);
expect(mocks.cancelCalls).toBeGreaterThanOrEqual(1);
if (r1.status === 'fulfilled') {
expect(r1.value.content).toContain('subagent:');
}
expect(r2.status).toBe('fulfilled');
if (r2.status === 'fulfilled') {
expect(r2.value.content).toBe('subagent:second request');
}
await expect(manager.send('interrupt-one', 'third request')).rejects.toThrow('max turns');
});
}); });
+278 -21
View File
@@ -1,15 +1,20 @@
import { randomUUID } from 'node:crypto'; import { randomUUID } from 'node:crypto';
import type { AgentConfigRegistry } from '../../agents/registry.js'; import type { AgentConfigRegistry } from '../../agents/registry.js';
import type { ToolProfile } from '../../config/schema.js';
import type { Message } from '../../models/types.js'; import type { Message } from '../../models/types.js';
import type { ToolPolicyContext } from '../../tools/policy.js'; import type { ToolPolicyContext } from '../../tools/policy.js';
import { PROFILE_TOOLS } from '../../tools/policy.js';
import type { ModelRouter, ModelTier } from '../../models/router.js'; import type { ModelRouter, ModelTier } from '../../models/router.js';
import type { SessionManager } from '../../session/manager.js'; import type { SessionManager } from '../../session/manager.js';
import type { ToolRegistry } from '../../tools/registry.js'; import type { ToolRegistry } from '../../tools/registry.js';
import type { ToolExecutor } from '../../tools/executor.js'; import type { ToolExecutor } from '../../tools/executor.js';
import { auditLogger } from '../../audit/index.js';
import { AgentOrchestrator, type DelegationConfig } from './orchestrator.js'; import { AgentOrchestrator, type DelegationConfig } from './orchestrator.js';
const SUBAGENT_FRONTEND = 'subagent'; const SUBAGENT_FRONTEND = 'subagent';
type SubagentQueueMode = 'followup' | 'interrupt';
const BLOCKED_SUBAGENT_TOOL_NAMES = [ const BLOCKED_SUBAGENT_TOOL_NAMES = [
'agent.delegate', 'agent.delegate',
'council.run', 'council.run',
@@ -21,6 +26,13 @@ const BLOCKED_SUBAGENT_TOOL_NAMES = [
'subagent.summary', 'subagent.summary',
]; ];
interface QueuedTurn {
requestId: string;
message: string;
resolve: (result: SubagentSendResult) => void;
reject: (error: Error) => void;
}
export interface SubagentManagerConfig { export interface SubagentManagerConfig {
parentSessionId: string; parentSessionId: string;
modelRouter: ModelRouter; modelRouter: ModelRouter;
@@ -31,8 +43,13 @@ export interface SubagentManagerConfig {
delegation: DelegationConfig; delegation: DelegationConfig;
maxDelegationDepth: number; maxDelegationDepth: number;
defaultPrimaryTier: ModelTier; defaultPrimaryTier: ModelTier;
defaultQueueMode: SubagentQueueMode;
defaultToolProfile: ToolProfile;
maxIterations?: number; maxIterations?: number;
maxActiveSessions: number; maxActiveSessions: number;
maxTurns: number;
maxTotalTokens: number;
turnTimeoutMs: number;
idleTtlMs: number; idleTtlMs: number;
toolPolicyContext?: ToolPolicyContext; toolPolicyContext?: ToolPolicyContext;
} }
@@ -42,16 +59,24 @@ export interface SpawnSubagentRequest {
subagentId?: string; subagentId?: string;
tier?: ModelTier; tier?: ModelTier;
systemPrompt?: string; systemPrompt?: string;
queueMode?: SubagentQueueMode;
toolProfile?: ToolProfile;
} }
interface ManagedSubagent { interface ManagedSubagent {
id: string; id: string;
agent: string; agent: string;
tier: ModelTier; tier: ModelTier;
queueMode: SubagentQueueMode;
toolProfile: ToolProfile;
traceId: string;
sessionUserId: string; sessionUserId: string;
createdAt: number; createdAt: number;
updatedAt: number; updatedAt: number;
busy: boolean; busy: boolean;
processing: boolean;
pending: QueuedTurn[];
completedTurns: number;
orchestrator: AgentOrchestrator; orchestrator: AgentOrchestrator;
} }
@@ -59,7 +84,12 @@ export interface SubagentSessionSummary {
id: string; id: string;
agent: string; agent: string;
tier: ModelTier; tier: ModelTier;
queueMode: SubagentQueueMode;
toolProfile: ToolProfile;
traceId: string;
messageCount: number; messageCount: number;
completedTurns: number;
pendingCount: number;
createdAt: number; createdAt: number;
updatedAt: number; updatedAt: number;
busy: boolean; busy: boolean;
@@ -101,13 +131,13 @@ export class SubagentManager {
if (!agentConfig) { if (!agentConfig) {
const available = this.config.agentConfigRegistry.list().map((entry) => entry.name); const available = this.config.agentConfigRegistry.list().map((entry) => entry.name);
throw new Error( throw new Error(
`Agent \"${agentName}\" not found. Available agents: ${available.length > 0 ? available.join(', ') : 'none'}`, `Agent "${agentName}" not found. Available agents: ${available.length > 0 ? available.join(', ') : 'none'}`,
); );
} }
const id = this.resolveSubagentId(request.subagentId); const id = this.resolveSubagentId(request.subagentId);
if (this.sessions.has(id)) { if (this.sessions.has(id)) {
throw new Error(`Subagent session \"${id}\" already exists.`); throw new Error(`Subagent session "${id}" already exists.`);
} }
if (this.sessions.size >= this.config.maxActiveSessions) { if (this.sessions.size >= this.config.maxActiveSessions) {
throw new Error( throw new Error(
@@ -118,15 +148,16 @@ export class SubagentManager {
const tier = request.tier ?? agentConfig.modelTier ?? this.config.defaultPrimaryTier; const tier = request.tier ?? agentConfig.modelTier ?? this.config.defaultPrimaryTier;
const systemPrompt = request.systemPrompt const systemPrompt = request.systemPrompt
?? agentConfig.systemPrompt ?? agentConfig.systemPrompt
?? `You are subagent \"${agentName}\". Complete assigned tasks clearly and concisely.`; ?? `You are subagent "${agentName}". Complete assigned tasks clearly and concisely.`;
const queueMode = request.queueMode ?? this.config.defaultQueueMode;
const toolProfile = request.toolProfile ?? agentConfig.toolProfile ?? this.config.defaultToolProfile;
const now = Date.now(); const now = Date.now();
const traceId = `subagent:${this.config.parentSessionId}:${id}:${randomUUID().slice(0, 8)}`;
const sessionUserId = `${this.config.parentSessionId}:${id}`; const sessionUserId = `${this.config.parentSessionId}:${id}`;
const session = this.config.sessionManager.getSession(SUBAGENT_FRONTEND, sessionUserId); const session = this.config.sessionManager.getSession(SUBAGENT_FRONTEND, sessionUserId);
const subagentToolRegistry = this.config.toolRegistry.clone(); const subagentToolRegistry = this.buildSubagentToolRegistry(toolProfile);
for (const toolName of BLOCKED_SUBAGENT_TOOL_NAMES) {
subagentToolRegistry.unregister(toolName);
}
const policyContext: ToolPolicyContext | undefined = this.config.toolPolicyContext const policyContext: ToolPolicyContext | undefined = this.config.toolPolicyContext
? { ? {
@@ -154,13 +185,30 @@ export class SubagentManager {
id, id,
agent: agentName, agent: agentName,
tier, tier,
queueMode,
toolProfile,
traceId,
sessionUserId, sessionUserId,
createdAt: now, createdAt: now,
updatedAt: now, updatedAt: now,
busy: false, busy: false,
processing: false,
pending: [],
completedTurns: 0,
orchestrator: subagent, orchestrator: subagent,
}); });
auditLogger?.subagentLifecycle?.({
parent_session_id: this.config.parentSessionId,
subagent_id: id,
trace_id: traceId,
action: 'spawn',
agent: agentName,
tier,
queue_mode: queueMode,
tool_profile: toolProfile,
});
return this.getSummaryById(id); return this.getSummaryById(id);
} }
@@ -173,18 +221,53 @@ export class SubagentManager {
throw new Error('message is required'); throw new Error('message is required');
} }
subagent.busy = true; this.assertBudgets(subagent);
subagent.updatedAt = Date.now();
try { return new Promise<SubagentSendResult>((resolve, reject) => {
const content = await subagent.orchestrator.process(trimmed); const requestId = `rq-${randomUUID().slice(0, 8)}`;
subagent.updatedAt = Date.now(); const queued: QueuedTurn = { requestId, message: trimmed, resolve, reject };
return {
content, if (subagent.queueMode === 'interrupt') {
session: this.getSummary(subagent), while (subagent.pending.length > 0) {
}; const dropped = subagent.pending.shift();
} finally { if (!dropped) {
subagent.busy = false; continue;
} }
auditLogger?.subagentTurn?.({
parent_session_id: this.config.parentSessionId,
subagent_id: subagent.id,
trace_id: subagent.traceId,
action: 'superseded',
request_id: dropped.requestId,
queue_mode: subagent.queueMode,
});
dropped.reject(new Error(`Superseded by newer subagent request (${requestId}).`));
}
if (subagent.busy && subagent.orchestrator.isCancellable()) {
subagent.orchestrator.cancel();
auditLogger?.subagentLifecycle?.({
parent_session_id: this.config.parentSessionId,
subagent_id: subagent.id,
trace_id: subagent.traceId,
action: 'cancel',
queue_mode: subagent.queueMode,
reason: 'interrupt_mode_latest_wins',
});
}
}
subagent.pending.push(queued);
auditLogger?.subagentTurn?.({
parent_session_id: this.config.parentSessionId,
subagent_id: subagent.id,
trace_id: subagent.traceId,
action: 'queued',
request_id: requestId,
queue_mode: subagent.queueMode,
pending_count: subagent.pending.length,
});
this.processQueue(subagent);
});
} }
cancel(subagentId: string): boolean { cancel(subagentId: string): boolean {
@@ -196,6 +279,14 @@ export class SubagentManager {
} }
subagent.orchestrator.cancel(); subagent.orchestrator.cancel();
subagent.updatedAt = Date.now(); subagent.updatedAt = Date.now();
auditLogger?.subagentLifecycle?.({
parent_session_id: this.config.parentSessionId,
subagent_id: subagent.id,
trace_id: subagent.traceId,
action: 'cancel',
queue_mode: subagent.queueMode,
reason: 'explicit_cancel',
});
return true; return true;
} }
@@ -209,10 +300,27 @@ export class SubagentManager {
subagent.orchestrator.cancel(); subagent.orchestrator.cancel();
} }
while (subagent.pending.length > 0) {
const queued = subagent.pending.shift();
if (!queued) {
continue;
}
queued.reject(new Error(`Subagent session "${subagent.id}" was deleted.`));
}
const session = this.config.sessionManager.getSession(SUBAGENT_FRONTEND, subagent.sessionUserId); const session = this.config.sessionManager.getSession(SUBAGENT_FRONTEND, subagent.sessionUserId);
session.clear(); session.clear();
this.config.sessionManager.closeSession(SUBAGENT_FRONTEND, subagent.sessionUserId); this.config.sessionManager.closeSession(SUBAGENT_FRONTEND, subagent.sessionUserId);
this.sessions.delete(subagentId); this.sessions.delete(subagentId);
auditLogger?.subagentLifecycle?.({
parent_session_id: this.config.parentSessionId,
subagent_id: subagent.id,
trace_id: subagent.traceId,
action: 'delete',
queue_mode: subagent.queueMode,
reason: 'explicit_delete',
});
return true; return true;
} }
@@ -235,6 +343,15 @@ export class SubagentManager {
: history.length; : history.length;
const tail = history.slice(Math.max(0, history.length - max)); const tail = history.slice(Math.max(0, history.length - max));
const messages = tail.map((entry) => this.toTranscriptEntry(entry)); const messages = tail.map((entry) => this.toTranscriptEntry(entry));
auditLogger?.subagentLifecycle?.({
parent_session_id: this.config.parentSessionId,
subagent_id: subagent.id,
trace_id: subagent.traceId,
action: 'summary',
queue_mode: subagent.queueMode,
});
return { return {
session: this.getSummary(subagent), session: this.getSummary(subagent),
messages, messages,
@@ -247,7 +364,7 @@ export class SubagentManager {
} }
const removed: string[] = []; const removed: string[] = [];
for (const [id, session] of this.sessions.entries()) { for (const [id, session] of this.sessions.entries()) {
if (session.busy) { if (session.busy || session.pending.length > 0) {
continue; continue;
} }
if ((nowMs - session.updatedAt) <= this.config.idleTtlMs) { if ((nowMs - session.updatedAt) <= this.config.idleTtlMs) {
@@ -255,6 +372,14 @@ export class SubagentManager {
} }
this.delete(id); this.delete(id);
removed.push(id); removed.push(id);
auditLogger?.subagentLifecycle?.({
parent_session_id: this.config.parentSessionId,
subagent_id: session.id,
trace_id: session.traceId,
action: 'ttl_evict',
queue_mode: session.queueMode,
reason: 'idle_ttl_elapsed',
});
} }
return removed; return removed;
} }
@@ -271,7 +396,7 @@ export class SubagentManager {
const normalized = id.trim(); const normalized = id.trim();
const subagent = this.sessions.get(normalized); const subagent = this.sessions.get(normalized);
if (!subagent) { if (!subagent) {
throw new Error(`Subagent session \"${normalized}\" not found.`); throw new Error(`Subagent session "${normalized}" not found.`);
} }
return subagent; return subagent;
} }
@@ -287,7 +412,12 @@ export class SubagentManager {
id: subagent.id, id: subagent.id,
agent: subagent.agent, agent: subagent.agent,
tier: subagent.tier, tier: subagent.tier,
queueMode: subagent.queueMode,
toolProfile: subagent.toolProfile,
traceId: subagent.traceId,
messageCount: session.getHistory().length, messageCount: session.getHistory().length,
completedTurns: subagent.completedTurns,
pendingCount: subagent.pending.length,
createdAt: subagent.createdAt, createdAt: subagent.createdAt,
updatedAt: subagent.updatedAt, updatedAt: subagent.updatedAt,
busy: subagent.busy, busy: subagent.busy,
@@ -301,4 +431,131 @@ export class SubagentManager {
timestamp: entry.timestamp, timestamp: entry.timestamp,
}; };
} }
private async processQueue(subagent: ManagedSubagent): Promise<void> {
if (subagent.processing) {
return;
}
subagent.processing = true;
try {
while (subagent.pending.length > 0) {
const next = subagent.pending.shift();
if (!next) {
continue;
}
const startedAt = Date.now();
subagent.busy = true;
subagent.updatedAt = startedAt;
try {
this.assertBudgets(subagent);
auditLogger?.subagentTurn?.({
parent_session_id: this.config.parentSessionId,
subagent_id: subagent.id,
trace_id: subagent.traceId,
action: 'start',
request_id: next.requestId,
queue_mode: subagent.queueMode,
input_chars: next.message.length,
pending_count: subagent.pending.length,
});
const content = await this.runTurnWithTimeout(subagent, next.message);
subagent.completedTurns += 1;
subagent.updatedAt = Date.now();
const totalTokens = this.getTotalTokens(subagent);
auditLogger?.subagentTurn?.({
parent_session_id: this.config.parentSessionId,
subagent_id: subagent.id,
trace_id: subagent.traceId,
action: 'complete',
request_id: next.requestId,
queue_mode: subagent.queueMode,
duration_ms: subagent.updatedAt - startedAt,
output_chars: content.length,
turn_count: subagent.completedTurns,
total_tokens: totalTokens,
pending_count: subagent.pending.length,
});
next.resolve({
content,
session: this.getSummary(subagent),
});
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
subagent.updatedAt = Date.now();
auditLogger?.subagentTurn?.({
parent_session_id: this.config.parentSessionId,
subagent_id: subagent.id,
trace_id: subagent.traceId,
action: 'error',
request_id: next.requestId,
queue_mode: subagent.queueMode,
duration_ms: subagent.updatedAt - startedAt,
error: message,
pending_count: subagent.pending.length,
});
next.reject(error instanceof Error ? error : new Error(message));
} finally {
subagent.busy = false;
}
}
} finally {
subagent.processing = false;
}
}
private async runTurnWithTimeout(subagent: ManagedSubagent, message: string): Promise<string> {
return await new Promise<string>((resolve, reject) => {
const timer = setTimeout(() => {
if (subagent.orchestrator.isCancellable()) {
subagent.orchestrator.cancel();
}
reject(new Error(`Subagent turn timed out after ${this.config.turnTimeoutMs}ms`));
}, this.config.turnTimeoutMs);
subagent.orchestrator.process(message)
.then((result) => {
clearTimeout(timer);
resolve(result);
})
.catch((error) => {
clearTimeout(timer);
reject(error);
});
});
}
private assertBudgets(subagent: ManagedSubagent): void {
if (subagent.completedTurns >= this.config.maxTurns) {
throw new Error(`Subagent session "${subagent.id}" reached max turns (${this.config.maxTurns}).`);
}
if (this.getTotalTokens(subagent) >= this.config.maxTotalTokens) {
throw new Error(`Subagent session "${subagent.id}" reached max total tokens (${this.config.maxTotalTokens}).`);
}
}
private getTotalTokens(subagent: ManagedSubagent): number {
const usage = subagent.orchestrator.getUsage();
return usage.total.inputTokens + usage.total.outputTokens;
}
private buildSubagentToolRegistry(profile: ToolProfile): ToolRegistry {
const subagentToolRegistry = this.config.toolRegistry.clone();
if (profile !== 'full') {
const allowed = PROFILE_TOOLS[profile];
for (const tool of subagentToolRegistry.list()) {
if (!allowed.has(tool.name)) {
subagentToolRegistry.unregister(tool.name);
}
}
}
for (const toolName of BLOCKED_SUBAGENT_TOOL_NAMES) {
subagentToolRegistry.unregister(toolName);
}
return subagentToolRegistry;
}
} }
+29 -1
View File
@@ -1,6 +1,6 @@
import { describe, it, expect, vi } from 'vitest'; import { describe, it, expect, vi } from 'vitest';
import { createApproveCommand, createApprovalsCommand, createBackendCommand, createContextCommand, createCouncilCommand, createDenyCommand, createElevateCommand, createModelCommand, createQueueCommand, createResearchCommand, createSkillCommand, createStopCommand, createToolsCommand, createTransferCommand } from './index.js'; import { createApproveCommand, createApprovalsCommand, createBackendCommand, createContextCommand, createCouncilCommand, createDenyCommand, createElevateCommand, createModelCommand, createQueueCommand, createResearchCommand, createSkillCommand, createStopCommand, createSubagentsCommand, createToolsCommand, createTransferCommand } from './index.js';
describe('builtin /model command', () => { describe('builtin /model command', () => {
it('passes through the full argument string', async () => { it('passes through the full argument string', async () => {
@@ -94,6 +94,34 @@ describe('builtin /council command', () => {
}); });
}); });
describe('builtin /subagents command', () => {
it('passes through raw subcommands', async () => {
const cmd = createSubagentsCommand();
const subagentsCommand = vi.fn(() => 'subagents listed');
const result = await cmd.execute(['summary', 'planner', '10'], {
channel: 'test',
senderId: 'user',
sessionId: 's1',
rawInput: '/subagents summary planner 10',
services: { subagentsCommand },
});
expect(subagentsCommand).toHaveBeenCalledWith('summary planner 10');
expect(result).toEqual({ handled: true, text: 'subagents listed' });
});
it('returns not-available when service is missing', async () => {
const cmd = createSubagentsCommand();
const result = await cmd.execute([], {
channel: 'test',
senderId: 'user',
sessionId: 's1',
rawInput: '/subagents',
services: {},
});
expect(result).toEqual({ handled: true, text: 'Subagents command is not available in this session.' });
});
});
describe('builtin /elevate command', () => { describe('builtin /elevate command', () => {
it('passes through the full argument string', async () => { it('passes through the full argument string', async () => {
const cmd = createElevateCommand(); const cmd = createElevateCommand();
+17
View File
@@ -274,6 +274,22 @@ export function createCouncilCommand(): CommandDefinition {
}; };
} }
export function createSubagentsCommand(): CommandDefinition {
return {
name: 'subagents',
description: 'Inspect subagent sessions (list/summary/cancel/delete)',
execute: async (args, ctx) => {
if (!ctx.services?.subagentsCommand) {
return notAvailable('Subagents command');
}
return {
handled: true,
text: await ctx.services.subagentsCommand(args.join(' ').trim()),
};
},
};
}
export function createTransferCommand(): CommandDefinition { export function createTransferCommand(): CommandDefinition {
return { return {
name: 'transfer', name: 'transfer',
@@ -381,6 +397,7 @@ export function registerBuiltinCommands(registry: CommandRegistry): void {
registry.register(createContextCommand()); registry.register(createContextCommand());
registry.register(createResearchCommand()); registry.register(createResearchCommand());
registry.register(createCouncilCommand()); registry.register(createCouncilCommand());
registry.register(createSubagentsCommand());
registry.register(createModelCommand()); registry.register(createModelCommand());
registry.register(createCompactCommand()); registry.register(createCompactCommand());
registry.register(createResetCommand()); registry.register(createResetCommand());
+1
View File
@@ -29,6 +29,7 @@ export interface CommandServices {
reset?: () => Promise<string> | string; reset?: () => Promise<string> | string;
delegateAgent?: (agentName: string, task: string) => Promise<string> | string; delegateAgent?: (agentName: string, task: string) => Promise<string> | string;
runCouncil?: (task: string) => Promise<string> | string; runCouncil?: (task: string) => Promise<string> | string;
subagentsCommand?: (input: string) => Promise<string> | string;
getElevation?: () => Promise<string> | string; getElevation?: () => Promise<string> | string;
setElevation?: (input: string) => Promise<string> | string; setElevation?: (input: string) => Promise<string> | string;
+26
View File
@@ -1701,6 +1701,11 @@ describe('configSchema — agents truthfulness/autonomy', () => {
expect(result.agents.subagents.enabled).toBe(true); expect(result.agents.subagents.enabled).toBe(true);
expect(result.agents.subagents.max_active_sessions).toBe(6); expect(result.agents.subagents.max_active_sessions).toBe(6);
expect(result.agents.subagents.idle_ttl_ms).toBe(3600000); expect(result.agents.subagents.idle_ttl_ms).toBe(3600000);
expect(result.agents.subagents.queue_mode).toBe('followup');
expect(result.agents.subagents.default_tool_profile).toBe('minimal');
expect(result.agents.subagents.max_turns).toBe(40);
expect(result.agents.subagents.max_total_tokens).toBe(200000);
expect(result.agents.subagents.turn_timeout_ms).toBe(120000);
expect(result.agents.immutable_denylist).toEqual( expect(result.agents.immutable_denylist).toEqual(
expect.arrayContaining([ expect.arrayContaining([
expect.objectContaining({ tool: 'shell.exec', args_pattern: 'git push origin main' }), expect.objectContaining({ tool: 'shell.exec', args_pattern: 'git push origin main' }),
@@ -1720,6 +1725,11 @@ describe('configSchema — agents truthfulness/autonomy', () => {
enabled: false, enabled: false,
max_active_sessions: 3, max_active_sessions: 3,
idle_ttl_ms: 120000, idle_ttl_ms: 120000,
queue_mode: 'interrupt',
default_tool_profile: 'messaging',
max_turns: 12,
max_total_tokens: 50000,
turn_timeout_ms: 90000,
}, },
immutable_denylist: [ immutable_denylist: [
{ tool: 'shell.exec', args_pattern: 'rm -rf /', reason: 'too destructive' }, { tool: 'shell.exec', args_pattern: 'rm -rf /', reason: 'too destructive' },
@@ -1733,6 +1743,11 @@ describe('configSchema — agents truthfulness/autonomy', () => {
expect(result.agents.subagents.enabled).toBe(false); expect(result.agents.subagents.enabled).toBe(false);
expect(result.agents.subagents.max_active_sessions).toBe(3); expect(result.agents.subagents.max_active_sessions).toBe(3);
expect(result.agents.subagents.idle_ttl_ms).toBe(120000); expect(result.agents.subagents.idle_ttl_ms).toBe(120000);
expect(result.agents.subagents.queue_mode).toBe('interrupt');
expect(result.agents.subagents.default_tool_profile).toBe('messaging');
expect(result.agents.subagents.max_turns).toBe(12);
expect(result.agents.subagents.max_total_tokens).toBe(50000);
expect(result.agents.subagents.turn_timeout_ms).toBe(90000);
expect(result.agents.immutable_denylist).toEqual([ expect(result.agents.immutable_denylist).toEqual([
{ tool: 'shell.exec', args_pattern: 'rm -rf /', reason: 'too destructive' }, { tool: 'shell.exec', args_pattern: 'rm -rf /', reason: 'too destructive' },
]); ]);
@@ -1760,6 +1775,17 @@ describe('configSchema — agents truthfulness/autonomy', () => {
})).toThrow(); })).toThrow();
}); });
it('rejects invalid subagent queue mode', () => {
expect(() => configSchema.parse({
...minimalConfig,
agents: {
subagents: {
queue_mode: 'latest',
},
},
})).toThrow();
});
it('rejects invalid truthfulness_mode', () => { it('rejects invalid truthfulness_mode', () => {
expect(() => configSchema.parse({ expect(() => configSchema.parse({
...minimalConfig, ...minimalConfig,
+5
View File
@@ -539,6 +539,11 @@ const agentsSchema = z.object({
enabled: z.boolean().default(true), enabled: z.boolean().default(true),
max_active_sessions: z.number().min(1).max(32).default(6), max_active_sessions: z.number().min(1).max(32).default(6),
idle_ttl_ms: z.number().min(60_000).max(86_400_000).default(3_600_000), idle_ttl_ms: z.number().min(60_000).max(86_400_000).default(3_600_000),
queue_mode: z.enum(['followup', 'interrupt']).default('followup'),
default_tool_profile: z.enum(['minimal', 'messaging', 'coding', 'full']).default('minimal'),
max_turns: z.number().min(1).max(500).default(40),
max_total_tokens: z.number().min(1000).max(5_000_000).default(200_000),
turn_timeout_ms: z.number().min(1000).max(10 * 60 * 1000).default(120_000),
}).default({}), }).default({}),
auto_escalate: z.boolean().default(false), auto_escalate: z.boolean().default(false),
max_delegation_depth: z.number().min(1).max(10).default(3), max_delegation_depth: z.number().min(1).max(10).default(3),
+71
View File
@@ -737,6 +737,77 @@ describe('daemon command fast-path integration', () => {
expect(session.setConfig).toHaveBeenCalledWith('queue.mode', 'followup'); expect(session.setConfig).toHaveBeenCalledWith('queue.mode', 'followup');
}); });
it('handles /subagents list via command fast-path', async () => {
const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process');
const session = {
id: 'telegram:user-subagents',
addMessage: vi.fn(),
getHistory: vi.fn(() => []),
clear: vi.fn(),
replaceHistory: vi.fn(),
getConfig: vi.fn(() => undefined),
setConfig: vi.fn(),
deleteConfig: vi.fn(),
};
const commandRegistry = new CommandRegistry();
registerBuiltinCommands(commandRegistry);
const agentConfigRegistry = new AgentConfigRegistry();
agentConfigRegistry.loadFromConfig({
assistant: { model_tier: 'default', sandbox: false },
helper: { model_tier: 'fast', sandbox: false },
});
const router = createMessageRouter({
sessionManager: {
getSession: vi.fn(() => session),
} as unknown as MessageRouterDeps['sessionManager'],
modelRouter: {
getAvailableTiers: () => ['fast', 'default', 'complex', 'local'],
getAllLabels: () => ({ fast: 'fast', default: 'default', complex: 'complex', local: 'local' }),
getLabel: (tier: string) => tier,
} as unknown as MessageRouterDeps['modelRouter'],
systemPrompt: 'test prompt',
toolRegistry: {
clone() { return this; },
register: vi.fn(),
} as unknown as MessageRouterDeps['toolRegistry'],
toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'],
config: {
agents: {
primary_tier: 'default',
delegation: {
compaction: 'fast',
memory_extraction: 'fast',
classification: 'fast',
tool_summarisation: 'fast',
complex_reasoning: 'complex',
},
max_delegation_depth: 3,
max_iterations: 10,
},
compaction: { enabled: false },
models: { default: { provider: 'anthropic', model: 'claude' } },
} as unknown as MessageRouterDeps['config'],
commandRegistry,
agentConfigRegistry,
});
const reply = vi.fn(async (_message: OutboundMessage) => {});
await router.handler({
id: 'subagents-1',
channel: 'telegram',
senderId: 'user-subagents',
text: '/subagents list',
timestamp: Date.now(),
metadata: { isCommand: true, command: 'subagents', commandArgs: 'list' },
} as MessageRouterInput, reply);
expect(processSpy).not.toHaveBeenCalled();
expect(reply).toHaveBeenCalledWith(expect.objectContaining({ text: 'No active subagent sessions.' }));
});
it('uses intent match to override agent target', async () => { it('uses intent match to override agent target', async () => {
const session = { const session = {
id: 'telegram:user-2', id: 'telegram:user-2',
+57
View File
@@ -695,8 +695,13 @@ export function createMessageRouter(deps: {
delegation: delegationConfig, delegation: delegationConfig,
maxDelegationDepth: deps.config.agents.max_delegation_depth ?? 3, maxDelegationDepth: deps.config.agents.max_delegation_depth ?? 3,
defaultPrimaryTier: effectiveTier, defaultPrimaryTier: effectiveTier,
defaultQueueMode: deps.config.agents.subagents?.queue_mode ?? 'followup',
defaultToolProfile: deps.config.agents.subagents?.default_tool_profile ?? 'minimal',
maxIterations: deps.config.agents.max_iterations, maxIterations: deps.config.agents.max_iterations,
maxActiveSessions: maxSubagentSessions, maxActiveSessions: maxSubagentSessions,
maxTurns: deps.config.agents.subagents?.max_turns ?? 40,
maxTotalTokens: deps.config.agents.subagents?.max_total_tokens ?? 200_000,
turnTimeoutMs: deps.config.agents.subagents?.turn_timeout_ms ?? 120_000,
idleTtlMs: deps.config.agents.subagents?.idle_ttl_ms ?? 3_600_000, idleTtlMs: deps.config.agents.subagents?.idle_ttl_ms ?? 3_600_000,
}); });
for (const tool of createSubagentTools(subagentManager)) { for (const tool of createSubagentTools(subagentManager)) {
@@ -1274,6 +1279,58 @@ export function createMessageRouter(deps: {
} }
return result.output; return result.output;
}, },
subagentsCommand: async (input: string) => {
if (!subagentManager) {
return 'Subagents are not enabled for this session.';
}
const raw = input.trim();
if (!raw || raw === 'list') {
const entries = subagentManager.list();
if (entries.length === 0) {
return 'No active subagent sessions.';
}
return [
`Active subagents (${entries.length}):`,
...entries.map((entry) => (
`- ${entry.id} agent=${entry.agent} tier=${entry.tier} queue=${entry.queueMode} profile=${entry.toolProfile} ` +
`turns=${entry.completedTurns} pending=${entry.pendingCount} busy=${entry.busy ? 'yes' : 'no'}`
)),
].join('\n');
}
const [action, ...rest] = raw.split(/\s+/);
if ((action === 'summary' || action === 'show') && rest.length >= 1) {
const subagentId = rest[0];
const limitRaw = rest[1];
const parsedLimit = limitRaw ? Number.parseInt(limitRaw, 10) : undefined;
const transcript = subagentManager.getTranscript(subagentId, Number.isFinite(parsedLimit) ? parsedLimit : undefined);
return [
`Subagent ${transcript.session.id} summary:`,
`- agent=${transcript.session.agent} tier=${transcript.session.tier} queue=${transcript.session.queueMode} profile=${transcript.session.toolProfile}`,
`- turns=${transcript.session.completedTurns} messages=${transcript.session.messageCount} pending=${transcript.session.pendingCount}`,
'Transcript:',
...(transcript.messages.length > 0
? transcript.messages.map((entry, idx) => `${idx + 1}. [${entry.role}] ${entry.content.slice(0, 200)}`)
: ['(empty)']),
].join('\n');
}
if (action === 'cancel' && rest.length >= 1) {
const cancelled = subagentManager.cancel(rest[0]);
return cancelled
? `Cancellation requested for subagent \"${rest[0]}\".`
: `No active operation to cancel for subagent \"${rest[0]}\".`;
}
if ((action === 'delete' || action === 'rm') && rest.length >= 1) {
const deleted = subagentManager.delete(rest[0]);
return deleted
? `Deleted subagent session \"${rest[0]}\".`
: `Subagent session \"${rest[0]}\" not found.`;
}
return 'Usage: /subagents [list|summary <id> [limit]|cancel <id>|delete <id>]';
},
getElevation: () => { getElevation: () => {
return getElevationStatusMessage({ return getElevationStatusMessage({
+1
View File
@@ -31,6 +31,7 @@ const SLASH_COMMANDS = [
{ name: '/usage', desc: 'Show token usage' }, { name: '/usage', desc: 'Show token usage' },
{ name: '/status', desc: 'Show system health' }, { name: '/status', desc: 'Show system health' },
{ name: '/model', desc: 'Show current model' }, { name: '/model', desc: 'Show current model' },
{ name: '/subagents', desc: 'Inspect subagent sessions' },
{ name: '/stop', desc: 'Stop active response' }, { name: '/stop', desc: 'Stop active response' },
{ name: '/cancel', desc: 'Alias for /stop' }, { name: '/cancel', desc: 'Alias for /stop' },
{ name: '/approvals', desc: 'List pending guarded actions' }, { name: '/approvals', desc: 'List pending guarded actions' },
+27
View File
@@ -25,7 +25,12 @@ describe('subagent tools', () => {
id: 'planner', id: 'planner',
agent: 'research', agent: 'research',
tier: 'complex', tier: 'complex',
queueMode: 'followup',
toolProfile: 'minimal',
traceId: 'trace-planner',
messageCount: 0, messageCount: 0,
completedTurns: 0,
pendingCount: 0,
createdAt: 1, createdAt: 1,
updatedAt: 1, updatedAt: 1,
busy: false, busy: false,
@@ -36,7 +41,12 @@ describe('subagent tools', () => {
id: 'planner', id: 'planner',
agent: 'research', agent: 'research',
tier: 'complex', tier: 'complex',
queueMode: 'followup',
toolProfile: 'minimal',
traceId: 'trace-planner',
messageCount: 2, messageCount: 2,
completedTurns: 1,
pendingCount: 0,
createdAt: 1, createdAt: 1,
updatedAt: 2, updatedAt: 2,
busy: false, busy: false,
@@ -60,6 +70,8 @@ describe('subagent tools', () => {
agent: 'research', agent: 'research',
subagentId: 'planner', subagentId: 'planner',
tier: undefined, tier: undefined,
queueMode: undefined,
toolProfile: undefined,
systemPrompt: undefined, systemPrompt: undefined,
}); });
expect(mockController.send).toHaveBeenCalledWith('planner', 'Create a checklist'); expect(mockController.send).toHaveBeenCalledWith('planner', 'Create a checklist');
@@ -72,7 +84,12 @@ describe('subagent tools', () => {
id: 'planner', id: 'planner',
agent: 'research', agent: 'research',
tier: 'complex', tier: 'complex',
queueMode: 'followup',
toolProfile: 'minimal',
traceId: 'trace-planner',
messageCount: 4, messageCount: 4,
completedTurns: 2,
pendingCount: 0,
createdAt: 1, createdAt: 1,
updatedAt: 3, updatedAt: 3,
busy: false, busy: false,
@@ -83,7 +100,12 @@ describe('subagent tools', () => {
id: 'planner', id: 'planner',
agent: 'research', agent: 'research',
tier: 'complex', tier: 'complex',
queueMode: 'followup',
toolProfile: 'minimal',
traceId: 'trace-planner',
messageCount: 4, messageCount: 4,
completedTurns: 2,
pendingCount: 0,
createdAt: 1, createdAt: 1,
updatedAt: 3, updatedAt: 3,
busy: false, busy: false,
@@ -96,7 +118,12 @@ describe('subagent tools', () => {
id: 'planner', id: 'planner',
agent: 'research', agent: 'research',
tier: 'complex', tier: 'complex',
queueMode: 'followup',
toolProfile: 'minimal',
traceId: 'trace-planner',
messageCount: 4, messageCount: 4,
completedTurns: 2,
pendingCount: 0,
createdAt: 1, createdAt: 1,
updatedAt: 3, updatedAt: 3,
busy: false, busy: false,
+19
View File
@@ -1,11 +1,17 @@
import type { Tool, ToolResult } from '../types.js'; import type { Tool, ToolResult } from '../types.js';
import type { ToolProfile } from '../../config/schema.js';
import type { ModelTier } from '../../models/router.js'; import type { ModelTier } from '../../models/router.js';
interface SubagentSessionSummary { interface SubagentSessionSummary {
id: string; id: string;
agent: string; agent: string;
tier: ModelTier; tier: ModelTier;
queueMode: 'followup' | 'interrupt';
toolProfile: ToolProfile;
traceId: string;
messageCount: number; messageCount: number;
completedTurns: number;
pendingCount: number;
createdAt: number; createdAt: number;
updatedAt: number; updatedAt: number;
busy: boolean; busy: boolean;
@@ -17,6 +23,8 @@ interface SubagentController {
subagentId?: string; subagentId?: string;
tier?: ModelTier; tier?: ModelTier;
systemPrompt?: string; systemPrompt?: string;
queueMode?: 'followup' | 'interrupt';
toolProfile?: ToolProfile;
}): SubagentSessionSummary; }): SubagentSessionSummary;
send(subagentId: string, message: string): Promise<{ send(subagentId: string, message: string): Promise<{
content: string; content: string;
@@ -40,6 +48,8 @@ interface SpawnArgs {
subagent_id?: string; subagent_id?: string;
tier?: ModelTier; tier?: ModelTier;
system_prompt?: string; system_prompt?: string;
queue_mode?: 'followup' | 'interrupt';
tool_profile?: ToolProfile;
task?: string; task?: string;
} }
@@ -62,7 +72,12 @@ function formatSummary(summary: SubagentSessionSummary): string {
`id=${summary.id}`, `id=${summary.id}`,
`agent=${summary.agent}`, `agent=${summary.agent}`,
`tier=${summary.tier}`, `tier=${summary.tier}`,
`queue=${summary.queueMode}`,
`profile=${summary.toolProfile}`,
`trace=${summary.traceId}`,
`turns=${summary.completedTurns}`,
`messages=${summary.messageCount}`, `messages=${summary.messageCount}`,
`pending=${summary.pendingCount}`,
`busy=${summary.busy ? 'yes' : 'no'}`, `busy=${summary.busy ? 'yes' : 'no'}`,
].join(' '); ].join(' ');
} }
@@ -81,6 +96,8 @@ export function createSubagentTools(controller: SubagentController): Tool[] {
agent: { type: 'string', description: 'Agent profile name from agent_configs (e.g. research, coder).' }, agent: { type: 'string', description: 'Agent profile name from agent_configs (e.g. research, coder).' },
subagent_id: { type: 'string', description: 'Optional custom subagent session ID.' }, subagent_id: { type: 'string', description: 'Optional custom subagent session ID.' },
tier: { type: 'string', description: 'Optional model tier override (fast|default|complex|local).' }, tier: { type: 'string', description: 'Optional model tier override (fast|default|complex|local).' },
queue_mode: { type: 'string', description: 'Optional queue mode override (followup|interrupt).' },
tool_profile: { type: 'string', description: 'Optional tool profile override (minimal|messaging|coding|full).' },
system_prompt: { type: 'string', description: 'Optional system prompt override for this subagent session.' }, system_prompt: { type: 'string', description: 'Optional system prompt override for this subagent session.' },
task: { type: 'string', description: 'Optional initial task to run right after spawn.' }, task: { type: 'string', description: 'Optional initial task to run right after spawn.' },
}, },
@@ -93,6 +110,8 @@ export function createSubagentTools(controller: SubagentController): Tool[] {
agent: args.agent, agent: args.agent,
subagentId: args.subagent_id, subagentId: args.subagent_id,
tier: args.tier, tier: args.tier,
queueMode: args.queue_mode,
toolProfile: args.tool_profile,
systemPrompt: args.system_prompt, systemPrompt: args.system_prompt,
}); });