# Matrix Channel Adapter Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Add a Matrix channel adapter using raw fetch against the Matrix Client-Server API v3, with zero external dependencies (no matrix-js-sdk). **Architecture:** Polling-based adapter using `/_matrix/client/v3/sync` with incremental `since` tokens. Auth via pre-provisioned access token in config. Room allowlist + mention gating for multi-room safety. DM detection via `m.direct` account data. Outbound via `/_matrix/client/v3/rooms/{roomId}/send/m.room.message/{txnId}`. **Tech Stack:** Node.js fetch (built-in), Vitest for tests, Zod for config schema validation. --- ## Context ### Existing Patterns All Flynn channel adapters follow a consistent pattern: 1. **Config interface** — e.g. `TelegramAdapterConfig`, `DiscordAdapterConfig` 2. **Adapter class** implementing `ChannelAdapter` from `src/channels/types.ts` 3. **Barrel export** from `src/channels//index.ts` 4. **Re-export** in `src/channels/index.ts` 5. **Zod schema** in `src/config/schema.ts` (optional field on root config) 6. **Registration** in `src/daemon/channels.ts` (conditional on config presence) 7. **Tests** mocking the platform SDK; here we mock `global.fetch` ### Key ChannelAdapter interface ```typescript interface ChannelAdapter { readonly name: string; readonly status: ChannelStatus; connect(): Promise; disconnect(): Promise; send(peerId: string, message: OutboundMessage): Promise; onMessage(handler: (msg: InboundMessage) => void): void; } ``` ### senderId convention - Telegram: `String(chatId)` (numeric chat ID) - Discord: `channelId` (snowflake string) - Slack: `channelId:threadTs` - WhatsApp: `number@c.us` For Matrix: `roomId` (e.g. `!abc123:matrix.org`). This gives one session per room, matching the Discord pattern. The Matrix userId (e.g. `@flynn:matrix.org`) goes into `senderName`. ### peerId convention `peerId` passed to `send()` is the `roomId`. This is the same as `senderId` since replies go back to the same room. --- ## Config Shape ```yaml matrix: homeserver_url: "https://matrix.example.org" access_token: "${MATRIX_ACCESS_TOKEN}" allowed_room_ids: - "!abc123:matrix.org" require_mention: true # default: true sync_timeout_ms: 30000 # default: 30000 (long-poll timeout) display_name: "Flynn" # optional: set bot display name on connect ``` Zod schema: ```typescript const matrixSchema = z.object({ homeserver_url: z.string().url('Homeserver URL is required'), access_token: z.string().min(1, 'Access token is required'), allowed_room_ids: z.array(z.string()).default([]), require_mention: z.boolean().default(true), sync_timeout_ms: z.number().min(1000).max(120000).default(30000), display_name: z.string().optional(), }).optional(); ``` --- ## Matrix Client-Server API Endpoints Used | Endpoint | Method | Purpose | |----------|--------|---------| | `/_matrix/client/v3/sync` | GET | Long-poll for events (with `?since=&timeout=&filter=`) | | `/_matrix/client/v3/rooms/{roomId}/send/m.room.message/{txnId}` | PUT | Send a message | | `/_matrix/client/v3/user/{userId}/account_data/m.direct` | GET | Detect DM rooms | | `/_matrix/client/v3/account/whoami` | GET | Resolve own userId on connect | ### Sync filter (minimize bandwidth) ```json { "room": { "timeline": { "limit": 50 }, "state": { "lazy_load_members": true } }, "presence": { "not_types": ["*"] }, "account_data": { "types": ["m.direct"] } } ``` --- ## Design Decisions ### 1. No matrix-js-sdk dependency - **Choice**: Raw fetch calls against `/_matrix/client/v3` endpoints - **Rationale**: User explicitly requested minimal dependencies. The Matrix CS API is well-documented REST/JSON. We only need sync, send, whoami, and account_data — 4 endpoints total. - **Alternatives**: matrix-js-sdk (heavy, ~2MB, pulls in many transitive deps), matrix-bot-sdk (lighter but still a dep). ### 2. Polling via /sync long-poll (not WebSocket or SSE) - **Choice**: `/sync` with `timeout` param for long-polling, looped in an async function with `AbortController` for clean shutdown. - **Rationale**: This is the standard Matrix sync mechanism. Simple, robust, no extra deps. The `timeout` param makes it behave like long-polling (server holds connection open until events arrive or timeout elapses). - **Alternatives**: Sliding Sync (MSC3575, not universally supported), WebSocket transport (experimental extension). ### 3. senderId = roomId - **Choice**: Use the Matrix room ID (`!...`) as `senderId` in `InboundMessage`, matching the Discord adapter pattern where `senderId = channelId`. - **Rationale**: Flynn creates one session per `channel:senderId` pair. Using `roomId` means one session per room, which is the natural boundary. The human user's Matrix ID goes into `senderName`. ### 4. DM detection via m.direct account data - **Choice**: On connect, fetch the bot's `m.direct` account data to build a `Set` of known DM room IDs. DM rooms bypass `require_mention`. - **Rationale**: Matrix DMs are just rooms, but the `m.direct` account data flags which rooms are direct messages. This is the standard approach and doesn't require room member counting heuristics. ### 5. Mention detection via body text - **Choice**: Check for `@displayName` or bot's Matrix user ID in the message body text. - **Rationale**: Matrix `m.room.message` events include both `body` (plain text) and optional `formatted_body` (HTML with `` pill links). Checking the plain text `body` for the display name or MXID is simpler and sufficient. The HTML formatted_body could also be checked for matrix.to links, but body text matching covers all clients. ### 6. Transaction ID for idempotent sends - **Choice**: Generate txnId as `m${Date.now()}.${counter++}` per adapter instance. - **Rationale**: Matrix PUT send requires a txnId for idempotency. A timestamp + counter ensures uniqueness within a process lifetime. ### 7. Message size limit - **Choice**: Use `splitMessage()` utility with a 65536-char limit (Matrix spec allows up to 65536 bytes for event content). - **Rationale**: Matrix's limit is much higher than Telegram (4096) or Discord (2000), but chunking at 65536 chars prevents oversized events. --- ## Implementation Steps ### Task 1: Add Matrix Zod Schema to Config **Files:** - Modify: `src/config/schema.ts` **Step 1: Add the matrixSchema definition** Add after the `whatsappSchema` definition (around line 350): ```typescript const matrixSchema = z.object({ homeserver_url: z.string().url('Homeserver URL is required'), access_token: z.string().min(1, 'Access token is required'), allowed_room_ids: z.array(z.string()).default([]), require_mention: z.boolean().default(true), sync_timeout_ms: z.number().min(1000).max(120000).default(30000), display_name: z.string().optional(), }).optional(); ``` **Step 2: Add `matrix` to configSchema** In `configSchema` (around line 510), add after `whatsapp`: ```typescript matrix: matrixSchema, ``` **Step 3: Add exported type** After the existing type exports (around line 559), add: ```typescript export type MatrixConfig = z.infer; ``` **Step 4: Run typecheck** Run: `pnpm typecheck` Expected: PASS **Step 5: Commit** ```bash git add src/config/schema.ts git commit -m "feat(matrix): add Matrix channel config schema" ``` --- ### Task 2: Create Matrix Adapter — Core Structure and Connect **Files:** - Create: `src/channels/matrix/adapter.ts` - Create: `src/channels/matrix/adapter.test.ts` - Create: `src/channels/matrix/index.ts` **Step 1: Write the adapter.test.ts skeleton with connect/disconnect tests** Create `src/channels/matrix/adapter.test.ts`: ```typescript import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; import { MatrixAdapter, type MatrixAdapterConfig } from './adapter.js'; import type { InboundMessage } from '../types.js'; // ── Mock global fetch ───────────────────────────────────────────── const mockFetch = vi.fn(); beforeEach(() => { vi.stubGlobal('fetch', mockFetch); }); afterEach(() => { vi.restoreAllMocks(); }); const baseConfig: MatrixAdapterConfig = { homeserverUrl: 'https://matrix.example.org', accessToken: 'syt_test_token', allowedRoomIds: ['!room1:example.org'], requireMention: true, syncTimeoutMs: 30000, }; /** Helper: create a mock fetch Response. */ function mockResponse(body: unknown, status = 200): Response { return { ok: status >= 200 && status < 300, status, json: async () => body, text: async () => JSON.stringify(body), } as Response; } /** Helper: make fetch return specific responses in sequence. */ function mockFetchSequence(...responses: Response[]) { for (const response of responses) { mockFetch.mockResolvedValueOnce(response); } } describe('MatrixAdapter', () => { let adapter: MatrixAdapter; beforeEach(() => { vi.clearAllMocks(); adapter = new MatrixAdapter(baseConfig); }); afterEach(async () => { // Ensure sync loop is stopped if (adapter.status === 'connected' || adapter.status === 'connecting') { await adapter.disconnect(); } }); // ── Basic properties ──────────────────────────────────────────── it('has name "matrix"', () => { expect(adapter.name).toBe('matrix'); }); it('starts as disconnected', () => { expect(adapter.status).toBe('disconnected'); }); // ── connect / disconnect ──────────────────────────────────────── it('connect resolves userId via whoami and sets connected', async () => { // whoami mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' })); // m.direct account data mockFetch.mockResolvedValueOnce(mockResponse({})); // First sync (will be aborted on disconnect) mockFetch.mockResolvedValueOnce( new Promise(() => {}), // never resolves — simulates long-poll ); await adapter.connect(); expect(adapter.status).toBe('connected'); // Verify whoami was called expect(mockFetch).toHaveBeenCalledWith( 'https://matrix.example.org/_matrix/client/v3/account/whoami', expect.objectContaining({ headers: expect.objectContaining({ Authorization: 'Bearer syt_test_token', }), }), ); }); it('connect throws on whoami failure', async () => { mockFetch.mockResolvedValueOnce(mockResponse({ errcode: 'M_UNKNOWN_TOKEN' }, 401)); await expect(adapter.connect()).rejects.toThrow(); }); it('disconnect sets status to disconnected', async () => { // whoami mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' })); // m.direct mockFetch.mockResolvedValueOnce(mockResponse({})); // sync long-poll (never resolves) mockFetch.mockResolvedValueOnce(new Promise(() => {})); await adapter.connect(); expect(adapter.status).toBe('connected'); await adapter.disconnect(); expect(adapter.status).toBe('disconnected'); }); it('disconnect is safe when not connected', async () => { await adapter.disconnect(); expect(adapter.status).toBe('disconnected'); }); }); ``` **Step 2: Run test to verify it fails (adapter doesn't exist yet)** Run: `pnpm test:run src/channels/matrix/adapter.test.ts` Expected: FAIL — cannot resolve `./adapter.js` **Step 3: Write adapter.ts with connect/disconnect** Create `src/channels/matrix/adapter.ts`: ```typescript /** * Matrix channel adapter. * * Implements the ChannelAdapter interface using raw fetch against the * Matrix Client-Server API v3. No external Matrix SDK dependency. * Uses /sync long-polling for inbound messages and PUT /send for outbound. */ import type { InboundMessage, OutboundMessage, ChannelAdapter, ChannelStatus, } from '../types.js'; import { splitMessage } from '../utils.js'; import type { PairingManager } from '../pairing.js'; /** Configuration for the Matrix channel adapter. */ export interface MatrixAdapterConfig { homeserverUrl: string; accessToken: string; /** Room IDs to respond in. Empty = all joined rooms. */ allowedRoomIds?: string[]; /** Require @mention to respond in rooms (default: true). DMs always respond. */ requireMention?: boolean; /** Long-poll timeout for /sync in milliseconds (default: 30000). */ syncTimeoutMs?: number; /** Optional display name to set on connect. */ displayName?: string; /** Optional pairing manager for DM pairing codes. */ pairingManager?: PairingManager; } /** Maximum message length before chunking (Matrix allows up to 65536 bytes). */ const MAX_MESSAGE_LENGTH = 65536; /** * Matrix channel adapter backed by raw Client-Server API v3 fetch calls. * * Handles room allowlist filtering, optional mention requirement, * DM detection via m.direct, and message chunking. */ export class MatrixAdapter implements ChannelAdapter { readonly name = 'matrix'; private _status: ChannelStatus = 'disconnected'; private messageHandler?: (msg: InboundMessage) => void; private config: MatrixAdapterConfig; /** Resolved bot user ID (e.g. @flynn:example.org). */ private userId: string | null = null; /** Resolved bot display name (for mention detection). */ private botDisplayName: string | null = null; /** Set of room IDs that are direct messages. */ private dmRoomIds: Set = new Set(); /** Incremental sync token. */ private syncToken: string | null = null; /** AbortController for the sync loop. */ private syncAbort: AbortController | null = null; /** Transaction ID counter for idempotent sends. */ private txnCounter = 0; get status(): ChannelStatus { return this._status; } constructor(config: MatrixAdapterConfig) { this.config = config; } /** Register the inbound message handler. Called by the registry before connect(). */ onMessage(handler: (msg: InboundMessage) => void): void { this.messageHandler = handler; } /** * Connect to the Matrix homeserver: * 1. Resolve bot userId via /account/whoami * 2. Fetch m.direct account data for DM detection * 3. Start the /sync long-poll loop */ async connect(): Promise { this._status = 'connecting'; try { // Resolve our own user ID const whoami = await this.matrixGet<{ user_id: string }>( '/_matrix/client/v3/account/whoami', ); this.userId = whoami.user_id; this.botDisplayName = this.config.displayName ?? this.userId.split(':')[0].slice(1); // Fetch m.direct account data for DM detection await this.loadDirectRooms(); // Start sync loop (fire and forget — runs until disconnect) this.syncAbort = new AbortController(); this.runSyncLoop(this.syncAbort.signal); this._status = 'connected'; console.log(`Matrix adapter connected as ${this.userId}`); } catch (error) { this._status = 'error'; throw error; } } /** Stop the sync loop and clean up. */ async disconnect(): Promise { if (this.syncAbort) { this.syncAbort.abort(); this.syncAbort = null; } this.userId = null; this.syncToken = null; this.dmRoomIds.clear(); this._status = 'disconnected'; } /** Send an outbound message to a Matrix room. */ async send(peerId: string, message: OutboundMessage): Promise { if (this._status !== 'connected') { throw new Error('Matrix adapter not connected'); } const text = message.text ?? ''; if (!text.trim()) { return; } if (text.length <= MAX_MESSAGE_LENGTH) { await this.sendRoomMessage(peerId, text); } else { const chunks = splitMessage(text, MAX_MESSAGE_LENGTH); for (const chunk of chunks) { await this.sendRoomMessage(peerId, chunk); } } } // ── Private: Matrix API helpers ───────────────────────────────── /** Make an authenticated GET request to the homeserver. */ private async matrixGet(path: string, query?: Record): Promise { const url = new URL(path, this.config.homeserverUrl); if (query) { for (const [k, v] of Object.entries(query)) { url.searchParams.set(k, v); } } const response = await fetch(url.toString(), { method: 'GET', headers: { Authorization: `Bearer ${this.config.accessToken}`, }, }); if (!response.ok) { const body = await response.text(); throw new Error(`Matrix API error (${response.status}): ${body}`); } return response.json() as Promise; } /** Make an authenticated PUT request to the homeserver. */ private async matrixPut(path: string, body: unknown): Promise { const url = new URL(path, this.config.homeserverUrl); const response = await fetch(url.toString(), { method: 'PUT', headers: { Authorization: `Bearer ${this.config.accessToken}`, 'Content-Type': 'application/json', }, body: JSON.stringify(body), }); if (!response.ok) { const text = await response.text(); throw new Error(`Matrix API error (${response.status}): ${text}`); } return response.json() as Promise; } /** Send a text message to a room with idempotent txnId. */ private async sendRoomMessage(roomId: string, text: string): Promise { const txnId = `m${Date.now()}.${this.txnCounter++}`; await this.matrixPut( `/_matrix/client/v3/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${txnId}`, { msgtype: 'm.text', body: text, }, ); } /** Load the m.direct account data to populate dmRoomIds. */ private async loadDirectRooms(): Promise { try { const data = await this.matrixGet>( `/_matrix/client/v3/user/${encodeURIComponent(this.userId!)}/account_data/m.direct`, ); this.dmRoomIds.clear(); // m.direct maps userId → [roomId, ...]. Flatten all room IDs. for (const roomIds of Object.values(data)) { if (Array.isArray(roomIds)) { for (const roomId of roomIds) { this.dmRoomIds.add(roomId); } } } } catch { // m.direct may not exist if the bot has no DMs yet — that's fine. this.dmRoomIds.clear(); } } /** Check if a room is a DM. */ private isDM(roomId: string): boolean { return this.dmRoomIds.has(roomId); } // ── Private: Sync loop ────────────────────────────────────────── /** Run the /sync long-poll loop until aborted. */ private async runSyncLoop(signal: AbortSignal): Promise { // Build the sync filter to minimize bandwidth const filter = JSON.stringify({ room: { timeline: { limit: 50 }, state: { lazy_load_members: true }, }, presence: { not_types: ['*'] }, account_data: { types: ['m.direct'] }, }); while (!signal.aborted) { try { const query: Record = { timeout: String(this.config.syncTimeoutMs ?? 30000), filter, }; if (this.syncToken) { query.since = this.syncToken; } const url = new URL('/_matrix/client/v3/sync', this.config.homeserverUrl); for (const [k, v] of Object.entries(query)) { url.searchParams.set(k, v); } const response = await fetch(url.toString(), { method: 'GET', headers: { Authorization: `Bearer ${this.config.accessToken}`, }, signal, }); if (!response.ok) { const body = await response.text(); console.error(`Matrix sync error (${response.status}): ${body}`); // Back off on error await this.sleep(5000, signal); continue; } const syncResponse = await response.json() as MatrixSyncResponse; this.syncToken = syncResponse.next_batch; // Process sync response this.processSyncResponse(syncResponse); } catch (error) { if (signal.aborted) { return; } console.error('Matrix sync loop error:', error instanceof Error ? error.message : 'Unknown error'); // Back off on error await this.sleep(5000, signal); } } } /** Process a /sync response — extract timeline events from joined rooms. */ private processSyncResponse(sync: MatrixSyncResponse): void { if (!this.messageHandler) { return; } const joinedRooms = sync.rooms?.join; if (!joinedRooms) { return; } // Update m.direct from account_data if present const accountData = sync.account_data?.events; if (accountData) { for (const event of accountData) { if (event.type === 'm.direct' && event.content) { this.dmRoomIds.clear(); for (const roomIds of Object.values(event.content as Record)) { if (Array.isArray(roomIds)) { for (const roomId of roomIds) { this.dmRoomIds.add(roomId); } } } } } } for (const [roomId, roomData] of Object.entries(joinedRooms)) { const events = roomData.timeline?.events; if (!events) { continue; } for (const event of events) { this.handleTimelineEvent(roomId, event); } } } /** Handle a single timeline event from a joined room. */ private handleTimelineEvent(roomId: string, event: MatrixEvent): void { if (!this.messageHandler) { return; } // Only process m.room.message events if (event.type !== 'm.room.message') { return; } // Ignore our own messages if (event.sender === this.userId) { return; } // Room allowlist check const allowedRoomIds = this.config.allowedRoomIds ?? []; if (allowedRoomIds.length > 0 && !allowedRoomIds.includes(roomId)) { // Pairing fallback const pm = this.config.pairingManager; if (pm?.enabled) { if (pm.isApproved('matrix', roomId)) { // Approved — fall through } else { const text = (event.content?.body ?? '').trim(); if (text && pm.validateCode('matrix', roomId, text)) { this.sendRoomMessage(roomId, 'Pairing successful! You can now chat with Flynn.').catch(() => {}); } return; } } else { return; } } const body = event.content?.body ?? ''; const isDm = this.isDM(roomId); // Mention gating: require mention in non-DM rooms const requireMention = this.config.requireMention ?? true; if (!isDm && requireMention) { const isMentioned = this.isBotMentioned(body); if (!isMentioned) { return; } } // Strip bot mention from text let text = body; if (this.botDisplayName) { text = text.replace(new RegExp(`@?${this.escapeRegex(this.botDisplayName)}\\b`, 'gi'), '').trim(); } if (this.userId) { text = text.replace(new RegExp(this.escapeRegex(this.userId), 'g'), '').trim(); } // Detect reset command if (text === '!reset' || text === 'reset') { this.messageHandler({ id: event.event_id, channel: 'matrix', senderId: roomId, senderName: this.extractDisplayName(event.sender), text: '!reset', timestamp: event.origin_server_ts, metadata: { isCommand: true, command: 'reset' }, }); return; } // Regular message this.messageHandler({ id: event.event_id, channel: 'matrix', senderId: roomId, senderName: this.extractDisplayName(event.sender), text, timestamp: event.origin_server_ts, }); } /** Check if the bot is mentioned in the message text. */ private isBotMentioned(text: string): boolean { if (this.botDisplayName && text.toLowerCase().includes(this.botDisplayName.toLowerCase())) { return true; } if (this.userId && text.includes(this.userId)) { return true; } return false; } /** Extract a display name from a Matrix user ID. @user:server → user */ private extractDisplayName(userId: string): string { const match = userId.match(/^@([^:]+)/); return match ? match[1] : userId; } /** Escape a string for use in a RegExp. */ private escapeRegex(str: string): string { return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); } /** Sleep with abort support. */ private sleep(ms: number, signal: AbortSignal): Promise { return new Promise((resolve) => { const timer = setTimeout(resolve, ms); signal.addEventListener('abort', () => { clearTimeout(timer); resolve(); }, { once: true }); }); } } // ── Matrix API response types ────────────────────────────────────── interface MatrixSyncResponse { next_batch: string; rooms?: { join?: Record; }; account_data?: { events?: Array<{ type: string; content?: unknown }>; }; } interface MatrixJoinedRoom { timeline?: { events?: MatrixEvent[]; }; } interface MatrixEvent { type: string; event_id: string; sender: string; origin_server_ts: number; content?: { body?: string; msgtype?: string; [key: string]: unknown; }; } ``` **Step 4: Run test to verify connect/disconnect pass** Run: `pnpm test:run src/channels/matrix/adapter.test.ts` Expected: PASS (4 tests) **Step 5: Create barrel export** Create `src/channels/matrix/index.ts`: ```typescript export { MatrixAdapter, type MatrixAdapterConfig } from './adapter.js'; ``` **Step 6: Commit** ```bash git add src/channels/matrix/ git commit -m "feat(matrix): add Matrix adapter core with connect/disconnect" ``` --- ### Task 3: Add Send Tests and Verify Send Works **Files:** - Modify: `src/channels/matrix/adapter.test.ts` **Step 1: Add send tests to the test file** Append these tests inside the `describe('MatrixAdapter')` block: ```typescript // ── send ────────────────────────────────────────────────────── it('send throws when not connected', async () => { await expect(adapter.send('!room1:example.org', { text: 'hello' })).rejects.toThrow( 'Matrix adapter not connected', ); }); it('send delivers a message via PUT with txnId', async () => { // Connect first mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' })); mockFetch.mockResolvedValueOnce(mockResponse({})); mockFetch.mockResolvedValueOnce(new Promise(() => {})); // sync hangs await adapter.connect(); // Send message mockFetch.mockResolvedValueOnce(mockResponse({ event_id: '$sent1' })); await adapter.send('!room1:example.org', { text: 'Hello there' }); // Find the PUT call (skip whoami, m.direct, sync) const putCall = mockFetch.mock.calls.find( (call) => call[1]?.method === 'PUT', ); expect(putCall).toBeDefined(); expect(putCall![0]).toContain('/_matrix/client/v3/rooms/'); expect(putCall![0]).toContain('/send/m.room.message/'); const body = JSON.parse(putCall![1].body); expect(body.msgtype).toBe('m.text'); expect(body.body).toBe('Hello there'); }); it('send skips empty text', async () => { mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' })); mockFetch.mockResolvedValueOnce(mockResponse({})); mockFetch.mockResolvedValueOnce(new Promise(() => {})); await adapter.connect(); await adapter.send('!room1:example.org', { text: '' }); // No PUT call should have been made const putCall = mockFetch.mock.calls.find( (call) => call[1]?.method === 'PUT', ); expect(putCall).toBeUndefined(); }); ``` **Step 2: Run tests** Run: `pnpm test:run src/channels/matrix/adapter.test.ts` Expected: PASS (7 tests) **Step 3: Commit** ```bash git add src/channels/matrix/adapter.test.ts git commit -m "test(matrix): add send tests" ``` --- ### Task 4: Add Inbound Message Handling Tests **Files:** - Modify: `src/channels/matrix/adapter.test.ts` **Step 1: Add a helper to connect the adapter and simulate sync** Add these helpers near the top of the test file (after `mockFetchSequence`): ```typescript /** Helper: connect the adapter with standard mocks. Returns a function to simulate sync. */ async function connectAdapter(adapter: MatrixAdapter, directRooms: Record = {}) { // whoami mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' })); // m.direct mockFetch.mockResolvedValueOnce(mockResponse(directRooms)); // sync long-poll — we'll control this manually let syncResolve: ((value: Response) => void) | null = null; mockFetch.mockResolvedValueOnce( new Promise((resolve) => { syncResolve = resolve; }), ); await adapter.connect(); return { /** Simulate a sync response arriving. */ async emitSync(syncData: unknown) { if (syncResolve) { syncResolve(mockResponse(syncData)); syncResolve = null; } // Allow the sync loop to process await new Promise((r) => setTimeout(r, 10)); // Set up next sync to hang mockFetch.mockResolvedValueOnce(new Promise(() => {})); }, }; } /** Helper: create a Matrix timeline event. */ function createMatrixEvent(overrides: Record = {}) { return { type: 'm.room.message', event_id: '$evt1', sender: '@alice:example.org', origin_server_ts: 1700000000000, content: { msgtype: 'm.text', body: 'Hello Flynn', }, ...overrides, }; } /** Helper: wrap events into a sync response. */ function createSyncResponse(roomId: string, events: unknown[], extras: Record = {}) { return { next_batch: 's_next', rooms: { join: { [roomId]: { timeline: { events }, }, }, }, ...extras, }; } ``` **Step 2: Add inbound message tests** Append inside the `describe('MatrixAdapter')` block: ```typescript // ── Inbound message handling ──────────────────────────────────── it('inbound message from allowed room with mention triggers handler', async () => { const handler = vi.fn(); adapter.onMessage(handler); const { emitSync } = await connectAdapter(adapter); const event = createMatrixEvent({ content: { msgtype: 'm.text', body: '@Flynn Hello there' }, }); await emitSync(createSyncResponse('!room1:example.org', [event])); expect(handler).toHaveBeenCalledTimes(1); const msg: InboundMessage = handler.mock.calls[0][0]; expect(msg.channel).toBe('matrix'); expect(msg.senderId).toBe('!room1:example.org'); expect(msg.senderName).toBe('alice'); expect(msg.text).toBe('Hello there'); expect(msg.id).toBe('$evt1'); }); it('inbound message without mention is ignored when requireMention is true', async () => { const handler = vi.fn(); adapter.onMessage(handler); const { emitSync } = await connectAdapter(adapter); const event = createMatrixEvent({ content: { msgtype: 'm.text', body: 'Hello everyone' }, }); await emitSync(createSyncResponse('!room1:example.org', [event])); expect(handler).not.toHaveBeenCalled(); }); it('inbound message without mention is accepted when requireMention is false', async () => { const noMentionAdapter = new MatrixAdapter({ ...baseConfig, requireMention: false, }); const handler = vi.fn(); noMentionAdapter.onMessage(handler); const { emitSync } = await connectAdapter(noMentionAdapter); const event = createMatrixEvent({ content: { msgtype: 'm.text', body: 'Hello everyone' }, }); await emitSync(createSyncResponse('!room1:example.org', [event])); expect(handler).toHaveBeenCalledTimes(1); await noMentionAdapter.disconnect(); }); it('ignores own messages (sender === userId)', async () => { const handler = vi.fn(); adapter.onMessage(handler); const { emitSync } = await connectAdapter(adapter); const event = createMatrixEvent({ sender: '@flynn:example.org', content: { msgtype: 'm.text', body: 'My own message' }, }); await emitSync(createSyncResponse('!room1:example.org', [event])); expect(handler).not.toHaveBeenCalled(); }); it('ignores messages in disallowed rooms', async () => { const handler = vi.fn(); adapter.onMessage(handler); const { emitSync } = await connectAdapter(adapter); const event = createMatrixEvent({ content: { msgtype: 'm.text', body: '@Flynn hello' }, }); await emitSync(createSyncResponse('!notallowed:example.org', [event])); expect(handler).not.toHaveBeenCalled(); }); it('DM room bypasses mention requirement', async () => { const handler = vi.fn(); adapter.onMessage(handler); // Mark !dm1 as a DM room via m.direct const directRooms = { '@alice:example.org': ['!dm1:example.org'] }; const dmAdapter = new MatrixAdapter({ ...baseConfig, allowedRoomIds: ['!dm1:example.org'], }); dmAdapter.onMessage(handler); const { emitSync } = await connectAdapter(dmAdapter, directRooms); const event = createMatrixEvent({ content: { msgtype: 'm.text', body: 'Hello without mention' }, }); await emitSync(createSyncResponse('!dm1:example.org', [event])); expect(handler).toHaveBeenCalledTimes(1); const msg: InboundMessage = handler.mock.calls[0][0]; expect(msg.text).toBe('Hello without mention'); await dmAdapter.disconnect(); }); it('!reset command delivers reset metadata', async () => { const noMentionAdapter = new MatrixAdapter({ ...baseConfig, requireMention: false, }); const handler = vi.fn(); noMentionAdapter.onMessage(handler); const { emitSync } = await connectAdapter(noMentionAdapter); const event = createMatrixEvent({ content: { msgtype: 'm.text', body: '!reset' }, }); await emitSync(createSyncResponse('!room1:example.org', [event])); expect(handler).toHaveBeenCalledTimes(1); const msg: InboundMessage = handler.mock.calls[0][0]; expect(msg.text).toBe('!reset'); expect(msg.metadata).toEqual({ isCommand: true, command: 'reset' }); await noMentionAdapter.disconnect(); }); it('strips bot mention from message text', async () => { const handler = vi.fn(); adapter.onMessage(handler); const { emitSync } = await connectAdapter(adapter); const event = createMatrixEvent({ content: { msgtype: 'm.text', body: '@Flynn What is the weather?' }, }); await emitSync(createSyncResponse('!room1:example.org', [event])); expect(handler).toHaveBeenCalledTimes(1); const msg: InboundMessage = handler.mock.calls[0][0]; expect(msg.text).toBe('What is the weather?'); }); it('ignores non-message events (e.g. m.room.member)', async () => { const handler = vi.fn(); adapter.onMessage(handler); const { emitSync } = await connectAdapter(adapter); const event = createMatrixEvent({ type: 'm.room.member', content: { membership: 'join' }, }); await emitSync(createSyncResponse('!room1:example.org', [event])); expect(handler).not.toHaveBeenCalled(); }); it('accepts all rooms when allowedRoomIds is empty', async () => { const openAdapter = new MatrixAdapter({ ...baseConfig, allowedRoomIds: [], requireMention: false, }); const handler = vi.fn(); openAdapter.onMessage(handler); const { emitSync } = await connectAdapter(openAdapter); const event = createMatrixEvent({ content: { msgtype: 'm.text', body: 'Hello from any room' }, }); await emitSync(createSyncResponse('!anyrandom:example.org', [event])); expect(handler).toHaveBeenCalledTimes(1); const msg: InboundMessage = handler.mock.calls[0][0]; expect(msg.text).toBe('Hello from any room'); await openAdapter.disconnect(); }); it('does nothing when no message handler is registered', async () => { // Don't call onMessage const { emitSync } = await connectAdapter(adapter); const event = createMatrixEvent({ content: { msgtype: 'm.text', body: '@Flynn hello' }, }); // Should not throw await emitSync(createSyncResponse('!room1:example.org', [event])); }); it('mention detection works with Matrix user ID', async () => { const handler = vi.fn(); adapter.onMessage(handler); const { emitSync } = await connectAdapter(adapter); const event = createMatrixEvent({ content: { msgtype: 'm.text', body: '@flynn:example.org can you help?' }, }); await emitSync(createSyncResponse('!room1:example.org', [event])); expect(handler).toHaveBeenCalledTimes(1); const msg: InboundMessage = handler.mock.calls[0][0]; expect(msg.text).toBe('can you help?'); }); ``` **Step 2: Run tests** Run: `pnpm test:run src/channels/matrix/adapter.test.ts` Expected: PASS (all ~19 tests) **Step 3: Commit** ```bash git add src/channels/matrix/adapter.test.ts git commit -m "test(matrix): add inbound message handling and DM detection tests" ``` --- ### Task 5: Wire Matrix Adapter into the System **Files:** - Modify: `src/channels/index.ts` - Modify: `src/daemon/channels.ts` **Step 1: Add Matrix exports to src/channels/index.ts** Add after the WhatsApp export line: ```typescript export { MatrixAdapter, type MatrixAdapterConfig } from './matrix/index.js'; ``` **Step 2: Register Matrix adapter in src/daemon/channels.ts** Add import at top: ```typescript import { ChannelRegistry, TelegramAdapter, WebChatAdapter, DiscordAdapter, SlackAdapter, WhatsAppAdapter, MatrixAdapter, PairingManager } from '../channels/index.js'; ``` Add registration block after the WhatsApp block (around line 71): ```typescript // Register Matrix adapter (if configured) if (config.matrix) { const matrixAdapter = new MatrixAdapter({ homeserverUrl: config.matrix.homeserver_url, accessToken: config.matrix.access_token, allowedRoomIds: config.matrix.allowed_room_ids.length > 0 ? config.matrix.allowed_room_ids : undefined, requireMention: config.matrix.require_mention, syncTimeoutMs: config.matrix.sync_timeout_ms, displayName: config.matrix.display_name, pairingManager, }); channelRegistry.register(matrixAdapter); } ``` **Step 3: Run typecheck** Run: `pnpm typecheck` Expected: PASS **Step 4: Run all channel tests to ensure no regressions** Run: `pnpm test:run src/channels/` Expected: All existing tests PASS **Step 5: Commit** ```bash git add src/channels/index.ts src/daemon/channels.ts git commit -m "feat(matrix): wire Matrix adapter into channel registry and daemon" ``` --- ### Task 6: Add Config Schema Test **Files:** - Modify: `src/config/schema.test.ts` (if exists, otherwise verify with existing tests) **Step 1: Check for existing config schema tests** Read `src/config/schema.test.ts` and add a Matrix config validation test. ```typescript it('validates matrix config', () => { const config = configSchema.parse({ models: { default: { provider: 'anthropic', model: 'claude-sonnet-4-20250514' } }, matrix: { homeserver_url: 'https://matrix.example.org', access_token: 'syt_test_token', allowed_room_ids: ['!room1:example.org'], require_mention: true, }, }); expect(config.matrix).toBeDefined(); expect(config.matrix!.homeserver_url).toBe('https://matrix.example.org'); expect(config.matrix!.require_mention).toBe(true); expect(config.matrix!.sync_timeout_ms).toBe(30000); // default }); it('matrix config is optional', () => { const config = configSchema.parse({ models: { default: { provider: 'anthropic', model: 'claude-sonnet-4-20250514' } }, }); expect(config.matrix).toBeUndefined(); }); it('matrix config rejects invalid homeserver URL', () => { expect(() => configSchema.parse({ models: { default: { provider: 'anthropic', model: 'claude-sonnet-4-20250514' } }, matrix: { homeserver_url: 'not-a-url', access_token: 'token', }, })).toThrow(); }); ``` **Step 2: Run config schema tests** Run: `pnpm test:run src/config/schema.test.ts` Expected: PASS **Step 3: Commit** ```bash git add src/config/schema.test.ts git commit -m "test(matrix): add config schema validation tests" ``` --- ### Task 7: Add Credential Redaction for Matrix Token **Files:** - Modify: `src/gateway/handlers/config.ts` **Step 1: Add matrix access_token to redaction** In the `redactConfig()` function, add redaction for the Matrix access token alongside the existing channel token redactions: ```typescript // After the existing channel token redactions (e.g., discord.bot_token) if (redacted.matrix) { redacted.matrix = { ...redacted.matrix, access_token: REDACTED }; } ``` **Step 2: Run tests** Run: `pnpm test:run src/gateway/handlers/handlers.test.ts` Expected: PASS **Step 3: Commit** ```bash git add src/gateway/handlers/config.ts git commit -m "feat(matrix): add access_token to config redaction" ``` --- ### Task 8: Final Verification **Step 1: Run full typecheck** Run: `pnpm typecheck` Expected: PASS **Step 2: Run all tests** Run: `pnpm test:run` Expected: All tests PASS **Step 3: Commit any remaining changes** ```bash git add -A git commit -m "feat(matrix): complete Matrix channel adapter implementation" ``` --- ## Testing Summary | Test Category | Count | Description | |---------------|-------|-------------| | Basic properties | 2 | name = "matrix", starts disconnected | | connect/disconnect | 4 | whoami resolution, error handling, clean disconnect, safe disconnect when not connected | | send | 3 | throws when disconnected, PUT with txnId, skips empty text | | Inbound messages | 12 | mention gating, DM bypass, own message filtering, room allowlist, reset command, mention stripping, MXID mention, non-message events, open rooms, no handler | | Config schema | 3 | valid config, optional, invalid URL rejection | | **Total** | **~24** | | ## Error Handling Strategy | Error | Recovery | |-------|----------| | `/account/whoami` 401 | Throw on connect — adapter stays in error state | | `/sync` HTTP error | Log, back off 5s, retry | | `/sync` network error | Log, back off 5s, retry | | `/send` failure | Throw — caller (registry) logs the error | | `m.direct` 404 | Silently ignore — no DMs known yet | | AbortError during sync | Clean exit from loop | ## Sync Loop Lifecycle ``` connect() ├── GET /account/whoami → resolve userId ├── GET /user/{userId}/account_data/m.direct → populate dmRoomIds └── runSyncLoop() (fire-and-forget async) ├── GET /sync?timeout=30000 (first call, no since token) │ └── next_batch → save as syncToken │ └── rooms.join → process timeline events │ └── account_data → update dmRoomIds ├── GET /sync?timeout=30000&since=... (subsequent calls) │ └── (repeat) └── (on error) → sleep 5s → retry disconnect() └── AbortController.abort() → sync loop exits ``` ## Risks and Mitigations | Risk | Mitigation | |------|------------| | Initial sync floods events from history | First sync may replay old events. The adapter could skip the first sync's events (only use it to get `next_batch`). However, this is how other adapters work (Telegram's grammy does NOT replay), so we process all events from the first sync. If this is a problem, a `skip_initial_sync` config flag can be added later. | | Sync loop leak on unhandled error | `try/catch` wraps the entire loop body with exponential backoff. AbortController ensures clean shutdown. | | Bot invited to many rooms, only some allowed | Room allowlist (`allowed_room_ids`) prevents processing. If empty, all joined rooms are processed. | | Matrix homeserver rate limiting | The sync loop naturally rate-limits (one request at a time with 30s timeout). Send calls are user-triggered and unlikely to hit limits. | | Access token expiry | Matrix access tokens are typically long-lived. If expired, whoami will fail on connect. Short-lived tokens (refresh tokens) are a future enhancement. | ## Open Questions 1. **Skip initial sync?** — Should the adapter skip processing events from the very first `/sync` response (which may contain old history)? Current design processes them all. Can add a `skip_initial_sync: true` config flag if needed. 2. **Typing indicators?** — Matrix supports `m.typing` events. Should the adapter send a typing indicator while processing? Deferred for initial implementation (other adapters do this inconsistently — Telegram yes, Discord yes, Slack no). 3. **Image/media attachments?** — Matrix supports `m.image`, `m.file`, `m.audio` msgtypes. The initial implementation only handles `m.text`. Media support can be added in a follow-up (download via `/_matrix/media/v3/download/{serverName}/{mediaId}`). 4. **Formatted messages?** — Matrix supports `format: "org.matrix.custom.html"` with `formatted_body`. Should outbound messages include HTML? Deferred — plain text is sufficient for initial implementation. --- ## Files Summary | Action | File | |--------|------| | Modify | `src/config/schema.ts` — add `matrixSchema` + type export | | Create | `src/channels/matrix/adapter.ts` — full adapter implementation | | Create | `src/channels/matrix/adapter.test.ts` — 24 tests | | Create | `src/channels/matrix/index.ts` — barrel export | | Modify | `src/channels/index.ts` — re-export Matrix adapter | | Modify | `src/daemon/channels.ts` — register Matrix adapter | | Modify | `src/gateway/handlers/config.ts` — redact access_token | | Modify | `src/config/schema.test.ts` — config validation tests |