From b9e008ea237c51f733bee6ae0cc972b2aca952c2 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Thu, 5 Feb 2026 22:22:13 -0800 Subject: [PATCH] feat(automation): add CronScheduler channel adapter Implements CronScheduler as a ChannelAdapter that fires InboundMessages on cron schedules and routes agent responses to configured output channels (e.g. Telegram). Includes 9 tests. --- src/automation/cron.test.ts | 134 ++++++++++++++++++++++++++++++++++++ src/automation/cron.ts | 103 +++++++++++++++++++++++++++ src/automation/index.ts | 1 + 3 files changed, 238 insertions(+) create mode 100644 src/automation/cron.test.ts create mode 100644 src/automation/cron.ts create mode 100644 src/automation/index.ts diff --git a/src/automation/cron.test.ts b/src/automation/cron.test.ts new file mode 100644 index 0000000..2a3bf2e --- /dev/null +++ b/src/automation/cron.test.ts @@ -0,0 +1,134 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { CronScheduler } from './cron.js'; +import type { CronJobConfig } from '../config/schema.js'; +import type { InboundMessage } from '../channels/types.js'; + +function makeCronJob(overrides?: Partial): CronJobConfig { + return { + name: 'test-job', + schedule: '0 9 * * *', + message: 'Hello from cron', + output: { channel: 'telegram', peer: '123' }, + enabled: true, + ...overrides, + }; +} + +describe('CronScheduler', () => { + let scheduler: CronScheduler; + let mockChannelRegistry: { get: ReturnType }; + + beforeEach(() => { + mockChannelRegistry = { + get: vi.fn(), + }; + }); + + afterEach(async () => { + if (scheduler) { + await scheduler.disconnect(); + } + }); + + it('implements ChannelAdapter interface', () => { + scheduler = new CronScheduler([], mockChannelRegistry as any); + expect(scheduler.name).toBe('cron'); + expect(scheduler.status).toBe('disconnected'); + }); + + it('status changes to connected after connect()', async () => { + scheduler = new CronScheduler([], mockChannelRegistry as any); + await scheduler.connect(); + expect(scheduler.status).toBe('connected'); + }); + + it('status changes to disconnected after disconnect()', async () => { + scheduler = new CronScheduler([], mockChannelRegistry as any); + await scheduler.connect(); + await scheduler.disconnect(); + expect(scheduler.status).toBe('disconnected'); + }); + + it('skips disabled jobs', async () => { + const jobs = [makeCronJob({ enabled: false })]; + scheduler = new CronScheduler(jobs, mockChannelRegistry as any); + + const messages: InboundMessage[] = []; + scheduler.onMessage((msg: InboundMessage) => messages.push(msg)); + + await scheduler.connect(); + // Disabled job should not fire + expect(messages).toHaveLength(0); + }); + + it('fires a message when triggerJob is called', async () => { + const jobs = [makeCronJob()]; + scheduler = new CronScheduler(jobs, mockChannelRegistry as any); + + const messages: InboundMessage[] = []; + scheduler.onMessage((msg: InboundMessage) => messages.push(msg)); + await scheduler.connect(); + + // Manually trigger (simulates cron firing) + scheduler.triggerJob('test-job'); + + expect(messages).toHaveLength(1); + expect(messages[0].channel).toBe('cron'); + expect(messages[0].senderId).toBe('test-job'); + expect(messages[0].text).toBe('Hello from cron'); + }); + + it('forwards response to output channel on send()', async () => { + const mockOutputAdapter = { + send: vi.fn().mockResolvedValue(undefined), + }; + mockChannelRegistry.get.mockReturnValue(mockOutputAdapter); + + const jobs = [makeCronJob()]; + scheduler = new CronScheduler(jobs, mockChannelRegistry as any); + await scheduler.connect(); + + await scheduler.send('test-job', { text: 'Agent response' }); + + expect(mockChannelRegistry.get).toHaveBeenCalledWith('telegram'); + expect(mockOutputAdapter.send).toHaveBeenCalledWith('123', { text: 'Agent response' }); + }); + + it('logs warning when output channel not found', async () => { + mockChannelRegistry.get.mockReturnValue(undefined); + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + + const jobs = [makeCronJob()]; + scheduler = new CronScheduler(jobs, mockChannelRegistry as any); + await scheduler.connect(); + + await scheduler.send('test-job', { text: 'Agent response' }); + + expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining('Output channel')); + warnSpy.mockRestore(); + }); + + it('logs warning when job name not found in send()', async () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + + const jobs = [makeCronJob()]; + scheduler = new CronScheduler(jobs, mockChannelRegistry as any); + await scheduler.connect(); + + await scheduler.send('nonexistent-job', { text: 'response' }); + + expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining('No cron job')); + warnSpy.mockRestore(); + }); + + it('lists registered job names', () => { + const jobs = [ + makeCronJob({ name: 'job-a' }), + makeCronJob({ name: 'job-b', enabled: false }), + ]; + scheduler = new CronScheduler(jobs, mockChannelRegistry as any); + + const names = scheduler.getJobNames(); + expect(names).toEqual(['job-a', 'job-b']); + }); +}); diff --git a/src/automation/cron.ts b/src/automation/cron.ts new file mode 100644 index 0000000..a34a4ee --- /dev/null +++ b/src/automation/cron.ts @@ -0,0 +1,103 @@ +import { Cron } from 'croner'; +import type { CronJobConfig } from '../config/schema.js'; +import type { ChannelAdapter, ChannelStatus, InboundMessage, OutboundMessage } from '../channels/types.js'; + +/** Minimal interface for the parts of ChannelRegistry we need. */ +interface ChannelLookup { + get(name: string): { send(peerId: string, message: OutboundMessage): Promise } | undefined; +} + +export class CronScheduler implements ChannelAdapter { + readonly name = 'cron'; + private _status: ChannelStatus = 'disconnected'; + private messageHandler?: (msg: InboundMessage) => void; + private cronInstances: Map = new Map(); + private jobs: Map = new Map(); + + constructor( + private readonly jobConfigs: CronJobConfig[], + private readonly channelLookup: ChannelLookup, + ) { + for (const job of jobConfigs) { + this.jobs.set(job.name, job); + } + } + + get status(): ChannelStatus { + return this._status; + } + + async connect(): Promise { + this._status = 'connected'; + + for (const job of this.jobConfigs) { + if (!job.enabled) continue; + + const cronInstance = new Cron(job.schedule, { + timezone: job.timezone, + paused: false, + }, () => { + this.triggerJob(job.name); + }); + + this.cronInstances.set(job.name, cronInstance); + } + + const enabledCount = this.jobConfigs.filter(j => j.enabled).length; + if (enabledCount > 0) { + console.log(`CronScheduler: ${enabledCount} job(s) scheduled`); + } + } + + async disconnect(): Promise { + for (const [, cron] of this.cronInstances) { + cron.stop(); + } + this.cronInstances.clear(); + this._status = 'disconnected'; + } + + async send(peerId: string, message: OutboundMessage): Promise { + // peerId is the cron job name — look up its output config + const job = this.jobs.get(peerId); + if (!job) { + console.warn(`No cron job found for '${peerId}'`); + return; + } + + const outputAdapter = this.channelLookup.get(job.output.channel); + if (!outputAdapter) { + console.warn(`Output channel '${job.output.channel}' not found for cron job '${peerId}'`); + return; + } + + await outputAdapter.send(job.output.peer, message); + } + + onMessage(handler: (msg: InboundMessage) => void): void { + this.messageHandler = handler; + } + + /** Manually trigger a job (also called by cron on schedule). */ + triggerJob(jobName: string): void { + const job = this.jobs.get(jobName); + if (!job) return; + + const msg: InboundMessage = { + id: `cron-${jobName}-${Date.now()}`, + channel: 'cron', + senderId: jobName, + senderName: `cron:${jobName}`, + text: job.message, + timestamp: Date.now(), + metadata: { cronJob: jobName, scheduled: true }, + }; + + this.messageHandler?.(msg); + } + + /** Get list of all job names (enabled and disabled). */ + getJobNames(): string[] { + return Array.from(this.jobs.keys()); + } +} diff --git a/src/automation/index.ts b/src/automation/index.ts new file mode 100644 index 0000000..b751da3 --- /dev/null +++ b/src/automation/index.ts @@ -0,0 +1 @@ +export { CronScheduler } from './cron.js';