diff --git a/README.md b/README.md index 60dffcf..2328871 100644 --- a/README.md +++ b/README.md @@ -996,7 +996,10 @@ Reactions let you convert inbound events into deterministic agent prompts withou - Templates support: - `{{text}}`, `{{channel}}`, `{{sender_id}}` - `{{metadata.some.path}}` -- First matching rule wins. +- Priority + cooldown semantics: + - `priority` (higher wins, default `100`; ties break by config order). + - `cooldown_ms` suppresses repeated matches per sender/session. + - `stop_on_match` defaults to `true`; set `false` to make a rule non-blocking (only chosen if no blocking rule matches). ## Backup Scheduling diff --git a/docs/architecture/AGENT_DIAGRAM.md b/docs/architecture/AGENT_DIAGRAM.md index 4e5c0b5..5f33795 100644 --- a/docs/architecture/AGENT_DIAGRAM.md +++ b/docs/architecture/AGENT_DIAGRAM.md @@ -142,6 +142,7 @@ Outbound Reply Gateway streaming UX signals: - WebSocket `agent.send` emits `run_state` lifecycle events (`start`, `cancel_requested`, `cancelled`, `complete`, `error`) for UI/state rendering. +- Routing applies reaction rules with deterministic priority/cooldown (and recursion guard) before intent routing. Key files: diff --git a/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md b/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md index 392f97f..99bb730 100644 --- a/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md +++ b/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md @@ -15,6 +15,7 @@ If you only want the protocol surface, see `docs/api/PROTOCOL.md`. - `flynn tui` now attaches to this same gateway command path for `/runtime ...` and auto-starts/attaches daemon+gateway when needed. - Backend routing outcomes are auditable via `backend.route` / `backend.success` / `backend.fallback`, which enables offline canary evaluation without changing gateway protocol methods. - Run lifecycle/cancel intent and reaction decisions are emitted to audit logs, and aggregated into `system.metrics` counters (runStates, cancelLatencyMs, reactions) for dashboards. +- Reaction matching is deterministic (priority + cooldown + recursion guard) before intent/agent routing. ## Component Map diff --git a/docs/plans/state.json b/docs/plans/state.json index f70f6e5..99ffde5 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -6678,10 +6678,31 @@ "docs/plans/state.json" ], "test_status": "pnpm test:run src/gateway/lane-queue.test.ts src/gateway/handlers/agent.test.ts src/gateway/ui/pages/chat.test.ts src/daemon/routing.test.ts passing" + }, + "deeper-surfaces-phase2-reactions-v2": { + "status": "completed", + "date": "2026-02-25", + "updated": "2026-02-25", + "summary": "Implemented Phase 2 reactions v2 with deterministic priority + cooldown semantics, non-blocking stop_on_match handling, recursion guard, and routing integration. Updated reaction schema defaults, docs, and added focused reaction/cooldown tests.", + "files_modified": [ + "src/config/schema.ts", + "src/config/schema.test.ts", + "src/audit/types.ts", + "src/automation/reactions.ts", + "src/automation/reactions.test.ts", + "src/automation/index.ts", + "src/daemon/routing.ts", + "src/daemon/routing.test.ts", + "README.md", + "docs/architecture/AGENT_DIAGRAM.md", + "docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md", + "docs/plans/state.json" + ], + "test_status": "pnpm test:run src/automation/reactions.test.ts src/config/schema.test.ts src/daemon/routing.test.ts passing" } }, "overall_progress": { - "total_test_count": 2013, + "total_test_count": 2018, "all_tests_passing": true, "p0_completion": "3/3 (100%)", "p1_completion": "4/4 (100%)", @@ -6724,7 +6745,8 @@ "pi_embedded_manual_mode": "completed \u2014 added persisted runtime backend controls for manual Pi activation/deactivation (`/runtime` preferred, `/backend` alias; `status`, `activate pi`, `deactivate pi`, `use config`) while keeping config-driven default routing", "openclaw_gateway_first_tui_runtime_unification": "completed \u2014 shared `/runtime` backend-mode command service across channel router + gateway, plus TUI `/runtime` forwarding through a gateway bridge with daemon/gateway auto-start attach", "gateway_startup_eaddrinuse_hardening": "completed \u2014 gateway bind collisions now fail deterministically with explicit error handling and TUI auto-start treats EADDRINUSE as attach race with connect retry", - "deeper_surfaces_phase1_run_control": "completed \u2014 interrupt queue mode now enforces latest-wins semantics with channel-path preemption, and gateway emits run_state lifecycle events rendered in the web UI" + "deeper_surfaces_phase1_run_control": "completed \u2014 interrupt queue mode now enforces latest-wins semantics with channel-path preemption, and gateway emits run_state lifecycle events rendered in the web UI", + "deeper_surfaces_phase2_reactions_v2": "completed \u2014 reaction engine now uses priority/cooldown with non-blocking rules, recursion guard, and routing-level cooldown skip logging" }, "soul_md_and_cron_create": { "date": "2026-02-11", diff --git a/src/audit/types.ts b/src/audit/types.ts index cbb5333..abd38fd 100644 --- a/src/audit/types.ts +++ b/src/audit/types.ts @@ -271,7 +271,7 @@ export interface ReactionSkipEvent { channel: string; sender: string; source: 'gateway' | 'channel'; - reason: 'no_rules' | 'no_match' | 'disabled' | 'channel_mismatch' | 'filter_miss'; + reason: 'no_rules' | 'no_match' | 'cooldown' | 'recursion_guard' | 'disabled' | 'channel_mismatch' | 'filter_miss'; candidate_count: number; } diff --git a/src/automation/index.ts b/src/automation/index.ts index 67019f1..c3c1445 100644 --- a/src/automation/index.ts +++ b/src/automation/index.ts @@ -5,5 +5,5 @@ export { HeartbeatMonitor, parseInterval } from './heartbeat.js'; export type { HeartbeatResult, HeartbeatDeps, CheckResult } from './heartbeat.js'; export { buildPresetCronJobs } from './presets.js'; export { MinioSyncScheduler } from './minioSync.js'; -export { matchReactionPrompt } from './reactions.js'; +export { matchReactionPrompt, resolveReactionDecision } from './reactions.js'; export type { MinioSyncSchedulerDeps } from './minioSync.js'; diff --git a/src/automation/reactions.test.ts b/src/automation/reactions.test.ts index 187ee56..dea4f2f 100644 --- a/src/automation/reactions.test.ts +++ b/src/automation/reactions.test.ts @@ -10,6 +10,9 @@ function makeRule(overrides: Partial & Pick { expect(result).toBeNull(); }); + + it('prefers higher priority matches with deterministic ordering', () => { + const rules: AutomationReactionConfig[] = [ + makeRule({ + name: 'low-priority', + on: ['gmail'], + filter: { contains: 'update' }, + run: 'low', + priority: 10, + }), + makeRule({ + name: 'high-priority', + on: ['gmail'], + filter: { contains: 'update' }, + run: 'high', + priority: 200, + }), + ]; + + const result = matchReactionPrompt(rules, { + channel: 'gmail', + senderId: 'watcher', + text: 'update: status', + }); + + expect(result).toEqual({ name: 'high-priority', prompt: 'high' }); + }); + + it('skips cooldowned matches and falls back to next candidate', () => { + const cooldowns = new Map(); + const rules: AutomationReactionConfig[] = [ + makeRule({ + name: 'cooldowned', + on: ['gmail'], + filter: { contains: 'alert' }, + run: 'cooldown', + priority: 200, + cooldown_ms: 1000, + }), + makeRule({ + name: 'fallback', + on: ['gmail'], + filter: { contains: 'alert' }, + run: 'fallback', + priority: 50, + }), + ]; + + const first = matchReactionPrompt(rules, { + channel: 'gmail', + senderId: 'watcher', + text: 'alert: first', + }, { + now: 1000, + cooldownStore: cooldowns, + cooldownScope: 'gmail:watcher', + }); + expect(first).toEqual({ name: 'cooldowned', prompt: 'cooldown' }); + + const second = matchReactionPrompt(rules, { + channel: 'gmail', + senderId: 'watcher', + text: 'alert: second', + }, { + now: 1500, + cooldownStore: cooldowns, + cooldownScope: 'gmail:watcher', + }); + expect(second).toEqual({ name: 'fallback', prompt: 'fallback' }); + }); + + it('allows non-blocking matches to yield to lower-priority stop rules', () => { + const rules: AutomationReactionConfig[] = [ + makeRule({ + name: 'non-blocking', + on: ['gmail'], + filter: { contains: 'ping' }, + run: 'soft', + priority: 200, + stop_on_match: false, + }), + makeRule({ + name: 'blocking', + on: ['gmail'], + filter: { contains: 'ping' }, + run: 'hard', + priority: 100, + }), + ]; + + const result = matchReactionPrompt(rules, { + channel: 'gmail', + senderId: 'watcher', + text: 'ping', + }); + + expect(result).toEqual({ name: 'blocking', prompt: 'hard' }); + }); }); diff --git a/src/automation/reactions.ts b/src/automation/reactions.ts index 4f6feb6..4906821 100644 --- a/src/automation/reactions.ts +++ b/src/automation/reactions.ts @@ -12,6 +12,18 @@ export interface ReactionMatchResult { prompt: string; } +export interface ReactionMatchOptions { + now?: number; + cooldownStore?: Map; + cooldownScope?: string; +} + +export interface ReactionDecision { + match: ReactionMatchResult | null; + matchedRule?: AutomationReactionConfig; + cooldownSkipped: number; +} + function getNestedValue(record: Record, path: string): unknown { const segments = path.split('.').filter(Boolean); let current: unknown = record; @@ -104,19 +116,95 @@ function ruleMatches(rule: AutomationReactionConfig, event: ReactionEvent): bool return true; } -/** Find the first matching reaction rule and render its run template. */ -export function matchReactionPrompt( +function buildCooldownKey(scope: string, ruleName: string): string { + return `${scope}:${ruleName}`; +} + +/** + * Resolve the best matching reaction rule using deterministic priority and cooldown semantics. + * - Higher priority wins (default priority = 100). + * - Ties break by config order. + * - stop_on_match=false means "non-blocking": it only wins if no stop_on_match=true rules match. + */ +export function resolveReactionDecision( reactions: AutomationReactionConfig[], event: ReactionEvent, -): ReactionMatchResult | null { - for (const rule of reactions) { + options?: ReactionMatchOptions, +): ReactionDecision { + const now = options?.now ?? Date.now(); + const cooldownScope = options?.cooldownScope ?? `${event.channel}:${event.senderId}`; + const cooldownStore = options?.cooldownStore; + const candidates: Array<{ rule: AutomationReactionConfig; index: number }> = []; + let cooldownSkipped = 0; + + for (const [index, rule] of reactions.entries()) { if (!ruleMatches(rule, event)) { continue; } - return { - name: rule.name, - prompt: renderTemplate(rule.run, event), - }; + + if (cooldownStore && (rule.cooldown_ms ?? 0) > 0) { + const key = buildCooldownKey(cooldownScope, rule.name); + const lastMatchedAt = cooldownStore.get(key); + if (typeof lastMatchedAt === 'number' && now - lastMatchedAt < rule.cooldown_ms) { + cooldownSkipped += 1; + continue; + } + } + + candidates.push({ rule, index }); } - return null; + + if (candidates.length === 0) { + return { match: null, cooldownSkipped }; + } + + candidates.sort((a, b) => { + const priorityA = Number.isFinite(a.rule.priority) ? a.rule.priority : 100; + const priorityB = Number.isFinite(b.rule.priority) ? b.rule.priority : 100; + if (priorityA !== priorityB) { + return priorityB - priorityA; + } + return a.index - b.index; + }); + + let fallback: { rule: AutomationReactionConfig; index: number } | undefined; + let selected: { rule: AutomationReactionConfig; index: number } | undefined; + + for (const candidate of candidates) { + if (!fallback) { + fallback = candidate; + } + if (candidate.rule.stop_on_match !== false) { + selected = candidate; + break; + } + } + + const chosen = selected ?? fallback; + if (!chosen) { + return { match: null, cooldownSkipped }; + } + + if (cooldownStore && (chosen.rule.cooldown_ms ?? 0) > 0) { + const key = buildCooldownKey(cooldownScope, chosen.rule.name); + cooldownStore.set(key, now); + } + + return { + match: { + name: chosen.rule.name, + prompt: renderTemplate(chosen.rule.run, event), + }, + matchedRule: chosen.rule, + cooldownSkipped, + }; +} + +/** Find the best matching reaction rule and render its run template. */ +export function matchReactionPrompt( + reactions: AutomationReactionConfig[], + event: ReactionEvent, + options?: ReactionMatchOptions, +): ReactionMatchResult | null { + return resolveReactionDecision(reactions, event, options).match; } diff --git a/src/config/schema.test.ts b/src/config/schema.test.ts index 60b439e..47c8018 100644 --- a/src/config/schema.test.ts +++ b/src/config/schema.test.ts @@ -1328,6 +1328,9 @@ describe('configSchema automation', () => { expect(result.automation.reactions[0]).toMatchObject({ name: 'boss-email', enabled: true, + priority: 100, + cooldown_ms: 0, + stop_on_match: true, on: ['gmail'], run: 'Summarize and propose next actions:\n\n{{text}}', }); diff --git a/src/config/schema.ts b/src/config/schema.ts index 66f627d..4b5dbcc 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -291,6 +291,9 @@ const reactionFilterSchema = z.object({ const automationReactionSchema = z.object({ name: z.string().min(1, 'Reaction name is required'), enabled: z.boolean().default(true), + priority: z.number().finite().default(100), + cooldown_ms: z.number().int().min(0).default(0), + stop_on_match: z.boolean().default(true), /** Source channels/events this rule applies to (e.g. gmail, webhook). */ on: z.array(z.string().min(1)).default([]), filter: reactionFilterSchema, diff --git a/src/daemon/routing.test.ts b/src/daemon/routing.test.ts index 7bc9769..f375923 100644 --- a/src/daemon/routing.test.ts +++ b/src/daemon/routing.test.ts @@ -2484,6 +2484,160 @@ describe('daemon reactions routing integration', () => { }), ); }); + + it('skips reactions for command messages', async () => { + const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process').mockResolvedValue('ok'); + + const session = { + id: 'gmail:reaction-command', + addMessage: vi.fn(), + getHistory: vi.fn(() => []), + clear: vi.fn(), + replaceHistory: vi.fn(), + getConfig: vi.fn(() => undefined), + setConfig: vi.fn(), + deleteConfig: vi.fn(), + }; + + const router = createMessageRouter({ + sessionManager: { getSession: vi.fn(() => session) } as unknown as MessageRouterDeps['sessionManager'], + modelRouter: { + getAvailableTiers: () => ['default'], + getAllLabels: () => ({ default: 'default' }), + getLabel: (tier: string) => tier, + } as unknown as MessageRouterDeps['modelRouter'], + systemPrompt: 'test prompt', + toolRegistry: { clone() { return this; }, register: vi.fn() } as unknown as MessageRouterDeps['toolRegistry'], + toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'], + config: { + agents: { + primary_tier: 'default', + delegation: { + compaction: 'default', + memory_extraction: 'default', + classification: 'default', + tool_summarisation: 'default', + complex_reasoning: 'default', + }, + max_delegation_depth: 1, + max_iterations: 3, + }, + automation: { + reactions: [{ + name: 'boss-email', + enabled: true, + on: ['gmail'], + filter: { contains: 'boss@company.com' }, + run: 'Summarize: {{text}}', + }], + }, + compaction: { enabled: false }, + models: { default: { provider: 'anthropic', model: 'claude' } }, + } as unknown as MessageRouterDeps['config'], + }); + + await router.handler({ + id: 'r3', + channel: 'gmail', + senderId: 'reaction-command', + text: '/status', + timestamp: Date.now(), + metadata: { isCommand: true, command: 'status' }, + } as MessageRouterInput, vi.fn(async (_message: OutboundMessage) => {})); + + expect(processSpy).toHaveBeenCalledTimes(1); + const [prompt] = processSpy.mock.calls[0] ?? []; + expect(prompt).toBe('/status'); + expect(mockAuditLogger.reactionMatch).not.toHaveBeenCalled(); + }); + + it('suppresses reactions during cooldown windows', async () => { + vi.useFakeTimers(); + try { + vi.setSystemTime(new Date('2026-02-25T00:00:00Z')); + + const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process').mockResolvedValue('ok'); + + const session = { + id: 'gmail:reaction-cooldown', + addMessage: vi.fn(), + getHistory: vi.fn(() => []), + clear: vi.fn(), + replaceHistory: vi.fn(), + getConfig: vi.fn(() => undefined), + setConfig: vi.fn(), + deleteConfig: vi.fn(), + }; + + const router = createMessageRouter({ + sessionManager: { getSession: vi.fn(() => session) } as unknown as MessageRouterDeps['sessionManager'], + modelRouter: { + getAvailableTiers: () => ['default'], + getAllLabels: () => ({ default: 'default' }), + getLabel: (tier: string) => tier, + } as unknown as MessageRouterDeps['modelRouter'], + systemPrompt: 'test prompt', + toolRegistry: { clone() { return this; }, register: vi.fn() } as unknown as MessageRouterDeps['toolRegistry'], + toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'], + config: { + agents: { + primary_tier: 'default', + delegation: { + compaction: 'default', + memory_extraction: 'default', + classification: 'default', + tool_summarisation: 'default', + complex_reasoning: 'default', + }, + max_delegation_depth: 1, + max_iterations: 3, + }, + automation: { + reactions: [{ + name: 'boss-email', + enabled: true, + on: ['gmail'], + filter: { contains: 'boss@company.com' }, + run: 'Summarize: {{text}}', + cooldown_ms: 60000, + }], + }, + compaction: { enabled: false }, + models: { default: { provider: 'anthropic', model: 'claude' } }, + } as unknown as MessageRouterDeps['config'], + }); + + const reply = vi.fn(async (_message: OutboundMessage) => {}); + await router.handler({ + id: 'r4', + channel: 'gmail', + senderId: 'reaction-cooldown', + text: 'boss@company.com update', + timestamp: Date.now(), + } as MessageRouterInput, reply); + + vi.setSystemTime(new Date('2026-02-25T00:00:30Z')); + await router.handler({ + id: 'r5', + channel: 'gmail', + senderId: 'reaction-cooldown', + text: 'boss@company.com update again', + timestamp: Date.now(), + } as MessageRouterInput, reply); + + expect(processSpy).toHaveBeenCalledTimes(2); + const secondPrompt = processSpy.mock.calls[1]?.[0]; + expect(secondPrompt).toBe('boss@company.com update again'); + expect(mockAuditLogger.reactionSkip).toHaveBeenCalledWith( + expect.objectContaining({ + session_id: 'gmail:reaction-cooldown', + reason: 'cooldown', + }), + ); + } finally { + vi.useRealTimers(); + } + }); }); describe('daemon auto-escalate integration', () => { diff --git a/src/daemon/routing.ts b/src/daemon/routing.ts index 63b1167..9f79586 100644 --- a/src/daemon/routing.ts +++ b/src/daemon/routing.ts @@ -27,7 +27,7 @@ import type { ComponentRegistry } from '../intents/index.js'; import type { RoutingPolicy } from '../routing/index.js'; import type { HookEngine } from '../hooks/index.js'; import { createClientFromConfig } from './models.js'; -import { matchReactionPrompt } from '../automation/reactions.js'; +import { resolveReactionDecision } from '../automation/reactions.js'; import { loadSkillRegistryCatalog } from '../skills/index.js'; import type { SkillInstaller, SkillRegistry, SkillRegistryEntry, SkillRegistrySource } from '../skills/index.js'; import { auditLogger } from '../audit/index.js'; @@ -388,6 +388,7 @@ export function createMessageRouter(deps: { const agents = new Map(); const talkModeUntil = new Map(); const activeRuns = new Map(); + const reactionCooldowns = new Map(); function getBackendMode(): BackendRuntimeMode { return deps.getBackendMode?.() ?? 'config_default'; @@ -837,7 +838,17 @@ export function createMessageRouter(deps: { const automationReactions = deps.config.automation?.reactions ?? []; if (!msg.metadata?.isCommand) { - if (automationReactions.length === 0) { + if (msg.metadata?.automationReaction) { + auditLogger?.reactionSkip?.({ + session_id: sessionIdForRun, + channel: msg.channel, + sender: msg.senderId, + source: 'channel', + reason: 'recursion_guard', + candidate_count: automationReactions.length, + }); + deps.metrics?.recordReactionDecision({ matched: false, reason: 'recursion_guard' }); + } else if (automationReactions.length === 0) { auditLogger?.reactionSkip?.({ session_id: sessionIdForRun, channel: msg.channel, @@ -848,36 +859,41 @@ export function createMessageRouter(deps: { }); deps.metrics?.recordReactionDecision({ matched: false, reason: 'no_rules' }); } else { - const reactionMatch = matchReactionPrompt(automationReactions, { + const decision = resolveReactionDecision(automationReactions, { channel: msg.channel, senderId: msg.senderId, text: incomingText, metadata: msg.metadata, + }, { + cooldownStore: reactionCooldowns, + cooldownScope: sessionIdForRun, + now: Date.now(), }); - if (reactionMatch) { - matchedReactionName = reactionMatch.name; - incomingText = reactionMatch.prompt; - const matchedRule = automationReactions.find((rule) => rule.name === reactionMatch.name); + + if (decision.match) { + matchedReactionName = decision.match.name; + incomingText = decision.match.prompt; auditLogger?.reactionMatch?.({ session_id: sessionIdForRun, channel: msg.channel, sender: msg.senderId, source: 'channel', - rule_name: reactionMatch.name, + rule_name: decision.match.name, candidate_count: automationReactions.length, - filter_summary: buildReactionFilterSummary(matchedRule), + filter_summary: buildReactionFilterSummary(decision.matchedRule), }); deps.metrics?.recordReactionDecision({ matched: true }); } else { + const reason = decision.cooldownSkipped > 0 ? 'cooldown' : 'no_match'; auditLogger?.reactionSkip?.({ session_id: sessionIdForRun, channel: msg.channel, sender: msg.senderId, source: 'channel', - reason: 'no_match', + reason, candidate_count: automationReactions.length, }); - deps.metrics?.recordReactionDecision({ matched: false, reason: 'no_match' }); + deps.metrics?.recordReactionDecision({ matched: false, reason }); } } }