feat: add channel adapter abstraction with Telegram and WebChat adapters

Implement Phase 3 channel adapters that decouple message sources from
the agent via a uniform ChannelAdapter interface and ChannelRegistry.

- Add ChannelAdapter/InboundMessage/OutboundMessage types
- Add ChannelRegistry for adapter lifecycle and message routing
- Add TelegramAdapter (grammy bot, auth middleware, confirmations, chunking)
- Add WebChatAdapter (thin shim over GatewayServer)
- Refactor daemon to use ChannelRegistry with per-channel-per-user agents
- Add config.get/config.patch gateway handlers (Phase 2 loose end)
- Add system.restart gateway handler (Phase 2 loose end)
- Add implementation plans and design docs

Tests: 225 passing (33 new channel adapter + gateway handler tests)
This commit is contained in:
William Valentin
2026-02-05 20:00:36 -08:00
parent 282a15d2b9
commit aa95f2132c
19 changed files with 4123 additions and 37 deletions
+11
View File
@@ -0,0 +1,11 @@
export type {
ChannelAdapter,
ChannelStatus,
InboundMessage,
OutboundMessage,
ToolStatusEvent,
MessageHandler,
} from './types.js';
export { ChannelRegistry } from './registry.js';
export { TelegramAdapter, type TelegramAdapterConfig } from './telegram/index.js';
export { WebChatAdapter, type WebChatAdapterConfig } from './webchat/index.js';
+196
View File
@@ -0,0 +1,196 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { ChannelRegistry } from './registry.js';
import type { ChannelAdapter, InboundMessage, OutboundMessage } from './types.js';
/** Create a mock adapter with spy functions and a triggerMessage helper. */
function createMockAdapter(name: string): ChannelAdapter & {
connectFn: ReturnType<typeof vi.fn>;
disconnectFn: ReturnType<typeof vi.fn>;
sendFn: ReturnType<typeof vi.fn>;
triggerMessage: (msg: InboundMessage) => void;
} {
let messageHandler: ((msg: InboundMessage) => void) | undefined;
let _status: 'disconnected' | 'connecting' | 'connected' | 'error' = 'disconnected';
const connectFn = vi.fn(async () => { _status = 'connected'; });
const disconnectFn = vi.fn(async () => { _status = 'disconnected'; });
const sendFn = vi.fn(async () => {});
return {
name,
get status() { return _status; },
connect: connectFn,
disconnect: disconnectFn,
send: sendFn,
onMessage: (handler: (msg: InboundMessage) => void) => { messageHandler = handler; },
triggerMessage: (msg: InboundMessage) => { messageHandler?.(msg); },
connectFn,
disconnectFn,
sendFn,
};
}
/** Create a sample inbound message for a given channel. */
function makeMessage(channel: string): InboundMessage {
return {
id: 'msg-1',
channel,
senderId: 'user-42',
senderName: 'Alice',
text: 'Hello',
timestamp: Date.now(),
};
}
describe('ChannelRegistry', () => {
let registry: ChannelRegistry;
beforeEach(() => {
registry = new ChannelRegistry();
});
it('registers and lists adapters', () => {
const a1 = createMockAdapter('alpha');
const a2 = createMockAdapter('beta');
registry.register(a1);
registry.register(a2);
const listed = registry.list();
expect(listed).toHaveLength(2);
expect(listed.map((a) => a.name)).toContain('alpha');
expect(listed.map((a) => a.name)).toContain('beta');
});
it('throws on duplicate registration', () => {
const a1 = createMockAdapter('dup');
registry.register(a1);
const a2 = createMockAdapter('dup');
expect(() => registry.register(a2)).toThrow('already registered');
});
it('gets adapter by name', () => {
const adapter = createMockAdapter('test');
registry.register(adapter);
expect(registry.get('test')).toBe(adapter);
expect(registry.get('unknown')).toBeUndefined();
});
it('starts all adapters', async () => {
const a1 = createMockAdapter('one');
const a2 = createMockAdapter('two');
registry.register(a1);
registry.register(a2);
await registry.startAll();
expect(a1.connectFn).toHaveBeenCalledOnce();
expect(a2.connectFn).toHaveBeenCalledOnce();
});
it('stops all adapters', async () => {
const a1 = createMockAdapter('one');
const a2 = createMockAdapter('two');
registry.register(a1);
registry.register(a2);
// Connect first so they are in connected state
await a1.connect();
await a2.connect();
await registry.stopAll();
expect(a1.disconnectFn).toHaveBeenCalled();
expect(a2.disconnectFn).toHaveBeenCalled();
});
it('routes inbound messages to handler', async () => {
const adapter = createMockAdapter('test-channel');
registry.register(adapter);
const handler = vi.fn(async (_msg: InboundMessage, reply: (r: OutboundMessage) => Promise<void>) => {
await reply({ text: 'pong' });
});
registry.setMessageHandler(handler);
const msg = makeMessage('test-channel');
adapter.triggerMessage(msg);
// Allow the async handler to settle
await vi.waitFor(() => {
expect(handler).toHaveBeenCalledOnce();
});
// Handler receives the original inbound message
expect(handler.mock.calls[0][0]).toBe(msg);
// The reply function should have called adapter.send with the sender's peerId
expect(adapter.sendFn).toHaveBeenCalledWith('user-42', { text: 'pong' });
});
it('unregisters adapter', () => {
const adapter = createMockAdapter('removeme');
registry.register(adapter);
registry.unregister('removeme');
expect(registry.list()).toHaveLength(0);
expect(registry.get('removeme')).toBeUndefined();
});
it('unregister disconnects connected adapter', async () => {
const adapter = createMockAdapter('connected-one');
registry.register(adapter);
await adapter.connect();
expect(adapter.status).toBe('connected');
await registry.unregister('connected-one');
expect(adapter.disconnectFn).toHaveBeenCalled();
});
it('logs warning when no message handler set', () => {
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});
const adapter = createMockAdapter('no-handler');
registry.register(adapter);
// Trigger a message WITHOUT calling setMessageHandler
adapter.triggerMessage(makeMessage('no-handler'));
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining('No message handler set'),
);
warnSpy.mockRestore();
});
it('handles errors in message handler gracefully', async () => {
const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {});
const adapter = createMockAdapter('err-channel');
registry.register(adapter);
registry.setMessageHandler(async () => {
throw new Error('handler exploded');
});
// Trigger a message — should not throw
adapter.triggerMessage(makeMessage('err-channel'));
// Allow the async error path to settle
await vi.waitFor(() => {
expect(errorSpy).toHaveBeenCalledWith(
expect.stringContaining('Error handling message'),
expect.any(Error),
);
});
errorSpy.mockRestore();
});
});
+116
View File
@@ -0,0 +1,116 @@
/**
* Channel registry — manages adapter lifecycle and message routing.
*
* The ChannelRegistry holds all registered channel adapters and routes
* inbound messages through a single MessageHandler. Each adapter's
* onMessage callback is wired at registration time so that messages
* flow through handleInbound → messageHandler → reply.
*/
import type {
ChannelAdapter,
InboundMessage,
MessageHandler,
OutboundMessage,
} from './types.js';
export class ChannelRegistry {
private adapters: Map<string, ChannelAdapter> = new Map();
private messageHandler?: MessageHandler;
/** Register an adapter. Throws if name already registered. */
register(adapter: ChannelAdapter): void {
if (this.adapters.has(adapter.name)) {
throw new Error(`Channel adapter '${adapter.name}' is already registered`);
}
// Wire the adapter's onMessage to route through our messageHandler
adapter.onMessage((msg) => this.handleInbound(msg));
this.adapters.set(adapter.name, adapter);
}
/** Unregister an adapter by name. Calls disconnect() if connected. */
async unregister(name: string): Promise<void> {
const adapter = this.adapters.get(name);
if (!adapter) return;
if (adapter.status === 'connected' || adapter.status === 'connecting') {
await adapter.disconnect();
}
this.adapters.delete(name);
}
/** Get an adapter by name. */
get(name: string): ChannelAdapter | undefined {
return this.adapters.get(name);
}
/** List all registered adapters. */
list(): ChannelAdapter[] {
return Array.from(this.adapters.values());
}
/** Set the message handler that all adapters route inbound messages to. */
setMessageHandler(handler: MessageHandler): void {
this.messageHandler = handler;
}
/** Start all registered adapters. Logs errors per adapter, doesn't throw. */
async startAll(): Promise<void> {
const adapters = Array.from(this.adapters.values());
const results = await Promise.allSettled(
adapters.map((a) => a.connect()),
);
for (const [i, result] of results.entries()) {
if (result.status === 'rejected') {
console.error(
`Failed to start channel '${adapters[i].name}':`,
result.reason,
);
}
}
}
/** Stop all registered adapters. */
async stopAll(): Promise<void> {
const adapters = Array.from(this.adapters.values());
const results = await Promise.allSettled(
adapters.map((a) => a.disconnect()),
);
for (const [i, result] of results.entries()) {
if (result.status === 'rejected') {
console.error(
`Failed to stop channel '${adapters[i].name}':`,
result.reason,
);
}
}
}
/** Internal: route an inbound message to the message handler. */
private handleInbound(msg: InboundMessage): void {
if (!this.messageHandler) {
console.warn(`No message handler set, dropping message from '${msg.channel}'`);
return;
}
const adapter = this.adapters.get(msg.channel);
if (!adapter) {
console.warn(`Unknown channel '${msg.channel}' in inbound message`);
return;
}
// Create a reply function bound to this message's channel and sender
const reply = async (response: OutboundMessage): Promise<void> => {
await adapter.send(msg.senderId, response);
};
// Fire and forget — errors are logged, not propagated
this.messageHandler(msg, reply).catch((err: unknown) => {
console.error(`Error handling message from '${msg.channel}':`, err);
});
}
}
+256
View File
@@ -0,0 +1,256 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
// ── Mock grammy before importing adapter ──────────────────────────
const mockUse = vi.fn();
const mockOn = vi.fn();
const mockCommand = vi.fn();
const mockStart = vi.fn();
const mockStop = vi.fn();
const mockSendMessage = vi.fn();
vi.mock('grammy', () => ({
Bot: vi.fn().mockImplementation(() => ({
use: mockUse,
on: mockOn,
command: mockCommand,
start: mockStart,
stop: mockStop,
api: { sendMessage: mockSendMessage },
})),
}));
import { TelegramAdapter, type TelegramAdapterConfig } from './adapter.js';
import type { InboundMessage } from '../types.js';
const baseConfig: TelegramAdapterConfig = {
botToken: 'test-token-123',
allowedChatIds: [100, 200],
};
describe('TelegramAdapter', () => {
let adapter: TelegramAdapter;
beforeEach(() => {
vi.clearAllMocks();
adapter = new TelegramAdapter(baseConfig);
});
// ── Basic properties ──────────────────────────────────────────
it('has name "telegram"', () => {
expect(adapter.name).toBe('telegram');
});
it('starts as disconnected', () => {
expect(adapter.status).toBe('disconnected');
});
// ── connect / disconnect ──────────────────────────────────────
it('connect creates a bot and sets status to connected', async () => {
await adapter.connect();
expect(adapter.status).toBe('connected');
// Bot constructor called with the token
const { Bot } = await import('grammy');
expect(Bot).toHaveBeenCalledWith('test-token-123');
});
it('connect registers auth middleware, commands, and message handler', async () => {
await adapter.connect();
// .use() for auth middleware
expect(mockUse).toHaveBeenCalledTimes(1);
// .command() for /start and /reset
expect(mockCommand).toHaveBeenCalledTimes(2);
expect(mockCommand.mock.calls[0][0]).toBe('start');
expect(mockCommand.mock.calls[1][0]).toBe('reset');
// .on('message:text', ...) for text handler
expect(mockOn).toHaveBeenCalledWith('message:text', expect.any(Function));
// .start() to begin long polling
expect(mockStart).toHaveBeenCalledTimes(1);
});
it('connect registers callback_query handler when hookEngine is provided', async () => {
const hookEngine = { resolveConfirmation: vi.fn() };
const adapterWithHooks = new TelegramAdapter({
...baseConfig,
hookEngine: hookEngine as never,
});
await adapterWithHooks.connect();
// Should have .on('callback_query:data', ...) plus .on('message:text', ...)
expect(mockOn).toHaveBeenCalledWith('callback_query:data', expect.any(Function));
expect(mockOn).toHaveBeenCalledWith('message:text', expect.any(Function));
});
it('disconnect stops the bot and sets status to disconnected', async () => {
await adapter.connect();
expect(adapter.status).toBe('connected');
await adapter.disconnect();
expect(mockStop).toHaveBeenCalledTimes(1);
expect(adapter.status).toBe('disconnected');
});
it('disconnect is safe to call when not connected', async () => {
await adapter.disconnect();
expect(adapter.status).toBe('disconnected');
expect(mockStop).not.toHaveBeenCalled();
});
// ── send ──────────────────────────────────────────────────────
it('send throws when adapter is not connected', async () => {
await expect(adapter.send('100', { text: 'hello' })).rejects.toThrow(
'Telegram adapter not connected',
);
});
it('send delivers a short message in a single API call', async () => {
await adapter.connect();
await adapter.send('100', { text: 'Hello there' });
expect(mockSendMessage).toHaveBeenCalledTimes(1);
expect(mockSendMessage).toHaveBeenCalledWith(100, 'Hello there', { parse_mode: 'Markdown' });
});
it('send chunks a long message that exceeds 4096 chars', async () => {
await adapter.connect();
// Create a message that is longer than 4096 chars — two halves joined by a newline
const half = 'A'.repeat(3000);
const longMessage = `${half}\n${'B'.repeat(3000)}`;
await adapter.send('200', { text: longMessage });
// Should have been split into 2 chunks
expect(mockSendMessage.mock.calls.length).toBeGreaterThanOrEqual(2);
// Each call uses numeric chatId and parse_mode
for (const call of mockSendMessage.mock.calls) {
expect(call[0]).toBe(200);
expect(call[2]).toEqual({ parse_mode: 'Markdown' });
}
});
// ── onMessage / inbound handling ──────────────────────────────
it('onMessage registers a handler that receives text messages', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
await adapter.connect();
// Get the registered message:text handler from mockOn
const textHandlerCall = mockOn.mock.calls.find(
(call) => call[0] === 'message:text',
);
expect(textHandlerCall).toBeDefined();
const textHandler = textHandlerCall![1];
// Simulate a grammy context object
const ctx = {
message: { message_id: 42, text: 'Hello Flynn' },
chat: { id: 100 },
from: { first_name: 'Will' },
replyWithChatAction: vi.fn(),
};
await textHandler(ctx);
expect(ctx.replyWithChatAction).toHaveBeenCalledWith('typing');
expect(handler).toHaveBeenCalledTimes(1);
const msg: InboundMessage = handler.mock.calls[0][0];
expect(msg.channel).toBe('telegram');
expect(msg.senderId).toBe('100');
expect(msg.senderName).toBe('Will');
expect(msg.text).toBe('Hello Flynn');
expect(msg.id).toBe('42');
});
it('text handler does nothing when no message handler is registered', async () => {
// Don't call onMessage — no handler
await adapter.connect();
const textHandlerCall = mockOn.mock.calls.find(
(call) => call[0] === 'message:text',
);
const textHandler = textHandlerCall![1];
const ctx = {
message: { message_id: 1, text: 'test' },
chat: { id: 100 },
from: { first_name: 'Will' },
replyWithChatAction: vi.fn(),
};
// Should not throw
await textHandler(ctx);
expect(ctx.replyWithChatAction).not.toHaveBeenCalled();
});
// ── /reset command ────────────────────────────────────────────
it('/reset command delivers a reset inbound message', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
await adapter.connect();
// Find the /reset command handler
const resetCall = mockCommand.mock.calls.find((call) => call[0] === 'reset');
expect(resetCall).toBeDefined();
const resetHandler = resetCall![1];
const ctx = {
message: { message_id: 99 },
chat: { id: 100 },
from: { first_name: 'Will' },
reply: vi.fn(),
};
await resetHandler(ctx);
expect(ctx.reply).toHaveBeenCalledWith('Conversation reset.');
expect(handler).toHaveBeenCalledTimes(1);
const msg: InboundMessage = handler.mock.calls[0][0];
expect(msg.text).toBe('/reset');
expect(msg.metadata).toEqual({ isCommand: true, command: 'reset' });
});
// ── Auth middleware ───────────────────────────────────────────
it('auth middleware blocks unauthorized chat IDs', async () => {
await adapter.connect();
// The first .use() call is the auth middleware
const authMiddleware = mockUse.mock.calls[0][0];
const next = vi.fn();
const ctx = { chat: { id: 999 } }; // Not in allowedChatIds
await authMiddleware(ctx, next);
expect(next).not.toHaveBeenCalled();
});
it('auth middleware allows authorized chat IDs', async () => {
await adapter.connect();
const authMiddleware = mockUse.mock.calls[0][0];
const next = vi.fn();
const ctx = { chat: { id: 100 } }; // In allowedChatIds
await authMiddleware(ctx, next);
expect(next).toHaveBeenCalledTimes(1);
});
});
+208
View File
@@ -0,0 +1,208 @@
import { Bot } from 'grammy';
import type { HookEngine } from '../../hooks/index.js';
import type {
InboundMessage,
OutboundMessage,
ChannelAdapter,
ChannelStatus,
} from '../types.js';
import { isAllowedChat } from '../../frontends/telegram/handlers.js';
import { parseConfirmationCallback } from '../../frontends/telegram/confirmations.js';
/** Configuration for the Telegram channel adapter. */
export interface TelegramAdapterConfig {
botToken: string;
allowedChatIds: number[];
hookEngine?: HookEngine;
}
/**
* Split a long message into chunks that respect Telegram's 4096 char limit.
* Prefers splitting at newlines, then spaces, then hard-cuts.
*/
function splitMessage(text: string, maxLength: number): string[] {
const chunks: string[] = [];
let remaining = text;
while (remaining.length > 0) {
if (remaining.length <= maxLength) {
chunks.push(remaining);
break;
}
// Try to split at a newline within the allowed window
let splitIndex = remaining.lastIndexOf('\n', maxLength);
if (splitIndex === -1 || splitIndex < maxLength / 2) {
splitIndex = remaining.lastIndexOf(' ', maxLength);
}
if (splitIndex === -1 || splitIndex < maxLength / 2) {
splitIndex = maxLength;
}
chunks.push(remaining.slice(0, splitIndex));
remaining = remaining.slice(splitIndex).trimStart();
}
return chunks;
}
/**
* Telegram channel adapter backed by grammy.
*
* Handles authentication via allowed-chat-id filtering,
* confirmation callbacks (when a HookEngine is provided),
* and message chunking for Telegram's 4096-char limit.
*/
export class TelegramAdapter implements ChannelAdapter {
readonly name = 'telegram';
private _status: ChannelStatus = 'disconnected';
private bot: Bot | null = null;
private messageHandler?: (msg: InboundMessage) => void;
private config: TelegramAdapterConfig;
get status(): ChannelStatus {
return this._status;
}
constructor(config: TelegramAdapterConfig) {
this.config = config;
}
/** Register the inbound message handler. Called by the registry before connect(). */
onMessage(handler: (msg: InboundMessage) => void): void {
this.messageHandler = handler;
}
/** Create the grammy bot, wire up middleware & handlers, and start long-polling. */
async connect(): Promise<void> {
this.bot = new Bot(this.config.botToken);
this._status = 'connecting';
// ── Auth middleware — reject messages from unknown chats ──
this.bot.use(async (ctx, next) => {
const chatId = ctx.chat?.id;
if (chatId === undefined || !isAllowedChat(chatId, this.config.allowedChatIds)) {
console.log(`Rejected message from unauthorized chat: ${chatId}`);
return;
}
await next();
});
// ── Confirmation callback handler (requires hookEngine) ──
if (this.config.hookEngine) {
const hookEngine = this.config.hookEngine;
this.bot.on('callback_query:data', async (ctx) => {
const data = ctx.callbackQuery.data;
const parsed = parseConfirmationCallback(data);
if (!parsed) {
await ctx.answerCallbackQuery({ text: 'Invalid action' });
return;
}
const resolved = hookEngine.resolveConfirmation(parsed.id, {
approved: parsed.approved,
reason: parsed.approved ? undefined : 'Denied by user',
});
if (resolved) {
await ctx.answerCallbackQuery({
text: parsed.approved ? '✅ Approved' : '❌ Denied',
});
await ctx.editMessageText(
ctx.callbackQuery.message?.text + `\n\n${parsed.approved ? '✅ Approved' : '❌ Denied'}`,
{ parse_mode: 'Markdown' },
);
} else {
await ctx.answerCallbackQuery({ text: 'Confirmation expired or not found' });
}
});
}
// ── Command handlers ──
this.bot.command('start', async (ctx) => {
await ctx.reply('Flynn is ready. Send me a message!');
});
this.bot.command('reset', async (ctx) => {
// Deliver a special reset message through the channel
if (this.messageHandler) {
this.messageHandler({
id: String(ctx.message?.message_id ?? Date.now()),
channel: 'telegram',
senderId: String(ctx.chat.id),
senderName: ctx.from?.first_name,
text: '/reset',
timestamp: Date.now(),
metadata: { isCommand: true, command: 'reset' },
});
}
await ctx.reply('Conversation reset.');
});
// ── Text message handler ──
this.bot.on('message:text', async (ctx) => {
if (!this.messageHandler) return;
const text = ctx.message.text;
// Show typing indicator while processing
await ctx.replyWithChatAction('typing');
this.messageHandler({
id: String(ctx.message.message_id),
channel: 'telegram',
senderId: String(ctx.chat.id),
senderName: ctx.from?.first_name,
text,
timestamp: Date.now(),
});
});
// ── Start long polling ──
this.bot.start({
onStart: (botInfo) => {
console.log(`Telegram bot started: @${botInfo.username}`);
this._status = 'connected';
},
});
// bot.start() returns immediately for long polling.
// The onStart callback sets connected above; also set here for safety
// in case the callback fires before this line is reached.
this._status = 'connected';
}
/** Stop the bot and clean up. */
async disconnect(): Promise<void> {
if (this.bot) {
await this.bot.stop();
this.bot = null;
}
this._status = 'disconnected';
}
/** Send an outbound message, automatically chunking if it exceeds Telegram's limit. */
async send(peerId: string, message: OutboundMessage): Promise<void> {
if (!this.bot) throw new Error('Telegram adapter not connected');
const chatId = Number(peerId);
const text = message.text;
// Telegram enforces a 4096-character limit per message
if (text.length <= 4096) {
await this.bot.api.sendMessage(chatId, text, { parse_mode: 'Markdown' });
} else {
const chunks = splitMessage(text, 4096);
for (const chunk of chunks) {
await this.bot.api.sendMessage(chatId, chunk, { parse_mode: 'Markdown' });
}
}
}
}
+1
View File
@@ -0,0 +1 @@
export { TelegramAdapter, type TelegramAdapterConfig } from './adapter.js';
+75
View File
@@ -0,0 +1,75 @@
/**
* Channel adapter type definitions.
*
* Pure type definitions for the channel abstraction layer.
* Each channel adapter (Telegram, webchat, etc.) implements
* the ChannelAdapter interface to provide a uniform messaging API.
*/
/** Inbound message received from a channel platform. */
export interface InboundMessage {
/** Platform message ID. */
id: string;
/** Adapter name: "telegram", "webchat", etc. */
channel: string;
/** Platform user ID. */
senderId: string;
/** Display name (optional). */
senderName?: string;
/** Message text. */
text: string;
/** ID of message being replied to. */
replyTo?: string;
/** Unix ms. */
timestamp: number;
/** Platform-specific extras. */
metadata?: Record<string, unknown>;
}
/** Outbound message to send via a channel adapter. */
export interface OutboundMessage {
/** Response text (markdown). */
text: string;
/** Original message ID to reply to. */
replyTo?: string;
/** Platform-specific extras. */
metadata?: Record<string, unknown>;
}
/** Tool execution status event for streaming feedback. */
export interface ToolStatusEvent {
type: 'start' | 'end';
tool: string;
args?: unknown;
result?: { success: boolean; output: string; error?: string };
}
/** Connection status of a channel adapter. */
export type ChannelStatus = 'disconnected' | 'connecting' | 'connected' | 'error';
/** Uniform interface that every channel adapter must implement. */
export interface ChannelAdapter {
/** Unique channel name (e.g. "telegram", "webchat"). */
readonly name: string;
/** Current connection status. */
readonly status: ChannelStatus;
/** Start the adapter (connect to platform, begin listening). */
connect(): Promise<void>;
/** Stop the adapter (disconnect, clean up). */
disconnect(): Promise<void>;
/** Send a message to a specific peer on this channel. */
send(peerId: string, message: OutboundMessage): Promise<void>;
/** Register the inbound message handler. Called by registry before connect(). */
onMessage(handler: (msg: InboundMessage) => void): void;
}
/** Callback type for the registry's message handler. */
export type MessageHandler = (
msg: InboundMessage,
reply: (response: OutboundMessage) => Promise<void>,
) => Promise<void>;
+63
View File
@@ -0,0 +1,63 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { WebChatAdapter } from './adapter.js';
import type { GatewayServer } from '../../gateway/index.js';
/** Mock GatewayServer — the adapter wraps this but doesn't manage its lifecycle. */
const mockGateway = {
start: vi.fn(),
stop: vi.fn(),
getWss: vi.fn(() => null),
getHttpServer: vi.fn(() => null),
getSessionBridge: vi.fn(),
getMethods: vi.fn(() => []),
} as unknown as GatewayServer;
describe('WebChatAdapter', () => {
let adapter: WebChatAdapter;
beforeEach(() => {
vi.clearAllMocks();
adapter = new WebChatAdapter({ gateway: mockGateway });
});
it('has correct name', () => {
expect(adapter.name).toBe('webchat');
});
it('starts as disconnected', () => {
expect(adapter.status).toBe('disconnected');
});
it('connect sets status to connected', async () => {
await adapter.connect();
expect(adapter.status).toBe('connected');
});
it('disconnect sets status to disconnected', async () => {
await adapter.connect();
expect(adapter.status).toBe('connected');
await adapter.disconnect();
expect(adapter.status).toBe('disconnected');
});
it('connect does not call gateway.start', async () => {
await adapter.connect();
expect(mockGateway.start).not.toHaveBeenCalled();
});
it('disconnect does not call gateway.stop', async () => {
await adapter.connect();
await adapter.disconnect();
expect(mockGateway.stop).not.toHaveBeenCalled();
});
it('send is a no-op', async () => {
// Should not throw
await adapter.send('peer1', { text: 'hello' });
});
it('getGateway returns the gateway instance', () => {
expect(adapter.getGateway()).toBe(mockGateway);
});
});
+81
View File
@@ -0,0 +1,81 @@
/**
* WebChat channel adapter.
*
* Thin wrapper around the existing GatewayServer. The gateway already
* handles WebSocket connections, sessions, and agent routing. This adapter
* exposes the gateway as a ChannelAdapter so the ChannelRegistry has a
* uniform interface for all channels.
*/
import type { GatewayServer } from '../../gateway/index.js';
import type {
InboundMessage,
OutboundMessage,
ChannelAdapter,
ChannelStatus,
} from '../types.js';
/** Configuration for the WebChat adapter. */
export interface WebChatAdapterConfig {
gateway: GatewayServer;
}
/**
* WebChatAdapter wraps a GatewayServer to satisfy the ChannelAdapter interface.
*
* The gateway's lifecycle (start/stop) is managed by the daemon, not by
* this adapter. Connect/disconnect only track the adapter's logical status.
*/
export class WebChatAdapter implements ChannelAdapter {
readonly name = 'webchat';
private _status: ChannelStatus = 'disconnected';
private gateway: GatewayServer;
private messageHandler?: (msg: InboundMessage) => void;
get status(): ChannelStatus {
return this._status;
}
constructor(config: WebChatAdapterConfig) {
this.gateway = config.gateway;
}
/** Register the inbound message handler. Called by registry before connect(). */
onMessage(handler: (msg: InboundMessage) => void): void {
this.messageHandler = handler;
}
/**
* Connect the adapter. The gateway's lifecycle is managed by the daemon,
* so this just marks the adapter as connected. The gateway should already
* be started (or will be started) by the daemon.
*/
async connect(): Promise<void> {
this._status = 'connected';
}
/**
* Disconnect the adapter. Does NOT stop the gateway — that's managed
* by the daemon lifecycle. Just marks this adapter as disconnected.
*/
async disconnect(): Promise<void> {
this._status = 'disconnected';
}
/**
* Send a message to a WebSocket peer. This is a no-op placeholder —
* the gateway handles outbound messages directly via its own WS connections.
* This method exists to satisfy the ChannelAdapter interface.
*/
async send(_peerId: string, _message: OutboundMessage): Promise<void> {
// Gateway handles outbound via its own WS event system (GatewayEvent).
// This adapter doesn't need to implement send() because the gateway's
// agent.send handler already streams responses back to the WS client.
}
/** Get the underlying gateway server. */
getGateway(): GatewayServer {
return this.gateway;
}
}
+1
View File
@@ -0,0 +1 @@
export { WebChatAdapter, type WebChatAdapterConfig } from './adapter.js';
+111 -36
View File
@@ -1,13 +1,13 @@
import { Bot } from 'grammy';
import { Lifecycle } from './lifecycle.js';
import type { Config } from '../config/index.js';
import { AnthropicClient, OpenAIClient, OllamaClient, LlamaCppClient, ModelRouter } from '../models/index.js';
import { NativeAgent } from '../backends/index.js';
import { createTelegramBot } from '../frontends/telegram/index.js';
import { SessionStore, SessionManager } from '../session/index.js';
import { HookEngine } from '../hooks/index.js';
import { ToolRegistry, ToolExecutor, allBuiltinTools } from '../tools/index.js';
import { GatewayServer } from '../gateway/index.js';
import { ChannelRegistry, TelegramAdapter, WebChatAdapter } from '../channels/index.js';
import type { InboundMessage, OutboundMessage } from '../channels/index.js';
import { resolve } from 'path';
import { homedir } from 'os';
import { mkdirSync, readFileSync, existsSync } from 'fs';
@@ -15,8 +15,6 @@ import { mkdirSync, readFileSync, existsSync } from 'fs';
export interface DaemonContext {
config: Config;
lifecycle: Lifecycle;
bot: Bot;
agent: NativeAgent;
sessionStore: SessionStore;
sessionManager: SessionManager;
hookEngine: HookEngine;
@@ -24,6 +22,7 @@ export interface DaemonContext {
toolRegistry: ToolRegistry;
toolExecutor: ToolExecutor;
gateway: GatewayServer;
channelRegistry: ChannelRegistry;
}
function loadSystemPrompt(): string {
@@ -106,6 +105,59 @@ function createModelRouter(config: Config): ModelRouter {
});
}
/**
* Create the unified message handler for the channel registry.
* Each channel+sender pair gets its own NativeAgent backed by a persistent session.
*/
function createMessageRouter(deps: {
sessionManager: SessionManager;
modelRouter: ModelRouter;
systemPrompt: string;
toolRegistry: ToolRegistry;
toolExecutor: ToolExecutor;
}) {
// Cache agents by session ID to avoid recreating on every message
const agents = new Map<string, NativeAgent>();
function getOrCreateAgent(channel: string, senderId: string): NativeAgent {
const sessionId = `${channel}:${senderId}`;
let agent = agents.get(sessionId);
if (!agent) {
const session = deps.sessionManager.getSession(channel, senderId);
agent = new NativeAgent({
modelClient: deps.modelRouter,
systemPrompt: deps.systemPrompt,
session,
toolRegistry: deps.toolRegistry,
toolExecutor: deps.toolExecutor,
});
agents.set(sessionId, agent);
}
return agent;
}
return async (msg: InboundMessage, reply: (response: OutboundMessage) => Promise<void>): Promise<void> => {
const agent = getOrCreateAgent(msg.channel, msg.senderId);
// Handle special commands
if (msg.metadata?.isCommand && msg.metadata.command === 'reset') {
agent.reset();
return;
}
try {
const response = await agent.process(msg.text);
await reply({ text: response, replyTo: msg.id });
} catch (error) {
console.error(`Error processing message from ${msg.channel}:${msg.senderId}:`, error);
await reply({
text: 'Sorry, an error occurred while processing your message.',
replyTo: msg.id,
});
}
};
}
export async function startDaemon(config: Config): Promise<DaemonContext> {
const lifecycle = new Lifecycle();
@@ -138,26 +190,6 @@ export async function startDaemon(config: Config): Promise<DaemonContext> {
// Load system prompt once for reuse
const systemPrompt = loadSystemPrompt();
// Get Telegram session
const telegramUserId = String(config.telegram.allowed_chat_ids[0]);
const session = sessionManager.getSession('telegram', telegramUserId);
// Initialize native agent with session and tools
const agent = new NativeAgent({
modelClient: modelRouter,
systemPrompt,
session,
toolRegistry,
toolExecutor,
});
// Initialize Telegram bot with hook engine
const bot = createTelegramBot({
telegram: config.telegram,
agent,
hookEngine,
});
// Initialize gateway WebSocket server
const gateway = new GatewayServer({
port: config.server.port,
@@ -167,9 +199,43 @@ export async function startDaemon(config: Config): Promise<DaemonContext> {
systemPrompt,
toolRegistry,
toolExecutor,
uiDir: resolve(import.meta.dirname, '../gateway/ui'),
config,
restart: async () => {
console.log('Restart requested via gateway');
await lifecycle.shutdown();
// Exit with code 75 (EX_TEMPFAIL) — process supervisor should restart
process.exit(75);
},
});
// Register signal handlers
// ── Channel Registry ──────────────────────────────────────────
const channelRegistry = new ChannelRegistry();
// Set up the unified message handler
channelRegistry.setMessageHandler(createMessageRouter({
sessionManager,
modelRouter,
systemPrompt,
toolRegistry,
toolExecutor,
}));
// Register Telegram adapter
const telegramAdapter = new TelegramAdapter({
botToken: config.telegram.bot_token,
allowedChatIds: config.telegram.allowed_chat_ids,
hookEngine,
});
channelRegistry.register(telegramAdapter);
// Register WebChat adapter (wraps the gateway)
const webChatAdapter = new WebChatAdapter({ gateway });
channelRegistry.register(webChatAdapter);
// ── Signal Handlers ───────────────────────────────────────────
const signalHandler = () => {
lifecycle.shutdown().then(() => process.exit(0));
};
@@ -182,20 +248,18 @@ export async function startDaemon(config: Config): Promise<DaemonContext> {
process.off('SIGTERM', signalHandler);
});
// Start bot
// ── Start Services ────────────────────────────────────────────
// Register shutdown handler for channels (stops Telegram bot etc.)
lifecycle.onShutdown(async () => {
await bot.stop();
console.log('Telegram bot stopped');
await channelRegistry.stopAll();
console.log('Channel adapters stopped');
});
// Use long polling (no webhook, no internet exposure)
bot.start({
onStart: (botInfo) => {
console.log(`Telegram bot started: @${botInfo.username}`);
},
});
// Start all channel adapters (Telegram long polling, WebChat status)
await channelRegistry.startAll();
// Start gateway
// Start gateway (HTTP + WS server)
lifecycle.onShutdown(async () => {
await gateway.stop();
console.log('Gateway server stopped');
@@ -205,7 +269,18 @@ export async function startDaemon(config: Config): Promise<DaemonContext> {
console.log('Flynn daemon started');
return { config, lifecycle, bot, agent, sessionStore, sessionManager, hookEngine, modelRouter, toolRegistry, toolExecutor, gateway };
return {
config,
lifecycle,
sessionStore,
sessionManager,
hookEngine,
modelRouter,
toolRegistry,
toolExecutor,
gateway,
channelRegistry,
};
}
export { Lifecycle } from './lifecycle.js';
+98
View File
@@ -0,0 +1,98 @@
import type { GatewayRequest, OutboundMessage } from '../protocol.js';
import { makeResponse, makeError, ErrorCode } from '../protocol.js';
import type { Config } from '../../config/index.js';
export interface ConfigHandlerDeps {
config: Config;
}
/**
* Redact sensitive values from config before returning.
* Replaces API keys, tokens, and passwords with "***".
*/
function redactConfig(config: Config): Record<string, unknown> {
const raw = JSON.parse(JSON.stringify(config)) as Record<string, unknown>;
// Redact telegram bot token
const telegram = raw.telegram as Record<string, unknown> | undefined;
if (telegram?.bot_token) {
telegram.bot_token = '***';
}
// Redact model keys/tokens
const models = raw.models as Record<string, unknown> | undefined;
if (models) {
for (const tier of ['default', 'fast', 'complex', 'local'] as const) {
const m = models[tier] as Record<string, unknown> | undefined;
if (m?.api_key) m.api_key = '***';
if (m?.auth_token) m.auth_token = '***';
}
const localProviders = models.local_providers as Record<string, Record<string, unknown>> | undefined;
if (localProviders) {
for (const provider of Object.values(localProviders)) {
if (provider.api_key) provider.api_key = '***';
if (provider.auth_token) provider.auth_token = '***';
}
}
}
return raw;
}
/** Keys that are safe to update at runtime via config.patch. */
const PATCHABLE_KEYS: Record<string, (config: Config, value: unknown) => boolean> = {
'hooks.confirm': (config, value) => {
if (!Array.isArray(value) || !value.every((v) => typeof v === 'string')) return false;
config.hooks.confirm = value as string[];
return true;
},
'hooks.log': (config, value) => {
if (!Array.isArray(value) || !value.every((v) => typeof v === 'string')) return false;
config.hooks.log = value as string[];
return true;
},
'hooks.silent': (config, value) => {
if (!Array.isArray(value) || !value.every((v) => typeof v === 'string')) return false;
config.hooks.silent = value as string[];
return true;
},
'server.localhost': (config, value) => {
if (typeof value !== 'boolean') return false;
config.server.localhost = value;
return true;
},
};
export function createConfigHandlers(deps: ConfigHandlerDeps) {
return {
'config.get': async (request: GatewayRequest): Promise<OutboundMessage> => {
return makeResponse(request.id, redactConfig(deps.config));
},
'config.patch': async (request: GatewayRequest): Promise<OutboundMessage> => {
const patches = request.params?.patches;
if (!patches || typeof patches !== 'object' || Array.isArray(patches)) {
return makeError(request.id, ErrorCode.InvalidRequest, 'params.patches must be an object of { key: value } pairs');
}
const applied: string[] = [];
const rejected: string[] = [];
for (const [key, value] of Object.entries(patches as Record<string, unknown>)) {
const patcher = PATCHABLE_KEYS[key];
if (!patcher) {
rejected.push(key);
continue;
}
const ok = patcher(deps.config, value);
if (ok) {
applied.push(key);
} else {
rejected.push(key);
}
}
return makeResponse(request.id, { applied, rejected });
},
};
}
+142
View File
@@ -3,6 +3,7 @@ import { createSystemHandlers } from './system.js';
import { createSessionHandlers } from './sessions.js';
import { createToolHandlers } from './tools.js';
import { createAgentHandlers } from './agent.js';
import { createConfigHandlers } from './config.js';
import { ErrorCode } from '../protocol.js';
import type { GatewayRequest, GatewayResponse, GatewayError, GatewayEvent, OutboundMessage } from '../protocol.js';
@@ -269,3 +270,144 @@ describe('agent handlers', () => {
expect((result.result as any).cancelled).toBe(true);
});
});
describe('system.restart handler', () => {
it('returns restarting:true and calls restart callback', async () => {
const restartFn = vi.fn(async () => {});
const handlers = createSystemHandlers({
startTime: Date.now(),
version: '0.1.0',
getSessionCount: () => 0,
getToolCount: () => 0,
getConnectionCount: () => 0,
restart: restartFn,
});
const req: GatewayRequest = { id: 1, method: 'system.restart' };
const result = await handlers['system.restart'](req) as GatewayResponse;
expect(result.id).toBe(1);
expect((result.result as any).restarting).toBe(true);
// Restart is called asynchronously via queueMicrotask
await new Promise<void>((resolve) => queueMicrotask(resolve));
expect(restartFn).toHaveBeenCalledOnce();
});
it('returns error when restart is not available', async () => {
const handlers = createSystemHandlers({
startTime: Date.now(),
version: '0.1.0',
getSessionCount: () => 0,
getToolCount: () => 0,
getConnectionCount: () => 0,
});
const req: GatewayRequest = { id: 2, method: 'system.restart' };
const result = await handlers['system.restart'](req) as GatewayError;
expect(result.error.code).toBe(ErrorCode.InternalError);
expect(result.error.message).toContain('not available');
});
});
describe('config handlers', () => {
function makeConfig() {
return {
telegram: { bot_token: 'secret-token-123', allowed_chat_ids: [12345] },
server: { tailscale_only: true, localhost: true, port: 18800 },
models: {
default: { provider: 'anthropic' as const, model: 'claude-3-haiku', api_key: 'sk-secret-key' },
fallback_chain: ['anthropic'],
},
backends: { claude_code: { enabled: false }, opencode: { enabled: false }, native: { enabled: true } },
hooks: { confirm: ['shell.exec'], log: [], silent: [] },
mcp: { servers: [] },
};
}
it('config.get returns redacted config', async () => {
const config = makeConfig();
const handlers = createConfigHandlers({ config: config as any });
const req: GatewayRequest = { id: 1, method: 'config.get' };
const result = await handlers['config.get'](req) as GatewayResponse;
const r = result.result as Record<string, any>;
expect(r.telegram.bot_token).toBe('***');
expect(r.models.default.api_key).toBe('***');
// Non-secret values are preserved
expect(r.server.port).toBe(18800);
expect(r.hooks.confirm).toEqual(['shell.exec']);
});
it('config.patch applies valid patches', async () => {
const config = makeConfig();
const handlers = createConfigHandlers({ config: config as any });
const req: GatewayRequest = {
id: 2,
method: 'config.patch',
params: {
patches: {
'hooks.confirm': ['shell.exec', 'file.write'],
'hooks.log': ['file.read'],
},
},
};
const result = await handlers['config.patch'](req) as GatewayResponse;
const r = result.result as { applied: string[]; rejected: string[] };
expect(r.applied).toEqual(['hooks.confirm', 'hooks.log']);
expect(r.rejected).toEqual([]);
// Verify the config was actually mutated
expect(config.hooks.confirm).toEqual(['shell.exec', 'file.write']);
expect(config.hooks.log).toEqual(['file.read']);
});
it('config.patch rejects unknown keys', async () => {
const config = makeConfig();
const handlers = createConfigHandlers({ config: config as any });
const req: GatewayRequest = {
id: 3,
method: 'config.patch',
params: {
patches: {
'telegram.bot_token': 'hacked',
'hooks.confirm': [],
},
},
};
const result = await handlers['config.patch'](req) as GatewayResponse;
const r = result.result as { applied: string[]; rejected: string[] };
expect(r.applied).toEqual(['hooks.confirm']);
expect(r.rejected).toEqual(['telegram.bot_token']);
});
it('config.patch rejects invalid value types', async () => {
const config = makeConfig();
const handlers = createConfigHandlers({ config: config as any });
const req: GatewayRequest = {
id: 4,
method: 'config.patch',
params: {
patches: {
'hooks.confirm': 'not-an-array',
},
},
};
const result = await handlers['config.patch'](req) as GatewayResponse;
const r = result.result as { applied: string[]; rejected: string[] };
expect(r.applied).toEqual([]);
expect(r.rejected).toEqual(['hooks.confirm']);
});
it('config.patch requires patches object', async () => {
const config = makeConfig();
const handlers = createConfigHandlers({ config: config as any });
const req: GatewayRequest = { id: 5, method: 'config.patch', params: {} };
const result = await handlers['config.patch'](req) as GatewayError;
expect(result.error.code).toBe(ErrorCode.InvalidRequest);
});
});
+2
View File
@@ -6,3 +6,5 @@ export { createToolHandlers } from './tools.js';
export type { ToolHandlerDeps } from './tools.js';
export { createAgentHandlers } from './agent.js';
export type { AgentHandlerDeps } from './agent.js';
export { createConfigHandlers } from './config.js';
export type { ConfigHandlerDeps } from './config.js';
+21 -1
View File
@@ -1,5 +1,5 @@
import type { GatewayRequest, OutboundMessage } from '../protocol.js';
import { makeResponse } from '../protocol.js';
import { makeResponse, makeError, ErrorCode } from '../protocol.js';
export interface SystemHandlerDeps {
startTime: number;
@@ -7,6 +7,8 @@ export interface SystemHandlerDeps {
getSessionCount: () => number;
getToolCount: () => number;
getConnectionCount: () => number;
/** Optional callback to trigger a graceful restart. If not provided, system.restart returns an error. */
restart?: () => Promise<void>;
}
export function createSystemHandlers(deps: SystemHandlerDeps) {
@@ -21,5 +23,23 @@ export function createSystemHandlers(deps: SystemHandlerDeps) {
connections: deps.getConnectionCount(),
});
},
'system.restart': async (request: GatewayRequest): Promise<OutboundMessage> => {
if (!deps.restart) {
return makeError(request.id, ErrorCode.InternalError, 'Restart not available in this environment');
}
// Send response before initiating restart (client receives confirmation)
const response = makeResponse(request.id, { restarting: true });
// Schedule restart after response is sent (next tick)
queueMicrotask(() => {
deps.restart!().catch((err) => {
console.error('Restart failed:', err);
});
});
return response;
},
};
}
+14
View File
@@ -18,8 +18,10 @@ import {
createSessionHandlers,
createToolHandlers,
createAgentHandlers,
createConfigHandlers,
} from './handlers/index.js';
import type { SessionManager } from '../session/manager.js';
import type { Config } from '../config/index.js';
import type { ToolRegistry } from '../tools/registry.js';
import type { ToolExecutor } from '../tools/executor.js';
@@ -34,6 +36,9 @@ export interface GatewayServerConfig {
version?: string;
auth?: AuthConfig;
uiDir?: string;
config?: Config;
/** Optional callback for system.restart. Should trigger graceful shutdown + process restart. */
restart?: () => Promise<void>;
}
export class GatewayServer {
@@ -67,6 +72,7 @@ export class GatewayServer {
getSessionCount: () => this.sessionBridge.listSessions().length,
getToolCount: () => this.config.toolRegistry.list().length,
getConnectionCount: () => this.sessionBridge.connectionCount,
restart: this.config.restart,
});
const sessionHandlers = createSessionHandlers({
@@ -82,6 +88,14 @@ export class GatewayServer {
sessionBridge: this.sessionBridge,
});
// Config handlers (only if config object is provided)
if (this.config.config) {
const configHandlers = createConfigHandlers({ config: this.config.config });
for (const [method, handler] of Object.entries(configHandlers)) {
this.router.register(method, handler);
}
}
// Register all methods
for (const [method, handler] of Object.entries(systemHandlers)) {
this.router.register(method, handler);