feat(automation): add isolated job delivery mode

This commit is contained in:
William Valentin
2026-02-15 19:23:15 -08:00
parent 0470647ee7
commit 421942f66d
13 changed files with 183 additions and 11 deletions
+15
View File
@@ -78,6 +78,21 @@ describe('CronScheduler', () => {
expect(messages[0].text).toBe('Hello from cron');
});
it('uses isolated sender IDs when delivery mode is isolated_job', async () => {
const jobs = [makeCronJob()];
scheduler = new CronScheduler(jobs, mockChannelRegistry as any, 'isolated_job');
const messages: InboundMessage[] = [];
scheduler.onMessage((msg: InboundMessage) => messages.push(msg));
await scheduler.connect();
scheduler.triggerJob('test-job');
expect(messages).toHaveLength(1);
expect(messages[0].senderId).toMatch(/^test-job:run-/);
expect(messages[0].metadata?.replyPeerId).toBe('test-job');
expect(messages[0].metadata?.deliveryMode).toBe('isolated_job');
});
it('forwards response to output channel on send()', async () => {
const mockOutputAdapter = {
send: vi.fn().mockResolvedValue(undefined),
+15 -2
View File
@@ -2,12 +2,15 @@ import { Cron } from 'croner';
import type { CronJobConfig } from '../config/schema.js';
import type { ChannelAdapter, ChannelStatus, InboundMessage, OutboundMessage } from '../channels/types.js';
import { auditLogger } from '../audit/index.js';
import { randomUUID } from 'crypto';
/** Minimal interface for the parts of ChannelRegistry we need. */
interface ChannelLookup {
get(name: string): { send(peerId: string, message: OutboundMessage): Promise<void> } | undefined;
}
type DeliveryMode = 'shared_session' | 'isolated_job';
export class CronScheduler implements ChannelAdapter {
readonly name = 'cron';
private _status: ChannelStatus = 'disconnected';
@@ -18,6 +21,7 @@ export class CronScheduler implements ChannelAdapter {
constructor(
private readonly jobConfigs: CronJobConfig[],
private readonly channelLookup: ChannelLookup,
private readonly deliveryMode: DeliveryMode = 'shared_session',
) {
for (const job of jobConfigs) {
this.jobs.set(job.name, job);
@@ -85,15 +89,24 @@ export class CronScheduler implements ChannelAdapter {
triggerJob(jobName: string): void {
const job = this.jobs.get(jobName);
if (!job) {return;}
const runId = `run-${randomUUID()}`;
const senderId = this.deliveryMode === 'isolated_job' ? `${jobName}:${runId}` : jobName;
const msg: InboundMessage = {
id: `cron-${jobName}-${Date.now()}`,
channel: 'cron',
senderId: jobName,
senderId,
senderName: `cron:${jobName}`,
text: job.message,
timestamp: Date.now(),
metadata: { cronJob: jobName, scheduled: true, modelTier: job.model_tier },
metadata: {
cronJob: jobName,
scheduled: true,
modelTier: job.model_tier,
deliveryMode: this.deliveryMode,
runId,
replyPeerId: jobName,
},
};
auditLogger?.cronTrigger({
+19
View File
@@ -114,6 +114,25 @@ describe('WebhookHandler', () => {
expect(messages[0].text).toBe('hello world');
});
it('handleRequest uses isolated sender IDs when delivery mode is isolated_job', async () => {
const webhooks = [makeWebhook()];
handler = new WebhookHandler(webhooks, mockChannelRegistry as any, 'isolated_job');
const messages: InboundMessage[] = [];
handler.onMessage((msg: InboundMessage) => messages.push(msg));
await handler.connect();
const req = mockRequest('hello world');
const res = mockResponse();
const result = await handler.handleRequest('test-hook', req, res);
expect(result).toBe(true);
expect(messages).toHaveLength(1);
expect(messages[0].senderId).toMatch(/^test-hook:run-/);
expect(messages[0].metadata?.replyPeerId).toBe('test-hook');
expect(messages[0].metadata?.deliveryMode).toBe('isolated_job');
});
it('returns false for unknown webhook', async () => {
handler = new WebhookHandler([], mockChannelRegistry as any);
await handler.connect();
+14 -3
View File
@@ -1,4 +1,4 @@
import { createHmac, timingSafeEqual } from 'crypto';
import { createHmac, randomUUID, timingSafeEqual } from 'crypto';
import type { IncomingMessage, ServerResponse } from 'http';
import type { WebhookConfig } from '../config/schema.js';
import type { ChannelAdapter, ChannelStatus, InboundMessage, OutboundMessage } from '../channels/types.js';
@@ -9,6 +9,8 @@ interface ChannelLookup {
get(name: string): { send(peerId: string, message: OutboundMessage): Promise<void> } | undefined;
}
type DeliveryMode = 'shared_session' | 'isolated_job';
/** Read the full request body as a string. */
function readBody(req: IncomingMessage): Promise<string> {
return new Promise((resolve, reject) => {
@@ -68,6 +70,7 @@ export class WebhookHandler implements ChannelAdapter {
constructor(
private readonly webhookConfigs: WebhookConfig[],
private readonly channelLookup: ChannelLookup,
private readonly deliveryMode: DeliveryMode = 'shared_session',
) {
for (const webhook of webhookConfigs) {
this.webhooks.set(webhook.name, webhook);
@@ -151,15 +154,23 @@ export class WebhookHandler implements ChannelAdapter {
// Render message template
const text = renderTemplate(webhook.message, body);
const runId = `run-${randomUUID()}`;
const senderId = this.deliveryMode === 'isolated_job' ? `${webhookName}:${runId}` : webhookName;
const msg: InboundMessage = {
id: `webhook-${webhookName}-${Date.now()}`,
channel: 'webhook',
senderId: webhookName,
senderId,
senderName: `webhook:${webhookName}`,
text,
timestamp: Date.now(),
metadata: { webhookName, body },
metadata: {
webhookName,
body,
deliveryMode: this.deliveryMode,
runId,
replyPeerId: webhookName,
},
};
auditLogger?.webhookReceive({
+23
View File
@@ -132,6 +132,29 @@ describe('ChannelRegistry', () => {
expect(adapter.sendFn).toHaveBeenCalledWith('user-42', { text: 'pong' });
});
it('routes reply using metadata.replyPeerId when provided', async () => {
const adapter = createMockAdapter('test-channel');
registry.register(adapter);
const handler = vi.fn(async (_msg: InboundMessage, reply: (r: OutboundMessage) => Promise<void>) => {
await reply({ text: 'pong' });
});
registry.setMessageHandler(handler);
const msg = {
...makeMessage('test-channel'),
senderId: 'isolated-run-1',
metadata: { replyPeerId: 'job-a' },
};
adapter.triggerMessage(msg);
await vi.waitFor(() => {
expect(handler).toHaveBeenCalledOnce();
});
expect(adapter.sendFn).toHaveBeenCalledWith('job-a', { text: 'pong' });
});
it('unregisters adapter', () => {
const adapter = createMockAdapter('removeme');
registry.register(adapter);
+5 -1
View File
@@ -105,7 +105,11 @@ export class ChannelRegistry {
// Create a reply function bound to this message's channel and sender
const reply = async (response: OutboundMessage): Promise<void> => {
await adapter.send(msg.senderId, response);
const metadata = msg.metadata as Record<string, unknown> | undefined;
const replyPeerId = typeof metadata?.replyPeerId === 'string' && metadata.replyPeerId.length > 0
? metadata.replyPeerId
: msg.senderId;
await adapter.send(replyPeerId, response);
};
// Fire and forget — errors are logged, not propagated
+11
View File
@@ -306,9 +306,20 @@ describe('configSchema automation', () => {
it('accepts config without automation section', () => {
const result = configSchema.parse(baseConfig);
expect(result.automation).toBeDefined();
expect(result.automation.delivery_mode).toBe('shared_session');
expect(result.automation.cron).toEqual([]);
});
it('accepts isolated automation delivery mode', () => {
const result = configSchema.parse({
...baseConfig,
automation: {
delivery_mode: 'isolated_job',
},
});
expect(result.automation.delivery_mode).toBe('isolated_job');
});
it('accepts config with cron jobs', () => {
const result = configSchema.parse({
...baseConfig,
+5
View File
@@ -255,7 +255,11 @@ const gtasksSchema = z.object({
token_file: z.string().default('~/.config/flynn/gtasks-token.json'),
}).optional();
const automationDeliveryModeSchema = z.enum(['shared_session', 'isolated_job']);
const automationSchema = z.object({
/** Session strategy for automation-triggered runs (cron/webhooks/gmail). */
delivery_mode: automationDeliveryModeSchema.default('shared_session'),
cron: z.array(cronJobSchema).default([]),
webhooks: z.array(webhookSchema).default([]),
gmail: gmailSchema,
@@ -593,6 +597,7 @@ export type GcalConfig = z.infer<typeof gcalSchema>;
export type GdocsConfig = z.infer<typeof gdocsSchema>;
export type GdriveConfig = z.infer<typeof gdriveSchema>;
export type GtasksConfig = z.infer<typeof gtasksSchema>;
export type AutomationDeliveryMode = z.infer<typeof automationDeliveryModeSchema>;
export type PairingCodeConfig = z.infer<typeof pairingSchema>;
export type LogLevel = z.infer<typeof logLevelSchema>;
export type AuditConfig = z.infer<typeof auditSchema>;
+2 -2
View File
@@ -91,7 +91,7 @@ export function registerChannels(deps: ChannelsDeps): ChannelsResult {
// Register cron scheduler adapter (if any cron jobs configured)
let cronScheduler: CronScheduler | undefined;
if (config.automation.cron.length > 0) {
cronScheduler = new CronScheduler(config.automation.cron, channelRegistry);
cronScheduler = new CronScheduler(config.automation.cron, channelRegistry, config.automation.delivery_mode);
channelRegistry.register(cronScheduler);
console.log(`Registered ${config.automation.cron.length} cron job(s)`);
}
@@ -99,7 +99,7 @@ export function registerChannels(deps: ChannelsDeps): ChannelsResult {
// Register webhook handler adapter (if any webhooks configured)
let webhookHandler: WebhookHandler | undefined;
if (config.automation.webhooks.length > 0) {
webhookHandler = new WebhookHandler(config.automation.webhooks, channelRegistry);
webhookHandler = new WebhookHandler(config.automation.webhooks, channelRegistry, config.automation.delivery_mode);
channelRegistry.register(webhookHandler);
gateway.setWebhookHandler(webhookHandler);
console.log(`Registered ${config.automation.webhooks.length} webhook(s)`);