feat(automation): add CronScheduler channel adapter
Implements CronScheduler as a ChannelAdapter that fires InboundMessages on cron schedules and routes agent responses to configured output channels (e.g. Telegram). Includes 9 tests.
This commit is contained in:
@@ -0,0 +1,134 @@
|
||||
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
||||
import { CronScheduler } from './cron.js';
|
||||
import type { CronJobConfig } from '../config/schema.js';
|
||||
import type { InboundMessage } from '../channels/types.js';
|
||||
|
||||
function makeCronJob(overrides?: Partial<CronJobConfig>): CronJobConfig {
|
||||
return {
|
||||
name: 'test-job',
|
||||
schedule: '0 9 * * *',
|
||||
message: 'Hello from cron',
|
||||
output: { channel: 'telegram', peer: '123' },
|
||||
enabled: true,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
describe('CronScheduler', () => {
|
||||
let scheduler: CronScheduler;
|
||||
let mockChannelRegistry: { get: ReturnType<typeof vi.fn> };
|
||||
|
||||
beforeEach(() => {
|
||||
mockChannelRegistry = {
|
||||
get: vi.fn(),
|
||||
};
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
if (scheduler) {
|
||||
await scheduler.disconnect();
|
||||
}
|
||||
});
|
||||
|
||||
it('implements ChannelAdapter interface', () => {
|
||||
scheduler = new CronScheduler([], mockChannelRegistry as any);
|
||||
expect(scheduler.name).toBe('cron');
|
||||
expect(scheduler.status).toBe('disconnected');
|
||||
});
|
||||
|
||||
it('status changes to connected after connect()', async () => {
|
||||
scheduler = new CronScheduler([], mockChannelRegistry as any);
|
||||
await scheduler.connect();
|
||||
expect(scheduler.status).toBe('connected');
|
||||
});
|
||||
|
||||
it('status changes to disconnected after disconnect()', async () => {
|
||||
scheduler = new CronScheduler([], mockChannelRegistry as any);
|
||||
await scheduler.connect();
|
||||
await scheduler.disconnect();
|
||||
expect(scheduler.status).toBe('disconnected');
|
||||
});
|
||||
|
||||
it('skips disabled jobs', async () => {
|
||||
const jobs = [makeCronJob({ enabled: false })];
|
||||
scheduler = new CronScheduler(jobs, mockChannelRegistry as any);
|
||||
|
||||
const messages: InboundMessage[] = [];
|
||||
scheduler.onMessage((msg: InboundMessage) => messages.push(msg));
|
||||
|
||||
await scheduler.connect();
|
||||
// Disabled job should not fire
|
||||
expect(messages).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('fires a message when triggerJob is called', async () => {
|
||||
const jobs = [makeCronJob()];
|
||||
scheduler = new CronScheduler(jobs, mockChannelRegistry as any);
|
||||
|
||||
const messages: InboundMessage[] = [];
|
||||
scheduler.onMessage((msg: InboundMessage) => messages.push(msg));
|
||||
await scheduler.connect();
|
||||
|
||||
// Manually trigger (simulates cron firing)
|
||||
scheduler.triggerJob('test-job');
|
||||
|
||||
expect(messages).toHaveLength(1);
|
||||
expect(messages[0].channel).toBe('cron');
|
||||
expect(messages[0].senderId).toBe('test-job');
|
||||
expect(messages[0].text).toBe('Hello from cron');
|
||||
});
|
||||
|
||||
it('forwards response to output channel on send()', async () => {
|
||||
const mockOutputAdapter = {
|
||||
send: vi.fn().mockResolvedValue(undefined),
|
||||
};
|
||||
mockChannelRegistry.get.mockReturnValue(mockOutputAdapter);
|
||||
|
||||
const jobs = [makeCronJob()];
|
||||
scheduler = new CronScheduler(jobs, mockChannelRegistry as any);
|
||||
await scheduler.connect();
|
||||
|
||||
await scheduler.send('test-job', { text: 'Agent response' });
|
||||
|
||||
expect(mockChannelRegistry.get).toHaveBeenCalledWith('telegram');
|
||||
expect(mockOutputAdapter.send).toHaveBeenCalledWith('123', { text: 'Agent response' });
|
||||
});
|
||||
|
||||
it('logs warning when output channel not found', async () => {
|
||||
mockChannelRegistry.get.mockReturnValue(undefined);
|
||||
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});
|
||||
|
||||
const jobs = [makeCronJob()];
|
||||
scheduler = new CronScheduler(jobs, mockChannelRegistry as any);
|
||||
await scheduler.connect();
|
||||
|
||||
await scheduler.send('test-job', { text: 'Agent response' });
|
||||
|
||||
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining('Output channel'));
|
||||
warnSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('logs warning when job name not found in send()', async () => {
|
||||
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});
|
||||
|
||||
const jobs = [makeCronJob()];
|
||||
scheduler = new CronScheduler(jobs, mockChannelRegistry as any);
|
||||
await scheduler.connect();
|
||||
|
||||
await scheduler.send('nonexistent-job', { text: 'response' });
|
||||
|
||||
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining('No cron job'));
|
||||
warnSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('lists registered job names', () => {
|
||||
const jobs = [
|
||||
makeCronJob({ name: 'job-a' }),
|
||||
makeCronJob({ name: 'job-b', enabled: false }),
|
||||
];
|
||||
scheduler = new CronScheduler(jobs, mockChannelRegistry as any);
|
||||
|
||||
const names = scheduler.getJobNames();
|
||||
expect(names).toEqual(['job-a', 'job-b']);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,103 @@
|
||||
import { Cron } from 'croner';
|
||||
import type { CronJobConfig } from '../config/schema.js';
|
||||
import type { ChannelAdapter, ChannelStatus, InboundMessage, OutboundMessage } from '../channels/types.js';
|
||||
|
||||
/** Minimal interface for the parts of ChannelRegistry we need. */
|
||||
interface ChannelLookup {
|
||||
get(name: string): { send(peerId: string, message: OutboundMessage): Promise<void> } | undefined;
|
||||
}
|
||||
|
||||
export class CronScheduler implements ChannelAdapter {
|
||||
readonly name = 'cron';
|
||||
private _status: ChannelStatus = 'disconnected';
|
||||
private messageHandler?: (msg: InboundMessage) => void;
|
||||
private cronInstances: Map<string, Cron> = new Map();
|
||||
private jobs: Map<string, CronJobConfig> = new Map();
|
||||
|
||||
constructor(
|
||||
private readonly jobConfigs: CronJobConfig[],
|
||||
private readonly channelLookup: ChannelLookup,
|
||||
) {
|
||||
for (const job of jobConfigs) {
|
||||
this.jobs.set(job.name, job);
|
||||
}
|
||||
}
|
||||
|
||||
get status(): ChannelStatus {
|
||||
return this._status;
|
||||
}
|
||||
|
||||
async connect(): Promise<void> {
|
||||
this._status = 'connected';
|
||||
|
||||
for (const job of this.jobConfigs) {
|
||||
if (!job.enabled) continue;
|
||||
|
||||
const cronInstance = new Cron(job.schedule, {
|
||||
timezone: job.timezone,
|
||||
paused: false,
|
||||
}, () => {
|
||||
this.triggerJob(job.name);
|
||||
});
|
||||
|
||||
this.cronInstances.set(job.name, cronInstance);
|
||||
}
|
||||
|
||||
const enabledCount = this.jobConfigs.filter(j => j.enabled).length;
|
||||
if (enabledCount > 0) {
|
||||
console.log(`CronScheduler: ${enabledCount} job(s) scheduled`);
|
||||
}
|
||||
}
|
||||
|
||||
async disconnect(): Promise<void> {
|
||||
for (const [, cron] of this.cronInstances) {
|
||||
cron.stop();
|
||||
}
|
||||
this.cronInstances.clear();
|
||||
this._status = 'disconnected';
|
||||
}
|
||||
|
||||
async send(peerId: string, message: OutboundMessage): Promise<void> {
|
||||
// peerId is the cron job name — look up its output config
|
||||
const job = this.jobs.get(peerId);
|
||||
if (!job) {
|
||||
console.warn(`No cron job found for '${peerId}'`);
|
||||
return;
|
||||
}
|
||||
|
||||
const outputAdapter = this.channelLookup.get(job.output.channel);
|
||||
if (!outputAdapter) {
|
||||
console.warn(`Output channel '${job.output.channel}' not found for cron job '${peerId}'`);
|
||||
return;
|
||||
}
|
||||
|
||||
await outputAdapter.send(job.output.peer, message);
|
||||
}
|
||||
|
||||
onMessage(handler: (msg: InboundMessage) => void): void {
|
||||
this.messageHandler = handler;
|
||||
}
|
||||
|
||||
/** Manually trigger a job (also called by cron on schedule). */
|
||||
triggerJob(jobName: string): void {
|
||||
const job = this.jobs.get(jobName);
|
||||
if (!job) return;
|
||||
|
||||
const msg: InboundMessage = {
|
||||
id: `cron-${jobName}-${Date.now()}`,
|
||||
channel: 'cron',
|
||||
senderId: jobName,
|
||||
senderName: `cron:${jobName}`,
|
||||
text: job.message,
|
||||
timestamp: Date.now(),
|
||||
metadata: { cronJob: jobName, scheduled: true },
|
||||
};
|
||||
|
||||
this.messageHandler?.(msg);
|
||||
}
|
||||
|
||||
/** Get list of all job names (enabled and disabled). */
|
||||
getJobNames(): string[] {
|
||||
return Array.from(this.jobs.keys());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1 @@
|
||||
export { CronScheduler } from './cron.js';
|
||||
Reference in New Issue
Block a user