feat: add automation reactions event-trigger layer

This commit is contained in:
William Valentin
2026-02-18 10:26:40 -08:00
parent a71aa5992d
commit f341149ac7
10 changed files with 483 additions and 1 deletions
+1
View File
@@ -5,4 +5,5 @@ export { HeartbeatMonitor, parseInterval } from './heartbeat.js';
export type { HeartbeatResult, HeartbeatDeps, CheckResult } from './heartbeat.js';
export { buildPresetCronJobs } from './presets.js';
export { MinioSyncScheduler } from './minioSync.js';
export { matchReactionPrompt } from './reactions.js';
export type { MinioSyncSchedulerDeps } from './minioSync.js';
+93
View File
@@ -0,0 +1,93 @@
import { describe, expect, it } from 'vitest';
import { matchReactionPrompt } from './reactions.js';
import type { AutomationReactionConfig } from '../config/schema.js';
function makeRule(overrides: Partial<AutomationReactionConfig> & Pick<AutomationReactionConfig, 'name' | 'run'>): AutomationReactionConfig {
return {
name: overrides.name,
enabled: overrides.enabled ?? true,
on: overrides.on ?? ['gmail'],
filter: overrides.filter,
run: overrides.run,
};
}
describe('matchReactionPrompt', () => {
it('matches channel + contains filter and renders text template', () => {
const rules: AutomationReactionConfig[] = [
makeRule({
name: 'boss-email',
on: ['gmail'],
filter: { contains: 'boss@company.com' },
run: 'Summarize this email and propose next actions:\n\n{{text}}',
}),
];
const result = matchReactionPrompt(rules, {
channel: 'gmail',
senderId: 'watcher',
text: 'New email from boss@company.com: Q1 plan',
metadata: { from: 'boss@company.com' },
});
expect(result).toEqual({
name: 'boss-email',
prompt: 'Summarize this email and propose next actions:\n\nNew email from boss@company.com: Q1 plan',
});
});
it('matches metadata paths and supports metadata templating', () => {
const rules: AutomationReactionConfig[] = [
makeRule({
name: 'github-push',
on: ['webhook'],
filter: { metadata: { 'webhookName': 'github', 'body.repository.full_name': 'acme/app' } },
run: 'New push on {{metadata.body.repository.full_name}} from {{metadata.body.pusher.name}}',
}),
];
const result = matchReactionPrompt(rules, {
channel: 'webhook',
senderId: 'github',
text: 'raw webhook payload',
metadata: {
webhookName: 'github',
body: {
repository: { full_name: 'acme/app' },
pusher: { name: 'will' },
},
},
});
expect(result).toEqual({
name: 'github-push',
prompt: 'New push on acme/app from will',
});
});
it('returns null on no match or invalid regex', () => {
const rules: AutomationReactionConfig[] = [
makeRule({
name: 'bad',
on: ['gmail'],
filter: { regex: '[' },
run: 'x',
}),
makeRule({
name: 'different-channel',
on: ['webhook'],
run: 'x',
}),
];
const result = matchReactionPrompt(rules, {
channel: 'gmail',
senderId: 'watcher',
text: 'hello',
metadata: {},
});
expect(result).toBeNull();
});
});
+122
View File
@@ -0,0 +1,122 @@
import type { AutomationReactionConfig } from '../config/schema.js';
export interface ReactionEvent {
channel: string;
senderId: string;
text: string;
metadata?: Record<string, unknown>;
}
export interface ReactionMatchResult {
name: string;
prompt: string;
}
function getNestedValue(record: Record<string, unknown>, path: string): unknown {
const segments = path.split('.').filter(Boolean);
let current: unknown = record;
for (const segment of segments) {
if (!current || typeof current !== 'object') {
return undefined;
}
current = (current as Record<string, unknown>)[segment];
}
return current;
}
function renderTemplate(template: string, event: ReactionEvent): string {
return template.replace(/\{\{\s*([^}]+)\s*\}\}/g, (_full, rawKey: string) => {
const key = rawKey.trim();
if (key === 'text') {
return event.text;
}
if (key === 'channel') {
return event.channel;
}
if (key === 'sender_id') {
return event.senderId;
}
if (key.startsWith('metadata.')) {
const value = getNestedValue(event.metadata ?? {}, key.slice('metadata.'.length));
if (value === undefined || value === null) {
return '';
}
return typeof value === 'string' ? value : JSON.stringify(value);
}
return '';
});
}
function metadataMatches(
required: Record<string, string>,
metadata?: Record<string, unknown>,
): boolean {
if (!metadata) {
return false;
}
for (const [path, expected] of Object.entries(required)) {
const actual = getNestedValue(metadata, path);
if (actual === undefined || actual === null) {
return false;
}
if (String(actual) !== expected) {
return false;
}
}
return true;
}
function ruleMatches(rule: AutomationReactionConfig, event: ReactionEvent): boolean {
if (!rule.enabled) {
return false;
}
if (rule.on.length > 0 && !rule.on.includes(event.channel)) {
return false;
}
const filter = rule.filter;
if (!filter) {
return true;
}
if (filter.contains) {
if (!event.text.toLowerCase().includes(filter.contains.toLowerCase())) {
return false;
}
}
if (filter.regex) {
let regex: RegExp;
try {
regex = new RegExp(filter.regex, 'i');
} catch {
return false;
}
if (!regex.test(event.text)) {
return false;
}
}
if (filter.metadata && !metadataMatches(filter.metadata, event.metadata)) {
return false;
}
return true;
}
/** Find the first matching reaction rule and render its run template. */
export function matchReactionPrompt(
reactions: AutomationReactionConfig[],
event: ReactionEvent,
): ReactionMatchResult | null {
for (const rule of reactions) {
if (!ruleMatches(rule, event)) {
continue;
}
return {
name: rule.name,
prompt: renderTemplate(rule.run, event),
};
}
return null;
}
+31
View File
@@ -1082,6 +1082,7 @@ describe('configSchema automation', () => {
const result = configSchema.parse(baseConfig);
expect(result.automation).toBeDefined();
expect(result.automation.delivery_mode).toBe('shared_session');
expect(result.automation.reactions).toEqual([]);
expect(result.automation.cron).toEqual([]);
expect(result.automation.daily_briefing.enabled).toBe(false);
expect(result.automation.daily_briefing.schedule).toBe('0 8 * * *');
@@ -1129,6 +1130,36 @@ describe('configSchema automation', () => {
expect(result.automation.cron[0].enabled).toBe(true); // default
});
it('accepts reactions config with filters', () => {
const result = configSchema.parse({
...baseConfig,
automation: {
reactions: [{
name: 'boss-email',
on: ['gmail'],
filter: {
contains: 'boss@company.com',
regex: 'urgent|asap',
metadata: { from: 'boss@company.com' },
},
run: 'Summarize and propose next actions:\n\n{{text}}',
}],
},
});
expect(result.automation.reactions).toHaveLength(1);
expect(result.automation.reactions[0]).toMatchObject({
name: 'boss-email',
enabled: true,
on: ['gmail'],
run: 'Summarize and propose next actions:\n\n{{text}}',
});
expect(result.automation.reactions[0].filter).toMatchObject({
contains: 'boss@company.com',
regex: 'urgent|asap',
metadata: { from: 'boss@company.com' },
});
});
it('rejects cron job with empty name', () => {
expect(() => configSchema.parse({
...baseConfig,
+21
View File
@@ -257,6 +257,25 @@ const mcpSchema = z.object({
const modelTierEnum = z.enum(['fast', 'default', 'complex', 'local']);
const reactionFilterSchema = z.object({
/** Case-insensitive substring match against inbound message text. */
contains: z.string().optional(),
/** Case-insensitive regex match against inbound message text. */
regex: z.string().optional(),
/** Dot-path metadata constraints (exact string comparison). */
metadata: z.record(z.string(), z.string()).optional(),
}).optional();
const automationReactionSchema = z.object({
name: z.string().min(1, 'Reaction name is required'),
enabled: z.boolean().default(true),
/** Source channels/events this rule applies to (e.g. gmail, webhook). */
on: z.array(z.string().min(1)).default([]),
filter: reactionFilterSchema,
/** Prompt template to run when matched. Supports {{text}}, {{channel}}, {{sender_id}}, {{metadata.*}}. */
run: z.string().min(1, 'Reaction run template is required'),
});
const cronJobSchema = z.object({
name: z.string().min(1, 'Cron job name is required'),
schedule: z.string().min(1, 'Cron schedule is required'),
@@ -422,6 +441,7 @@ const automationDeliveryModeSchema = z.enum(['shared_session', 'isolated_job', '
const automationSchema = z.object({
/** Session strategy for automation-triggered runs (cron/webhooks/gmail). */
delivery_mode: automationDeliveryModeSchema.default('shared_session'),
reactions: z.array(automationReactionSchema).default([]),
cron: z.array(cronJobSchema).default([]),
webhooks: z.array(webhookSchema).default([]),
gmail: gmailSchema,
@@ -999,6 +1019,7 @@ export type DailyBriefingConfig = z.infer<typeof dailyBriefingSchema>;
export type MinioSyncTaskConfig = z.infer<typeof minioSyncTaskSchema>;
export type MinioSyncAutomationConfig = z.infer<typeof minioSyncAutomationSchema>;
export type AutomationDeliveryMode = z.infer<typeof automationDeliveryModeSchema>;
export type AutomationReactionConfig = z.infer<typeof automationReactionSchema>;
export type PairingCodeConfig = z.infer<typeof pairingSchema>;
export type LogLevel = z.infer<typeof logLevelSchema>;
export type AuditConfig = z.infer<typeof auditSchema>;
+136
View File
@@ -1272,6 +1272,142 @@ describe('daemon tts routing integration', () => {
});
});
describe('daemon reactions routing integration', () => {
afterEach(() => {
vi.restoreAllMocks();
});
it('rewrites automation event prompts when a reaction rule matches', async () => {
const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process').mockResolvedValue('ok');
const session = {
id: 'gmail:reaction-user-1',
addMessage: vi.fn(),
getHistory: vi.fn(() => []),
clear: vi.fn(),
replaceHistory: vi.fn(),
getConfig: vi.fn(() => undefined),
setConfig: vi.fn(),
deleteConfig: vi.fn(),
};
const router = createMessageRouter({
sessionManager: { getSession: vi.fn(() => session) } as unknown as MessageRouterDeps['sessionManager'],
modelRouter: {
getAvailableTiers: () => ['default'],
getAllLabels: () => ({ default: 'default' }),
getLabel: (tier: string) => tier,
} as unknown as MessageRouterDeps['modelRouter'],
systemPrompt: 'test prompt',
toolRegistry: { clone() { return this; }, register: vi.fn() } as unknown as MessageRouterDeps['toolRegistry'],
toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'],
config: {
agents: {
primary_tier: 'default',
delegation: {
compaction: 'default',
memory_extraction: 'default',
classification: 'default',
tool_summarisation: 'default',
complex_reasoning: 'default',
},
max_delegation_depth: 1,
max_iterations: 3,
},
automation: {
reactions: [{
name: 'boss-email',
enabled: true,
on: ['gmail'],
filter: { contains: 'boss@company.com' },
run: 'Summarize and suggest next steps:\n\n{{text}}',
}],
},
compaction: { enabled: false },
models: { default: { provider: 'anthropic', model: 'claude' } },
} as unknown as MessageRouterDeps['config'],
});
await router.handler({
id: 'r1',
channel: 'gmail',
senderId: 'reaction-user-1',
text: 'New email from boss@company.com: Please share timeline',
timestamp: Date.now(),
} as MessageRouterInput, vi.fn(async (_message: OutboundMessage) => {}));
expect(processSpy).toHaveBeenCalledTimes(1);
const [prompt] = processSpy.mock.calls[0] ?? [];
expect(prompt).toBe(
'Summarize and suggest next steps:\n\nNew email from boss@company.com: Please share timeline',
);
});
it('keeps original prompt when no reaction rule matches', async () => {
const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process').mockResolvedValue('ok');
const session = {
id: 'gmail:reaction-user-2',
addMessage: vi.fn(),
getHistory: vi.fn(() => []),
clear: vi.fn(),
replaceHistory: vi.fn(),
getConfig: vi.fn(() => undefined),
setConfig: vi.fn(),
deleteConfig: vi.fn(),
};
const router = createMessageRouter({
sessionManager: { getSession: vi.fn(() => session) } as unknown as MessageRouterDeps['sessionManager'],
modelRouter: {
getAvailableTiers: () => ['default'],
getAllLabels: () => ({ default: 'default' }),
getLabel: (tier: string) => tier,
} as unknown as MessageRouterDeps['modelRouter'],
systemPrompt: 'test prompt',
toolRegistry: { clone() { return this; }, register: vi.fn() } as unknown as MessageRouterDeps['toolRegistry'],
toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'],
config: {
agents: {
primary_tier: 'default',
delegation: {
compaction: 'default',
memory_extraction: 'default',
classification: 'default',
tool_summarisation: 'default',
complex_reasoning: 'default',
},
max_delegation_depth: 1,
max_iterations: 3,
},
automation: {
reactions: [{
name: 'boss-email',
enabled: true,
on: ['gmail'],
filter: { contains: 'boss@company.com' },
run: 'Summarize: {{text}}',
}],
},
compaction: { enabled: false },
models: { default: { provider: 'anthropic', model: 'claude' } },
} as unknown as MessageRouterDeps['config'],
});
await router.handler({
id: 'r2',
channel: 'gmail',
senderId: 'reaction-user-2',
text: 'New email from teammate@company.com: FYI',
timestamp: Date.now(),
} as MessageRouterInput, vi.fn(async (_message: OutboundMessage) => {}));
expect(processSpy).toHaveBeenCalledTimes(1);
const [prompt] = processSpy.mock.calls[0] ?? [];
expect(prompt).toBe('New email from teammate@company.com: FYI');
});
});
describe('daemon auto-escalate integration', () => {
afterEach(() => {
vi.restoreAllMocks();
+17
View File
@@ -21,6 +21,7 @@ import type { CommandRegistry } from '../commands/index.js';
import type { ComponentRegistry } from '../intents/index.js';
import type { RoutingPolicy } from '../routing/index.js';
import { createClientFromConfig } from './models.js';
import { matchReactionPrompt } from '../automation/reactions.js';
import type { SkillRegistry } from '../skills/index.js';
import { auditLogger } from '../audit/index.js';
import { randomUUID } from 'crypto';
@@ -373,6 +374,7 @@ export function createMessageRouter(deps: {
const handler = async (msg: InboundMessage, reply: (response: OutboundMessage) => Promise<void>): Promise<void> => {
let incomingText = msg.text;
let matchedReactionName: string | undefined;
const talkMode = deps.config.audio?.talk_mode;
if (talkMode?.enabled && incomingText.trim().length > 0) {
const key = `${msg.channel}:${msg.senderId}`;
@@ -422,6 +424,20 @@ export function createMessageRouter(deps: {
}
}
const automationReactions = deps.config.automation?.reactions ?? [];
if (!msg.metadata?.isCommand && automationReactions.length > 0) {
const reactionMatch = matchReactionPrompt(automationReactions, {
channel: msg.channel,
senderId: msg.senderId,
text: incomingText,
metadata: msg.metadata,
});
if (reactionMatch) {
matchedReactionName = reactionMatch.name;
incomingText = reactionMatch.prompt;
}
}
let intentAgentOverride: string | undefined;
let intentSkillOverride: string | undefined;
if (!deps.config.intents?.enabled && deps.agentConfigRegistry?.get('research')) {
@@ -475,6 +491,7 @@ export function createMessageRouter(deps: {
const effectiveMetadata = {
...(msg.metadata ?? {}),
...(intentSkillOverride ? { skillOverride: intentSkillOverride } : {}),
...(matchedReactionName ? { automationReaction: matchedReactionName } : {}),
};
const agentConfigName = intentAgentOverride ?? deps.agentRouter?.resolve(msg.channel, msg.senderId);