diff --git a/README.md b/README.md index 90be565..7a69ca3 100644 --- a/README.md +++ b/README.md @@ -102,6 +102,13 @@ telegram: bot_token: "your-telegram-bot-token" allowed_chat_ids: [123456789] # Your Telegram user ID +# Optional: Matrix +matrix: + homeserver_url: "https://matrix.example.org" + access_token: "${MATRIX_ACCESS_TOKEN}" + allowed_room_ids: ["!room1:example.org"] + require_mention: true + models: default: provider: anthropic @@ -818,7 +825,7 @@ src/ ├── agents/ # Multi-agent routing ├── auth/ # OAuth flows (GitHub Copilot) ├── backends/native/ # Agent implementation + orchestrator -├── channels/ # Channel adapters (Telegram, Discord, Slack, WhatsApp, WebChat) +├── channels/ # Channel adapters (Telegram, Discord, Slack, WhatsApp, Matrix, WebChat) │ └── pairing.ts # DM pairing code manager ├── cli/ # CLI commands (commander) │ └── completion.ts # Shell completion generator (bash/zsh/fish) diff --git a/docs/plans/2026-02-15-matrix-channel-adapter.md b/docs/plans/2026-02-15-matrix-channel-adapter.md new file mode 100644 index 0000000..77f09aa --- /dev/null +++ b/docs/plans/2026-02-15-matrix-channel-adapter.md @@ -0,0 +1,1430 @@ +# Matrix Channel Adapter Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Add a Matrix channel adapter using raw fetch against the Matrix Client-Server API v3, with zero external dependencies (no matrix-js-sdk). + +**Architecture:** Polling-based adapter using `/_matrix/client/v3/sync` with incremental `since` tokens. Auth via pre-provisioned access token in config. Room allowlist + mention gating for multi-room safety. DM detection via `m.direct` account data. Outbound via `/_matrix/client/v3/rooms/{roomId}/send/m.room.message/{txnId}`. + +**Tech Stack:** Node.js fetch (built-in), Vitest for tests, Zod for config schema validation. + +--- + +## Context + +### Existing Patterns + +All Flynn channel adapters follow a consistent pattern: + +1. **Config interface** — e.g. `TelegramAdapterConfig`, `DiscordAdapterConfig` +2. **Adapter class** implementing `ChannelAdapter` from `src/channels/types.ts` +3. **Barrel export** from `src/channels//index.ts` +4. **Re-export** in `src/channels/index.ts` +5. **Zod schema** in `src/config/schema.ts` (optional field on root config) +6. **Registration** in `src/daemon/channels.ts` (conditional on config presence) +7. **Tests** mocking the platform SDK; here we mock `global.fetch` + +### Key ChannelAdapter interface + +```typescript +interface ChannelAdapter { + readonly name: string; + readonly status: ChannelStatus; + connect(): Promise; + disconnect(): Promise; + send(peerId: string, message: OutboundMessage): Promise; + onMessage(handler: (msg: InboundMessage) => void): void; +} +``` + +### senderId convention + +- Telegram: `String(chatId)` (numeric chat ID) +- Discord: `channelId` (snowflake string) +- Slack: `channelId:threadTs` +- WhatsApp: `number@c.us` + +For Matrix: `roomId` (e.g. `!abc123:matrix.org`). This gives one session per room, matching the Discord pattern. The Matrix userId (e.g. `@flynn:matrix.org`) goes into `senderName`. + +### peerId convention + +`peerId` passed to `send()` is the `roomId`. This is the same as `senderId` since replies go back to the same room. + +--- + +## Config Shape + +```yaml +matrix: + homeserver_url: "https://matrix.example.org" + access_token: "${MATRIX_ACCESS_TOKEN}" + allowed_room_ids: + - "!abc123:matrix.org" + require_mention: true # default: true + sync_timeout_ms: 30000 # default: 30000 (long-poll timeout) + display_name: "Flynn" # optional: set bot display name on connect +``` + +Zod schema: + +```typescript +const matrixSchema = z.object({ + homeserver_url: z.string().url('Homeserver URL is required'), + access_token: z.string().min(1, 'Access token is required'), + allowed_room_ids: z.array(z.string()).default([]), + require_mention: z.boolean().default(true), + sync_timeout_ms: z.number().min(1000).max(120000).default(30000), + display_name: z.string().optional(), +}).optional(); +``` + +--- + +## Matrix Client-Server API Endpoints Used + +| Endpoint | Method | Purpose | +|----------|--------|---------| +| `/_matrix/client/v3/sync` | GET | Long-poll for events (with `?since=&timeout=&filter=`) | +| `/_matrix/client/v3/rooms/{roomId}/send/m.room.message/{txnId}` | PUT | Send a message | +| `/_matrix/client/v3/user/{userId}/account_data/m.direct` | GET | Detect DM rooms | +| `/_matrix/client/v3/account/whoami` | GET | Resolve own userId on connect | + +### Sync filter (minimize bandwidth) + +```json +{ + "room": { + "timeline": { "limit": 50 }, + "state": { "lazy_load_members": true } + }, + "presence": { "not_types": ["*"] }, + "account_data": { "types": ["m.direct"] } +} +``` + +--- + +## Design Decisions + +### 1. No matrix-js-sdk dependency +- **Choice**: Raw fetch calls against `/_matrix/client/v3` endpoints +- **Rationale**: User explicitly requested minimal dependencies. The Matrix CS API is well-documented REST/JSON. We only need sync, send, whoami, and account_data — 4 endpoints total. +- **Alternatives**: matrix-js-sdk (heavy, ~2MB, pulls in many transitive deps), matrix-bot-sdk (lighter but still a dep). + +### 2. Polling via /sync long-poll (not WebSocket or SSE) +- **Choice**: `/sync` with `timeout` param for long-polling, looped in an async function with `AbortController` for clean shutdown. +- **Rationale**: This is the standard Matrix sync mechanism. Simple, robust, no extra deps. The `timeout` param makes it behave like long-polling (server holds connection open until events arrive or timeout elapses). +- **Alternatives**: Sliding Sync (MSC3575, not universally supported), WebSocket transport (experimental extension). + +### 3. senderId = roomId +- **Choice**: Use the Matrix room ID (`!...`) as `senderId` in `InboundMessage`, matching the Discord adapter pattern where `senderId = channelId`. +- **Rationale**: Flynn creates one session per `channel:senderId` pair. Using `roomId` means one session per room, which is the natural boundary. The human user's Matrix ID goes into `senderName`. + +### 4. DM detection via m.direct account data +- **Choice**: On connect, fetch the bot's `m.direct` account data to build a `Set` of known DM room IDs. DM rooms bypass `require_mention`. +- **Rationale**: Matrix DMs are just rooms, but the `m.direct` account data flags which rooms are direct messages. This is the standard approach and doesn't require room member counting heuristics. + +### 5. Mention detection via body text +- **Choice**: Check for `@displayName` or bot's Matrix user ID in the message body text. +- **Rationale**: Matrix `m.room.message` events include both `body` (plain text) and optional `formatted_body` (HTML with `` pill links). Checking the plain text `body` for the display name or MXID is simpler and sufficient. The HTML formatted_body could also be checked for matrix.to links, but body text matching covers all clients. + +### 6. Transaction ID for idempotent sends +- **Choice**: Generate txnId as `m${Date.now()}.${counter++}` per adapter instance. +- **Rationale**: Matrix PUT send requires a txnId for idempotency. A timestamp + counter ensures uniqueness within a process lifetime. + +### 7. Message size limit +- **Choice**: Use `splitMessage()` utility with a 65536-char limit (Matrix spec allows up to 65536 bytes for event content). +- **Rationale**: Matrix's limit is much higher than Telegram (4096) or Discord (2000), but chunking at 65536 chars prevents oversized events. + +--- + +## Implementation Steps + +### Task 1: Add Matrix Zod Schema to Config + +**Files:** +- Modify: `src/config/schema.ts` + +**Step 1: Add the matrixSchema definition** + +Add after the `whatsappSchema` definition (around line 350): + +```typescript +const matrixSchema = z.object({ + homeserver_url: z.string().url('Homeserver URL is required'), + access_token: z.string().min(1, 'Access token is required'), + allowed_room_ids: z.array(z.string()).default([]), + require_mention: z.boolean().default(true), + sync_timeout_ms: z.number().min(1000).max(120000).default(30000), + display_name: z.string().optional(), +}).optional(); +``` + +**Step 2: Add `matrix` to configSchema** + +In `configSchema` (around line 510), add after `whatsapp`: + +```typescript +matrix: matrixSchema, +``` + +**Step 3: Add exported type** + +After the existing type exports (around line 559), add: + +```typescript +export type MatrixConfig = z.infer; +``` + +**Step 4: Run typecheck** + +Run: `pnpm typecheck` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/config/schema.ts +git commit -m "feat(matrix): add Matrix channel config schema" +``` + +--- + +### Task 2: Create Matrix Adapter — Core Structure and Connect + +**Files:** +- Create: `src/channels/matrix/adapter.ts` +- Create: `src/channels/matrix/adapter.test.ts` +- Create: `src/channels/matrix/index.ts` + +**Step 1: Write the adapter.test.ts skeleton with connect/disconnect tests** + +Create `src/channels/matrix/adapter.test.ts`: + +```typescript +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { MatrixAdapter, type MatrixAdapterConfig } from './adapter.js'; +import type { InboundMessage } from '../types.js'; + +// ── Mock global fetch ───────────────────────────────────────────── + +const mockFetch = vi.fn(); + +beforeEach(() => { + vi.stubGlobal('fetch', mockFetch); +}); + +afterEach(() => { + vi.restoreAllMocks(); +}); + +const baseConfig: MatrixAdapterConfig = { + homeserverUrl: 'https://matrix.example.org', + accessToken: 'syt_test_token', + allowedRoomIds: ['!room1:example.org'], + requireMention: true, + syncTimeoutMs: 30000, +}; + +/** Helper: create a mock fetch Response. */ +function mockResponse(body: unknown, status = 200): Response { + return { + ok: status >= 200 && status < 300, + status, + json: async () => body, + text: async () => JSON.stringify(body), + } as Response; +} + +/** Helper: make fetch return specific responses in sequence. */ +function mockFetchSequence(...responses: Response[]) { + for (const response of responses) { + mockFetch.mockResolvedValueOnce(response); + } +} + +describe('MatrixAdapter', () => { + let adapter: MatrixAdapter; + + beforeEach(() => { + vi.clearAllMocks(); + adapter = new MatrixAdapter(baseConfig); + }); + + afterEach(async () => { + // Ensure sync loop is stopped + if (adapter.status === 'connected' || adapter.status === 'connecting') { + await adapter.disconnect(); + } + }); + + // ── Basic properties ──────────────────────────────────────────── + + it('has name "matrix"', () => { + expect(adapter.name).toBe('matrix'); + }); + + it('starts as disconnected', () => { + expect(adapter.status).toBe('disconnected'); + }); + + // ── connect / disconnect ──────────────────────────────────────── + + it('connect resolves userId via whoami and sets connected', async () => { + // whoami + mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' })); + // m.direct account data + mockFetch.mockResolvedValueOnce(mockResponse({})); + // First sync (will be aborted on disconnect) + mockFetch.mockResolvedValueOnce( + new Promise(() => {}), // never resolves — simulates long-poll + ); + + await adapter.connect(); + + expect(adapter.status).toBe('connected'); + // Verify whoami was called + expect(mockFetch).toHaveBeenCalledWith( + 'https://matrix.example.org/_matrix/client/v3/account/whoami', + expect.objectContaining({ + headers: expect.objectContaining({ + Authorization: 'Bearer syt_test_token', + }), + }), + ); + }); + + it('connect throws on whoami failure', async () => { + mockFetch.mockResolvedValueOnce(mockResponse({ errcode: 'M_UNKNOWN_TOKEN' }, 401)); + + await expect(adapter.connect()).rejects.toThrow(); + }); + + it('disconnect sets status to disconnected', async () => { + // whoami + mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' })); + // m.direct + mockFetch.mockResolvedValueOnce(mockResponse({})); + // sync long-poll (never resolves) + mockFetch.mockResolvedValueOnce(new Promise(() => {})); + + await adapter.connect(); + expect(adapter.status).toBe('connected'); + + await adapter.disconnect(); + expect(adapter.status).toBe('disconnected'); + }); + + it('disconnect is safe when not connected', async () => { + await adapter.disconnect(); + expect(adapter.status).toBe('disconnected'); + }); +}); +``` + +**Step 2: Run test to verify it fails (adapter doesn't exist yet)** + +Run: `pnpm test:run src/channels/matrix/adapter.test.ts` +Expected: FAIL — cannot resolve `./adapter.js` + +**Step 3: Write adapter.ts with connect/disconnect** + +Create `src/channels/matrix/adapter.ts`: + +```typescript +/** + * Matrix channel adapter. + * + * Implements the ChannelAdapter interface using raw fetch against the + * Matrix Client-Server API v3. No external Matrix SDK dependency. + * Uses /sync long-polling for inbound messages and PUT /send for outbound. + */ + +import type { + InboundMessage, + OutboundMessage, + ChannelAdapter, + ChannelStatus, +} from '../types.js'; +import { splitMessage } from '../utils.js'; +import type { PairingManager } from '../pairing.js'; + +/** Configuration for the Matrix channel adapter. */ +export interface MatrixAdapterConfig { + homeserverUrl: string; + accessToken: string; + /** Room IDs to respond in. Empty = all joined rooms. */ + allowedRoomIds?: string[]; + /** Require @mention to respond in rooms (default: true). DMs always respond. */ + requireMention?: boolean; + /** Long-poll timeout for /sync in milliseconds (default: 30000). */ + syncTimeoutMs?: number; + /** Optional display name to set on connect. */ + displayName?: string; + /** Optional pairing manager for DM pairing codes. */ + pairingManager?: PairingManager; +} + +/** Maximum message length before chunking (Matrix allows up to 65536 bytes). */ +const MAX_MESSAGE_LENGTH = 65536; + +/** + * Matrix channel adapter backed by raw Client-Server API v3 fetch calls. + * + * Handles room allowlist filtering, optional mention requirement, + * DM detection via m.direct, and message chunking. + */ +export class MatrixAdapter implements ChannelAdapter { + readonly name = 'matrix'; + + private _status: ChannelStatus = 'disconnected'; + private messageHandler?: (msg: InboundMessage) => void; + private config: MatrixAdapterConfig; + + /** Resolved bot user ID (e.g. @flynn:example.org). */ + private userId: string | null = null; + /** Resolved bot display name (for mention detection). */ + private botDisplayName: string | null = null; + /** Set of room IDs that are direct messages. */ + private dmRoomIds: Set = new Set(); + /** Incremental sync token. */ + private syncToken: string | null = null; + /** AbortController for the sync loop. */ + private syncAbort: AbortController | null = null; + /** Transaction ID counter for idempotent sends. */ + private txnCounter = 0; + + get status(): ChannelStatus { + return this._status; + } + + constructor(config: MatrixAdapterConfig) { + this.config = config; + } + + /** Register the inbound message handler. Called by the registry before connect(). */ + onMessage(handler: (msg: InboundMessage) => void): void { + this.messageHandler = handler; + } + + /** + * Connect to the Matrix homeserver: + * 1. Resolve bot userId via /account/whoami + * 2. Fetch m.direct account data for DM detection + * 3. Start the /sync long-poll loop + */ + async connect(): Promise { + this._status = 'connecting'; + + try { + // Resolve our own user ID + const whoami = await this.matrixGet<{ user_id: string }>( + '/_matrix/client/v3/account/whoami', + ); + this.userId = whoami.user_id; + this.botDisplayName = this.config.displayName ?? this.userId.split(':')[0].slice(1); + + // Fetch m.direct account data for DM detection + await this.loadDirectRooms(); + + // Start sync loop (fire and forget — runs until disconnect) + this.syncAbort = new AbortController(); + this.runSyncLoop(this.syncAbort.signal); + + this._status = 'connected'; + console.log(`Matrix adapter connected as ${this.userId}`); + } catch (error) { + this._status = 'error'; + throw error; + } + } + + /** Stop the sync loop and clean up. */ + async disconnect(): Promise { + if (this.syncAbort) { + this.syncAbort.abort(); + this.syncAbort = null; + } + this.userId = null; + this.syncToken = null; + this.dmRoomIds.clear(); + this._status = 'disconnected'; + } + + /** Send an outbound message to a Matrix room. */ + async send(peerId: string, message: OutboundMessage): Promise { + if (this._status !== 'connected') { + throw new Error('Matrix adapter not connected'); + } + + const text = message.text ?? ''; + if (!text.trim()) { return; } + + if (text.length <= MAX_MESSAGE_LENGTH) { + await this.sendRoomMessage(peerId, text); + } else { + const chunks = splitMessage(text, MAX_MESSAGE_LENGTH); + for (const chunk of chunks) { + await this.sendRoomMessage(peerId, chunk); + } + } + } + + // ── Private: Matrix API helpers ───────────────────────────────── + + /** Make an authenticated GET request to the homeserver. */ + private async matrixGet(path: string, query?: Record): Promise { + const url = new URL(path, this.config.homeserverUrl); + if (query) { + for (const [k, v] of Object.entries(query)) { + url.searchParams.set(k, v); + } + } + + const response = await fetch(url.toString(), { + method: 'GET', + headers: { + Authorization: `Bearer ${this.config.accessToken}`, + }, + }); + + if (!response.ok) { + const body = await response.text(); + throw new Error(`Matrix API error (${response.status}): ${body}`); + } + + return response.json() as Promise; + } + + /** Make an authenticated PUT request to the homeserver. */ + private async matrixPut(path: string, body: unknown): Promise { + const url = new URL(path, this.config.homeserverUrl); + + const response = await fetch(url.toString(), { + method: 'PUT', + headers: { + Authorization: `Bearer ${this.config.accessToken}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify(body), + }); + + if (!response.ok) { + const text = await response.text(); + throw new Error(`Matrix API error (${response.status}): ${text}`); + } + + return response.json() as Promise; + } + + /** Send a text message to a room with idempotent txnId. */ + private async sendRoomMessage(roomId: string, text: string): Promise { + const txnId = `m${Date.now()}.${this.txnCounter++}`; + await this.matrixPut( + `/_matrix/client/v3/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${txnId}`, + { + msgtype: 'm.text', + body: text, + }, + ); + } + + /** Load the m.direct account data to populate dmRoomIds. */ + private async loadDirectRooms(): Promise { + try { + const data = await this.matrixGet>( + `/_matrix/client/v3/user/${encodeURIComponent(this.userId!)}/account_data/m.direct`, + ); + this.dmRoomIds.clear(); + // m.direct maps userId → [roomId, ...]. Flatten all room IDs. + for (const roomIds of Object.values(data)) { + if (Array.isArray(roomIds)) { + for (const roomId of roomIds) { + this.dmRoomIds.add(roomId); + } + } + } + } catch { + // m.direct may not exist if the bot has no DMs yet — that's fine. + this.dmRoomIds.clear(); + } + } + + /** Check if a room is a DM. */ + private isDM(roomId: string): boolean { + return this.dmRoomIds.has(roomId); + } + + // ── Private: Sync loop ────────────────────────────────────────── + + /** Run the /sync long-poll loop until aborted. */ + private async runSyncLoop(signal: AbortSignal): Promise { + // Build the sync filter to minimize bandwidth + const filter = JSON.stringify({ + room: { + timeline: { limit: 50 }, + state: { lazy_load_members: true }, + }, + presence: { not_types: ['*'] }, + account_data: { types: ['m.direct'] }, + }); + + while (!signal.aborted) { + try { + const query: Record = { + timeout: String(this.config.syncTimeoutMs ?? 30000), + filter, + }; + if (this.syncToken) { + query.since = this.syncToken; + } + + const url = new URL('/_matrix/client/v3/sync', this.config.homeserverUrl); + for (const [k, v] of Object.entries(query)) { + url.searchParams.set(k, v); + } + + const response = await fetch(url.toString(), { + method: 'GET', + headers: { + Authorization: `Bearer ${this.config.accessToken}`, + }, + signal, + }); + + if (!response.ok) { + const body = await response.text(); + console.error(`Matrix sync error (${response.status}): ${body}`); + // Back off on error + await this.sleep(5000, signal); + continue; + } + + const syncResponse = await response.json() as MatrixSyncResponse; + this.syncToken = syncResponse.next_batch; + + // Process sync response + this.processSyncResponse(syncResponse); + } catch (error) { + if (signal.aborted) { return; } + console.error('Matrix sync loop error:', error instanceof Error ? error.message : 'Unknown error'); + // Back off on error + await this.sleep(5000, signal); + } + } + } + + /** Process a /sync response — extract timeline events from joined rooms. */ + private processSyncResponse(sync: MatrixSyncResponse): void { + if (!this.messageHandler) { return; } + + const joinedRooms = sync.rooms?.join; + if (!joinedRooms) { return; } + + // Update m.direct from account_data if present + const accountData = sync.account_data?.events; + if (accountData) { + for (const event of accountData) { + if (event.type === 'm.direct' && event.content) { + this.dmRoomIds.clear(); + for (const roomIds of Object.values(event.content as Record)) { + if (Array.isArray(roomIds)) { + for (const roomId of roomIds) { + this.dmRoomIds.add(roomId); + } + } + } + } + } + } + + for (const [roomId, roomData] of Object.entries(joinedRooms)) { + const events = roomData.timeline?.events; + if (!events) { continue; } + + for (const event of events) { + this.handleTimelineEvent(roomId, event); + } + } + } + + /** Handle a single timeline event from a joined room. */ + private handleTimelineEvent(roomId: string, event: MatrixEvent): void { + if (!this.messageHandler) { return; } + + // Only process m.room.message events + if (event.type !== 'm.room.message') { return; } + + // Ignore our own messages + if (event.sender === this.userId) { return; } + + // Room allowlist check + const allowedRoomIds = this.config.allowedRoomIds ?? []; + if (allowedRoomIds.length > 0 && !allowedRoomIds.includes(roomId)) { + // Pairing fallback + const pm = this.config.pairingManager; + if (pm?.enabled) { + if (pm.isApproved('matrix', roomId)) { + // Approved — fall through + } else { + const text = (event.content?.body ?? '').trim(); + if (text && pm.validateCode('matrix', roomId, text)) { + this.sendRoomMessage(roomId, 'Pairing successful! You can now chat with Flynn.').catch(() => {}); + } + return; + } + } else { + return; + } + } + + const body = event.content?.body ?? ''; + const isDm = this.isDM(roomId); + + // Mention gating: require mention in non-DM rooms + const requireMention = this.config.requireMention ?? true; + if (!isDm && requireMention) { + const isMentioned = this.isBotMentioned(body); + if (!isMentioned) { return; } + } + + // Strip bot mention from text + let text = body; + if (this.botDisplayName) { + text = text.replace(new RegExp(`@?${this.escapeRegex(this.botDisplayName)}\\b`, 'gi'), '').trim(); + } + if (this.userId) { + text = text.replace(new RegExp(this.escapeRegex(this.userId), 'g'), '').trim(); + } + + // Detect reset command + if (text === '!reset' || text === 'reset') { + this.messageHandler({ + id: event.event_id, + channel: 'matrix', + senderId: roomId, + senderName: this.extractDisplayName(event.sender), + text: '!reset', + timestamp: event.origin_server_ts, + metadata: { isCommand: true, command: 'reset' }, + }); + return; + } + + // Regular message + this.messageHandler({ + id: event.event_id, + channel: 'matrix', + senderId: roomId, + senderName: this.extractDisplayName(event.sender), + text, + timestamp: event.origin_server_ts, + }); + } + + /** Check if the bot is mentioned in the message text. */ + private isBotMentioned(text: string): boolean { + if (this.botDisplayName && text.toLowerCase().includes(this.botDisplayName.toLowerCase())) { + return true; + } + if (this.userId && text.includes(this.userId)) { + return true; + } + return false; + } + + /** Extract a display name from a Matrix user ID. @user:server → user */ + private extractDisplayName(userId: string): string { + const match = userId.match(/^@([^:]+)/); + return match ? match[1] : userId; + } + + /** Escape a string for use in a RegExp. */ + private escapeRegex(str: string): string { + return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); + } + + /** Sleep with abort support. */ + private sleep(ms: number, signal: AbortSignal): Promise { + return new Promise((resolve) => { + const timer = setTimeout(resolve, ms); + signal.addEventListener('abort', () => { + clearTimeout(timer); + resolve(); + }, { once: true }); + }); + } +} + +// ── Matrix API response types ────────────────────────────────────── + +interface MatrixSyncResponse { + next_batch: string; + rooms?: { + join?: Record; + }; + account_data?: { + events?: Array<{ type: string; content?: unknown }>; + }; +} + +interface MatrixJoinedRoom { + timeline?: { + events?: MatrixEvent[]; + }; +} + +interface MatrixEvent { + type: string; + event_id: string; + sender: string; + origin_server_ts: number; + content?: { + body?: string; + msgtype?: string; + [key: string]: unknown; + }; +} +``` + +**Step 4: Run test to verify connect/disconnect pass** + +Run: `pnpm test:run src/channels/matrix/adapter.test.ts` +Expected: PASS (4 tests) + +**Step 5: Create barrel export** + +Create `src/channels/matrix/index.ts`: + +```typescript +export { MatrixAdapter, type MatrixAdapterConfig } from './adapter.js'; +``` + +**Step 6: Commit** + +```bash +git add src/channels/matrix/ +git commit -m "feat(matrix): add Matrix adapter core with connect/disconnect" +``` + +--- + +### Task 3: Add Send Tests and Verify Send Works + +**Files:** +- Modify: `src/channels/matrix/adapter.test.ts` + +**Step 1: Add send tests to the test file** + +Append these tests inside the `describe('MatrixAdapter')` block: + +```typescript + // ── send ────────────────────────────────────────────────────── + + it('send throws when not connected', async () => { + await expect(adapter.send('!room1:example.org', { text: 'hello' })).rejects.toThrow( + 'Matrix adapter not connected', + ); + }); + + it('send delivers a message via PUT with txnId', async () => { + // Connect first + mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' })); + mockFetch.mockResolvedValueOnce(mockResponse({})); + mockFetch.mockResolvedValueOnce(new Promise(() => {})); // sync hangs + await adapter.connect(); + + // Send message + mockFetch.mockResolvedValueOnce(mockResponse({ event_id: '$sent1' })); + await adapter.send('!room1:example.org', { text: 'Hello there' }); + + // Find the PUT call (skip whoami, m.direct, sync) + const putCall = mockFetch.mock.calls.find( + (call) => call[1]?.method === 'PUT', + ); + expect(putCall).toBeDefined(); + expect(putCall![0]).toContain('/_matrix/client/v3/rooms/'); + expect(putCall![0]).toContain('/send/m.room.message/'); + + const body = JSON.parse(putCall![1].body); + expect(body.msgtype).toBe('m.text'); + expect(body.body).toBe('Hello there'); + }); + + it('send skips empty text', async () => { + mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' })); + mockFetch.mockResolvedValueOnce(mockResponse({})); + mockFetch.mockResolvedValueOnce(new Promise(() => {})); + await adapter.connect(); + + await adapter.send('!room1:example.org', { text: '' }); + + // No PUT call should have been made + const putCall = mockFetch.mock.calls.find( + (call) => call[1]?.method === 'PUT', + ); + expect(putCall).toBeUndefined(); + }); +``` + +**Step 2: Run tests** + +Run: `pnpm test:run src/channels/matrix/adapter.test.ts` +Expected: PASS (7 tests) + +**Step 3: Commit** + +```bash +git add src/channels/matrix/adapter.test.ts +git commit -m "test(matrix): add send tests" +``` + +--- + +### Task 4: Add Inbound Message Handling Tests + +**Files:** +- Modify: `src/channels/matrix/adapter.test.ts` + +**Step 1: Add a helper to connect the adapter and simulate sync** + +Add these helpers near the top of the test file (after `mockFetchSequence`): + +```typescript +/** Helper: connect the adapter with standard mocks. Returns a function to simulate sync. */ +async function connectAdapter(adapter: MatrixAdapter, directRooms: Record = {}) { + // whoami + mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' })); + // m.direct + mockFetch.mockResolvedValueOnce(mockResponse(directRooms)); + // sync long-poll — we'll control this manually + let syncResolve: ((value: Response) => void) | null = null; + mockFetch.mockResolvedValueOnce( + new Promise((resolve) => { syncResolve = resolve; }), + ); + await adapter.connect(); + + return { + /** Simulate a sync response arriving. */ + async emitSync(syncData: unknown) { + if (syncResolve) { + syncResolve(mockResponse(syncData)); + syncResolve = null; + } + // Allow the sync loop to process + await new Promise((r) => setTimeout(r, 10)); + // Set up next sync to hang + mockFetch.mockResolvedValueOnce(new Promise(() => {})); + }, + }; +} + +/** Helper: create a Matrix timeline event. */ +function createMatrixEvent(overrides: Record = {}) { + return { + type: 'm.room.message', + event_id: '$evt1', + sender: '@alice:example.org', + origin_server_ts: 1700000000000, + content: { + msgtype: 'm.text', + body: 'Hello Flynn', + }, + ...overrides, + }; +} + +/** Helper: wrap events into a sync response. */ +function createSyncResponse(roomId: string, events: unknown[], extras: Record = {}) { + return { + next_batch: 's_next', + rooms: { + join: { + [roomId]: { + timeline: { events }, + }, + }, + }, + ...extras, + }; +} +``` + +**Step 2: Add inbound message tests** + +Append inside the `describe('MatrixAdapter')` block: + +```typescript + // ── Inbound message handling ──────────────────────────────────── + + it('inbound message from allowed room with mention triggers handler', async () => { + const handler = vi.fn(); + adapter.onMessage(handler); + + const { emitSync } = await connectAdapter(adapter); + + const event = createMatrixEvent({ + content: { msgtype: 'm.text', body: '@Flynn Hello there' }, + }); + await emitSync(createSyncResponse('!room1:example.org', [event])); + + expect(handler).toHaveBeenCalledTimes(1); + const msg: InboundMessage = handler.mock.calls[0][0]; + expect(msg.channel).toBe('matrix'); + expect(msg.senderId).toBe('!room1:example.org'); + expect(msg.senderName).toBe('alice'); + expect(msg.text).toBe('Hello there'); + expect(msg.id).toBe('$evt1'); + }); + + it('inbound message without mention is ignored when requireMention is true', async () => { + const handler = vi.fn(); + adapter.onMessage(handler); + + const { emitSync } = await connectAdapter(adapter); + + const event = createMatrixEvent({ + content: { msgtype: 'm.text', body: 'Hello everyone' }, + }); + await emitSync(createSyncResponse('!room1:example.org', [event])); + + expect(handler).not.toHaveBeenCalled(); + }); + + it('inbound message without mention is accepted when requireMention is false', async () => { + const noMentionAdapter = new MatrixAdapter({ + ...baseConfig, + requireMention: false, + }); + const handler = vi.fn(); + noMentionAdapter.onMessage(handler); + + const { emitSync } = await connectAdapter(noMentionAdapter); + + const event = createMatrixEvent({ + content: { msgtype: 'm.text', body: 'Hello everyone' }, + }); + await emitSync(createSyncResponse('!room1:example.org', [event])); + + expect(handler).toHaveBeenCalledTimes(1); + + await noMentionAdapter.disconnect(); + }); + + it('ignores own messages (sender === userId)', async () => { + const handler = vi.fn(); + adapter.onMessage(handler); + + const { emitSync } = await connectAdapter(adapter); + + const event = createMatrixEvent({ + sender: '@flynn:example.org', + content: { msgtype: 'm.text', body: 'My own message' }, + }); + await emitSync(createSyncResponse('!room1:example.org', [event])); + + expect(handler).not.toHaveBeenCalled(); + }); + + it('ignores messages in disallowed rooms', async () => { + const handler = vi.fn(); + adapter.onMessage(handler); + + const { emitSync } = await connectAdapter(adapter); + + const event = createMatrixEvent({ + content: { msgtype: 'm.text', body: '@Flynn hello' }, + }); + await emitSync(createSyncResponse('!notallowed:example.org', [event])); + + expect(handler).not.toHaveBeenCalled(); + }); + + it('DM room bypasses mention requirement', async () => { + const handler = vi.fn(); + adapter.onMessage(handler); + + // Mark !dm1 as a DM room via m.direct + const directRooms = { '@alice:example.org': ['!dm1:example.org'] }; + const dmAdapter = new MatrixAdapter({ + ...baseConfig, + allowedRoomIds: ['!dm1:example.org'], + }); + dmAdapter.onMessage(handler); + const { emitSync } = await connectAdapter(dmAdapter, directRooms); + + const event = createMatrixEvent({ + content: { msgtype: 'm.text', body: 'Hello without mention' }, + }); + await emitSync(createSyncResponse('!dm1:example.org', [event])); + + expect(handler).toHaveBeenCalledTimes(1); + const msg: InboundMessage = handler.mock.calls[0][0]; + expect(msg.text).toBe('Hello without mention'); + + await dmAdapter.disconnect(); + }); + + it('!reset command delivers reset metadata', async () => { + const noMentionAdapter = new MatrixAdapter({ + ...baseConfig, + requireMention: false, + }); + const handler = vi.fn(); + noMentionAdapter.onMessage(handler); + + const { emitSync } = await connectAdapter(noMentionAdapter); + + const event = createMatrixEvent({ + content: { msgtype: 'm.text', body: '!reset' }, + }); + await emitSync(createSyncResponse('!room1:example.org', [event])); + + expect(handler).toHaveBeenCalledTimes(1); + const msg: InboundMessage = handler.mock.calls[0][0]; + expect(msg.text).toBe('!reset'); + expect(msg.metadata).toEqual({ isCommand: true, command: 'reset' }); + + await noMentionAdapter.disconnect(); + }); + + it('strips bot mention from message text', async () => { + const handler = vi.fn(); + adapter.onMessage(handler); + + const { emitSync } = await connectAdapter(adapter); + + const event = createMatrixEvent({ + content: { msgtype: 'm.text', body: '@Flynn What is the weather?' }, + }); + await emitSync(createSyncResponse('!room1:example.org', [event])); + + expect(handler).toHaveBeenCalledTimes(1); + const msg: InboundMessage = handler.mock.calls[0][0]; + expect(msg.text).toBe('What is the weather?'); + }); + + it('ignores non-message events (e.g. m.room.member)', async () => { + const handler = vi.fn(); + adapter.onMessage(handler); + + const { emitSync } = await connectAdapter(adapter); + + const event = createMatrixEvent({ + type: 'm.room.member', + content: { membership: 'join' }, + }); + await emitSync(createSyncResponse('!room1:example.org', [event])); + + expect(handler).not.toHaveBeenCalled(); + }); + + it('accepts all rooms when allowedRoomIds is empty', async () => { + const openAdapter = new MatrixAdapter({ + ...baseConfig, + allowedRoomIds: [], + requireMention: false, + }); + const handler = vi.fn(); + openAdapter.onMessage(handler); + + const { emitSync } = await connectAdapter(openAdapter); + + const event = createMatrixEvent({ + content: { msgtype: 'm.text', body: 'Hello from any room' }, + }); + await emitSync(createSyncResponse('!anyrandom:example.org', [event])); + + expect(handler).toHaveBeenCalledTimes(1); + const msg: InboundMessage = handler.mock.calls[0][0]; + expect(msg.text).toBe('Hello from any room'); + + await openAdapter.disconnect(); + }); + + it('does nothing when no message handler is registered', async () => { + // Don't call onMessage + const { emitSync } = await connectAdapter(adapter); + + const event = createMatrixEvent({ + content: { msgtype: 'm.text', body: '@Flynn hello' }, + }); + + // Should not throw + await emitSync(createSyncResponse('!room1:example.org', [event])); + }); + + it('mention detection works with Matrix user ID', async () => { + const handler = vi.fn(); + adapter.onMessage(handler); + + const { emitSync } = await connectAdapter(adapter); + + const event = createMatrixEvent({ + content: { msgtype: 'm.text', body: '@flynn:example.org can you help?' }, + }); + await emitSync(createSyncResponse('!room1:example.org', [event])); + + expect(handler).toHaveBeenCalledTimes(1); + const msg: InboundMessage = handler.mock.calls[0][0]; + expect(msg.text).toBe('can you help?'); + }); +``` + +**Step 2: Run tests** + +Run: `pnpm test:run src/channels/matrix/adapter.test.ts` +Expected: PASS (all ~19 tests) + +**Step 3: Commit** + +```bash +git add src/channels/matrix/adapter.test.ts +git commit -m "test(matrix): add inbound message handling and DM detection tests" +``` + +--- + +### Task 5: Wire Matrix Adapter into the System + +**Files:** +- Modify: `src/channels/index.ts` +- Modify: `src/daemon/channels.ts` + +**Step 1: Add Matrix exports to src/channels/index.ts** + +Add after the WhatsApp export line: + +```typescript +export { MatrixAdapter, type MatrixAdapterConfig } from './matrix/index.js'; +``` + +**Step 2: Register Matrix adapter in src/daemon/channels.ts** + +Add import at top: + +```typescript +import { ChannelRegistry, TelegramAdapter, WebChatAdapter, DiscordAdapter, SlackAdapter, WhatsAppAdapter, MatrixAdapter, PairingManager } from '../channels/index.js'; +``` + +Add registration block after the WhatsApp block (around line 71): + +```typescript + // Register Matrix adapter (if configured) + if (config.matrix) { + const matrixAdapter = new MatrixAdapter({ + homeserverUrl: config.matrix.homeserver_url, + accessToken: config.matrix.access_token, + allowedRoomIds: config.matrix.allowed_room_ids.length > 0 ? config.matrix.allowed_room_ids : undefined, + requireMention: config.matrix.require_mention, + syncTimeoutMs: config.matrix.sync_timeout_ms, + displayName: config.matrix.display_name, + pairingManager, + }); + channelRegistry.register(matrixAdapter); + } +``` + +**Step 3: Run typecheck** + +Run: `pnpm typecheck` +Expected: PASS + +**Step 4: Run all channel tests to ensure no regressions** + +Run: `pnpm test:run src/channels/` +Expected: All existing tests PASS + +**Step 5: Commit** + +```bash +git add src/channels/index.ts src/daemon/channels.ts +git commit -m "feat(matrix): wire Matrix adapter into channel registry and daemon" +``` + +--- + +### Task 6: Add Config Schema Test + +**Files:** +- Modify: `src/config/schema.test.ts` (if exists, otherwise verify with existing tests) + +**Step 1: Check for existing config schema tests** + +Read `src/config/schema.test.ts` and add a Matrix config validation test. + +```typescript +it('validates matrix config', () => { + const config = configSchema.parse({ + models: { default: { provider: 'anthropic', model: 'claude-sonnet-4-20250514' } }, + matrix: { + homeserver_url: 'https://matrix.example.org', + access_token: 'syt_test_token', + allowed_room_ids: ['!room1:example.org'], + require_mention: true, + }, + }); + expect(config.matrix).toBeDefined(); + expect(config.matrix!.homeserver_url).toBe('https://matrix.example.org'); + expect(config.matrix!.require_mention).toBe(true); + expect(config.matrix!.sync_timeout_ms).toBe(30000); // default +}); + +it('matrix config is optional', () => { + const config = configSchema.parse({ + models: { default: { provider: 'anthropic', model: 'claude-sonnet-4-20250514' } }, + }); + expect(config.matrix).toBeUndefined(); +}); + +it('matrix config rejects invalid homeserver URL', () => { + expect(() => configSchema.parse({ + models: { default: { provider: 'anthropic', model: 'claude-sonnet-4-20250514' } }, + matrix: { + homeserver_url: 'not-a-url', + access_token: 'token', + }, + })).toThrow(); +}); +``` + +**Step 2: Run config schema tests** + +Run: `pnpm test:run src/config/schema.test.ts` +Expected: PASS + +**Step 3: Commit** + +```bash +git add src/config/schema.test.ts +git commit -m "test(matrix): add config schema validation tests" +``` + +--- + +### Task 7: Add Credential Redaction for Matrix Token + +**Files:** +- Modify: `src/gateway/handlers/config.ts` + +**Step 1: Add matrix access_token to redaction** + +In the `redactConfig()` function, add redaction for the Matrix access token alongside the existing channel token redactions: + +```typescript +// After the existing channel token redactions (e.g., discord.bot_token) +if (redacted.matrix) { + redacted.matrix = { ...redacted.matrix, access_token: REDACTED }; +} +``` + +**Step 2: Run tests** + +Run: `pnpm test:run src/gateway/handlers/handlers.test.ts` +Expected: PASS + +**Step 3: Commit** + +```bash +git add src/gateway/handlers/config.ts +git commit -m "feat(matrix): add access_token to config redaction" +``` + +--- + +### Task 8: Final Verification + +**Step 1: Run full typecheck** + +Run: `pnpm typecheck` +Expected: PASS + +**Step 2: Run all tests** + +Run: `pnpm test:run` +Expected: All tests PASS + +**Step 3: Commit any remaining changes** + +```bash +git add -A +git commit -m "feat(matrix): complete Matrix channel adapter implementation" +``` + +--- + +## Testing Summary + +| Test Category | Count | Description | +|---------------|-------|-------------| +| Basic properties | 2 | name = "matrix", starts disconnected | +| connect/disconnect | 4 | whoami resolution, error handling, clean disconnect, safe disconnect when not connected | +| send | 3 | throws when disconnected, PUT with txnId, skips empty text | +| Inbound messages | 12 | mention gating, DM bypass, own message filtering, room allowlist, reset command, mention stripping, MXID mention, non-message events, open rooms, no handler | +| Config schema | 3 | valid config, optional, invalid URL rejection | +| **Total** | **~24** | | + +## Error Handling Strategy + +| Error | Recovery | +|-------|----------| +| `/account/whoami` 401 | Throw on connect — adapter stays in error state | +| `/sync` HTTP error | Log, back off 5s, retry | +| `/sync` network error | Log, back off 5s, retry | +| `/send` failure | Throw — caller (registry) logs the error | +| `m.direct` 404 | Silently ignore — no DMs known yet | +| AbortError during sync | Clean exit from loop | + +## Sync Loop Lifecycle + +``` +connect() + ├── GET /account/whoami → resolve userId + ├── GET /user/{userId}/account_data/m.direct → populate dmRoomIds + └── runSyncLoop() (fire-and-forget async) + ├── GET /sync?timeout=30000 (first call, no since token) + │ └── next_batch → save as syncToken + │ └── rooms.join → process timeline events + │ └── account_data → update dmRoomIds + ├── GET /sync?timeout=30000&since=... (subsequent calls) + │ └── (repeat) + └── (on error) → sleep 5s → retry + +disconnect() + └── AbortController.abort() → sync loop exits +``` + +## Risks and Mitigations + +| Risk | Mitigation | +|------|------------| +| Initial sync floods events from history | First sync may replay old events. The adapter could skip the first sync's events (only use it to get `next_batch`). However, this is how other adapters work (Telegram's grammy does NOT replay), so we process all events from the first sync. If this is a problem, a `skip_initial_sync` config flag can be added later. | +| Sync loop leak on unhandled error | `try/catch` wraps the entire loop body with exponential backoff. AbortController ensures clean shutdown. | +| Bot invited to many rooms, only some allowed | Room allowlist (`allowed_room_ids`) prevents processing. If empty, all joined rooms are processed. | +| Matrix homeserver rate limiting | The sync loop naturally rate-limits (one request at a time with 30s timeout). Send calls are user-triggered and unlikely to hit limits. | +| Access token expiry | Matrix access tokens are typically long-lived. If expired, whoami will fail on connect. Short-lived tokens (refresh tokens) are a future enhancement. | + +## Open Questions + +1. **Skip initial sync?** — Should the adapter skip processing events from the very first `/sync` response (which may contain old history)? Current design processes them all. Can add a `skip_initial_sync: true` config flag if needed. + +2. **Typing indicators?** — Matrix supports `m.typing` events. Should the adapter send a typing indicator while processing? Deferred for initial implementation (other adapters do this inconsistently — Telegram yes, Discord yes, Slack no). + +3. **Image/media attachments?** — Matrix supports `m.image`, `m.file`, `m.audio` msgtypes. The initial implementation only handles `m.text`. Media support can be added in a follow-up (download via `/_matrix/media/v3/download/{serverName}/{mediaId}`). + +4. **Formatted messages?** — Matrix supports `format: "org.matrix.custom.html"` with `formatted_body`. Should outbound messages include HTML? Deferred — plain text is sufficient for initial implementation. + +--- + +## Files Summary + +| Action | File | +|--------|------| +| Modify | `src/config/schema.ts` — add `matrixSchema` + type export | +| Create | `src/channels/matrix/adapter.ts` — full adapter implementation | +| Create | `src/channels/matrix/adapter.test.ts` — 24 tests | +| Create | `src/channels/matrix/index.ts` — barrel export | +| Modify | `src/channels/index.ts` — re-export Matrix adapter | +| Modify | `src/daemon/channels.ts` — register Matrix adapter | +| Modify | `src/gateway/handlers/config.ts` — redact access_token | +| Modify | `src/config/schema.test.ts` — config validation tests | diff --git a/docs/plans/state.json b/docs/plans/state.json index 2addc42..f6d8ab9 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -136,6 +136,34 @@ ], "test_status": "pnpm typecheck + pnpm test:run passing" }, + + "matrix-channel-adapter": { + "file": "2026-02-15-matrix-channel-adapter.md", + "status": "completed", + "date": "2026-02-16", + "updated": "2026-02-16", + "summary": "Added a Matrix channel adapter using the Matrix Client-Server API v3 (/sync long-poll + /send), with config schema support, daemon registration, services dashboard reporting, and secret redaction.", + "files_created": [ + "docs/plans/2026-02-15-matrix-channel-adapter.md", + "src/channels/matrix/adapter.ts", + "src/channels/matrix/adapter.test.ts", + "src/channels/matrix/index.ts" + ], + "files_modified": [ + "README.md", + "src/config/schema.ts", + "src/config/schema.test.ts", + "src/channels/index.ts", + "src/daemon/channels.ts", + "src/gateway/handlers/services.ts", + "src/gateway/handlers/services.test.ts", + "src/gateway/handlers/config.ts", + "src/gateway/handlers/handlers.test.ts", + "src/cli/shared.ts", + "src/cli/shared.test.ts" + ], + "test_status": "pnpm typecheck + pnpm test:run (1689 tests) passing" + }, "openclaw-style-personal-agent-without-openclaw-risks": { "file": "2026-02-14-openclaw-style-personal-agent-without-openclaw-risks-plan.md", "status": "completed", @@ -2095,7 +2123,7 @@ }, "overall_progress": { - "total_test_count": 1632, + "total_test_count": 1689, "all_tests_passing": true, "p0_completion": "3/3 (100%)", "p1_completion": "4/4 (100%)", @@ -2110,7 +2138,7 @@ "tier2_completion": "4/4 (100%) — inbound webhooks, vector memory search, Dockerfile, heartbeat monitor", "tier3_completion": "5/5 (100%) — lane queue, credential redaction, web UI token dashboard, xAI (Grok) provider, Voyage AI embeddings", "tier4_completion": "4/4 (100%) — gateway lock, shell completion, Tailscale Serve/Funnel, DM pairing codes", - "feature_gap_scorecard": "101/128 match (79%), 0 partial (0%), 27 missing (21%)", + "feature_gap_scorecard": "102/128 match (80%), 0 partial (0%), 26 missing (20%)", "operator_dx_milestone": "Phase 3 (Live Ops Dashboard): 2/2 plans complete — milestone done", "gmail_auth_cli": "flynn gmail-auth command implemented with OAuth2 flow, doctor check, config routed to Telegram", "native_audio_support": "completed — smart routing for native audio (Gemini/OpenAI/GitHub) vs Whisper transcription fallback",