From f341149ac7f26faa876321199b7fec05bf4fcbf0 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Wed, 18 Feb 2026 10:26:40 -0800 Subject: [PATCH] feat: add automation reactions event-trigger layer --- README.md | 23 ++++++ config/default.yaml | 19 +++++ docs/plans/state.json | 21 ++++- src/automation/index.ts | 1 + src/automation/reactions.test.ts | 93 +++++++++++++++++++++ src/automation/reactions.ts | 122 +++++++++++++++++++++++++++ src/config/schema.test.ts | 31 +++++++ src/config/schema.ts | 21 +++++ src/daemon/routing.test.ts | 136 +++++++++++++++++++++++++++++++ src/daemon/routing.ts | 17 ++++ 10 files changed, 483 insertions(+), 1 deletion(-) create mode 100644 src/automation/reactions.test.ts create mode 100644 src/automation/reactions.ts diff --git a/README.md b/README.md index 2745c29..f71f7a8 100644 --- a/README.md +++ b/README.md @@ -677,6 +677,15 @@ Set `automation.delivery_mode` to control automation session behavior: ```yaml automation: delivery_mode: shared_session + reactions: + - name: boss-email + on: [gmail] + filter: + contains: "boss@company.com" + run: | + Summarize this email and propose next actions. + + {{text}} cron: - name: daily-summary schedule: "0 9 * * *" # 9 AM daily @@ -734,6 +743,7 @@ automation: | Field | Required | Description | |-------|----------|-------------| | `automation.delivery_mode` | no | Automation session strategy: `shared_session`, `isolated_job`, or `announce` (default: `shared_session`) | +| `automation.reactions.*` | no | Event-triggered prompt rewrite rules for inbound automation/chat events (supports channel matching + text/metadata filters) | | `name` | yes | Unique job identifier | | `schedule` | yes | Cron expression (standard 5-field) | | `message` | yes | Text sent to the agent when the job fires | @@ -746,6 +756,19 @@ automation: | `automation.daily_briefing.*` | no | Built-in daily briefing preset; generates an extra cron job when `enabled: true` and `output` is set | | `automation.minio_sync.*` | no | Scheduled MinIO prefix ingestion into memory namespaces (direct daemon automation) | +### Reactions (Event -> Action Prompting) + +Reactions let you convert inbound events into deterministic agent prompts without adding new webhook endpoints or cron jobs. + +- Matchers support: + - channel list (`on`) + - text filters (`contains`, `regex`) + - metadata path filters (`metadata` with dot paths) +- Templates support: + - `{{text}}`, `{{channel}}`, `{{sender_id}}` + - `{{metadata.some.path}}` +- First matching rule wins. + ## Backup Scheduling Daemon backups can run on a fixed interval (`backup.interval`) or a cron schedule (`backup.schedule`). If both are set, `backup.schedule` takes precedence. diff --git a/config/default.yaml b/config/default.yaml index 41faa42..a5ecd4e 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -326,6 +326,25 @@ hooks: # # isolated_job: create a fresh session per cron trigger/webhook request. # # announce: create a fresh announce-style run per trigger (no shared automation history). # delivery_mode: shared_session +# reactions: +# - name: boss-email +# on: [gmail] +# filter: +# contains: "boss@company.com" +# run: | +# Summarize this email and propose next actions. +# +# {{text}} +# +# - name: github-main-push +# on: [webhook] +# filter: +# metadata: +# webhookName: github-push +# body.ref: refs/heads/main +# run: | +# Summarize this deploy-relevant push: +# {{metadata.body.head_commit.message}} # cron: # - name: daily-summary # schedule: "0 9 * * *" diff --git a/docs/plans/state.json b/docs/plans/state.json index 5d53c47..35199db 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -5245,6 +5245,25 @@ "docs/plans/state.json" ], "test_status": "pnpm test:run src/models/tts.test.ts src/config/schema.test.ts src/daemon/routing.test.ts + pnpm typecheck passing" + }, + "reactions-event-trigger-layer-tier-a": { + "status": "completed", + "date": "2026-02-18", + "updated": "2026-02-18", + "summary": "Implemented a config-driven reactions/event-trigger layer (`automation.reactions`) with channel/text/metadata matching and templated run prompts, integrated into daemon message routing for automation and chat events, with matcher + routing + schema tests and documentation updates.", + "files_modified": [ + "src/config/schema.ts", + "src/config/schema.test.ts", + "src/automation/reactions.ts", + "src/automation/reactions.test.ts", + "src/automation/index.ts", + "src/daemon/routing.ts", + "src/daemon/routing.test.ts", + "README.md", + "config/default.yaml", + "docs/plans/state.json" + ], + "test_status": "pnpm test:run src/automation/reactions.test.ts src/config/schema.test.ts src/daemon/routing.test.ts + pnpm typecheck passing" } }, "overall_progress": { @@ -5268,7 +5287,7 @@ "gmail_auth_cli": "flynn gmail-auth command implemented with OAuth2 flow, doctor check, config routed to Telegram", "native_audio_support": "completed — smart routing for native audio (Gemini/OpenAI/GitHub) vs Whisper transcription fallback", "remaining_phases_completion": "Phase 1: 3/3 (100%) — context levels, command registry, memory structure. Phase 2: 3/3 (100%) — component registry, confidence routing, history index. Phase 3: 2/2 (100%) — adaptive memory/compaction, truthfulness/autonomy hardening", - "next_up": "Implement the reactions/event-trigger automation layer from the OpenClaw roadmap (event pattern matching -> agent action execution)" + "next_up": "Implement Tier A5 model auth-profile rotation (multiple API keys per provider with session stickiness)" }, "soul_md_and_cron_create": { "date": "2026-02-11", diff --git a/src/automation/index.ts b/src/automation/index.ts index 48267b9..67019f1 100644 --- a/src/automation/index.ts +++ b/src/automation/index.ts @@ -5,4 +5,5 @@ export { HeartbeatMonitor, parseInterval } from './heartbeat.js'; export type { HeartbeatResult, HeartbeatDeps, CheckResult } from './heartbeat.js'; export { buildPresetCronJobs } from './presets.js'; export { MinioSyncScheduler } from './minioSync.js'; +export { matchReactionPrompt } from './reactions.js'; export type { MinioSyncSchedulerDeps } from './minioSync.js'; diff --git a/src/automation/reactions.test.ts b/src/automation/reactions.test.ts new file mode 100644 index 0000000..187ee56 --- /dev/null +++ b/src/automation/reactions.test.ts @@ -0,0 +1,93 @@ +import { describe, expect, it } from 'vitest'; + +import { matchReactionPrompt } from './reactions.js'; +import type { AutomationReactionConfig } from '../config/schema.js'; + +function makeRule(overrides: Partial & Pick): AutomationReactionConfig { + return { + name: overrides.name, + enabled: overrides.enabled ?? true, + on: overrides.on ?? ['gmail'], + filter: overrides.filter, + run: overrides.run, + }; +} + +describe('matchReactionPrompt', () => { + it('matches channel + contains filter and renders text template', () => { + const rules: AutomationReactionConfig[] = [ + makeRule({ + name: 'boss-email', + on: ['gmail'], + filter: { contains: 'boss@company.com' }, + run: 'Summarize this email and propose next actions:\n\n{{text}}', + }), + ]; + + const result = matchReactionPrompt(rules, { + channel: 'gmail', + senderId: 'watcher', + text: 'New email from boss@company.com: Q1 plan', + metadata: { from: 'boss@company.com' }, + }); + + expect(result).toEqual({ + name: 'boss-email', + prompt: 'Summarize this email and propose next actions:\n\nNew email from boss@company.com: Q1 plan', + }); + }); + + it('matches metadata paths and supports metadata templating', () => { + const rules: AutomationReactionConfig[] = [ + makeRule({ + name: 'github-push', + on: ['webhook'], + filter: { metadata: { 'webhookName': 'github', 'body.repository.full_name': 'acme/app' } }, + run: 'New push on {{metadata.body.repository.full_name}} from {{metadata.body.pusher.name}}', + }), + ]; + + const result = matchReactionPrompt(rules, { + channel: 'webhook', + senderId: 'github', + text: 'raw webhook payload', + metadata: { + webhookName: 'github', + body: { + repository: { full_name: 'acme/app' }, + pusher: { name: 'will' }, + }, + }, + }); + + expect(result).toEqual({ + name: 'github-push', + prompt: 'New push on acme/app from will', + }); + }); + + it('returns null on no match or invalid regex', () => { + const rules: AutomationReactionConfig[] = [ + makeRule({ + name: 'bad', + on: ['gmail'], + filter: { regex: '[' }, + run: 'x', + }), + makeRule({ + name: 'different-channel', + on: ['webhook'], + run: 'x', + }), + ]; + + const result = matchReactionPrompt(rules, { + channel: 'gmail', + senderId: 'watcher', + text: 'hello', + metadata: {}, + }); + + expect(result).toBeNull(); + }); +}); diff --git a/src/automation/reactions.ts b/src/automation/reactions.ts new file mode 100644 index 0000000..4f6feb6 --- /dev/null +++ b/src/automation/reactions.ts @@ -0,0 +1,122 @@ +import type { AutomationReactionConfig } from '../config/schema.js'; + +export interface ReactionEvent { + channel: string; + senderId: string; + text: string; + metadata?: Record; +} + +export interface ReactionMatchResult { + name: string; + prompt: string; +} + +function getNestedValue(record: Record, path: string): unknown { + const segments = path.split('.').filter(Boolean); + let current: unknown = record; + for (const segment of segments) { + if (!current || typeof current !== 'object') { + return undefined; + } + current = (current as Record)[segment]; + } + return current; +} + +function renderTemplate(template: string, event: ReactionEvent): string { + return template.replace(/\{\{\s*([^}]+)\s*\}\}/g, (_full, rawKey: string) => { + const key = rawKey.trim(); + if (key === 'text') { + return event.text; + } + if (key === 'channel') { + return event.channel; + } + if (key === 'sender_id') { + return event.senderId; + } + if (key.startsWith('metadata.')) { + const value = getNestedValue(event.metadata ?? {}, key.slice('metadata.'.length)); + if (value === undefined || value === null) { + return ''; + } + return typeof value === 'string' ? value : JSON.stringify(value); + } + return ''; + }); +} + +function metadataMatches( + required: Record, + metadata?: Record, +): boolean { + if (!metadata) { + return false; + } + for (const [path, expected] of Object.entries(required)) { + const actual = getNestedValue(metadata, path); + if (actual === undefined || actual === null) { + return false; + } + if (String(actual) !== expected) { + return false; + } + } + return true; +} + +function ruleMatches(rule: AutomationReactionConfig, event: ReactionEvent): boolean { + if (!rule.enabled) { + return false; + } + if (rule.on.length > 0 && !rule.on.includes(event.channel)) { + return false; + } + + const filter = rule.filter; + if (!filter) { + return true; + } + + if (filter.contains) { + if (!event.text.toLowerCase().includes(filter.contains.toLowerCase())) { + return false; + } + } + + if (filter.regex) { + let regex: RegExp; + try { + regex = new RegExp(filter.regex, 'i'); + } catch { + return false; + } + if (!regex.test(event.text)) { + return false; + } + } + + if (filter.metadata && !metadataMatches(filter.metadata, event.metadata)) { + return false; + } + + return true; +} + +/** Find the first matching reaction rule and render its run template. */ +export function matchReactionPrompt( + reactions: AutomationReactionConfig[], + event: ReactionEvent, +): ReactionMatchResult | null { + for (const rule of reactions) { + if (!ruleMatches(rule, event)) { + continue; + } + return { + name: rule.name, + prompt: renderTemplate(rule.run, event), + }; + } + return null; +} diff --git a/src/config/schema.test.ts b/src/config/schema.test.ts index 8cf89e7..7dee0eb 100644 --- a/src/config/schema.test.ts +++ b/src/config/schema.test.ts @@ -1082,6 +1082,7 @@ describe('configSchema automation', () => { const result = configSchema.parse(baseConfig); expect(result.automation).toBeDefined(); expect(result.automation.delivery_mode).toBe('shared_session'); + expect(result.automation.reactions).toEqual([]); expect(result.automation.cron).toEqual([]); expect(result.automation.daily_briefing.enabled).toBe(false); expect(result.automation.daily_briefing.schedule).toBe('0 8 * * *'); @@ -1129,6 +1130,36 @@ describe('configSchema automation', () => { expect(result.automation.cron[0].enabled).toBe(true); // default }); + it('accepts reactions config with filters', () => { + const result = configSchema.parse({ + ...baseConfig, + automation: { + reactions: [{ + name: 'boss-email', + on: ['gmail'], + filter: { + contains: 'boss@company.com', + regex: 'urgent|asap', + metadata: { from: 'boss@company.com' }, + }, + run: 'Summarize and propose next actions:\n\n{{text}}', + }], + }, + }); + expect(result.automation.reactions).toHaveLength(1); + expect(result.automation.reactions[0]).toMatchObject({ + name: 'boss-email', + enabled: true, + on: ['gmail'], + run: 'Summarize and propose next actions:\n\n{{text}}', + }); + expect(result.automation.reactions[0].filter).toMatchObject({ + contains: 'boss@company.com', + regex: 'urgent|asap', + metadata: { from: 'boss@company.com' }, + }); + }); + it('rejects cron job with empty name', () => { expect(() => configSchema.parse({ ...baseConfig, diff --git a/src/config/schema.ts b/src/config/schema.ts index b4a6346..0e315e9 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -257,6 +257,25 @@ const mcpSchema = z.object({ const modelTierEnum = z.enum(['fast', 'default', 'complex', 'local']); +const reactionFilterSchema = z.object({ + /** Case-insensitive substring match against inbound message text. */ + contains: z.string().optional(), + /** Case-insensitive regex match against inbound message text. */ + regex: z.string().optional(), + /** Dot-path metadata constraints (exact string comparison). */ + metadata: z.record(z.string(), z.string()).optional(), +}).optional(); + +const automationReactionSchema = z.object({ + name: z.string().min(1, 'Reaction name is required'), + enabled: z.boolean().default(true), + /** Source channels/events this rule applies to (e.g. gmail, webhook). */ + on: z.array(z.string().min(1)).default([]), + filter: reactionFilterSchema, + /** Prompt template to run when matched. Supports {{text}}, {{channel}}, {{sender_id}}, {{metadata.*}}. */ + run: z.string().min(1, 'Reaction run template is required'), +}); + const cronJobSchema = z.object({ name: z.string().min(1, 'Cron job name is required'), schedule: z.string().min(1, 'Cron schedule is required'), @@ -422,6 +441,7 @@ const automationDeliveryModeSchema = z.enum(['shared_session', 'isolated_job', ' const automationSchema = z.object({ /** Session strategy for automation-triggered runs (cron/webhooks/gmail). */ delivery_mode: automationDeliveryModeSchema.default('shared_session'), + reactions: z.array(automationReactionSchema).default([]), cron: z.array(cronJobSchema).default([]), webhooks: z.array(webhookSchema).default([]), gmail: gmailSchema, @@ -999,6 +1019,7 @@ export type DailyBriefingConfig = z.infer; export type MinioSyncTaskConfig = z.infer; export type MinioSyncAutomationConfig = z.infer; export type AutomationDeliveryMode = z.infer; +export type AutomationReactionConfig = z.infer; export type PairingCodeConfig = z.infer; export type LogLevel = z.infer; export type AuditConfig = z.infer; diff --git a/src/daemon/routing.test.ts b/src/daemon/routing.test.ts index 3d8922e..a64a034 100644 --- a/src/daemon/routing.test.ts +++ b/src/daemon/routing.test.ts @@ -1272,6 +1272,142 @@ describe('daemon tts routing integration', () => { }); }); +describe('daemon reactions routing integration', () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('rewrites automation event prompts when a reaction rule matches', async () => { + const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process').mockResolvedValue('ok'); + + const session = { + id: 'gmail:reaction-user-1', + addMessage: vi.fn(), + getHistory: vi.fn(() => []), + clear: vi.fn(), + replaceHistory: vi.fn(), + getConfig: vi.fn(() => undefined), + setConfig: vi.fn(), + deleteConfig: vi.fn(), + }; + + const router = createMessageRouter({ + sessionManager: { getSession: vi.fn(() => session) } as unknown as MessageRouterDeps['sessionManager'], + modelRouter: { + getAvailableTiers: () => ['default'], + getAllLabels: () => ({ default: 'default' }), + getLabel: (tier: string) => tier, + } as unknown as MessageRouterDeps['modelRouter'], + systemPrompt: 'test prompt', + toolRegistry: { clone() { return this; }, register: vi.fn() } as unknown as MessageRouterDeps['toolRegistry'], + toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'], + config: { + agents: { + primary_tier: 'default', + delegation: { + compaction: 'default', + memory_extraction: 'default', + classification: 'default', + tool_summarisation: 'default', + complex_reasoning: 'default', + }, + max_delegation_depth: 1, + max_iterations: 3, + }, + automation: { + reactions: [{ + name: 'boss-email', + enabled: true, + on: ['gmail'], + filter: { contains: 'boss@company.com' }, + run: 'Summarize and suggest next steps:\n\n{{text}}', + }], + }, + compaction: { enabled: false }, + models: { default: { provider: 'anthropic', model: 'claude' } }, + } as unknown as MessageRouterDeps['config'], + }); + + await router.handler({ + id: 'r1', + channel: 'gmail', + senderId: 'reaction-user-1', + text: 'New email from boss@company.com: Please share timeline', + timestamp: Date.now(), + } as MessageRouterInput, vi.fn(async (_message: OutboundMessage) => {})); + + expect(processSpy).toHaveBeenCalledTimes(1); + const [prompt] = processSpy.mock.calls[0] ?? []; + expect(prompt).toBe( + 'Summarize and suggest next steps:\n\nNew email from boss@company.com: Please share timeline', + ); + }); + + it('keeps original prompt when no reaction rule matches', async () => { + const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process').mockResolvedValue('ok'); + + const session = { + id: 'gmail:reaction-user-2', + addMessage: vi.fn(), + getHistory: vi.fn(() => []), + clear: vi.fn(), + replaceHistory: vi.fn(), + getConfig: vi.fn(() => undefined), + setConfig: vi.fn(), + deleteConfig: vi.fn(), + }; + + const router = createMessageRouter({ + sessionManager: { getSession: vi.fn(() => session) } as unknown as MessageRouterDeps['sessionManager'], + modelRouter: { + getAvailableTiers: () => ['default'], + getAllLabels: () => ({ default: 'default' }), + getLabel: (tier: string) => tier, + } as unknown as MessageRouterDeps['modelRouter'], + systemPrompt: 'test prompt', + toolRegistry: { clone() { return this; }, register: vi.fn() } as unknown as MessageRouterDeps['toolRegistry'], + toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'], + config: { + agents: { + primary_tier: 'default', + delegation: { + compaction: 'default', + memory_extraction: 'default', + classification: 'default', + tool_summarisation: 'default', + complex_reasoning: 'default', + }, + max_delegation_depth: 1, + max_iterations: 3, + }, + automation: { + reactions: [{ + name: 'boss-email', + enabled: true, + on: ['gmail'], + filter: { contains: 'boss@company.com' }, + run: 'Summarize: {{text}}', + }], + }, + compaction: { enabled: false }, + models: { default: { provider: 'anthropic', model: 'claude' } }, + } as unknown as MessageRouterDeps['config'], + }); + + await router.handler({ + id: 'r2', + channel: 'gmail', + senderId: 'reaction-user-2', + text: 'New email from teammate@company.com: FYI', + timestamp: Date.now(), + } as MessageRouterInput, vi.fn(async (_message: OutboundMessage) => {})); + + expect(processSpy).toHaveBeenCalledTimes(1); + const [prompt] = processSpy.mock.calls[0] ?? []; + expect(prompt).toBe('New email from teammate@company.com: FYI'); + }); +}); + describe('daemon auto-escalate integration', () => { afterEach(() => { vi.restoreAllMocks(); diff --git a/src/daemon/routing.ts b/src/daemon/routing.ts index ad00ff0..ee4a53e 100644 --- a/src/daemon/routing.ts +++ b/src/daemon/routing.ts @@ -21,6 +21,7 @@ import type { CommandRegistry } from '../commands/index.js'; import type { ComponentRegistry } from '../intents/index.js'; import type { RoutingPolicy } from '../routing/index.js'; import { createClientFromConfig } from './models.js'; +import { matchReactionPrompt } from '../automation/reactions.js'; import type { SkillRegistry } from '../skills/index.js'; import { auditLogger } from '../audit/index.js'; import { randomUUID } from 'crypto'; @@ -373,6 +374,7 @@ export function createMessageRouter(deps: { const handler = async (msg: InboundMessage, reply: (response: OutboundMessage) => Promise): Promise => { let incomingText = msg.text; + let matchedReactionName: string | undefined; const talkMode = deps.config.audio?.talk_mode; if (talkMode?.enabled && incomingText.trim().length > 0) { const key = `${msg.channel}:${msg.senderId}`; @@ -422,6 +424,20 @@ export function createMessageRouter(deps: { } } + const automationReactions = deps.config.automation?.reactions ?? []; + if (!msg.metadata?.isCommand && automationReactions.length > 0) { + const reactionMatch = matchReactionPrompt(automationReactions, { + channel: msg.channel, + senderId: msg.senderId, + text: incomingText, + metadata: msg.metadata, + }); + if (reactionMatch) { + matchedReactionName = reactionMatch.name; + incomingText = reactionMatch.prompt; + } + } + let intentAgentOverride: string | undefined; let intentSkillOverride: string | undefined; if (!deps.config.intents?.enabled && deps.agentConfigRegistry?.get('research')) { @@ -475,6 +491,7 @@ export function createMessageRouter(deps: { const effectiveMetadata = { ...(msg.metadata ?? {}), ...(intentSkillOverride ? { skillOverride: intentSkillOverride } : {}), + ...(matchedReactionName ? { automationReaction: matchedReactionName } : {}), }; const agentConfigName = intentAgentOverride ?? deps.agentRouter?.resolve(msg.channel, msg.senderId);