diff --git a/README.md b/README.md index 0830833..c47c913 100644 --- a/README.md +++ b/README.md @@ -373,8 +373,13 @@ hooks: Schedule automated messages on cron schedules. Each job fires an inbound message through the agent pipeline and routes the response to a configured output channel. +Set `automation.delivery_mode` to control automation session behavior: +- `shared_session` (default): reuse one session per cron job/webhook name. +- `isolated_job`: create a fresh session per cron trigger/webhook request. + ```yaml automation: + delivery_mode: shared_session cron: - name: daily-summary schedule: "0 9 * * *" # 9 AM daily @@ -399,6 +404,7 @@ automation: | Field | Required | Description | |-------|----------|-------------| +| `automation.delivery_mode` | no | Automation session strategy: `shared_session` or `isolated_job` (default: `shared_session`) | | `name` | yes | Unique job identifier | | `schedule` | yes | Cron expression (standard 5-field) | | `message` | yes | Text sent to the agent when the job fires | @@ -414,6 +420,7 @@ HTTP endpoints that trigger agent processing. Each webhook accepts POST requests ```yaml automation: + delivery_mode: shared_session webhooks: - name: github-push secret: "whsec_..." # HMAC secret for signature verification @@ -435,6 +442,7 @@ Webhooks are available at `POST /webhooks/:name` on the gateway HTTP server. The | Field | Required | Description | |-------|----------|-------------| +| `automation.delivery_mode` | no | Automation session strategy: `shared_session` or `isolated_job` (default: `shared_session`) | | `name` | yes | Unique webhook identifier (used in URL path) | | `secret` | no | HMAC secret for `X-Webhook-Signature` header verification (SHA-256) | | `message` | no | Template for the message sent to the agent (default: `{{body}}`) | diff --git a/config/default.yaml b/config/default.yaml index b5eb68f..4273c12 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -119,6 +119,9 @@ hooks: # Uncomment and configure any automation sources you need. # automation: +# # shared_session: keep one session per cron job/webhook name. +# # isolated_job: create a fresh session per cron trigger/webhook request. +# delivery_mode: shared_session # cron: # - name: daily-summary # schedule: "0 9 * * *" diff --git a/docs/plans/2026-02-16-announce-delivery-mode-checklist.md b/docs/plans/2026-02-16-announce-delivery-mode-checklist.md new file mode 100644 index 0000000..e283173 --- /dev/null +++ b/docs/plans/2026-02-16-announce-delivery-mode-checklist.md @@ -0,0 +1,36 @@ +# Announce Delivery Mode Checklist + +Date: 2026-02-16 +Status: completed + +## Scope + +- Add a first-class automation delivery mode for cron/webhook runs. +- Support isolated per-run sessions while preserving outbound reply routing. + +## Completed + +- Added `automation.delivery_mode` config enum in `src/config/schema.ts`: + - `shared_session` (default) + - `isolated_job` +- Implemented reply routing override in `src/channels/registry.ts` via `metadata.replyPeerId`. +- Updated `CronScheduler` and `WebhookHandler` to: + - emit unique sender IDs per run when `delivery_mode=isolated_job` + - include `metadata.replyPeerId` to keep output routing stable + - include delivery metadata (`deliveryMode`, `runId`) for traceability +- Wired delivery mode through channel registration in `src/daemon/channels.ts`. +- Updated docs: + - `README.md` automation sections + - `config/default.yaml` commented template +- Added tests: + - `src/channels/registry.test.ts` + - `src/automation/cron.test.ts` + - `src/automation/webhooks.test.ts` + - `src/config/schema.test.ts` + +## Verification + +- `pnpm test:run src/channels/registry.test.ts` +- `pnpm test:run src/automation/cron.test.ts src/automation/webhooks.test.ts` +- `pnpm test:run src/config/schema.test.ts` +- `pnpm typecheck` diff --git a/docs/plans/state.json b/docs/plans/state.json index e17c3dc..5f8aa7c 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -179,6 +179,30 @@ ], "test_status": "pnpm test:run src/config/schema.test.ts src/daemon/clientFactory.test.ts src/cli/setup/providers.test.ts src/cli/doctor.test.ts + pnpm typecheck passing" }, + "announce-delivery-mode": { + "file": "2026-02-16-announce-delivery-mode-checklist.md", + "status": "completed", + "date": "2026-02-16", + "updated": "2026-02-16", + "summary": "Implemented automation delivery mode with isolated job sessions for cron/webhooks (`automation.delivery_mode=isolated_job`) while preserving outbound routing via metadata reply peer IDs.", + "files_created": [ + "docs/plans/2026-02-16-announce-delivery-mode-checklist.md" + ], + "files_modified": [ + "src/config/schema.ts", + "src/config/schema.test.ts", + "src/channels/registry.ts", + "src/channels/registry.test.ts", + "src/automation/cron.ts", + "src/automation/cron.test.ts", + "src/automation/webhooks.ts", + "src/automation/webhooks.test.ts", + "src/daemon/channels.ts", + "README.md", + "config/default.yaml" + ], + "test_status": "pnpm test:run src/channels/registry.test.ts src/automation/cron.test.ts src/automation/webhooks.test.ts src/config/schema.test.ts + pnpm typecheck passing" + }, "skill-safety-scanner": { "file": "2026-02-15-skill-safety-scanner-checklist.md", "status": "completed", @@ -2208,7 +2232,7 @@ }, "overall_progress": { - "total_test_count": 1694, + "total_test_count": 1698, "all_tests_passing": true, "p0_completion": "3/3 (100%)", "p1_completion": "4/4 (100%)", @@ -2223,12 +2247,12 @@ "tier2_completion": "4/4 (100%) — inbound webhooks, vector memory search, Dockerfile, heartbeat monitor", "tier3_completion": "5/5 (100%) — lane queue, credential redaction, web UI token dashboard, xAI (Grok) provider, Voyage AI embeddings", "tier4_completion": "4/4 (100%) — gateway lock, shell completion, Tailscale Serve/Funnel, DM pairing codes", - "feature_gap_scorecard": "105/128 match (82%), 0 partial (0%), 23 missing (18%)", + "feature_gap_scorecard": "106/128 match (83%), 0 partial (0%), 22 missing (17%)", "operator_dx_milestone": "Phase 3 (Live Ops Dashboard): 2/2 plans complete — milestone done", "gmail_auth_cli": "flynn gmail-auth command implemented with OAuth2 flow, doctor check, config routed to Telegram", "native_audio_support": "completed — smart routing for native audio (Gemini/OpenAI/GitHub) vs Whisper transcription fallback", "remaining_phases_completion": "Phase 1: 3/3 (100%) — context levels, command registry, memory structure. Phase 2: 3/3 (100%) — component registry, confidence routing, history index. Phase 3: 2/2 (100%) — adaptive memory/compaction, truthfulness/autonomy hardening", - "next_up": "Pick the next OpenClaw gap milestone and create a scoped checklist (candidates: announce delivery mode, presence tracking, QMD backend)" + "next_up": "Pick the next OpenClaw gap milestone and create a scoped checklist (candidates: presence tracking, QMD backend, ClawHub registry)" }, "soul_md_and_cron_create": { "date": "2026-02-11", diff --git a/src/automation/cron.test.ts b/src/automation/cron.test.ts index a543e21..cc52911 100644 --- a/src/automation/cron.test.ts +++ b/src/automation/cron.test.ts @@ -78,6 +78,21 @@ describe('CronScheduler', () => { expect(messages[0].text).toBe('Hello from cron'); }); + it('uses isolated sender IDs when delivery mode is isolated_job', async () => { + const jobs = [makeCronJob()]; + scheduler = new CronScheduler(jobs, mockChannelRegistry as any, 'isolated_job'); + + const messages: InboundMessage[] = []; + scheduler.onMessage((msg: InboundMessage) => messages.push(msg)); + await scheduler.connect(); + scheduler.triggerJob('test-job'); + + expect(messages).toHaveLength(1); + expect(messages[0].senderId).toMatch(/^test-job:run-/); + expect(messages[0].metadata?.replyPeerId).toBe('test-job'); + expect(messages[0].metadata?.deliveryMode).toBe('isolated_job'); + }); + it('forwards response to output channel on send()', async () => { const mockOutputAdapter = { send: vi.fn().mockResolvedValue(undefined), diff --git a/src/automation/cron.ts b/src/automation/cron.ts index 4b33754..a25f49e 100644 --- a/src/automation/cron.ts +++ b/src/automation/cron.ts @@ -2,12 +2,15 @@ import { Cron } from 'croner'; import type { CronJobConfig } from '../config/schema.js'; import type { ChannelAdapter, ChannelStatus, InboundMessage, OutboundMessage } from '../channels/types.js'; import { auditLogger } from '../audit/index.js'; +import { randomUUID } from 'crypto'; /** Minimal interface for the parts of ChannelRegistry we need. */ interface ChannelLookup { get(name: string): { send(peerId: string, message: OutboundMessage): Promise } | undefined; } +type DeliveryMode = 'shared_session' | 'isolated_job'; + export class CronScheduler implements ChannelAdapter { readonly name = 'cron'; private _status: ChannelStatus = 'disconnected'; @@ -18,6 +21,7 @@ export class CronScheduler implements ChannelAdapter { constructor( private readonly jobConfigs: CronJobConfig[], private readonly channelLookup: ChannelLookup, + private readonly deliveryMode: DeliveryMode = 'shared_session', ) { for (const job of jobConfigs) { this.jobs.set(job.name, job); @@ -85,15 +89,24 @@ export class CronScheduler implements ChannelAdapter { triggerJob(jobName: string): void { const job = this.jobs.get(jobName); if (!job) {return;} + const runId = `run-${randomUUID()}`; + const senderId = this.deliveryMode === 'isolated_job' ? `${jobName}:${runId}` : jobName; const msg: InboundMessage = { id: `cron-${jobName}-${Date.now()}`, channel: 'cron', - senderId: jobName, + senderId, senderName: `cron:${jobName}`, text: job.message, timestamp: Date.now(), - metadata: { cronJob: jobName, scheduled: true, modelTier: job.model_tier }, + metadata: { + cronJob: jobName, + scheduled: true, + modelTier: job.model_tier, + deliveryMode: this.deliveryMode, + runId, + replyPeerId: jobName, + }, }; auditLogger?.cronTrigger({ diff --git a/src/automation/webhooks.test.ts b/src/automation/webhooks.test.ts index 0f2c07b..c9165b9 100644 --- a/src/automation/webhooks.test.ts +++ b/src/automation/webhooks.test.ts @@ -114,6 +114,25 @@ describe('WebhookHandler', () => { expect(messages[0].text).toBe('hello world'); }); + it('handleRequest uses isolated sender IDs when delivery mode is isolated_job', async () => { + const webhooks = [makeWebhook()]; + handler = new WebhookHandler(webhooks, mockChannelRegistry as any, 'isolated_job'); + + const messages: InboundMessage[] = []; + handler.onMessage((msg: InboundMessage) => messages.push(msg)); + await handler.connect(); + + const req = mockRequest('hello world'); + const res = mockResponse(); + const result = await handler.handleRequest('test-hook', req, res); + + expect(result).toBe(true); + expect(messages).toHaveLength(1); + expect(messages[0].senderId).toMatch(/^test-hook:run-/); + expect(messages[0].metadata?.replyPeerId).toBe('test-hook'); + expect(messages[0].metadata?.deliveryMode).toBe('isolated_job'); + }); + it('returns false for unknown webhook', async () => { handler = new WebhookHandler([], mockChannelRegistry as any); await handler.connect(); diff --git a/src/automation/webhooks.ts b/src/automation/webhooks.ts index 52549c7..4a52762 100644 --- a/src/automation/webhooks.ts +++ b/src/automation/webhooks.ts @@ -1,4 +1,4 @@ -import { createHmac, timingSafeEqual } from 'crypto'; +import { createHmac, randomUUID, timingSafeEqual } from 'crypto'; import type { IncomingMessage, ServerResponse } from 'http'; import type { WebhookConfig } from '../config/schema.js'; import type { ChannelAdapter, ChannelStatus, InboundMessage, OutboundMessage } from '../channels/types.js'; @@ -9,6 +9,8 @@ interface ChannelLookup { get(name: string): { send(peerId: string, message: OutboundMessage): Promise } | undefined; } +type DeliveryMode = 'shared_session' | 'isolated_job'; + /** Read the full request body as a string. */ function readBody(req: IncomingMessage): Promise { return new Promise((resolve, reject) => { @@ -68,6 +70,7 @@ export class WebhookHandler implements ChannelAdapter { constructor( private readonly webhookConfigs: WebhookConfig[], private readonly channelLookup: ChannelLookup, + private readonly deliveryMode: DeliveryMode = 'shared_session', ) { for (const webhook of webhookConfigs) { this.webhooks.set(webhook.name, webhook); @@ -151,15 +154,23 @@ export class WebhookHandler implements ChannelAdapter { // Render message template const text = renderTemplate(webhook.message, body); + const runId = `run-${randomUUID()}`; + const senderId = this.deliveryMode === 'isolated_job' ? `${webhookName}:${runId}` : webhookName; const msg: InboundMessage = { id: `webhook-${webhookName}-${Date.now()}`, channel: 'webhook', - senderId: webhookName, + senderId, senderName: `webhook:${webhookName}`, text, timestamp: Date.now(), - metadata: { webhookName, body }, + metadata: { + webhookName, + body, + deliveryMode: this.deliveryMode, + runId, + replyPeerId: webhookName, + }, }; auditLogger?.webhookReceive({ diff --git a/src/channels/registry.test.ts b/src/channels/registry.test.ts index 1355b20..3bc1546 100644 --- a/src/channels/registry.test.ts +++ b/src/channels/registry.test.ts @@ -132,6 +132,29 @@ describe('ChannelRegistry', () => { expect(adapter.sendFn).toHaveBeenCalledWith('user-42', { text: 'pong' }); }); + it('routes reply using metadata.replyPeerId when provided', async () => { + const adapter = createMockAdapter('test-channel'); + registry.register(adapter); + + const handler = vi.fn(async (_msg: InboundMessage, reply: (r: OutboundMessage) => Promise) => { + await reply({ text: 'pong' }); + }); + registry.setMessageHandler(handler); + + const msg = { + ...makeMessage('test-channel'), + senderId: 'isolated-run-1', + metadata: { replyPeerId: 'job-a' }, + }; + adapter.triggerMessage(msg); + + await vi.waitFor(() => { + expect(handler).toHaveBeenCalledOnce(); + }); + + expect(adapter.sendFn).toHaveBeenCalledWith('job-a', { text: 'pong' }); + }); + it('unregisters adapter', () => { const adapter = createMockAdapter('removeme'); registry.register(adapter); diff --git a/src/channels/registry.ts b/src/channels/registry.ts index 8eb0fe7..22afd29 100644 --- a/src/channels/registry.ts +++ b/src/channels/registry.ts @@ -105,7 +105,11 @@ export class ChannelRegistry { // Create a reply function bound to this message's channel and sender const reply = async (response: OutboundMessage): Promise => { - await adapter.send(msg.senderId, response); + const metadata = msg.metadata as Record | undefined; + const replyPeerId = typeof metadata?.replyPeerId === 'string' && metadata.replyPeerId.length > 0 + ? metadata.replyPeerId + : msg.senderId; + await adapter.send(replyPeerId, response); }; // Fire and forget — errors are logged, not propagated diff --git a/src/config/schema.test.ts b/src/config/schema.test.ts index 82324d3..b0d498b 100644 --- a/src/config/schema.test.ts +++ b/src/config/schema.test.ts @@ -306,9 +306,20 @@ describe('configSchema automation', () => { it('accepts config without automation section', () => { const result = configSchema.parse(baseConfig); expect(result.automation).toBeDefined(); + expect(result.automation.delivery_mode).toBe('shared_session'); expect(result.automation.cron).toEqual([]); }); + it('accepts isolated automation delivery mode', () => { + const result = configSchema.parse({ + ...baseConfig, + automation: { + delivery_mode: 'isolated_job', + }, + }); + expect(result.automation.delivery_mode).toBe('isolated_job'); + }); + it('accepts config with cron jobs', () => { const result = configSchema.parse({ ...baseConfig, diff --git a/src/config/schema.ts b/src/config/schema.ts index 18118bd..829a68b 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -255,7 +255,11 @@ const gtasksSchema = z.object({ token_file: z.string().default('~/.config/flynn/gtasks-token.json'), }).optional(); +const automationDeliveryModeSchema = z.enum(['shared_session', 'isolated_job']); + const automationSchema = z.object({ + /** Session strategy for automation-triggered runs (cron/webhooks/gmail). */ + delivery_mode: automationDeliveryModeSchema.default('shared_session'), cron: z.array(cronJobSchema).default([]), webhooks: z.array(webhookSchema).default([]), gmail: gmailSchema, @@ -593,6 +597,7 @@ export type GcalConfig = z.infer; export type GdocsConfig = z.infer; export type GdriveConfig = z.infer; export type GtasksConfig = z.infer; +export type AutomationDeliveryMode = z.infer; export type PairingCodeConfig = z.infer; export type LogLevel = z.infer; export type AuditConfig = z.infer; diff --git a/src/daemon/channels.ts b/src/daemon/channels.ts index e49b69e..1cd1e98 100644 --- a/src/daemon/channels.ts +++ b/src/daemon/channels.ts @@ -91,7 +91,7 @@ export function registerChannels(deps: ChannelsDeps): ChannelsResult { // Register cron scheduler adapter (if any cron jobs configured) let cronScheduler: CronScheduler | undefined; if (config.automation.cron.length > 0) { - cronScheduler = new CronScheduler(config.automation.cron, channelRegistry); + cronScheduler = new CronScheduler(config.automation.cron, channelRegistry, config.automation.delivery_mode); channelRegistry.register(cronScheduler); console.log(`Registered ${config.automation.cron.length} cron job(s)`); } @@ -99,7 +99,7 @@ export function registerChannels(deps: ChannelsDeps): ChannelsResult { // Register webhook handler adapter (if any webhooks configured) let webhookHandler: WebhookHandler | undefined; if (config.automation.webhooks.length > 0) { - webhookHandler = new WebhookHandler(config.automation.webhooks, channelRegistry); + webhookHandler = new WebhookHandler(config.automation.webhooks, channelRegistry, config.automation.delivery_mode); channelRegistry.register(webhookHandler); gateway.setWebhookHandler(webhookHandler); console.log(`Registered ${config.automation.webhooks.length} webhook(s)`);