46 KiB
Matrix Channel Adapter Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Add a Matrix channel adapter using raw fetch against the Matrix Client-Server API v3, with zero external dependencies (no matrix-js-sdk).
Architecture: Polling-based adapter using /_matrix/client/v3/sync with incremental since tokens. Auth via pre-provisioned access token in config. Room allowlist + mention gating for multi-room safety. DM detection via m.direct account data. Outbound via /_matrix/client/v3/rooms/{roomId}/send/m.room.message/{txnId}.
Tech Stack: Node.js fetch (built-in), Vitest for tests, Zod for config schema validation.
Context
Existing Patterns
All Flynn channel adapters follow a consistent pattern:
- Config interface — e.g.
TelegramAdapterConfig,DiscordAdapterConfig - Adapter class implementing
ChannelAdapterfromsrc/channels/types.ts - Barrel export from
src/channels/<name>/index.ts - Re-export in
src/channels/index.ts - Zod schema in
src/config/schema.ts(optional field on root config) - Registration in
src/daemon/channels.ts(conditional on config presence) - Tests mocking the platform SDK; here we mock
global.fetch
Key ChannelAdapter interface
interface ChannelAdapter {
readonly name: string;
readonly status: ChannelStatus;
connect(): Promise<void>;
disconnect(): Promise<void>;
send(peerId: string, message: OutboundMessage): Promise<void>;
onMessage(handler: (msg: InboundMessage) => void): void;
}
senderId convention
- Telegram:
String(chatId)(numeric chat ID) - Discord:
channelId(snowflake string) - Slack:
channelId:threadTs - WhatsApp:
number@c.us
For Matrix: roomId (e.g. !abc123:matrix.org). This gives one session per room, matching the Discord pattern. The Matrix userId (e.g. @flynn:matrix.org) goes into senderName.
peerId convention
peerId passed to send() is the roomId. This is the same as senderId since replies go back to the same room.
Config Shape
matrix:
homeserver_url: "https://matrix.example.org"
access_token: "${MATRIX_ACCESS_TOKEN}"
allowed_room_ids:
- "!abc123:matrix.org"
require_mention: true # default: true
sync_timeout_ms: 30000 # default: 30000 (long-poll timeout)
display_name: "Flynn" # optional: set bot display name on connect
Zod schema:
const matrixSchema = z.object({
homeserver_url: z.string().url('Homeserver URL is required'),
access_token: z.string().min(1, 'Access token is required'),
allowed_room_ids: z.array(z.string()).default([]),
require_mention: z.boolean().default(true),
sync_timeout_ms: z.number().min(1000).max(120000).default(30000),
display_name: z.string().optional(),
}).optional();
Matrix Client-Server API Endpoints Used
| Endpoint | Method | Purpose |
|---|---|---|
/_matrix/client/v3/sync |
GET | Long-poll for events (with ?since=&timeout=&filter=) |
/_matrix/client/v3/rooms/{roomId}/send/m.room.message/{txnId} |
PUT | Send a message |
/_matrix/client/v3/user/{userId}/account_data/m.direct |
GET | Detect DM rooms |
/_matrix/client/v3/account/whoami |
GET | Resolve own userId on connect |
Sync filter (minimize bandwidth)
{
"room": {
"timeline": { "limit": 50 },
"state": { "lazy_load_members": true }
},
"presence": { "not_types": ["*"] },
"account_data": { "types": ["m.direct"] }
}
Design Decisions
1. No matrix-js-sdk dependency
- Choice: Raw fetch calls against
/_matrix/client/v3endpoints - Rationale: User explicitly requested minimal dependencies. The Matrix CS API is well-documented REST/JSON. We only need sync, send, whoami, and account_data — 4 endpoints total.
- Alternatives: matrix-js-sdk (heavy, ~2MB, pulls in many transitive deps), matrix-bot-sdk (lighter but still a dep).
2. Polling via /sync long-poll (not WebSocket or SSE)
- Choice:
/syncwithtimeoutparam for long-polling, looped in an async function withAbortControllerfor clean shutdown. - Rationale: This is the standard Matrix sync mechanism. Simple, robust, no extra deps. The
timeoutparam makes it behave like long-polling (server holds connection open until events arrive or timeout elapses). - Alternatives: Sliding Sync (MSC3575, not universally supported), WebSocket transport (experimental extension).
3. senderId = roomId
- Choice: Use the Matrix room ID (
!...) assenderIdinInboundMessage, matching the Discord adapter pattern wheresenderId = channelId. - Rationale: Flynn creates one session per
channel:senderIdpair. UsingroomIdmeans one session per room, which is the natural boundary. The human user's Matrix ID goes intosenderName.
4. DM detection via m.direct account data
- Choice: On connect, fetch the bot's
m.directaccount data to build aSet<string>of known DM room IDs. DM rooms bypassrequire_mention. - Rationale: Matrix DMs are just rooms, but the
m.directaccount data flags which rooms are direct messages. This is the standard approach and doesn't require room member counting heuristics.
5. Mention detection via body text
- Choice: Check for
@displayNameor bot's Matrix user ID in the message body text. - Rationale: Matrix
m.room.messageevents include bothbody(plain text) and optionalformatted_body(HTML with<a href="https://matrix.to/#/@user:server">pill links). Checking the plain textbodyfor the display name or MXID is simpler and sufficient. The HTML formatted_body could also be checked for matrix.to links, but body text matching covers all clients.
6. Transaction ID for idempotent sends
- Choice: Generate txnId as
m${Date.now()}.${counter++}per adapter instance. - Rationale: Matrix PUT send requires a txnId for idempotency. A timestamp + counter ensures uniqueness within a process lifetime.
7. Message size limit
- Choice: Use
splitMessage()utility with a 65536-char limit (Matrix spec allows up to 65536 bytes for event content). - Rationale: Matrix's limit is much higher than Telegram (4096) or Discord (2000), but chunking at 65536 chars prevents oversized events.
Implementation Steps
Task 1: Add Matrix Zod Schema to Config
Files:
- Modify:
src/config/schema.ts
Step 1: Add the matrixSchema definition
Add after the whatsappSchema definition (around line 350):
const matrixSchema = z.object({
homeserver_url: z.string().url('Homeserver URL is required'),
access_token: z.string().min(1, 'Access token is required'),
allowed_room_ids: z.array(z.string()).default([]),
require_mention: z.boolean().default(true),
sync_timeout_ms: z.number().min(1000).max(120000).default(30000),
display_name: z.string().optional(),
}).optional();
Step 2: Add matrix to configSchema
In configSchema (around line 510), add after whatsapp:
matrix: matrixSchema,
Step 3: Add exported type
After the existing type exports (around line 559), add:
export type MatrixConfig = z.infer<typeof matrixSchema>;
Step 4: Run typecheck
Run: pnpm typecheck
Expected: PASS
Step 5: Commit
git add src/config/schema.ts
git commit -m "feat(matrix): add Matrix channel config schema"
Task 2: Create Matrix Adapter — Core Structure and Connect
Files:
- Create:
src/channels/matrix/adapter.ts - Create:
src/channels/matrix/adapter.test.ts - Create:
src/channels/matrix/index.ts
Step 1: Write the adapter.test.ts skeleton with connect/disconnect tests
Create src/channels/matrix/adapter.test.ts:
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { MatrixAdapter, type MatrixAdapterConfig } from './adapter.js';
import type { InboundMessage } from '../types.js';
// ── Mock global fetch ─────────────────────────────────────────────
const mockFetch = vi.fn();
beforeEach(() => {
vi.stubGlobal('fetch', mockFetch);
});
afterEach(() => {
vi.restoreAllMocks();
});
const baseConfig: MatrixAdapterConfig = {
homeserverUrl: 'https://matrix.example.org',
accessToken: 'syt_test_token',
allowedRoomIds: ['!room1:example.org'],
requireMention: true,
syncTimeoutMs: 30000,
};
/** Helper: create a mock fetch Response. */
function mockResponse(body: unknown, status = 200): Response {
return {
ok: status >= 200 && status < 300,
status,
json: async () => body,
text: async () => JSON.stringify(body),
} as Response;
}
/** Helper: make fetch return specific responses in sequence. */
function mockFetchSequence(...responses: Response[]) {
for (const response of responses) {
mockFetch.mockResolvedValueOnce(response);
}
}
describe('MatrixAdapter', () => {
let adapter: MatrixAdapter;
beforeEach(() => {
vi.clearAllMocks();
adapter = new MatrixAdapter(baseConfig);
});
afterEach(async () => {
// Ensure sync loop is stopped
if (adapter.status === 'connected' || adapter.status === 'connecting') {
await adapter.disconnect();
}
});
// ── Basic properties ────────────────────────────────────────────
it('has name "matrix"', () => {
expect(adapter.name).toBe('matrix');
});
it('starts as disconnected', () => {
expect(adapter.status).toBe('disconnected');
});
// ── connect / disconnect ────────────────────────────────────────
it('connect resolves userId via whoami and sets connected', async () => {
// whoami
mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' }));
// m.direct account data
mockFetch.mockResolvedValueOnce(mockResponse({}));
// First sync (will be aborted on disconnect)
mockFetch.mockResolvedValueOnce(
new Promise(() => {}), // never resolves — simulates long-poll
);
await adapter.connect();
expect(adapter.status).toBe('connected');
// Verify whoami was called
expect(mockFetch).toHaveBeenCalledWith(
'https://matrix.example.org/_matrix/client/v3/account/whoami',
expect.objectContaining({
headers: expect.objectContaining({
Authorization: 'Bearer syt_test_token',
}),
}),
);
});
it('connect throws on whoami failure', async () => {
mockFetch.mockResolvedValueOnce(mockResponse({ errcode: 'M_UNKNOWN_TOKEN' }, 401));
await expect(adapter.connect()).rejects.toThrow();
});
it('disconnect sets status to disconnected', async () => {
// whoami
mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' }));
// m.direct
mockFetch.mockResolvedValueOnce(mockResponse({}));
// sync long-poll (never resolves)
mockFetch.mockResolvedValueOnce(new Promise(() => {}));
await adapter.connect();
expect(adapter.status).toBe('connected');
await adapter.disconnect();
expect(adapter.status).toBe('disconnected');
});
it('disconnect is safe when not connected', async () => {
await adapter.disconnect();
expect(adapter.status).toBe('disconnected');
});
});
Step 2: Run test to verify it fails (adapter doesn't exist yet)
Run: pnpm test:run src/channels/matrix/adapter.test.ts
Expected: FAIL — cannot resolve ./adapter.js
Step 3: Write adapter.ts with connect/disconnect
Create src/channels/matrix/adapter.ts:
/**
* Matrix channel adapter.
*
* Implements the ChannelAdapter interface using raw fetch against the
* Matrix Client-Server API v3. No external Matrix SDK dependency.
* Uses /sync long-polling for inbound messages and PUT /send for outbound.
*/
import type {
InboundMessage,
OutboundMessage,
ChannelAdapter,
ChannelStatus,
} from '../types.js';
import { splitMessage } from '../utils.js';
import type { PairingManager } from '../pairing.js';
/** Configuration for the Matrix channel adapter. */
export interface MatrixAdapterConfig {
homeserverUrl: string;
accessToken: string;
/** Room IDs to respond in. Empty = all joined rooms. */
allowedRoomIds?: string[];
/** Require @mention to respond in rooms (default: true). DMs always respond. */
requireMention?: boolean;
/** Long-poll timeout for /sync in milliseconds (default: 30000). */
syncTimeoutMs?: number;
/** Optional display name to set on connect. */
displayName?: string;
/** Optional pairing manager for DM pairing codes. */
pairingManager?: PairingManager;
}
/** Maximum message length before chunking (Matrix allows up to 65536 bytes). */
const MAX_MESSAGE_LENGTH = 65536;
/**
* Matrix channel adapter backed by raw Client-Server API v3 fetch calls.
*
* Handles room allowlist filtering, optional mention requirement,
* DM detection via m.direct, and message chunking.
*/
export class MatrixAdapter implements ChannelAdapter {
readonly name = 'matrix';
private _status: ChannelStatus = 'disconnected';
private messageHandler?: (msg: InboundMessage) => void;
private config: MatrixAdapterConfig;
/** Resolved bot user ID (e.g. @flynn:example.org). */
private userId: string | null = null;
/** Resolved bot display name (for mention detection). */
private botDisplayName: string | null = null;
/** Set of room IDs that are direct messages. */
private dmRoomIds: Set<string> = new Set();
/** Incremental sync token. */
private syncToken: string | null = null;
/** AbortController for the sync loop. */
private syncAbort: AbortController | null = null;
/** Transaction ID counter for idempotent sends. */
private txnCounter = 0;
get status(): ChannelStatus {
return this._status;
}
constructor(config: MatrixAdapterConfig) {
this.config = config;
}
/** Register the inbound message handler. Called by the registry before connect(). */
onMessage(handler: (msg: InboundMessage) => void): void {
this.messageHandler = handler;
}
/**
* Connect to the Matrix homeserver:
* 1. Resolve bot userId via /account/whoami
* 2. Fetch m.direct account data for DM detection
* 3. Start the /sync long-poll loop
*/
async connect(): Promise<void> {
this._status = 'connecting';
try {
// Resolve our own user ID
const whoami = await this.matrixGet<{ user_id: string }>(
'/_matrix/client/v3/account/whoami',
);
this.userId = whoami.user_id;
this.botDisplayName = this.config.displayName ?? this.userId.split(':')[0].slice(1);
// Fetch m.direct account data for DM detection
await this.loadDirectRooms();
// Start sync loop (fire and forget — runs until disconnect)
this.syncAbort = new AbortController();
this.runSyncLoop(this.syncAbort.signal);
this._status = 'connected';
console.log(`Matrix adapter connected as ${this.userId}`);
} catch (error) {
this._status = 'error';
throw error;
}
}
/** Stop the sync loop and clean up. */
async disconnect(): Promise<void> {
if (this.syncAbort) {
this.syncAbort.abort();
this.syncAbort = null;
}
this.userId = null;
this.syncToken = null;
this.dmRoomIds.clear();
this._status = 'disconnected';
}
/** Send an outbound message to a Matrix room. */
async send(peerId: string, message: OutboundMessage): Promise<void> {
if (this._status !== 'connected') {
throw new Error('Matrix adapter not connected');
}
const text = message.text ?? '';
if (!text.trim()) { return; }
if (text.length <= MAX_MESSAGE_LENGTH) {
await this.sendRoomMessage(peerId, text);
} else {
const chunks = splitMessage(text, MAX_MESSAGE_LENGTH);
for (const chunk of chunks) {
await this.sendRoomMessage(peerId, chunk);
}
}
}
// ── Private: Matrix API helpers ─────────────────────────────────
/** Make an authenticated GET request to the homeserver. */
private async matrixGet<T>(path: string, query?: Record<string, string>): Promise<T> {
const url = new URL(path, this.config.homeserverUrl);
if (query) {
for (const [k, v] of Object.entries(query)) {
url.searchParams.set(k, v);
}
}
const response = await fetch(url.toString(), {
method: 'GET',
headers: {
Authorization: `Bearer ${this.config.accessToken}`,
},
});
if (!response.ok) {
const body = await response.text();
throw new Error(`Matrix API error (${response.status}): ${body}`);
}
return response.json() as Promise<T>;
}
/** Make an authenticated PUT request to the homeserver. */
private async matrixPut<T>(path: string, body: unknown): Promise<T> {
const url = new URL(path, this.config.homeserverUrl);
const response = await fetch(url.toString(), {
method: 'PUT',
headers: {
Authorization: `Bearer ${this.config.accessToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
});
if (!response.ok) {
const text = await response.text();
throw new Error(`Matrix API error (${response.status}): ${text}`);
}
return response.json() as Promise<T>;
}
/** Send a text message to a room with idempotent txnId. */
private async sendRoomMessage(roomId: string, text: string): Promise<void> {
const txnId = `m${Date.now()}.${this.txnCounter++}`;
await this.matrixPut(
`/_matrix/client/v3/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${txnId}`,
{
msgtype: 'm.text',
body: text,
},
);
}
/** Load the m.direct account data to populate dmRoomIds. */
private async loadDirectRooms(): Promise<void> {
try {
const data = await this.matrixGet<Record<string, string[]>>(
`/_matrix/client/v3/user/${encodeURIComponent(this.userId!)}/account_data/m.direct`,
);
this.dmRoomIds.clear();
// m.direct maps userId → [roomId, ...]. Flatten all room IDs.
for (const roomIds of Object.values(data)) {
if (Array.isArray(roomIds)) {
for (const roomId of roomIds) {
this.dmRoomIds.add(roomId);
}
}
}
} catch {
// m.direct may not exist if the bot has no DMs yet — that's fine.
this.dmRoomIds.clear();
}
}
/** Check if a room is a DM. */
private isDM(roomId: string): boolean {
return this.dmRoomIds.has(roomId);
}
// ── Private: Sync loop ──────────────────────────────────────────
/** Run the /sync long-poll loop until aborted. */
private async runSyncLoop(signal: AbortSignal): Promise<void> {
// Build the sync filter to minimize bandwidth
const filter = JSON.stringify({
room: {
timeline: { limit: 50 },
state: { lazy_load_members: true },
},
presence: { not_types: ['*'] },
account_data: { types: ['m.direct'] },
});
while (!signal.aborted) {
try {
const query: Record<string, string> = {
timeout: String(this.config.syncTimeoutMs ?? 30000),
filter,
};
if (this.syncToken) {
query.since = this.syncToken;
}
const url = new URL('/_matrix/client/v3/sync', this.config.homeserverUrl);
for (const [k, v] of Object.entries(query)) {
url.searchParams.set(k, v);
}
const response = await fetch(url.toString(), {
method: 'GET',
headers: {
Authorization: `Bearer ${this.config.accessToken}`,
},
signal,
});
if (!response.ok) {
const body = await response.text();
console.error(`Matrix sync error (${response.status}): ${body}`);
// Back off on error
await this.sleep(5000, signal);
continue;
}
const syncResponse = await response.json() as MatrixSyncResponse;
this.syncToken = syncResponse.next_batch;
// Process sync response
this.processSyncResponse(syncResponse);
} catch (error) {
if (signal.aborted) { return; }
console.error('Matrix sync loop error:', error instanceof Error ? error.message : 'Unknown error');
// Back off on error
await this.sleep(5000, signal);
}
}
}
/** Process a /sync response — extract timeline events from joined rooms. */
private processSyncResponse(sync: MatrixSyncResponse): void {
if (!this.messageHandler) { return; }
const joinedRooms = sync.rooms?.join;
if (!joinedRooms) { return; }
// Update m.direct from account_data if present
const accountData = sync.account_data?.events;
if (accountData) {
for (const event of accountData) {
if (event.type === 'm.direct' && event.content) {
this.dmRoomIds.clear();
for (const roomIds of Object.values(event.content as Record<string, string[]>)) {
if (Array.isArray(roomIds)) {
for (const roomId of roomIds) {
this.dmRoomIds.add(roomId);
}
}
}
}
}
}
for (const [roomId, roomData] of Object.entries(joinedRooms)) {
const events = roomData.timeline?.events;
if (!events) { continue; }
for (const event of events) {
this.handleTimelineEvent(roomId, event);
}
}
}
/** Handle a single timeline event from a joined room. */
private handleTimelineEvent(roomId: string, event: MatrixEvent): void {
if (!this.messageHandler) { return; }
// Only process m.room.message events
if (event.type !== 'm.room.message') { return; }
// Ignore our own messages
if (event.sender === this.userId) { return; }
// Room allowlist check
const allowedRoomIds = this.config.allowedRoomIds ?? [];
if (allowedRoomIds.length > 0 && !allowedRoomIds.includes(roomId)) {
// Pairing fallback
const pm = this.config.pairingManager;
if (pm?.enabled) {
if (pm.isApproved('matrix', roomId)) {
// Approved — fall through
} else {
const text = (event.content?.body ?? '').trim();
if (text && pm.validateCode('matrix', roomId, text)) {
this.sendRoomMessage(roomId, 'Pairing successful! You can now chat with Flynn.').catch(() => {});
}
return;
}
} else {
return;
}
}
const body = event.content?.body ?? '';
const isDm = this.isDM(roomId);
// Mention gating: require mention in non-DM rooms
const requireMention = this.config.requireMention ?? true;
if (!isDm && requireMention) {
const isMentioned = this.isBotMentioned(body);
if (!isMentioned) { return; }
}
// Strip bot mention from text
let text = body;
if (this.botDisplayName) {
text = text.replace(new RegExp(`@?${this.escapeRegex(this.botDisplayName)}\\b`, 'gi'), '').trim();
}
if (this.userId) {
text = text.replace(new RegExp(this.escapeRegex(this.userId), 'g'), '').trim();
}
// Detect reset command
if (text === '!reset' || text === 'reset') {
this.messageHandler({
id: event.event_id,
channel: 'matrix',
senderId: roomId,
senderName: this.extractDisplayName(event.sender),
text: '!reset',
timestamp: event.origin_server_ts,
metadata: { isCommand: true, command: 'reset' },
});
return;
}
// Regular message
this.messageHandler({
id: event.event_id,
channel: 'matrix',
senderId: roomId,
senderName: this.extractDisplayName(event.sender),
text,
timestamp: event.origin_server_ts,
});
}
/** Check if the bot is mentioned in the message text. */
private isBotMentioned(text: string): boolean {
if (this.botDisplayName && text.toLowerCase().includes(this.botDisplayName.toLowerCase())) {
return true;
}
if (this.userId && text.includes(this.userId)) {
return true;
}
return false;
}
/** Extract a display name from a Matrix user ID. @user:server → user */
private extractDisplayName(userId: string): string {
const match = userId.match(/^@([^:]+)/);
return match ? match[1] : userId;
}
/** Escape a string for use in a RegExp. */
private escapeRegex(str: string): string {
return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
}
/** Sleep with abort support. */
private sleep(ms: number, signal: AbortSignal): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(resolve, ms);
signal.addEventListener('abort', () => {
clearTimeout(timer);
resolve();
}, { once: true });
});
}
}
// ── Matrix API response types ──────────────────────────────────────
interface MatrixSyncResponse {
next_batch: string;
rooms?: {
join?: Record<string, MatrixJoinedRoom>;
};
account_data?: {
events?: Array<{ type: string; content?: unknown }>;
};
}
interface MatrixJoinedRoom {
timeline?: {
events?: MatrixEvent[];
};
}
interface MatrixEvent {
type: string;
event_id: string;
sender: string;
origin_server_ts: number;
content?: {
body?: string;
msgtype?: string;
[key: string]: unknown;
};
}
Step 4: Run test to verify connect/disconnect pass
Run: pnpm test:run src/channels/matrix/adapter.test.ts
Expected: PASS (4 tests)
Step 5: Create barrel export
Create src/channels/matrix/index.ts:
export { MatrixAdapter, type MatrixAdapterConfig } from './adapter.js';
Step 6: Commit
git add src/channels/matrix/
git commit -m "feat(matrix): add Matrix adapter core with connect/disconnect"
Task 3: Add Send Tests and Verify Send Works
Files:
- Modify:
src/channels/matrix/adapter.test.ts
Step 1: Add send tests to the test file
Append these tests inside the describe('MatrixAdapter') block:
// ── send ──────────────────────────────────────────────────────
it('send throws when not connected', async () => {
await expect(adapter.send('!room1:example.org', { text: 'hello' })).rejects.toThrow(
'Matrix adapter not connected',
);
});
it('send delivers a message via PUT with txnId', async () => {
// Connect first
mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' }));
mockFetch.mockResolvedValueOnce(mockResponse({}));
mockFetch.mockResolvedValueOnce(new Promise(() => {})); // sync hangs
await adapter.connect();
// Send message
mockFetch.mockResolvedValueOnce(mockResponse({ event_id: '$sent1' }));
await adapter.send('!room1:example.org', { text: 'Hello there' });
// Find the PUT call (skip whoami, m.direct, sync)
const putCall = mockFetch.mock.calls.find(
(call) => call[1]?.method === 'PUT',
);
expect(putCall).toBeDefined();
expect(putCall![0]).toContain('/_matrix/client/v3/rooms/');
expect(putCall![0]).toContain('/send/m.room.message/');
const body = JSON.parse(putCall![1].body);
expect(body.msgtype).toBe('m.text');
expect(body.body).toBe('Hello there');
});
it('send skips empty text', async () => {
mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' }));
mockFetch.mockResolvedValueOnce(mockResponse({}));
mockFetch.mockResolvedValueOnce(new Promise(() => {}));
await adapter.connect();
await adapter.send('!room1:example.org', { text: '' });
// No PUT call should have been made
const putCall = mockFetch.mock.calls.find(
(call) => call[1]?.method === 'PUT',
);
expect(putCall).toBeUndefined();
});
Step 2: Run tests
Run: pnpm test:run src/channels/matrix/adapter.test.ts
Expected: PASS (7 tests)
Step 3: Commit
git add src/channels/matrix/adapter.test.ts
git commit -m "test(matrix): add send tests"
Task 4: Add Inbound Message Handling Tests
Files:
- Modify:
src/channels/matrix/adapter.test.ts
Step 1: Add a helper to connect the adapter and simulate sync
Add these helpers near the top of the test file (after mockFetchSequence):
/** Helper: connect the adapter with standard mocks. Returns a function to simulate sync. */
async function connectAdapter(adapter: MatrixAdapter, directRooms: Record<string, string[]> = {}) {
// whoami
mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' }));
// m.direct
mockFetch.mockResolvedValueOnce(mockResponse(directRooms));
// sync long-poll — we'll control this manually
let syncResolve: ((value: Response) => void) | null = null;
mockFetch.mockResolvedValueOnce(
new Promise<Response>((resolve) => { syncResolve = resolve; }),
);
await adapter.connect();
return {
/** Simulate a sync response arriving. */
async emitSync(syncData: unknown) {
if (syncResolve) {
syncResolve(mockResponse(syncData));
syncResolve = null;
}
// Allow the sync loop to process
await new Promise((r) => setTimeout(r, 10));
// Set up next sync to hang
mockFetch.mockResolvedValueOnce(new Promise(() => {}));
},
};
}
/** Helper: create a Matrix timeline event. */
function createMatrixEvent(overrides: Record<string, unknown> = {}) {
return {
type: 'm.room.message',
event_id: '$evt1',
sender: '@alice:example.org',
origin_server_ts: 1700000000000,
content: {
msgtype: 'm.text',
body: 'Hello Flynn',
},
...overrides,
};
}
/** Helper: wrap events into a sync response. */
function createSyncResponse(roomId: string, events: unknown[], extras: Record<string, unknown> = {}) {
return {
next_batch: 's_next',
rooms: {
join: {
[roomId]: {
timeline: { events },
},
},
},
...extras,
};
}
Step 2: Add inbound message tests
Append inside the describe('MatrixAdapter') block:
// ── Inbound message handling ────────────────────────────────────
it('inbound message from allowed room with mention triggers handler', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
const { emitSync } = await connectAdapter(adapter);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: '@Flynn Hello there' },
});
await emitSync(createSyncResponse('!room1:example.org', [event]));
expect(handler).toHaveBeenCalledTimes(1);
const msg: InboundMessage = handler.mock.calls[0][0];
expect(msg.channel).toBe('matrix');
expect(msg.senderId).toBe('!room1:example.org');
expect(msg.senderName).toBe('alice');
expect(msg.text).toBe('Hello there');
expect(msg.id).toBe('$evt1');
});
it('inbound message without mention is ignored when requireMention is true', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
const { emitSync } = await connectAdapter(adapter);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: 'Hello everyone' },
});
await emitSync(createSyncResponse('!room1:example.org', [event]));
expect(handler).not.toHaveBeenCalled();
});
it('inbound message without mention is accepted when requireMention is false', async () => {
const noMentionAdapter = new MatrixAdapter({
...baseConfig,
requireMention: false,
});
const handler = vi.fn();
noMentionAdapter.onMessage(handler);
const { emitSync } = await connectAdapter(noMentionAdapter);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: 'Hello everyone' },
});
await emitSync(createSyncResponse('!room1:example.org', [event]));
expect(handler).toHaveBeenCalledTimes(1);
await noMentionAdapter.disconnect();
});
it('ignores own messages (sender === userId)', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
const { emitSync } = await connectAdapter(adapter);
const event = createMatrixEvent({
sender: '@flynn:example.org',
content: { msgtype: 'm.text', body: 'My own message' },
});
await emitSync(createSyncResponse('!room1:example.org', [event]));
expect(handler).not.toHaveBeenCalled();
});
it('ignores messages in disallowed rooms', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
const { emitSync } = await connectAdapter(adapter);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: '@Flynn hello' },
});
await emitSync(createSyncResponse('!notallowed:example.org', [event]));
expect(handler).not.toHaveBeenCalled();
});
it('DM room bypasses mention requirement', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
// Mark !dm1 as a DM room via m.direct
const directRooms = { '@alice:example.org': ['!dm1:example.org'] };
const dmAdapter = new MatrixAdapter({
...baseConfig,
allowedRoomIds: ['!dm1:example.org'],
});
dmAdapter.onMessage(handler);
const { emitSync } = await connectAdapter(dmAdapter, directRooms);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: 'Hello without mention' },
});
await emitSync(createSyncResponse('!dm1:example.org', [event]));
expect(handler).toHaveBeenCalledTimes(1);
const msg: InboundMessage = handler.mock.calls[0][0];
expect(msg.text).toBe('Hello without mention');
await dmAdapter.disconnect();
});
it('!reset command delivers reset metadata', async () => {
const noMentionAdapter = new MatrixAdapter({
...baseConfig,
requireMention: false,
});
const handler = vi.fn();
noMentionAdapter.onMessage(handler);
const { emitSync } = await connectAdapter(noMentionAdapter);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: '!reset' },
});
await emitSync(createSyncResponse('!room1:example.org', [event]));
expect(handler).toHaveBeenCalledTimes(1);
const msg: InboundMessage = handler.mock.calls[0][0];
expect(msg.text).toBe('!reset');
expect(msg.metadata).toEqual({ isCommand: true, command: 'reset' });
await noMentionAdapter.disconnect();
});
it('strips bot mention from message text', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
const { emitSync } = await connectAdapter(adapter);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: '@Flynn What is the weather?' },
});
await emitSync(createSyncResponse('!room1:example.org', [event]));
expect(handler).toHaveBeenCalledTimes(1);
const msg: InboundMessage = handler.mock.calls[0][0];
expect(msg.text).toBe('What is the weather?');
});
it('ignores non-message events (e.g. m.room.member)', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
const { emitSync } = await connectAdapter(adapter);
const event = createMatrixEvent({
type: 'm.room.member',
content: { membership: 'join' },
});
await emitSync(createSyncResponse('!room1:example.org', [event]));
expect(handler).not.toHaveBeenCalled();
});
it('accepts all rooms when allowedRoomIds is empty', async () => {
const openAdapter = new MatrixAdapter({
...baseConfig,
allowedRoomIds: [],
requireMention: false,
});
const handler = vi.fn();
openAdapter.onMessage(handler);
const { emitSync } = await connectAdapter(openAdapter);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: 'Hello from any room' },
});
await emitSync(createSyncResponse('!anyrandom:example.org', [event]));
expect(handler).toHaveBeenCalledTimes(1);
const msg: InboundMessage = handler.mock.calls[0][0];
expect(msg.text).toBe('Hello from any room');
await openAdapter.disconnect();
});
it('does nothing when no message handler is registered', async () => {
// Don't call onMessage
const { emitSync } = await connectAdapter(adapter);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: '@Flynn hello' },
});
// Should not throw
await emitSync(createSyncResponse('!room1:example.org', [event]));
});
it('mention detection works with Matrix user ID', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
const { emitSync } = await connectAdapter(adapter);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: '@flynn:example.org can you help?' },
});
await emitSync(createSyncResponse('!room1:example.org', [event]));
expect(handler).toHaveBeenCalledTimes(1);
const msg: InboundMessage = handler.mock.calls[0][0];
expect(msg.text).toBe('can you help?');
});
Step 2: Run tests
Run: pnpm test:run src/channels/matrix/adapter.test.ts
Expected: PASS (all ~19 tests)
Step 3: Commit
git add src/channels/matrix/adapter.test.ts
git commit -m "test(matrix): add inbound message handling and DM detection tests"
Task 5: Wire Matrix Adapter into the System
Files:
- Modify:
src/channels/index.ts - Modify:
src/daemon/channels.ts
Step 1: Add Matrix exports to src/channels/index.ts
Add after the WhatsApp export line:
export { MatrixAdapter, type MatrixAdapterConfig } from './matrix/index.js';
Step 2: Register Matrix adapter in src/daemon/channels.ts
Add import at top:
import { ChannelRegistry, TelegramAdapter, WebChatAdapter, DiscordAdapter, SlackAdapter, WhatsAppAdapter, MatrixAdapter, PairingManager } from '../channels/index.js';
Add registration block after the WhatsApp block (around line 71):
// Register Matrix adapter (if configured)
if (config.matrix) {
const matrixAdapter = new MatrixAdapter({
homeserverUrl: config.matrix.homeserver_url,
accessToken: config.matrix.access_token,
allowedRoomIds: config.matrix.allowed_room_ids.length > 0 ? config.matrix.allowed_room_ids : undefined,
requireMention: config.matrix.require_mention,
syncTimeoutMs: config.matrix.sync_timeout_ms,
displayName: config.matrix.display_name,
pairingManager,
});
channelRegistry.register(matrixAdapter);
}
Step 3: Run typecheck
Run: pnpm typecheck
Expected: PASS
Step 4: Run all channel tests to ensure no regressions
Run: pnpm test:run src/channels/
Expected: All existing tests PASS
Step 5: Commit
git add src/channels/index.ts src/daemon/channels.ts
git commit -m "feat(matrix): wire Matrix adapter into channel registry and daemon"
Task 6: Add Config Schema Test
Files:
- Modify:
src/config/schema.test.ts(if exists, otherwise verify with existing tests)
Step 1: Check for existing config schema tests
Read src/config/schema.test.ts and add a Matrix config validation test.
it('validates matrix config', () => {
const config = configSchema.parse({
models: { default: { provider: 'anthropic', model: 'claude-sonnet-4-20250514' } },
matrix: {
homeserver_url: 'https://matrix.example.org',
access_token: 'syt_test_token',
allowed_room_ids: ['!room1:example.org'],
require_mention: true,
},
});
expect(config.matrix).toBeDefined();
expect(config.matrix!.homeserver_url).toBe('https://matrix.example.org');
expect(config.matrix!.require_mention).toBe(true);
expect(config.matrix!.sync_timeout_ms).toBe(30000); // default
});
it('matrix config is optional', () => {
const config = configSchema.parse({
models: { default: { provider: 'anthropic', model: 'claude-sonnet-4-20250514' } },
});
expect(config.matrix).toBeUndefined();
});
it('matrix config rejects invalid homeserver URL', () => {
expect(() => configSchema.parse({
models: { default: { provider: 'anthropic', model: 'claude-sonnet-4-20250514' } },
matrix: {
homeserver_url: 'not-a-url',
access_token: 'token',
},
})).toThrow();
});
Step 2: Run config schema tests
Run: pnpm test:run src/config/schema.test.ts
Expected: PASS
Step 3: Commit
git add src/config/schema.test.ts
git commit -m "test(matrix): add config schema validation tests"
Task 7: Add Credential Redaction for Matrix Token
Files:
- Modify:
src/gateway/handlers/config.ts
Step 1: Add matrix access_token to redaction
In the redactConfig() function, add redaction for the Matrix access token alongside the existing channel token redactions:
// After the existing channel token redactions (e.g., discord.bot_token)
if (redacted.matrix) {
redacted.matrix = { ...redacted.matrix, access_token: REDACTED };
}
Step 2: Run tests
Run: pnpm test:run src/gateway/handlers/handlers.test.ts
Expected: PASS
Step 3: Commit
git add src/gateway/handlers/config.ts
git commit -m "feat(matrix): add access_token to config redaction"
Task 8: Final Verification
Step 1: Run full typecheck
Run: pnpm typecheck
Expected: PASS
Step 2: Run all tests
Run: pnpm test:run
Expected: All tests PASS
Step 3: Commit any remaining changes
git add -A
git commit -m "feat(matrix): complete Matrix channel adapter implementation"
Testing Summary
| Test Category | Count | Description |
|---|---|---|
| Basic properties | 2 | name = "matrix", starts disconnected |
| connect/disconnect | 4 | whoami resolution, error handling, clean disconnect, safe disconnect when not connected |
| send | 3 | throws when disconnected, PUT with txnId, skips empty text |
| Inbound messages | 12 | mention gating, DM bypass, own message filtering, room allowlist, reset command, mention stripping, MXID mention, non-message events, open rooms, no handler |
| Config schema | 3 | valid config, optional, invalid URL rejection |
| Total | ~24 |
Error Handling Strategy
| Error | Recovery |
|---|---|
/account/whoami 401 |
Throw on connect — adapter stays in error state |
/sync HTTP error |
Log, back off 5s, retry |
/sync network error |
Log, back off 5s, retry |
/send failure |
Throw — caller (registry) logs the error |
m.direct 404 |
Silently ignore — no DMs known yet |
| AbortError during sync | Clean exit from loop |
Sync Loop Lifecycle
connect()
├── GET /account/whoami → resolve userId
├── GET /user/{userId}/account_data/m.direct → populate dmRoomIds
└── runSyncLoop() (fire-and-forget async)
├── GET /sync?timeout=30000 (first call, no since token)
│ └── next_batch → save as syncToken
│ └── rooms.join → process timeline events
│ └── account_data → update dmRoomIds
├── GET /sync?timeout=30000&since=... (subsequent calls)
│ └── (repeat)
└── (on error) → sleep 5s → retry
disconnect()
└── AbortController.abort() → sync loop exits
Risks and Mitigations
| Risk | Mitigation |
|---|---|
| Initial sync floods events from history | First sync may replay old events. The adapter could skip the first sync's events (only use it to get next_batch). However, this is how other adapters work (Telegram's grammy does NOT replay), so we process all events from the first sync. If this is a problem, a skip_initial_sync config flag can be added later. |
| Sync loop leak on unhandled error | try/catch wraps the entire loop body with exponential backoff. AbortController ensures clean shutdown. |
| Bot invited to many rooms, only some allowed | Room allowlist (allowed_room_ids) prevents processing. If empty, all joined rooms are processed. |
| Matrix homeserver rate limiting | The sync loop naturally rate-limits (one request at a time with 30s timeout). Send calls are user-triggered and unlikely to hit limits. |
| Access token expiry | Matrix access tokens are typically long-lived. If expired, whoami will fail on connect. Short-lived tokens (refresh tokens) are a future enhancement. |
Open Questions
-
Skip initial sync? — Should the adapter skip processing events from the very first
/syncresponse (which may contain old history)? Current design processes them all. Can add askip_initial_sync: trueconfig flag if needed. -
Typing indicators? — Matrix supports
m.typingevents. Should the adapter send a typing indicator while processing? Deferred for initial implementation (other adapters do this inconsistently — Telegram yes, Discord yes, Slack no). -
Image/media attachments? — Matrix supports
m.image,m.file,m.audiomsgtypes. The initial implementation only handlesm.text. Media support can be added in a follow-up (download via/_matrix/media/v3/download/{serverName}/{mediaId}). -
Formatted messages? — Matrix supports
format: "org.matrix.custom.html"withformatted_body. Should outbound messages include HTML? Deferred — plain text is sufficient for initial implementation.
Files Summary
| Action | File |
|---|---|
| Modify | src/config/schema.ts — add matrixSchema + type export |
| Create | src/channels/matrix/adapter.ts — full adapter implementation |
| Create | src/channels/matrix/adapter.test.ts — 24 tests |
| Create | src/channels/matrix/index.ts — barrel export |
| Modify | src/channels/index.ts — re-export Matrix adapter |
| Modify | src/daemon/channels.ts — register Matrix adapter |
| Modify | src/gateway/handlers/config.ts — redact access_token |
| Modify | src/config/schema.test.ts — config validation tests |