Add LINE channel adapter with webhook ingress and gating

This commit is contained in:
William Valentin
2026-02-16 13:02:26 -08:00
parent a954d7e136
commit 76d44a74bf
15 changed files with 584 additions and 5 deletions
+1
View File
@@ -21,4 +21,5 @@ export { MattermostAdapter, type MattermostAdapterConfig } from './mattermost/in
export { TeamsAdapter, type TeamsAdapterConfig } from './teams/index.js';
export { GoogleChatAdapter, type GoogleChatAdapterConfig } from './googleChat/index.js';
export { BlueBubblesAdapter, type BlueBubblesAdapterConfig } from './bluebubbles/index.js';
export { LineAdapter, type LineAdapterConfig } from './line/index.js';
export { PairingManager, type PairingConfig, type PairingStore, type ApprovedSender } from './pairing.js';
+146
View File
@@ -0,0 +1,146 @@
import { createHmac } from 'crypto';
import { describe, expect, it, vi, beforeEach } from 'vitest';
import type { IncomingMessage, ServerResponse } from 'http';
import { LineAdapter } from './adapter.js';
const mockFetch = vi.fn();
vi.stubGlobal('fetch', mockFetch);
function mockReq(body: string, secret: string): IncomingMessage {
const signature = createHmac('sha256', secret).update(body).digest('base64');
const req = {
headers: { 'x-line-signature': signature },
on(event: string, handler: (...args: unknown[]) => void) {
if (event === 'data') {
handler(Buffer.from(body, 'utf8'));
}
if (event === 'end') {
handler();
}
return this;
},
off: () => req,
destroy: () => undefined,
} as unknown as IncomingMessage;
return req;
}
function mockRes() {
const state = {
statusCode: 0,
body: '',
};
const res = {
writeHead: (code: number) => {
state.statusCode = code;
},
end: (chunk?: string) => {
state.body = chunk ?? '';
},
} as unknown as ServerResponse;
return { res, state };
}
describe('LineAdapter', () => {
beforeEach(() => {
vi.clearAllMocks();
mockFetch.mockReset();
});
it('has name line and starts disconnected', () => {
const adapter = new LineAdapter({
channelAccessToken: 'token',
channelSecret: 'secret',
});
expect(adapter.name).toBe('line');
expect(adapter.status).toBe('disconnected');
});
it('send posts LINE push API', async () => {
const adapter = new LineAdapter({
channelAccessToken: 'token',
channelSecret: 'secret',
});
await adapter.connect();
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => '',
} as Response);
await adapter.send('U123', { text: 'hello line' });
expect(mockFetch).toHaveBeenCalledTimes(1);
expect(mockFetch.mock.calls[0]?.[0]).toBe('https://api.line.me/v2/bot/message/push');
});
it('handleRequest validates signature and dispatches text event', async () => {
const adapter = new LineAdapter({
channelAccessToken: 'token',
channelSecret: 'secret',
requireMention: false,
});
const inbound: Array<{ channel: string; text: string; senderId: string }> = [];
adapter.onMessage((msg) => inbound.push({ channel: msg.channel, text: msg.text, senderId: msg.senderId }));
const body = JSON.stringify({
events: [{
type: 'message',
timestamp: 1,
source: { type: 'user', userId: 'U123' },
message: { id: 'm1', type: 'text', text: 'ping' },
}],
});
const req = mockReq(body, 'secret');
const { res, state } = mockRes();
await adapter.handleRequest(req, res);
expect(state.statusCode).toBe(200);
expect(inbound).toEqual([{ channel: 'line', text: 'ping', senderId: 'U123' }]);
});
it('drops group messages without mention when require_mention=true', async () => {
const adapter = new LineAdapter({
channelAccessToken: 'token',
channelSecret: 'secret',
requireMention: true,
mentionName: 'flynn',
});
const handler = vi.fn();
adapter.onMessage(handler);
await adapter.handleEvent({
type: 'message',
source: { type: 'group', groupId: 'G123', userId: 'U123' },
message: { id: 'm1', type: 'text', text: 'hello there' },
});
expect(handler).not.toHaveBeenCalled();
});
it('rejects invalid signature', async () => {
const adapter = new LineAdapter({
channelAccessToken: 'token',
channelSecret: 'secret',
});
const body = JSON.stringify({ events: [] });
const req = {
headers: { 'x-line-signature': 'invalid' },
on(event: string, handler: (...args: unknown[]) => void) {
if (event === 'data') {
handler(Buffer.from(body, 'utf8'));
}
if (event === 'end') {
handler();
}
return this;
},
off: () => req,
destroy: () => undefined,
} as unknown as IncomingMessage;
const { res, state } = mockRes();
await adapter.handleRequest(req, res);
expect(state.statusCode).toBe(401);
});
});
+228
View File
@@ -0,0 +1,228 @@
import { createHmac, timingSafeEqual } from 'crypto';
import type { IncomingMessage, ServerResponse } from 'http';
import type {
InboundMessage,
OutboundMessage,
ChannelAdapter,
ChannelStatus,
} from '../types.js';
import { shouldIgnoreForMissingMention, splitMessage } from '../utils.js';
import { readRequestBody } from '../../utils/httpBody.js';
export interface LineAdapterConfig {
channelAccessToken: string;
channelSecret: string;
allowedSourceIds?: string[];
requireMention?: boolean;
mentionName?: string;
}
interface LineWebhookBody {
events?: LineEvent[];
}
interface LineEvent {
type?: string;
replyToken?: string;
timestamp?: number;
source?: {
type?: 'user' | 'group' | 'room';
userId?: string;
groupId?: string;
roomId?: string;
};
message?: {
id?: string;
type?: string;
text?: string;
};
}
const MAX_MESSAGE_LENGTH = 4500;
export class LineAdapter implements ChannelAdapter {
readonly name = 'line';
private _status: ChannelStatus = 'disconnected';
private messageHandler?: (msg: InboundMessage) => void;
constructor(private readonly config: LineAdapterConfig) {}
get status(): ChannelStatus {
return this._status;
}
onMessage(handler: (msg: InboundMessage) => void): void {
this.messageHandler = handler;
}
async connect(): Promise<void> {
this._status = 'connected';
}
async disconnect(): Promise<void> {
this._status = 'disconnected';
}
async send(peerId: string, message: OutboundMessage): Promise<void> {
if (this._status !== 'connected') {
throw new Error('LINE adapter not connected');
}
const text = message.text.trim();
if (!text) {
return;
}
const chunks = text.length > MAX_MESSAGE_LENGTH ? splitMessage(text, MAX_MESSAGE_LENGTH) : [text];
for (const chunk of chunks) {
await this.sendPush(peerId, chunk);
}
}
async handleRequest(req: IncomingMessage, res: ServerResponse): Promise<void> {
let body = '';
try {
body = await readRequestBody(req, { maxBytes: 1_048_576 });
} catch {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Invalid request body' }));
return;
}
if (!this.verifySignature(body, req.headers['x-line-signature'])) {
res.writeHead(401, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Invalid signature' }));
return;
}
let payload: LineWebhookBody;
try {
payload = JSON.parse(body) as LineWebhookBody;
} catch {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Invalid JSON' }));
return;
}
const events = Array.isArray(payload.events) ? payload.events : [];
for (const event of events) {
await this.handleEvent(event);
}
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ ok: true }));
}
async handleEvent(event: LineEvent): Promise<void> {
if (!this.messageHandler) {
return;
}
if (event.type !== 'message' || event.message?.type !== 'text') {
return;
}
const source = event.source;
const sourceType = source?.type ?? 'user';
const sourceId = this.resolveSourceId(source);
if (!sourceId) {
return;
}
if (this.config.allowedSourceIds && this.config.allowedSourceIds.length > 0) {
if (!this.config.allowedSourceIds.includes(sourceId)) {
return;
}
}
const text = (event.message?.text ?? '').trim();
if (!text) {
return;
}
const mentionName = this.config.mentionName ?? 'flynn';
const mentionRegex = new RegExp(`(?:^|\\s)@?${escapeRegex(mentionName)}(?:\\b|:)`, 'i');
const mentionsBot = mentionRegex.test(text);
const isDm = sourceType === 'user';
if (shouldIgnoreForMissingMention({
requireMention: this.config.requireMention,
defaultRequireMention: true,
mentionsBot: isDm || mentionsBot,
})) {
return;
}
const cleaned = text.replace(new RegExp(`^\\s*@?${escapeRegex(mentionName)}(?:\\b|:)\\s*`, 'i'), '').trim();
if (!cleaned) {
return;
}
this.messageHandler({
id: event.message?.id ?? `line-${Date.now()}`,
channel: 'line',
senderId: source?.userId?.trim() || sourceId,
text: cleaned,
timestamp: typeof event.timestamp === 'number' ? event.timestamp : Date.now(),
metadata: {
sourceType,
sourceId,
replyPeerId: sourceId,
replyToken: event.replyToken,
},
});
}
private resolveSourceId(source?: LineEvent['source']): string | undefined {
if (!source) {
return undefined;
}
if (source.type === 'group') {
return source.groupId?.trim() || undefined;
}
if (source.type === 'room') {
return source.roomId?.trim() || undefined;
}
return source.userId?.trim() || undefined;
}
private verifySignature(body: string, signatureHeader: string | string[] | undefined): boolean {
const signature = typeof signatureHeader === 'string' ? signatureHeader : signatureHeader?.[0];
if (!signature) {
return false;
}
const expected = createHmac('sha256', this.config.channelSecret).update(body).digest('base64');
const sigBuf = Buffer.from(signature);
const expectedBuf = Buffer.from(expected);
if (sigBuf.length !== expectedBuf.length) {
return false;
}
return timingSafeEqual(sigBuf, expectedBuf);
}
private async sendPush(to: string, text: string): Promise<void> {
const response = await fetch('https://api.line.me/v2/bot/message/push', {
method: 'POST',
headers: {
Authorization: `Bearer ${this.config.channelAccessToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
to,
messages: [
{
type: 'text',
text,
},
],
}),
});
if (!response.ok) {
const body = await response.text().catch(() => '');
throw new Error(`LINE send failed (${response.status}): ${body}`);
}
}
}
function escapeRegex(value: string): string {
return value.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
}
+1
View File
@@ -0,0 +1 @@
export { LineAdapter, type LineAdapterConfig } from './adapter.js';
+27
View File
@@ -581,6 +581,33 @@ describe('configSchema — bluebubbles', () => {
});
});
describe('configSchema — line', () => {
const minimalConfig = {
telegram: { bot_token: 'test', allowed_chat_ids: [1] },
models: { default: { provider: 'anthropic', model: 'claude-3' } },
};
it('accepts line config and defaults optional fields', () => {
const result = configSchema.parse({
...minimalConfig,
line: {
channel_access_token: 'line-token',
channel_secret: 'line-secret',
},
});
expect(result.line).toBeDefined();
if (!result.line) {
throw new Error('Expected line config');
}
expect(result.line.channel_access_token).toBe('line-token');
expect(result.line.channel_secret).toBe('line-secret');
expect(result.line.allowed_source_ids).toEqual([]);
expect(result.line.require_mention).toBe(true);
expect(result.line.mention_name).toBe('flynn');
});
});
describe('configSchema — whatsapp', () => {
const minimalConfig = {
telegram: { bot_token: 'test', allowed_chat_ids: [1] },
+9
View File
@@ -503,6 +503,14 @@ const bluebubblesSchema = z.object({
mention_name: z.string().default('flynn'),
}).optional();
const lineSchema = z.object({
channel_access_token: z.string().min(1, 'LINE channel_access_token is required'),
channel_secret: z.string().min(1, 'LINE channel_secret is required'),
allowed_source_ids: z.array(z.string()).default([]),
require_mention: z.boolean().default(true),
mention_name: z.string().default('flynn'),
}).optional();
const browserSchema = z.object({
enabled: z.boolean().default(false),
executable_path: z.string().optional(),
@@ -682,6 +690,7 @@ export const configSchema = z.object({
teams: teamsSchema,
google_chat: googleChatSchema,
bluebubbles: bluebubblesSchema,
line: lineSchema,
server: serverSchema.default({}),
models: modelsSchema,
backends: backendsSchema.default({}),
+34
View File
@@ -23,6 +23,7 @@ describe('registerChannels', () => {
setTeamsHandler: vi.fn(),
setGoogleChatHandler: vi.fn(),
setBlueBubblesHandler: vi.fn(),
setLineHandler: vi.fn(),
};
registerChannels({
@@ -36,4 +37,37 @@ describe('registerChannels', () => {
expect(names).toContain('mattermost');
expect(names).toContain('webchat');
});
it('registers LINE adapter when configured', () => {
const config = configSchema.parse({
telegram: { bot_token: 'test-token', allowed_chat_ids: [1] },
models: { default: { provider: 'anthropic', model: 'claude-3' } },
line: {
channel_access_token: 'line-token',
channel_secret: 'line-secret',
allowed_source_ids: ['U123'],
},
});
const channelRegistry = new ChannelRegistry();
const gateway = {
setWebhookHandler: vi.fn(),
setGmailHandler: vi.fn(),
setTeamsHandler: vi.fn(),
setGoogleChatHandler: vi.fn(),
setBlueBubblesHandler: vi.fn(),
setLineHandler: vi.fn(),
};
registerChannels({
config,
channelRegistry,
hookEngine: new HookEngine(config.hooks),
gateway: gateway as unknown as Parameters<typeof registerChannels>[0]['gateway'],
});
const names = channelRegistry.list().map((adapter) => adapter.name);
expect(names).toContain('line');
expect(gateway.setLineHandler).toHaveBeenCalledTimes(1);
});
});
+14 -1
View File
@@ -1,6 +1,6 @@
import type { Config } from '../config/index.js';
import type { HookEngine } from '../hooks/index.js';
import { ChannelRegistry, TelegramAdapter, WebChatAdapter, DiscordAdapter, SlackAdapter, WhatsAppAdapter, MatrixAdapter, SignalAdapter, MattermostAdapter, TeamsAdapter, GoogleChatAdapter, BlueBubblesAdapter, PairingManager } from '../channels/index.js';
import { ChannelRegistry, TelegramAdapter, WebChatAdapter, DiscordAdapter, SlackAdapter, WhatsAppAdapter, MatrixAdapter, SignalAdapter, MattermostAdapter, TeamsAdapter, GoogleChatAdapter, BlueBubblesAdapter, LineAdapter, PairingManager } from '../channels/index.js';
import { CronScheduler, WebhookHandler, GmailWatcher } from '../automation/index.js';
import type { GatewayServer } from '../gateway/index.js';
@@ -154,6 +154,19 @@ export function registerChannels(deps: ChannelsDeps): ChannelsResult {
gateway.setBlueBubblesHandler(blueBubblesAdapter);
}
// Register LINE adapter (if configured)
if (config.line) {
const lineAdapter = new LineAdapter({
channelAccessToken: config.line.channel_access_token,
channelSecret: config.line.channel_secret,
allowedSourceIds: config.line.allowed_source_ids.length > 0 ? config.line.allowed_source_ids : undefined,
requireMention: config.line.require_mention,
mentionName: config.line.mention_name,
});
channelRegistry.register(lineAdapter);
gateway.setLineHandler(lineAdapter);
}
// Register WebChat adapter (wraps the gateway)
const webChatAdapter = new WebChatAdapter({ gateway });
channelRegistry.register(webChatAdapter);
+2
View File
@@ -35,6 +35,7 @@ function makeBaseConfig(): Config {
teams: undefined,
google_chat: undefined,
bluebubbles: undefined,
line: undefined,
} as unknown as Config;
}
@@ -53,6 +54,7 @@ describe('discoverServices', () => {
expect.objectContaining({ name: 'teams', status: 'not_configured' }),
expect.objectContaining({ name: 'google_chat', status: 'not_configured' }),
expect.objectContaining({ name: 'bluebubbles', status: 'not_configured' }),
expect.objectContaining({ name: 'line', status: 'not_configured' }),
expect.objectContaining({ name: 'cron', status: 'not_configured' }),
expect.objectContaining({ name: 'mcp', status: 'not_configured' }),
expect.objectContaining({ name: 'web_search', status: 'configured' }),
+1
View File
@@ -58,6 +58,7 @@ export function discoverServices(
{ key: 'teams', name: 'teams', description: 'Microsoft Teams bot' },
{ key: 'google_chat', name: 'google_chat', description: 'Google Chat bot' },
{ key: 'bluebubbles', name: 'bluebubbles', description: 'iMessage via BlueBubbles' },
{ key: 'line', name: 'line', description: 'LINE Messaging API bot' },
];
for (const { key, name, description } of channelConfigs) {
+14
View File
@@ -51,6 +51,7 @@ import { RequestBodyTooLargeError, readRequestBody } from '../utils/httpBody.js'
import type { TeamsAdapter } from '../channels/teams/adapter.js';
import type { GoogleChatAdapter } from '../channels/googleChat/adapter.js';
import type { BlueBubblesAdapter } from '../channels/bluebubbles/adapter.js';
import type { LineAdapter } from '../channels/line/adapter.js';
export interface GatewayServerConfig {
port: number;
@@ -120,6 +121,8 @@ export interface GatewayServerConfig {
googleChatHandler?: Pick<GoogleChatAdapter, 'handleRequest'>;
/** Optional BlueBubbles adapter for inbound iMessage event webhooks. */
blueBubblesHandler?: Pick<BlueBubblesAdapter, 'handleRequest'>;
/** Optional LINE adapter for inbound webhook events. */
lineHandler?: Pick<LineAdapter, 'handleRequest'>;
}
export class GatewayServer {
@@ -724,6 +727,12 @@ export class GatewayServer {
return;
}
// LINE events route — bypass gateway auth (LINE webhook posts directly)
if (this.config.lineHandler && req.method === 'POST' && req.url?.startsWith('/line/events')) {
await this.config.lineHandler.handleRequest(req, res);
return;
}
// Apply auth to HTTP requests when configured
const authConfig = this.config.auth ?? {};
if (this.config.authHttp !== false && authConfig.token) {
@@ -842,6 +851,11 @@ export class GatewayServer {
this.config.blueBubblesHandler = handler;
}
/** Set the LINE handler for inbound webhook HTTP routes (late binding). */
setLineHandler(handler: Pick<LineAdapter, 'handleRequest'>): void {
this.config.lineHandler = handler;
}
private async startDiscovery(host: string, port: number): Promise<void> {
const discovery = this.config.discovery;
if (!discovery?.enabled) {