feat(matrix): add Matrix channel adapter

This commit is contained in:
William Valentin
2026-02-15 18:02:14 -08:00
parent 5fdb9e5a83
commit bc8326cf4a
5 changed files with 766 additions and 1 deletions
+1
View File
@@ -15,4 +15,5 @@ export { WebChatAdapter, type WebChatAdapterConfig } from './webchat/index.js';
export { DiscordAdapter, type DiscordAdapterConfig } from './discord/index.js';
export { SlackAdapter, type SlackAdapterConfig } from './slack/index.js';
export { WhatsAppAdapter, type WhatsAppAdapterConfig } from './whatsapp/index.js';
export { MatrixAdapter, type MatrixAdapterConfig } from './matrix/index.js';
export { PairingManager, type PairingConfig, type PairingStore, type ApprovedSender } from './pairing.js';
+268
View File
@@ -0,0 +1,268 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { MatrixAdapter, type MatrixAdapterConfig } from './adapter.js';
import type { InboundMessage } from '../types.js';
const mockFetch = vi.fn();
function jsonResponse(body: unknown, status = 200): Response {
return {
ok: status >= 200 && status < 300,
status,
json: async () => body,
text: async () => JSON.stringify(body),
} as Response;
}
describe('MatrixAdapter', () => {
const baseConfig: MatrixAdapterConfig = {
homeserverUrl: 'https://matrix.example.org',
accessToken: 'syt_test_token',
allowedRoomIds: ['!room1:example.org'],
requireMention: true,
syncTimeoutMs: 30_000,
displayName: 'Flynn',
};
let adapter: MatrixAdapter;
beforeEach(() => {
vi.stubGlobal('fetch', mockFetch);
vi.clearAllMocks();
adapter = new MatrixAdapter(baseConfig);
});
afterEach(async () => {
await adapter.disconnect();
vi.restoreAllMocks();
});
it('has name "matrix"', () => {
expect(adapter.name).toBe('matrix');
});
it('starts as disconnected', () => {
expect(adapter.status).toBe('disconnected');
});
it('connect resolves whoami and sets connected', async () => {
mockFetch.mockImplementation(async (url: string) => {
if (url.endsWith('/_matrix/client/v3/account/whoami')) {
return jsonResponse({ user_id: '@flynn:example.org' });
}
if (url.includes('/account_data/m.direct')) {
return jsonResponse({});
}
// /sync long-poll hangs
if (url.includes('/_matrix/client/v3/sync')) {
return new Promise<Response>(() => {});
}
throw new Error(`Unexpected fetch URL: ${url}`);
});
await adapter.connect();
expect(adapter.status).toBe('connected');
expect(mockFetch).toHaveBeenCalledWith(
'https://matrix.example.org/_matrix/client/v3/account/whoami',
expect.objectContaining({
method: 'GET',
headers: expect.objectContaining({
Authorization: 'Bearer syt_test_token',
}),
}),
);
});
it('disconnect is safe when not connected', async () => {
await adapter.disconnect();
expect(adapter.status).toBe('disconnected');
});
it('send throws when not connected', async () => {
await expect(adapter.send('!room1:example.org', { text: 'hello' })).rejects.toThrow(
'Matrix adapter not connected',
);
});
it('send delivers a message via PUT', async () => {
let syncStarted = false;
mockFetch.mockImplementation(async (url: string, init?: any) => {
if (url.endsWith('/_matrix/client/v3/account/whoami')) {
return jsonResponse({ user_id: '@flynn:example.org' });
}
if (url.includes('/account_data/m.direct')) {
return jsonResponse({});
}
if (url.includes('/_matrix/client/v3/sync')) {
syncStarted = true;
return new Promise<Response>(() => {});
}
if (init?.method === 'PUT' && url.includes('/send/m.room.message/')) {
const body = JSON.parse(init.body);
expect(body.msgtype).toBe('m.text');
expect(body.body).toBe('Hello there');
return jsonResponse({ event_id: '$sent1' });
}
throw new Error(`Unexpected fetch URL: ${url}`);
});
await adapter.connect();
expect(syncStarted).toBe(true);
await adapter.send('!room1:example.org', { text: 'Hello there' });
});
it('inbound message requires mention in non-DM rooms', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
let didSync = false;
mockFetch.mockImplementation(async (url: string) => {
if (url.endsWith('/_matrix/client/v3/account/whoami')) {
return jsonResponse({ user_id: '@flynn:example.org' });
}
if (url.includes('/account_data/m.direct')) {
return jsonResponse({});
}
if (url.includes('/_matrix/client/v3/sync')) {
if (didSync) {
return new Promise<Response>(() => {});
}
didSync = true;
return jsonResponse({
next_batch: 's1',
rooms: {
join: {
'!room1:example.org': {
timeline: {
events: [
{
type: 'm.room.message',
event_id: '$e1',
sender: '@alice:example.org',
origin_server_ts: 1700000000000,
content: { msgtype: 'm.text', body: 'hello without mention' },
},
],
},
},
},
},
});
}
throw new Error(`Unexpected fetch URL: ${url}`);
});
await adapter.connect();
await new Promise((r) => setTimeout(r, 0));
expect(handler).not.toHaveBeenCalled();
});
it('DM rooms bypass mention requirement (m.direct from sync)', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
let didSync = false;
mockFetch.mockImplementation(async (url: string) => {
if (url.endsWith('/_matrix/client/v3/account/whoami')) {
return jsonResponse({ user_id: '@flynn:example.org' });
}
if (url.includes('/account_data/m.direct')) {
return jsonResponse({});
}
if (url.includes('/_matrix/client/v3/sync')) {
if (didSync) {
return new Promise<Response>(() => {});
}
didSync = true;
return jsonResponse({
next_batch: 's1',
account_data: {
events: [
{ type: 'm.direct', content: { '@alice:example.org': ['!room1:example.org'] } },
],
},
rooms: {
join: {
'!room1:example.org': {
timeline: {
events: [
{
type: 'm.room.message',
event_id: '$e1',
sender: '@alice:example.org',
origin_server_ts: 1700000000000,
content: { msgtype: 'm.text', body: 'hello dm no mention' },
},
],
},
},
},
},
});
}
throw new Error(`Unexpected fetch URL: ${url}`);
});
await adapter.connect();
await new Promise((r) => setTimeout(r, 0));
expect(handler).toHaveBeenCalledTimes(1);
const msg: InboundMessage = handler.mock.calls[0][0];
expect(msg.channel).toBe('matrix');
expect(msg.senderId).toBe('!room1:example.org');
expect(msg.senderName).toBe('alice');
expect(msg.text).toBe('hello dm no mention');
});
it('inbound message with mention is accepted and mention is stripped', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
let didSync = false;
mockFetch.mockImplementation(async (url: string) => {
if (url.endsWith('/_matrix/client/v3/account/whoami')) {
return jsonResponse({ user_id: '@flynn:example.org' });
}
if (url.includes('/account_data/m.direct')) {
return jsonResponse({});
}
if (url.includes('/_matrix/client/v3/sync')) {
if (didSync) {
return new Promise<Response>(() => {});
}
didSync = true;
return jsonResponse({
next_batch: 's1',
rooms: {
join: {
'!room1:example.org': {
timeline: {
events: [
{
type: 'm.room.message',
event_id: '$e1',
sender: '@alice:example.org',
origin_server_ts: 1700000000000,
content: { msgtype: 'm.text', body: '@Flynn Hello there' },
},
],
},
},
},
},
});
}
throw new Error(`Unexpected fetch URL: ${url}`);
});
await adapter.connect();
await new Promise((r) => setTimeout(r, 0));
expect(handler).toHaveBeenCalledTimes(1);
const msg: InboundMessage = handler.mock.calls[0][0];
expect(msg.text).toBe('Hello there');
});
});
+481
View File
@@ -0,0 +1,481 @@
/**
* Matrix channel adapter.
*
* Implements the ChannelAdapter interface using raw fetch against the
* Matrix Client-Server API v3. Uses /sync long-polling for inbound messages
* and PUT /send for outbound messages.
*/
import type {
InboundMessage,
OutboundMessage,
ChannelAdapter,
ChannelStatus,
} from '../types.js';
import { splitMessage } from '../utils.js';
import type { PairingManager } from '../pairing.js';
export interface MatrixAdapterConfig {
homeserverUrl: string;
accessToken: string;
/** Room IDs to respond in. Empty/undefined = all rooms. */
allowedRoomIds?: string[];
/** Require mention in non-DM rooms (default: true). */
requireMention?: boolean;
/** /sync long-poll timeout in ms (default: 30000). */
syncTimeoutMs?: number;
/** Optional bot display name (used for mention detection). */
displayName?: string;
/** Optional pairing manager for DM pairing codes. */
pairingManager?: PairingManager;
}
interface MatrixWhoamiResponse {
user_id: string;
}
interface MatrixSyncResponse {
next_batch?: string;
rooms?: {
join?: Record<string, MatrixJoinedRoom>;
invite?: Record<string, unknown>;
};
account_data?: {
events?: Array<{ type: string; content?: unknown }>;
};
}
interface MatrixJoinedRoom {
timeline?: {
events?: MatrixEvent[];
};
}
interface MatrixEvent {
type?: string;
event_id?: string;
sender?: string;
origin_server_ts?: number;
content?: {
msgtype?: string;
body?: string;
[key: string]: unknown;
};
}
const MAX_MESSAGE_LENGTH = 65536;
const DEFAULT_SYNC_TIMEOUT_MS = 30_000;
const SYNC_ERROR_BACKOFF_MS = 5_000;
export class MatrixAdapter implements ChannelAdapter {
readonly name = 'matrix';
private _status: ChannelStatus = 'disconnected';
private messageHandler?: (msg: InboundMessage) => void;
private config: MatrixAdapterConfig;
private userId: string | null = null;
private localpart: string | null = null;
private botDisplayName: string | null = null;
private dmRoomIds: Set<string> = new Set();
private since: string | null = null;
private syncAbort: AbortController | null = null;
private txnCounter = 0;
get status(): ChannelStatus {
return this._status;
}
constructor(config: MatrixAdapterConfig) {
this.config = config;
}
onMessage(handler: (msg: InboundMessage) => void): void {
this.messageHandler = handler;
}
async connect(): Promise<void> {
this._status = 'connecting';
try {
const whoami = await this.matrixGet<MatrixWhoamiResponse>('/_matrix/client/v3/account/whoami');
if (!whoami.user_id) {
throw new Error('Matrix whoami response missing user_id');
}
this.userId = whoami.user_id;
this.localpart = this.extractLocalpart(this.userId);
this.botDisplayName = this.config.displayName ?? this.localpart ?? this.userId;
await this.loadDirectRooms();
this.syncAbort = new AbortController();
void this.runSyncLoop(this.syncAbort.signal);
this._status = 'connected';
console.log(`Matrix adapter connected as ${this.userId}`);
} catch (error) {
this._status = 'error';
throw error;
}
}
async disconnect(): Promise<void> {
if (this.syncAbort) {
this.syncAbort.abort();
this.syncAbort = null;
}
this.userId = null;
this.localpart = null;
this.botDisplayName = null;
this.dmRoomIds.clear();
this.since = null;
this._status = 'disconnected';
}
async send(peerId: string, message: OutboundMessage): Promise<void> {
if (this._status !== 'connected') {
throw new Error('Matrix adapter not connected');
}
const text = (message.text ?? '').trim();
if (!text) {
return;
}
const chunks = text.length > MAX_MESSAGE_LENGTH
? splitMessage(text, MAX_MESSAGE_LENGTH)
: [text];
for (const chunk of chunks) {
if (!chunk) {continue;}
await this.sendRoomMessage(peerId, chunk, message.replyTo);
}
if (message.attachments && message.attachments.length > 0) {
for (const a of message.attachments) {
if (a.url) {
const line = a.filename ? `${a.filename}: ${a.url}` : a.url;
await this.sendRoomMessage(peerId, line);
} else if (a.data) {
// MVP: don't attempt media upload yet.
console.warn(`Matrix: skipping attachment data (${a.mimeType}) — upload not implemented`);
}
}
}
}
private async runSyncLoop(signal: AbortSignal): Promise<void> {
while (!signal.aborted) {
try {
const url = new URL('/_matrix/client/v3/sync', this.config.homeserverUrl);
url.searchParams.set('timeout', String(this.config.syncTimeoutMs ?? DEFAULT_SYNC_TIMEOUT_MS));
if (this.since) {
url.searchParams.set('since', this.since);
}
const response = await fetch(url.toString(), {
method: 'GET',
headers: { Authorization: `Bearer ${this.config.accessToken}` },
signal,
});
if (!response.ok) {
const body = await response.text().catch(() => '');
console.error(`Matrix sync error (${response.status}): ${body}`);
await this.sleep(SYNC_ERROR_BACKOFF_MS, signal);
continue;
}
const sync = await response.json() as MatrixSyncResponse;
if (sync.next_batch) {
this.since = sync.next_batch;
}
this.processSync(sync);
} catch (error) {
if (signal.aborted) {
return;
}
const err = error as any;
if (err && typeof err === 'object' && err.name === 'AbortError') {
return;
}
console.error(
'Matrix sync loop error:',
error instanceof Error ? error.message : 'Unknown error',
);
await this.sleep(SYNC_ERROR_BACKOFF_MS, signal);
}
}
}
private processSync(sync: MatrixSyncResponse): void {
// Update DM room set if m.direct is included.
const accountEvents = sync.account_data?.events ?? [];
for (const e of accountEvents) {
if (e.type === 'm.direct') {
this.dmRoomIds = this.flattenDirectRooms(e.content);
}
}
if (!this.messageHandler) {
return;
}
const joined = sync.rooms?.join;
if (!joined) {
return;
}
for (const [roomId, data] of Object.entries(joined)) {
const events = data.timeline?.events ?? [];
for (const event of events) {
this.handleTimelineEvent(roomId, event);
}
}
}
private handleTimelineEvent(roomId: string, event: MatrixEvent): void {
if (!this.messageHandler) {
return;
}
if (event.type !== 'm.room.message') {
return;
}
const sender = event.sender;
if (!sender) {
return;
}
if (this.userId && sender === this.userId) {
return;
}
const msgtype = event.content?.msgtype;
if (msgtype !== 'm.text' && msgtype !== 'm.notice') {
return;
}
const body = event.content?.body;
if (typeof body !== 'string') {
return;
}
// Room allowlist.
const allowed = this.config.allowedRoomIds ?? [];
if (allowed.length > 0 && !allowed.includes(roomId)) {
// Pairing fallback: approve senders (MXID), not rooms.
const pm = this.config.pairingManager;
if (pm?.enabled) {
if (pm.isApproved('matrix', sender)) {
// approved sender bypasses allowlist
} else {
const text = body.trim();
if (text && pm.validateCode('matrix', sender, text)) {
this.sendRoomMessage(roomId, 'Pairing successful! You can now chat with Flynn.').catch(() => {});
}
return;
}
} else {
return;
}
}
// Mention gating (skip for DM rooms).
const requireMention = this.config.requireMention ?? true;
const isDm = this.dmRoomIds.has(roomId);
if (requireMention && !isDm && !this.isBotMentioned(body)) {
return;
}
const cleaned = this.stripMentions(body);
const text = cleaned.trim();
const senderName = this.extractLocalpart(sender) ?? sender;
if (text === '!reset' || text === 'reset') {
this.messageHandler({
id: event.event_id ?? '',
channel: 'matrix',
senderId: roomId,
senderName,
text: '!reset',
timestamp: event.origin_server_ts ?? Date.now(),
metadata: { isCommand: true, command: 'reset', senderUserId: sender, roomId },
});
return;
}
this.messageHandler({
id: event.event_id ?? '',
channel: 'matrix',
senderId: roomId,
senderName,
text,
timestamp: event.origin_server_ts ?? Date.now(),
metadata: { senderUserId: sender, roomId },
});
}
private isBotMentioned(text: string): boolean {
const lower = text.toLowerCase();
if (this.userId && lower.includes(this.userId.toLowerCase())) {
return true;
}
if (this.localpart) {
const token = `@${this.localpart}`.toLowerCase();
if (lower.includes(token)) {
return true;
}
}
if (this.botDisplayName) {
const dn = this.botDisplayName.toLowerCase();
if (lower.includes(`@${dn}`)) {
return true;
}
// Some clients omit '@' in the body for display-name mentions.
if (lower.includes(dn)) {
return true;
}
}
return false;
}
private stripMentions(text: string): string {
let out = text;
if (this.userId) {
out = out.replaceAll(this.userId, '').trim();
}
const parts: string[] = [];
if (this.localpart) {
parts.push(`@${this.localpart}`);
}
if (this.botDisplayName) {
parts.push(`@${this.botDisplayName}`);
parts.push(this.botDisplayName);
}
for (const p of parts) {
if (!p) {continue;}
out = out.replace(new RegExp(`(^|\\s)${this.escapeRegex(p)}(\\b|\\s|$)`, 'gi'), ' ').trim();
}
return out.replace(/\s+/g, ' ').trim();
}
private escapeRegex(str: string): string {
return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
}
private extractLocalpart(userId: string): string | null {
const m = userId.match(/^@([^:]+):/);
return m ? m[1] : null;
}
private async loadDirectRooms(): Promise<void> {
if (!this.userId) {
this.dmRoomIds.clear();
return;
}
try {
const data = await this.matrixGet<unknown>(
`/_matrix/client/v3/user/${encodeURIComponent(this.userId)}/account_data/m.direct`,
);
this.dmRoomIds = this.flattenDirectRooms(data);
} catch {
this.dmRoomIds.clear();
}
}
private flattenDirectRooms(content: unknown): Set<string> {
const roomIds = new Set<string>();
if (!content || typeof content !== 'object') {
return roomIds;
}
for (const value of Object.values(content as Record<string, unknown>)) {
if (!Array.isArray(value)) {
continue;
}
for (const roomId of value) {
if (typeof roomId === 'string') {
roomIds.add(roomId);
}
}
}
return roomIds;
}
private async sendRoomMessage(roomId: string, text: string, replyTo?: string): Promise<void> {
const txnId = `m${Date.now()}_${this.txnCounter++}`;
const relatesTo = replyTo
? { 'm.in_reply_to': { event_id: replyTo } }
: undefined;
await this.matrixPut(
`/_matrix/client/v3/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${encodeURIComponent(txnId)}`,
{
msgtype: 'm.text',
body: text,
...(relatesTo ? { 'm.relates_to': relatesTo } : {}),
},
);
}
private async matrixGet<T>(path: string): Promise<T> {
const url = new URL(path, this.config.homeserverUrl);
const response = await fetch(url.toString(), {
method: 'GET',
headers: { Authorization: `Bearer ${this.config.accessToken}` },
});
if (!response.ok) {
const body = await response.text().catch(() => '');
throw new Error(`Matrix API error (${response.status}): ${body}`);
}
return response.json() as Promise<T>;
}
private async matrixPut(path: string, body: unknown): Promise<void> {
const url = new URL(path, this.config.homeserverUrl);
const response = await fetch(url.toString(), {
method: 'PUT',
headers: {
Authorization: `Bearer ${this.config.accessToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
});
if (!response.ok) {
const text = await response.text().catch(() => '');
throw new Error(`Matrix API error (${response.status}): ${text}`);
}
}
private sleep(ms: number, signal: AbortSignal): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(resolve, ms);
signal.addEventListener('abort', () => {
clearTimeout(timer);
resolve();
}, { once: true });
});
}
}
+1
View File
@@ -0,0 +1 @@
export { MatrixAdapter, type MatrixAdapterConfig } from './adapter.js';
+15 -1
View File
@@ -1,6 +1,6 @@
import type { Config } from '../config/index.js';
import type { HookEngine } from '../hooks/index.js';
import { ChannelRegistry, TelegramAdapter, WebChatAdapter, DiscordAdapter, SlackAdapter, WhatsAppAdapter, PairingManager } from '../channels/index.js';
import { ChannelRegistry, TelegramAdapter, WebChatAdapter, DiscordAdapter, SlackAdapter, WhatsAppAdapter, MatrixAdapter, PairingManager } from '../channels/index.js';
import { CronScheduler, WebhookHandler, GmailWatcher } from '../automation/index.js';
import type { GatewayServer } from '../gateway/index.js';
@@ -70,6 +70,20 @@ export function registerChannels(deps: ChannelsDeps): ChannelsResult {
channelRegistry.register(whatsappAdapter);
}
// Register Matrix adapter (if configured)
if (config.matrix) {
const matrixAdapter = new MatrixAdapter({
homeserverUrl: config.matrix.homeserver_url,
accessToken: config.matrix.access_token,
allowedRoomIds: config.matrix.allowed_room_ids.length > 0 ? config.matrix.allowed_room_ids : undefined,
requireMention: config.matrix.require_mention,
syncTimeoutMs: config.matrix.sync_timeout_ms,
displayName: config.matrix.display_name,
pairingManager,
});
channelRegistry.register(matrixAdapter);
}
// Register WebChat adapter (wraps the gateway)
const webChatAdapter = new WebChatAdapter({ gateway });
channelRegistry.register(webChatAdapter);