From bc8326cf4a97a0c9c80152bf1ccf918352d64708 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Sun, 15 Feb 2026 18:02:14 -0800 Subject: [PATCH] feat(matrix): add Matrix channel adapter --- src/channels/index.ts | 1 + src/channels/matrix/adapter.test.ts | 268 ++++++++++++++++ src/channels/matrix/adapter.ts | 481 ++++++++++++++++++++++++++++ src/channels/matrix/index.ts | 1 + src/daemon/channels.ts | 16 +- 5 files changed, 766 insertions(+), 1 deletion(-) create mode 100644 src/channels/matrix/adapter.test.ts create mode 100644 src/channels/matrix/adapter.ts create mode 100644 src/channels/matrix/index.ts diff --git a/src/channels/index.ts b/src/channels/index.ts index cce14d5..198d4f0 100644 --- a/src/channels/index.ts +++ b/src/channels/index.ts @@ -15,4 +15,5 @@ export { WebChatAdapter, type WebChatAdapterConfig } from './webchat/index.js'; export { DiscordAdapter, type DiscordAdapterConfig } from './discord/index.js'; export { SlackAdapter, type SlackAdapterConfig } from './slack/index.js'; export { WhatsAppAdapter, type WhatsAppAdapterConfig } from './whatsapp/index.js'; +export { MatrixAdapter, type MatrixAdapterConfig } from './matrix/index.js'; export { PairingManager, type PairingConfig, type PairingStore, type ApprovedSender } from './pairing.js'; diff --git a/src/channels/matrix/adapter.test.ts b/src/channels/matrix/adapter.test.ts new file mode 100644 index 0000000..e05a0c8 --- /dev/null +++ b/src/channels/matrix/adapter.test.ts @@ -0,0 +1,268 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; + +import { MatrixAdapter, type MatrixAdapterConfig } from './adapter.js'; +import type { InboundMessage } from '../types.js'; + +const mockFetch = vi.fn(); + +function jsonResponse(body: unknown, status = 200): Response { + return { + ok: status >= 200 && status < 300, + status, + json: async () => body, + text: async () => JSON.stringify(body), + } as Response; +} + +describe('MatrixAdapter', () => { + const baseConfig: MatrixAdapterConfig = { + homeserverUrl: 'https://matrix.example.org', + accessToken: 'syt_test_token', + allowedRoomIds: ['!room1:example.org'], + requireMention: true, + syncTimeoutMs: 30_000, + displayName: 'Flynn', + }; + + let adapter: MatrixAdapter; + + beforeEach(() => { + vi.stubGlobal('fetch', mockFetch); + vi.clearAllMocks(); + adapter = new MatrixAdapter(baseConfig); + }); + + afterEach(async () => { + await adapter.disconnect(); + vi.restoreAllMocks(); + }); + + it('has name "matrix"', () => { + expect(adapter.name).toBe('matrix'); + }); + + it('starts as disconnected', () => { + expect(adapter.status).toBe('disconnected'); + }); + + it('connect resolves whoami and sets connected', async () => { + mockFetch.mockImplementation(async (url: string) => { + if (url.endsWith('/_matrix/client/v3/account/whoami')) { + return jsonResponse({ user_id: '@flynn:example.org' }); + } + if (url.includes('/account_data/m.direct')) { + return jsonResponse({}); + } + // /sync long-poll hangs + if (url.includes('/_matrix/client/v3/sync')) { + return new Promise(() => {}); + } + throw new Error(`Unexpected fetch URL: ${url}`); + }); + + await adapter.connect(); + expect(adapter.status).toBe('connected'); + + expect(mockFetch).toHaveBeenCalledWith( + 'https://matrix.example.org/_matrix/client/v3/account/whoami', + expect.objectContaining({ + method: 'GET', + headers: expect.objectContaining({ + Authorization: 'Bearer syt_test_token', + }), + }), + ); + }); + + it('disconnect is safe when not connected', async () => { + await adapter.disconnect(); + expect(adapter.status).toBe('disconnected'); + }); + + it('send throws when not connected', async () => { + await expect(adapter.send('!room1:example.org', { text: 'hello' })).rejects.toThrow( + 'Matrix adapter not connected', + ); + }); + + it('send delivers a message via PUT', async () => { + let syncStarted = false; + mockFetch.mockImplementation(async (url: string, init?: any) => { + if (url.endsWith('/_matrix/client/v3/account/whoami')) { + return jsonResponse({ user_id: '@flynn:example.org' }); + } + if (url.includes('/account_data/m.direct')) { + return jsonResponse({}); + } + if (url.includes('/_matrix/client/v3/sync')) { + syncStarted = true; + return new Promise(() => {}); + } + if (init?.method === 'PUT' && url.includes('/send/m.room.message/')) { + const body = JSON.parse(init.body); + expect(body.msgtype).toBe('m.text'); + expect(body.body).toBe('Hello there'); + return jsonResponse({ event_id: '$sent1' }); + } + throw new Error(`Unexpected fetch URL: ${url}`); + }); + + await adapter.connect(); + expect(syncStarted).toBe(true); + + await adapter.send('!room1:example.org', { text: 'Hello there' }); + }); + + it('inbound message requires mention in non-DM rooms', async () => { + const handler = vi.fn(); + adapter.onMessage(handler); + + let didSync = false; + mockFetch.mockImplementation(async (url: string) => { + if (url.endsWith('/_matrix/client/v3/account/whoami')) { + return jsonResponse({ user_id: '@flynn:example.org' }); + } + if (url.includes('/account_data/m.direct')) { + return jsonResponse({}); + } + if (url.includes('/_matrix/client/v3/sync')) { + if (didSync) { + return new Promise(() => {}); + } + didSync = true; + return jsonResponse({ + next_batch: 's1', + rooms: { + join: { + '!room1:example.org': { + timeline: { + events: [ + { + type: 'm.room.message', + event_id: '$e1', + sender: '@alice:example.org', + origin_server_ts: 1700000000000, + content: { msgtype: 'm.text', body: 'hello without mention' }, + }, + ], + }, + }, + }, + }, + }); + } + throw new Error(`Unexpected fetch URL: ${url}`); + }); + + await adapter.connect(); + await new Promise((r) => setTimeout(r, 0)); + + expect(handler).not.toHaveBeenCalled(); + }); + + it('DM rooms bypass mention requirement (m.direct from sync)', async () => { + const handler = vi.fn(); + adapter.onMessage(handler); + + let didSync = false; + mockFetch.mockImplementation(async (url: string) => { + if (url.endsWith('/_matrix/client/v3/account/whoami')) { + return jsonResponse({ user_id: '@flynn:example.org' }); + } + if (url.includes('/account_data/m.direct')) { + return jsonResponse({}); + } + if (url.includes('/_matrix/client/v3/sync')) { + if (didSync) { + return new Promise(() => {}); + } + didSync = true; + return jsonResponse({ + next_batch: 's1', + account_data: { + events: [ + { type: 'm.direct', content: { '@alice:example.org': ['!room1:example.org'] } }, + ], + }, + rooms: { + join: { + '!room1:example.org': { + timeline: { + events: [ + { + type: 'm.room.message', + event_id: '$e1', + sender: '@alice:example.org', + origin_server_ts: 1700000000000, + content: { msgtype: 'm.text', body: 'hello dm no mention' }, + }, + ], + }, + }, + }, + }, + }); + } + throw new Error(`Unexpected fetch URL: ${url}`); + }); + + await adapter.connect(); + await new Promise((r) => setTimeout(r, 0)); + + expect(handler).toHaveBeenCalledTimes(1); + const msg: InboundMessage = handler.mock.calls[0][0]; + expect(msg.channel).toBe('matrix'); + expect(msg.senderId).toBe('!room1:example.org'); + expect(msg.senderName).toBe('alice'); + expect(msg.text).toBe('hello dm no mention'); + }); + + it('inbound message with mention is accepted and mention is stripped', async () => { + const handler = vi.fn(); + adapter.onMessage(handler); + + let didSync = false; + mockFetch.mockImplementation(async (url: string) => { + if (url.endsWith('/_matrix/client/v3/account/whoami')) { + return jsonResponse({ user_id: '@flynn:example.org' }); + } + if (url.includes('/account_data/m.direct')) { + return jsonResponse({}); + } + if (url.includes('/_matrix/client/v3/sync')) { + if (didSync) { + return new Promise(() => {}); + } + didSync = true; + return jsonResponse({ + next_batch: 's1', + rooms: { + join: { + '!room1:example.org': { + timeline: { + events: [ + { + type: 'm.room.message', + event_id: '$e1', + sender: '@alice:example.org', + origin_server_ts: 1700000000000, + content: { msgtype: 'm.text', body: '@Flynn Hello there' }, + }, + ], + }, + }, + }, + }, + }); + } + throw new Error(`Unexpected fetch URL: ${url}`); + }); + + await adapter.connect(); + await new Promise((r) => setTimeout(r, 0)); + + expect(handler).toHaveBeenCalledTimes(1); + const msg: InboundMessage = handler.mock.calls[0][0]; + expect(msg.text).toBe('Hello there'); + }); +}); diff --git a/src/channels/matrix/adapter.ts b/src/channels/matrix/adapter.ts new file mode 100644 index 0000000..f07daf2 --- /dev/null +++ b/src/channels/matrix/adapter.ts @@ -0,0 +1,481 @@ +/** + * Matrix channel adapter. + * + * Implements the ChannelAdapter interface using raw fetch against the + * Matrix Client-Server API v3. Uses /sync long-polling for inbound messages + * and PUT /send for outbound messages. + */ + +import type { + InboundMessage, + OutboundMessage, + ChannelAdapter, + ChannelStatus, +} from '../types.js'; +import { splitMessage } from '../utils.js'; +import type { PairingManager } from '../pairing.js'; + +export interface MatrixAdapterConfig { + homeserverUrl: string; + accessToken: string; + /** Room IDs to respond in. Empty/undefined = all rooms. */ + allowedRoomIds?: string[]; + /** Require mention in non-DM rooms (default: true). */ + requireMention?: boolean; + /** /sync long-poll timeout in ms (default: 30000). */ + syncTimeoutMs?: number; + /** Optional bot display name (used for mention detection). */ + displayName?: string; + /** Optional pairing manager for DM pairing codes. */ + pairingManager?: PairingManager; +} + +interface MatrixWhoamiResponse { + user_id: string; +} + +interface MatrixSyncResponse { + next_batch?: string; + rooms?: { + join?: Record; + invite?: Record; + }; + account_data?: { + events?: Array<{ type: string; content?: unknown }>; + }; +} + +interface MatrixJoinedRoom { + timeline?: { + events?: MatrixEvent[]; + }; +} + +interface MatrixEvent { + type?: string; + event_id?: string; + sender?: string; + origin_server_ts?: number; + content?: { + msgtype?: string; + body?: string; + [key: string]: unknown; + }; +} + +const MAX_MESSAGE_LENGTH = 65536; +const DEFAULT_SYNC_TIMEOUT_MS = 30_000; +const SYNC_ERROR_BACKOFF_MS = 5_000; + +export class MatrixAdapter implements ChannelAdapter { + readonly name = 'matrix'; + + private _status: ChannelStatus = 'disconnected'; + private messageHandler?: (msg: InboundMessage) => void; + private config: MatrixAdapterConfig; + + private userId: string | null = null; + private localpart: string | null = null; + private botDisplayName: string | null = null; + + private dmRoomIds: Set = new Set(); + private since: string | null = null; + + private syncAbort: AbortController | null = null; + private txnCounter = 0; + + get status(): ChannelStatus { + return this._status; + } + + constructor(config: MatrixAdapterConfig) { + this.config = config; + } + + onMessage(handler: (msg: InboundMessage) => void): void { + this.messageHandler = handler; + } + + async connect(): Promise { + this._status = 'connecting'; + + try { + const whoami = await this.matrixGet('/_matrix/client/v3/account/whoami'); + if (!whoami.user_id) { + throw new Error('Matrix whoami response missing user_id'); + } + + this.userId = whoami.user_id; + this.localpart = this.extractLocalpart(this.userId); + this.botDisplayName = this.config.displayName ?? this.localpart ?? this.userId; + + await this.loadDirectRooms(); + + this.syncAbort = new AbortController(); + void this.runSyncLoop(this.syncAbort.signal); + + this._status = 'connected'; + console.log(`Matrix adapter connected as ${this.userId}`); + } catch (error) { + this._status = 'error'; + throw error; + } + } + + async disconnect(): Promise { + if (this.syncAbort) { + this.syncAbort.abort(); + this.syncAbort = null; + } + + this.userId = null; + this.localpart = null; + this.botDisplayName = null; + this.dmRoomIds.clear(); + this.since = null; + this._status = 'disconnected'; + } + + async send(peerId: string, message: OutboundMessage): Promise { + if (this._status !== 'connected') { + throw new Error('Matrix adapter not connected'); + } + + const text = (message.text ?? '').trim(); + if (!text) { + return; + } + + const chunks = text.length > MAX_MESSAGE_LENGTH + ? splitMessage(text, MAX_MESSAGE_LENGTH) + : [text]; + + for (const chunk of chunks) { + if (!chunk) {continue;} + await this.sendRoomMessage(peerId, chunk, message.replyTo); + } + + if (message.attachments && message.attachments.length > 0) { + for (const a of message.attachments) { + if (a.url) { + const line = a.filename ? `${a.filename}: ${a.url}` : a.url; + await this.sendRoomMessage(peerId, line); + } else if (a.data) { + // MVP: don't attempt media upload yet. + console.warn(`Matrix: skipping attachment data (${a.mimeType}) — upload not implemented`); + } + } + } + } + + private async runSyncLoop(signal: AbortSignal): Promise { + while (!signal.aborted) { + try { + const url = new URL('/_matrix/client/v3/sync', this.config.homeserverUrl); + url.searchParams.set('timeout', String(this.config.syncTimeoutMs ?? DEFAULT_SYNC_TIMEOUT_MS)); + if (this.since) { + url.searchParams.set('since', this.since); + } + + const response = await fetch(url.toString(), { + method: 'GET', + headers: { Authorization: `Bearer ${this.config.accessToken}` }, + signal, + }); + + if (!response.ok) { + const body = await response.text().catch(() => ''); + console.error(`Matrix sync error (${response.status}): ${body}`); + await this.sleep(SYNC_ERROR_BACKOFF_MS, signal); + continue; + } + + const sync = await response.json() as MatrixSyncResponse; + if (sync.next_batch) { + this.since = sync.next_batch; + } + + this.processSync(sync); + } catch (error) { + if (signal.aborted) { + return; + } + + const err = error as any; + if (err && typeof err === 'object' && err.name === 'AbortError') { + return; + } + + console.error( + 'Matrix sync loop error:', + error instanceof Error ? error.message : 'Unknown error', + ); + await this.sleep(SYNC_ERROR_BACKOFF_MS, signal); + } + } + } + + private processSync(sync: MatrixSyncResponse): void { + // Update DM room set if m.direct is included. + const accountEvents = sync.account_data?.events ?? []; + for (const e of accountEvents) { + if (e.type === 'm.direct') { + this.dmRoomIds = this.flattenDirectRooms(e.content); + } + } + + if (!this.messageHandler) { + return; + } + + const joined = sync.rooms?.join; + if (!joined) { + return; + } + + for (const [roomId, data] of Object.entries(joined)) { + const events = data.timeline?.events ?? []; + for (const event of events) { + this.handleTimelineEvent(roomId, event); + } + } + } + + private handleTimelineEvent(roomId: string, event: MatrixEvent): void { + if (!this.messageHandler) { + return; + } + + if (event.type !== 'm.room.message') { + return; + } + + const sender = event.sender; + if (!sender) { + return; + } + + if (this.userId && sender === this.userId) { + return; + } + + const msgtype = event.content?.msgtype; + if (msgtype !== 'm.text' && msgtype !== 'm.notice') { + return; + } + + const body = event.content?.body; + if (typeof body !== 'string') { + return; + } + + // Room allowlist. + const allowed = this.config.allowedRoomIds ?? []; + if (allowed.length > 0 && !allowed.includes(roomId)) { + // Pairing fallback: approve senders (MXID), not rooms. + const pm = this.config.pairingManager; + if (pm?.enabled) { + if (pm.isApproved('matrix', sender)) { + // approved sender bypasses allowlist + } else { + const text = body.trim(); + if (text && pm.validateCode('matrix', sender, text)) { + this.sendRoomMessage(roomId, 'Pairing successful! You can now chat with Flynn.').catch(() => {}); + } + return; + } + } else { + return; + } + } + + // Mention gating (skip for DM rooms). + const requireMention = this.config.requireMention ?? true; + const isDm = this.dmRoomIds.has(roomId); + if (requireMention && !isDm && !this.isBotMentioned(body)) { + return; + } + + const cleaned = this.stripMentions(body); + + const text = cleaned.trim(); + const senderName = this.extractLocalpart(sender) ?? sender; + + if (text === '!reset' || text === 'reset') { + this.messageHandler({ + id: event.event_id ?? '', + channel: 'matrix', + senderId: roomId, + senderName, + text: '!reset', + timestamp: event.origin_server_ts ?? Date.now(), + metadata: { isCommand: true, command: 'reset', senderUserId: sender, roomId }, + }); + return; + } + + this.messageHandler({ + id: event.event_id ?? '', + channel: 'matrix', + senderId: roomId, + senderName, + text, + timestamp: event.origin_server_ts ?? Date.now(), + metadata: { senderUserId: sender, roomId }, + }); + } + + private isBotMentioned(text: string): boolean { + const lower = text.toLowerCase(); + + if (this.userId && lower.includes(this.userId.toLowerCase())) { + return true; + } + + if (this.localpart) { + const token = `@${this.localpart}`.toLowerCase(); + if (lower.includes(token)) { + return true; + } + } + + if (this.botDisplayName) { + const dn = this.botDisplayName.toLowerCase(); + if (lower.includes(`@${dn}`)) { + return true; + } + // Some clients omit '@' in the body for display-name mentions. + if (lower.includes(dn)) { + return true; + } + } + + return false; + } + + private stripMentions(text: string): string { + let out = text; + + if (this.userId) { + out = out.replaceAll(this.userId, '').trim(); + } + + const parts: string[] = []; + if (this.localpart) { + parts.push(`@${this.localpart}`); + } + if (this.botDisplayName) { + parts.push(`@${this.botDisplayName}`); + parts.push(this.botDisplayName); + } + + for (const p of parts) { + if (!p) {continue;} + out = out.replace(new RegExp(`(^|\\s)${this.escapeRegex(p)}(\\b|\\s|$)`, 'gi'), ' ').trim(); + } + + return out.replace(/\s+/g, ' ').trim(); + } + + private escapeRegex(str: string): string { + return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); + } + + private extractLocalpart(userId: string): string | null { + const m = userId.match(/^@([^:]+):/); + return m ? m[1] : null; + } + + private async loadDirectRooms(): Promise { + if (!this.userId) { + this.dmRoomIds.clear(); + return; + } + + try { + const data = await this.matrixGet( + `/_matrix/client/v3/user/${encodeURIComponent(this.userId)}/account_data/m.direct`, + ); + this.dmRoomIds = this.flattenDirectRooms(data); + } catch { + this.dmRoomIds.clear(); + } + } + + private flattenDirectRooms(content: unknown): Set { + const roomIds = new Set(); + if (!content || typeof content !== 'object') { + return roomIds; + } + + for (const value of Object.values(content as Record)) { + if (!Array.isArray(value)) { + continue; + } + for (const roomId of value) { + if (typeof roomId === 'string') { + roomIds.add(roomId); + } + } + } + return roomIds; + } + + private async sendRoomMessage(roomId: string, text: string, replyTo?: string): Promise { + const txnId = `m${Date.now()}_${this.txnCounter++}`; + const relatesTo = replyTo + ? { 'm.in_reply_to': { event_id: replyTo } } + : undefined; + + await this.matrixPut( + `/_matrix/client/v3/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${encodeURIComponent(txnId)}`, + { + msgtype: 'm.text', + body: text, + ...(relatesTo ? { 'm.relates_to': relatesTo } : {}), + }, + ); + } + + private async matrixGet(path: string): Promise { + const url = new URL(path, this.config.homeserverUrl); + const response = await fetch(url.toString(), { + method: 'GET', + headers: { Authorization: `Bearer ${this.config.accessToken}` }, + }); + + if (!response.ok) { + const body = await response.text().catch(() => ''); + throw new Error(`Matrix API error (${response.status}): ${body}`); + } + + return response.json() as Promise; + } + + private async matrixPut(path: string, body: unknown): Promise { + const url = new URL(path, this.config.homeserverUrl); + const response = await fetch(url.toString(), { + method: 'PUT', + headers: { + Authorization: `Bearer ${this.config.accessToken}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify(body), + }); + + if (!response.ok) { + const text = await response.text().catch(() => ''); + throw new Error(`Matrix API error (${response.status}): ${text}`); + } + } + + private sleep(ms: number, signal: AbortSignal): Promise { + return new Promise((resolve) => { + const timer = setTimeout(resolve, ms); + signal.addEventListener('abort', () => { + clearTimeout(timer); + resolve(); + }, { once: true }); + }); + } +} diff --git a/src/channels/matrix/index.ts b/src/channels/matrix/index.ts new file mode 100644 index 0000000..779f82b --- /dev/null +++ b/src/channels/matrix/index.ts @@ -0,0 +1 @@ +export { MatrixAdapter, type MatrixAdapterConfig } from './adapter.js'; diff --git a/src/daemon/channels.ts b/src/daemon/channels.ts index 15280c3..e49b69e 100644 --- a/src/daemon/channels.ts +++ b/src/daemon/channels.ts @@ -1,6 +1,6 @@ import type { Config } from '../config/index.js'; import type { HookEngine } from '../hooks/index.js'; -import { ChannelRegistry, TelegramAdapter, WebChatAdapter, DiscordAdapter, SlackAdapter, WhatsAppAdapter, PairingManager } from '../channels/index.js'; +import { ChannelRegistry, TelegramAdapter, WebChatAdapter, DiscordAdapter, SlackAdapter, WhatsAppAdapter, MatrixAdapter, PairingManager } from '../channels/index.js'; import { CronScheduler, WebhookHandler, GmailWatcher } from '../automation/index.js'; import type { GatewayServer } from '../gateway/index.js'; @@ -70,6 +70,20 @@ export function registerChannels(deps: ChannelsDeps): ChannelsResult { channelRegistry.register(whatsappAdapter); } + // Register Matrix adapter (if configured) + if (config.matrix) { + const matrixAdapter = new MatrixAdapter({ + homeserverUrl: config.matrix.homeserver_url, + accessToken: config.matrix.access_token, + allowedRoomIds: config.matrix.allowed_room_ids.length > 0 ? config.matrix.allowed_room_ids : undefined, + requireMention: config.matrix.require_mention, + syncTimeoutMs: config.matrix.sync_timeout_ms, + displayName: config.matrix.display_name, + pairingManager, + }); + channelRegistry.register(matrixAdapter); + } + // Register WebChat adapter (wraps the gateway) const webChatAdapter = new WebChatAdapter({ gateway }); channelRegistry.register(webChatAdapter);