Files
flynn/docs/plans/2026-02-15-matrix-channel-adapter.md
2026-02-15 18:02:14 -08:00

1431 lines
46 KiB
Markdown

# Matrix Channel Adapter Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Add a Matrix channel adapter using raw fetch against the Matrix Client-Server API v3, with zero external dependencies (no matrix-js-sdk).
**Architecture:** Polling-based adapter using `/_matrix/client/v3/sync` with incremental `since` tokens. Auth via pre-provisioned access token in config. Room allowlist + mention gating for multi-room safety. DM detection via `m.direct` account data. Outbound via `/_matrix/client/v3/rooms/{roomId}/send/m.room.message/{txnId}`.
**Tech Stack:** Node.js fetch (built-in), Vitest for tests, Zod for config schema validation.
---
## Context
### Existing Patterns
All Flynn channel adapters follow a consistent pattern:
1. **Config interface** — e.g. `TelegramAdapterConfig`, `DiscordAdapterConfig`
2. **Adapter class** implementing `ChannelAdapter` from `src/channels/types.ts`
3. **Barrel export** from `src/channels/<name>/index.ts`
4. **Re-export** in `src/channels/index.ts`
5. **Zod schema** in `src/config/schema.ts` (optional field on root config)
6. **Registration** in `src/daemon/channels.ts` (conditional on config presence)
7. **Tests** mocking the platform SDK; here we mock `global.fetch`
### Key ChannelAdapter interface
```typescript
interface ChannelAdapter {
readonly name: string;
readonly status: ChannelStatus;
connect(): Promise<void>;
disconnect(): Promise<void>;
send(peerId: string, message: OutboundMessage): Promise<void>;
onMessage(handler: (msg: InboundMessage) => void): void;
}
```
### senderId convention
- Telegram: `String(chatId)` (numeric chat ID)
- Discord: `channelId` (snowflake string)
- Slack: `channelId:threadTs`
- WhatsApp: `number@c.us`
For Matrix: `roomId` (e.g. `!abc123:matrix.org`). This gives one session per room, matching the Discord pattern. The Matrix userId (e.g. `@flynn:matrix.org`) goes into `senderName`.
### peerId convention
`peerId` passed to `send()` is the `roomId`. This is the same as `senderId` since replies go back to the same room.
---
## Config Shape
```yaml
matrix:
homeserver_url: "https://matrix.example.org"
access_token: "${MATRIX_ACCESS_TOKEN}"
allowed_room_ids:
- "!abc123:matrix.org"
require_mention: true # default: true
sync_timeout_ms: 30000 # default: 30000 (long-poll timeout)
display_name: "Flynn" # optional: set bot display name on connect
```
Zod schema:
```typescript
const matrixSchema = z.object({
homeserver_url: z.string().url('Homeserver URL is required'),
access_token: z.string().min(1, 'Access token is required'),
allowed_room_ids: z.array(z.string()).default([]),
require_mention: z.boolean().default(true),
sync_timeout_ms: z.number().min(1000).max(120000).default(30000),
display_name: z.string().optional(),
}).optional();
```
---
## Matrix Client-Server API Endpoints Used
| Endpoint | Method | Purpose |
|----------|--------|---------|
| `/_matrix/client/v3/sync` | GET | Long-poll for events (with `?since=&timeout=&filter=`) |
| `/_matrix/client/v3/rooms/{roomId}/send/m.room.message/{txnId}` | PUT | Send a message |
| `/_matrix/client/v3/user/{userId}/account_data/m.direct` | GET | Detect DM rooms |
| `/_matrix/client/v3/account/whoami` | GET | Resolve own userId on connect |
### Sync filter (minimize bandwidth)
```json
{
"room": {
"timeline": { "limit": 50 },
"state": { "lazy_load_members": true }
},
"presence": { "not_types": ["*"] },
"account_data": { "types": ["m.direct"] }
}
```
---
## Design Decisions
### 1. No matrix-js-sdk dependency
- **Choice**: Raw fetch calls against `/_matrix/client/v3` endpoints
- **Rationale**: User explicitly requested minimal dependencies. The Matrix CS API is well-documented REST/JSON. We only need sync, send, whoami, and account_data — 4 endpoints total.
- **Alternatives**: matrix-js-sdk (heavy, ~2MB, pulls in many transitive deps), matrix-bot-sdk (lighter but still a dep).
### 2. Polling via /sync long-poll (not WebSocket or SSE)
- **Choice**: `/sync` with `timeout` param for long-polling, looped in an async function with `AbortController` for clean shutdown.
- **Rationale**: This is the standard Matrix sync mechanism. Simple, robust, no extra deps. The `timeout` param makes it behave like long-polling (server holds connection open until events arrive or timeout elapses).
- **Alternatives**: Sliding Sync (MSC3575, not universally supported), WebSocket transport (experimental extension).
### 3. senderId = roomId
- **Choice**: Use the Matrix room ID (`!...`) as `senderId` in `InboundMessage`, matching the Discord adapter pattern where `senderId = channelId`.
- **Rationale**: Flynn creates one session per `channel:senderId` pair. Using `roomId` means one session per room, which is the natural boundary. The human user's Matrix ID goes into `senderName`.
### 4. DM detection via m.direct account data
- **Choice**: On connect, fetch the bot's `m.direct` account data to build a `Set<string>` of known DM room IDs. DM rooms bypass `require_mention`.
- **Rationale**: Matrix DMs are just rooms, but the `m.direct` account data flags which rooms are direct messages. This is the standard approach and doesn't require room member counting heuristics.
### 5. Mention detection via body text
- **Choice**: Check for `@displayName` or bot's Matrix user ID in the message body text.
- **Rationale**: Matrix `m.room.message` events include both `body` (plain text) and optional `formatted_body` (HTML with `<a href="https://matrix.to/#/@user:server">` pill links). Checking the plain text `body` for the display name or MXID is simpler and sufficient. The HTML formatted_body could also be checked for matrix.to links, but body text matching covers all clients.
### 6. Transaction ID for idempotent sends
- **Choice**: Generate txnId as `m${Date.now()}.${counter++}` per adapter instance.
- **Rationale**: Matrix PUT send requires a txnId for idempotency. A timestamp + counter ensures uniqueness within a process lifetime.
### 7. Message size limit
- **Choice**: Use `splitMessage()` utility with a 65536-char limit (Matrix spec allows up to 65536 bytes for event content).
- **Rationale**: Matrix's limit is much higher than Telegram (4096) or Discord (2000), but chunking at 65536 chars prevents oversized events.
---
## Implementation Steps
### Task 1: Add Matrix Zod Schema to Config
**Files:**
- Modify: `src/config/schema.ts`
**Step 1: Add the matrixSchema definition**
Add after the `whatsappSchema` definition (around line 350):
```typescript
const matrixSchema = z.object({
homeserver_url: z.string().url('Homeserver URL is required'),
access_token: z.string().min(1, 'Access token is required'),
allowed_room_ids: z.array(z.string()).default([]),
require_mention: z.boolean().default(true),
sync_timeout_ms: z.number().min(1000).max(120000).default(30000),
display_name: z.string().optional(),
}).optional();
```
**Step 2: Add `matrix` to configSchema**
In `configSchema` (around line 510), add after `whatsapp`:
```typescript
matrix: matrixSchema,
```
**Step 3: Add exported type**
After the existing type exports (around line 559), add:
```typescript
export type MatrixConfig = z.infer<typeof matrixSchema>;
```
**Step 4: Run typecheck**
Run: `pnpm typecheck`
Expected: PASS
**Step 5: Commit**
```bash
git add src/config/schema.ts
git commit -m "feat(matrix): add Matrix channel config schema"
```
---
### Task 2: Create Matrix Adapter — Core Structure and Connect
**Files:**
- Create: `src/channels/matrix/adapter.ts`
- Create: `src/channels/matrix/adapter.test.ts`
- Create: `src/channels/matrix/index.ts`
**Step 1: Write the adapter.test.ts skeleton with connect/disconnect tests**
Create `src/channels/matrix/adapter.test.ts`:
```typescript
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { MatrixAdapter, type MatrixAdapterConfig } from './adapter.js';
import type { InboundMessage } from '../types.js';
// ── Mock global fetch ─────────────────────────────────────────────
const mockFetch = vi.fn();
beforeEach(() => {
vi.stubGlobal('fetch', mockFetch);
});
afterEach(() => {
vi.restoreAllMocks();
});
const baseConfig: MatrixAdapterConfig = {
homeserverUrl: 'https://matrix.example.org',
accessToken: 'syt_test_token',
allowedRoomIds: ['!room1:example.org'],
requireMention: true,
syncTimeoutMs: 30000,
};
/** Helper: create a mock fetch Response. */
function mockResponse(body: unknown, status = 200): Response {
return {
ok: status >= 200 && status < 300,
status,
json: async () => body,
text: async () => JSON.stringify(body),
} as Response;
}
/** Helper: make fetch return specific responses in sequence. */
function mockFetchSequence(...responses: Response[]) {
for (const response of responses) {
mockFetch.mockResolvedValueOnce(response);
}
}
describe('MatrixAdapter', () => {
let adapter: MatrixAdapter;
beforeEach(() => {
vi.clearAllMocks();
adapter = new MatrixAdapter(baseConfig);
});
afterEach(async () => {
// Ensure sync loop is stopped
if (adapter.status === 'connected' || adapter.status === 'connecting') {
await adapter.disconnect();
}
});
// ── Basic properties ────────────────────────────────────────────
it('has name "matrix"', () => {
expect(adapter.name).toBe('matrix');
});
it('starts as disconnected', () => {
expect(adapter.status).toBe('disconnected');
});
// ── connect / disconnect ────────────────────────────────────────
it('connect resolves userId via whoami and sets connected', async () => {
// whoami
mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' }));
// m.direct account data
mockFetch.mockResolvedValueOnce(mockResponse({}));
// First sync (will be aborted on disconnect)
mockFetch.mockResolvedValueOnce(
new Promise(() => {}), // never resolves — simulates long-poll
);
await adapter.connect();
expect(adapter.status).toBe('connected');
// Verify whoami was called
expect(mockFetch).toHaveBeenCalledWith(
'https://matrix.example.org/_matrix/client/v3/account/whoami',
expect.objectContaining({
headers: expect.objectContaining({
Authorization: 'Bearer syt_test_token',
}),
}),
);
});
it('connect throws on whoami failure', async () => {
mockFetch.mockResolvedValueOnce(mockResponse({ errcode: 'M_UNKNOWN_TOKEN' }, 401));
await expect(adapter.connect()).rejects.toThrow();
});
it('disconnect sets status to disconnected', async () => {
// whoami
mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' }));
// m.direct
mockFetch.mockResolvedValueOnce(mockResponse({}));
// sync long-poll (never resolves)
mockFetch.mockResolvedValueOnce(new Promise(() => {}));
await adapter.connect();
expect(adapter.status).toBe('connected');
await adapter.disconnect();
expect(adapter.status).toBe('disconnected');
});
it('disconnect is safe when not connected', async () => {
await adapter.disconnect();
expect(adapter.status).toBe('disconnected');
});
});
```
**Step 2: Run test to verify it fails (adapter doesn't exist yet)**
Run: `pnpm test:run src/channels/matrix/adapter.test.ts`
Expected: FAIL — cannot resolve `./adapter.js`
**Step 3: Write adapter.ts with connect/disconnect**
Create `src/channels/matrix/adapter.ts`:
```typescript
/**
* Matrix channel adapter.
*
* Implements the ChannelAdapter interface using raw fetch against the
* Matrix Client-Server API v3. No external Matrix SDK dependency.
* Uses /sync long-polling for inbound messages and PUT /send for outbound.
*/
import type {
InboundMessage,
OutboundMessage,
ChannelAdapter,
ChannelStatus,
} from '../types.js';
import { splitMessage } from '../utils.js';
import type { PairingManager } from '../pairing.js';
/** Configuration for the Matrix channel adapter. */
export interface MatrixAdapterConfig {
homeserverUrl: string;
accessToken: string;
/** Room IDs to respond in. Empty = all joined rooms. */
allowedRoomIds?: string[];
/** Require @mention to respond in rooms (default: true). DMs always respond. */
requireMention?: boolean;
/** Long-poll timeout for /sync in milliseconds (default: 30000). */
syncTimeoutMs?: number;
/** Optional display name to set on connect. */
displayName?: string;
/** Optional pairing manager for DM pairing codes. */
pairingManager?: PairingManager;
}
/** Maximum message length before chunking (Matrix allows up to 65536 bytes). */
const MAX_MESSAGE_LENGTH = 65536;
/**
* Matrix channel adapter backed by raw Client-Server API v3 fetch calls.
*
* Handles room allowlist filtering, optional mention requirement,
* DM detection via m.direct, and message chunking.
*/
export class MatrixAdapter implements ChannelAdapter {
readonly name = 'matrix';
private _status: ChannelStatus = 'disconnected';
private messageHandler?: (msg: InboundMessage) => void;
private config: MatrixAdapterConfig;
/** Resolved bot user ID (e.g. @flynn:example.org). */
private userId: string | null = null;
/** Resolved bot display name (for mention detection). */
private botDisplayName: string | null = null;
/** Set of room IDs that are direct messages. */
private dmRoomIds: Set<string> = new Set();
/** Incremental sync token. */
private syncToken: string | null = null;
/** AbortController for the sync loop. */
private syncAbort: AbortController | null = null;
/** Transaction ID counter for idempotent sends. */
private txnCounter = 0;
get status(): ChannelStatus {
return this._status;
}
constructor(config: MatrixAdapterConfig) {
this.config = config;
}
/** Register the inbound message handler. Called by the registry before connect(). */
onMessage(handler: (msg: InboundMessage) => void): void {
this.messageHandler = handler;
}
/**
* Connect to the Matrix homeserver:
* 1. Resolve bot userId via /account/whoami
* 2. Fetch m.direct account data for DM detection
* 3. Start the /sync long-poll loop
*/
async connect(): Promise<void> {
this._status = 'connecting';
try {
// Resolve our own user ID
const whoami = await this.matrixGet<{ user_id: string }>(
'/_matrix/client/v3/account/whoami',
);
this.userId = whoami.user_id;
this.botDisplayName = this.config.displayName ?? this.userId.split(':')[0].slice(1);
// Fetch m.direct account data for DM detection
await this.loadDirectRooms();
// Start sync loop (fire and forget — runs until disconnect)
this.syncAbort = new AbortController();
this.runSyncLoop(this.syncAbort.signal);
this._status = 'connected';
console.log(`Matrix adapter connected as ${this.userId}`);
} catch (error) {
this._status = 'error';
throw error;
}
}
/** Stop the sync loop and clean up. */
async disconnect(): Promise<void> {
if (this.syncAbort) {
this.syncAbort.abort();
this.syncAbort = null;
}
this.userId = null;
this.syncToken = null;
this.dmRoomIds.clear();
this._status = 'disconnected';
}
/** Send an outbound message to a Matrix room. */
async send(peerId: string, message: OutboundMessage): Promise<void> {
if (this._status !== 'connected') {
throw new Error('Matrix adapter not connected');
}
const text = message.text ?? '';
if (!text.trim()) { return; }
if (text.length <= MAX_MESSAGE_LENGTH) {
await this.sendRoomMessage(peerId, text);
} else {
const chunks = splitMessage(text, MAX_MESSAGE_LENGTH);
for (const chunk of chunks) {
await this.sendRoomMessage(peerId, chunk);
}
}
}
// ── Private: Matrix API helpers ─────────────────────────────────
/** Make an authenticated GET request to the homeserver. */
private async matrixGet<T>(path: string, query?: Record<string, string>): Promise<T> {
const url = new URL(path, this.config.homeserverUrl);
if (query) {
for (const [k, v] of Object.entries(query)) {
url.searchParams.set(k, v);
}
}
const response = await fetch(url.toString(), {
method: 'GET',
headers: {
Authorization: `Bearer ${this.config.accessToken}`,
},
});
if (!response.ok) {
const body = await response.text();
throw new Error(`Matrix API error (${response.status}): ${body}`);
}
return response.json() as Promise<T>;
}
/** Make an authenticated PUT request to the homeserver. */
private async matrixPut<T>(path: string, body: unknown): Promise<T> {
const url = new URL(path, this.config.homeserverUrl);
const response = await fetch(url.toString(), {
method: 'PUT',
headers: {
Authorization: `Bearer ${this.config.accessToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
});
if (!response.ok) {
const text = await response.text();
throw new Error(`Matrix API error (${response.status}): ${text}`);
}
return response.json() as Promise<T>;
}
/** Send a text message to a room with idempotent txnId. */
private async sendRoomMessage(roomId: string, text: string): Promise<void> {
const txnId = `m${Date.now()}.${this.txnCounter++}`;
await this.matrixPut(
`/_matrix/client/v3/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${txnId}`,
{
msgtype: 'm.text',
body: text,
},
);
}
/** Load the m.direct account data to populate dmRoomIds. */
private async loadDirectRooms(): Promise<void> {
try {
const data = await this.matrixGet<Record<string, string[]>>(
`/_matrix/client/v3/user/${encodeURIComponent(this.userId!)}/account_data/m.direct`,
);
this.dmRoomIds.clear();
// m.direct maps userId → [roomId, ...]. Flatten all room IDs.
for (const roomIds of Object.values(data)) {
if (Array.isArray(roomIds)) {
for (const roomId of roomIds) {
this.dmRoomIds.add(roomId);
}
}
}
} catch {
// m.direct may not exist if the bot has no DMs yet — that's fine.
this.dmRoomIds.clear();
}
}
/** Check if a room is a DM. */
private isDM(roomId: string): boolean {
return this.dmRoomIds.has(roomId);
}
// ── Private: Sync loop ──────────────────────────────────────────
/** Run the /sync long-poll loop until aborted. */
private async runSyncLoop(signal: AbortSignal): Promise<void> {
// Build the sync filter to minimize bandwidth
const filter = JSON.stringify({
room: {
timeline: { limit: 50 },
state: { lazy_load_members: true },
},
presence: { not_types: ['*'] },
account_data: { types: ['m.direct'] },
});
while (!signal.aborted) {
try {
const query: Record<string, string> = {
timeout: String(this.config.syncTimeoutMs ?? 30000),
filter,
};
if (this.syncToken) {
query.since = this.syncToken;
}
const url = new URL('/_matrix/client/v3/sync', this.config.homeserverUrl);
for (const [k, v] of Object.entries(query)) {
url.searchParams.set(k, v);
}
const response = await fetch(url.toString(), {
method: 'GET',
headers: {
Authorization: `Bearer ${this.config.accessToken}`,
},
signal,
});
if (!response.ok) {
const body = await response.text();
console.error(`Matrix sync error (${response.status}): ${body}`);
// Back off on error
await this.sleep(5000, signal);
continue;
}
const syncResponse = await response.json() as MatrixSyncResponse;
this.syncToken = syncResponse.next_batch;
// Process sync response
this.processSyncResponse(syncResponse);
} catch (error) {
if (signal.aborted) { return; }
console.error('Matrix sync loop error:', error instanceof Error ? error.message : 'Unknown error');
// Back off on error
await this.sleep(5000, signal);
}
}
}
/** Process a /sync response — extract timeline events from joined rooms. */
private processSyncResponse(sync: MatrixSyncResponse): void {
if (!this.messageHandler) { return; }
const joinedRooms = sync.rooms?.join;
if (!joinedRooms) { return; }
// Update m.direct from account_data if present
const accountData = sync.account_data?.events;
if (accountData) {
for (const event of accountData) {
if (event.type === 'm.direct' && event.content) {
this.dmRoomIds.clear();
for (const roomIds of Object.values(event.content as Record<string, string[]>)) {
if (Array.isArray(roomIds)) {
for (const roomId of roomIds) {
this.dmRoomIds.add(roomId);
}
}
}
}
}
}
for (const [roomId, roomData] of Object.entries(joinedRooms)) {
const events = roomData.timeline?.events;
if (!events) { continue; }
for (const event of events) {
this.handleTimelineEvent(roomId, event);
}
}
}
/** Handle a single timeline event from a joined room. */
private handleTimelineEvent(roomId: string, event: MatrixEvent): void {
if (!this.messageHandler) { return; }
// Only process m.room.message events
if (event.type !== 'm.room.message') { return; }
// Ignore our own messages
if (event.sender === this.userId) { return; }
// Room allowlist check
const allowedRoomIds = this.config.allowedRoomIds ?? [];
if (allowedRoomIds.length > 0 && !allowedRoomIds.includes(roomId)) {
// Pairing fallback
const pm = this.config.pairingManager;
if (pm?.enabled) {
if (pm.isApproved('matrix', roomId)) {
// Approved — fall through
} else {
const text = (event.content?.body ?? '').trim();
if (text && pm.validateCode('matrix', roomId, text)) {
this.sendRoomMessage(roomId, 'Pairing successful! You can now chat with Flynn.').catch(() => {});
}
return;
}
} else {
return;
}
}
const body = event.content?.body ?? '';
const isDm = this.isDM(roomId);
// Mention gating: require mention in non-DM rooms
const requireMention = this.config.requireMention ?? true;
if (!isDm && requireMention) {
const isMentioned = this.isBotMentioned(body);
if (!isMentioned) { return; }
}
// Strip bot mention from text
let text = body;
if (this.botDisplayName) {
text = text.replace(new RegExp(`@?${this.escapeRegex(this.botDisplayName)}\\b`, 'gi'), '').trim();
}
if (this.userId) {
text = text.replace(new RegExp(this.escapeRegex(this.userId), 'g'), '').trim();
}
// Detect reset command
if (text === '!reset' || text === 'reset') {
this.messageHandler({
id: event.event_id,
channel: 'matrix',
senderId: roomId,
senderName: this.extractDisplayName(event.sender),
text: '!reset',
timestamp: event.origin_server_ts,
metadata: { isCommand: true, command: 'reset' },
});
return;
}
// Regular message
this.messageHandler({
id: event.event_id,
channel: 'matrix',
senderId: roomId,
senderName: this.extractDisplayName(event.sender),
text,
timestamp: event.origin_server_ts,
});
}
/** Check if the bot is mentioned in the message text. */
private isBotMentioned(text: string): boolean {
if (this.botDisplayName && text.toLowerCase().includes(this.botDisplayName.toLowerCase())) {
return true;
}
if (this.userId && text.includes(this.userId)) {
return true;
}
return false;
}
/** Extract a display name from a Matrix user ID. @user:server → user */
private extractDisplayName(userId: string): string {
const match = userId.match(/^@([^:]+)/);
return match ? match[1] : userId;
}
/** Escape a string for use in a RegExp. */
private escapeRegex(str: string): string {
return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
}
/** Sleep with abort support. */
private sleep(ms: number, signal: AbortSignal): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(resolve, ms);
signal.addEventListener('abort', () => {
clearTimeout(timer);
resolve();
}, { once: true });
});
}
}
// ── Matrix API response types ──────────────────────────────────────
interface MatrixSyncResponse {
next_batch: string;
rooms?: {
join?: Record<string, MatrixJoinedRoom>;
};
account_data?: {
events?: Array<{ type: string; content?: unknown }>;
};
}
interface MatrixJoinedRoom {
timeline?: {
events?: MatrixEvent[];
};
}
interface MatrixEvent {
type: string;
event_id: string;
sender: string;
origin_server_ts: number;
content?: {
body?: string;
msgtype?: string;
[key: string]: unknown;
};
}
```
**Step 4: Run test to verify connect/disconnect pass**
Run: `pnpm test:run src/channels/matrix/adapter.test.ts`
Expected: PASS (4 tests)
**Step 5: Create barrel export**
Create `src/channels/matrix/index.ts`:
```typescript
export { MatrixAdapter, type MatrixAdapterConfig } from './adapter.js';
```
**Step 6: Commit**
```bash
git add src/channels/matrix/
git commit -m "feat(matrix): add Matrix adapter core with connect/disconnect"
```
---
### Task 3: Add Send Tests and Verify Send Works
**Files:**
- Modify: `src/channels/matrix/adapter.test.ts`
**Step 1: Add send tests to the test file**
Append these tests inside the `describe('MatrixAdapter')` block:
```typescript
// ── send ──────────────────────────────────────────────────────
it('send throws when not connected', async () => {
await expect(adapter.send('!room1:example.org', { text: 'hello' })).rejects.toThrow(
'Matrix adapter not connected',
);
});
it('send delivers a message via PUT with txnId', async () => {
// Connect first
mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' }));
mockFetch.mockResolvedValueOnce(mockResponse({}));
mockFetch.mockResolvedValueOnce(new Promise(() => {})); // sync hangs
await adapter.connect();
// Send message
mockFetch.mockResolvedValueOnce(mockResponse({ event_id: '$sent1' }));
await adapter.send('!room1:example.org', { text: 'Hello there' });
// Find the PUT call (skip whoami, m.direct, sync)
const putCall = mockFetch.mock.calls.find(
(call) => call[1]?.method === 'PUT',
);
expect(putCall).toBeDefined();
expect(putCall![0]).toContain('/_matrix/client/v3/rooms/');
expect(putCall![0]).toContain('/send/m.room.message/');
const body = JSON.parse(putCall![1].body);
expect(body.msgtype).toBe('m.text');
expect(body.body).toBe('Hello there');
});
it('send skips empty text', async () => {
mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' }));
mockFetch.mockResolvedValueOnce(mockResponse({}));
mockFetch.mockResolvedValueOnce(new Promise(() => {}));
await adapter.connect();
await adapter.send('!room1:example.org', { text: '' });
// No PUT call should have been made
const putCall = mockFetch.mock.calls.find(
(call) => call[1]?.method === 'PUT',
);
expect(putCall).toBeUndefined();
});
```
**Step 2: Run tests**
Run: `pnpm test:run src/channels/matrix/adapter.test.ts`
Expected: PASS (7 tests)
**Step 3: Commit**
```bash
git add src/channels/matrix/adapter.test.ts
git commit -m "test(matrix): add send tests"
```
---
### Task 4: Add Inbound Message Handling Tests
**Files:**
- Modify: `src/channels/matrix/adapter.test.ts`
**Step 1: Add a helper to connect the adapter and simulate sync**
Add these helpers near the top of the test file (after `mockFetchSequence`):
```typescript
/** Helper: connect the adapter with standard mocks. Returns a function to simulate sync. */
async function connectAdapter(adapter: MatrixAdapter, directRooms: Record<string, string[]> = {}) {
// whoami
mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' }));
// m.direct
mockFetch.mockResolvedValueOnce(mockResponse(directRooms));
// sync long-poll — we'll control this manually
let syncResolve: ((value: Response) => void) | null = null;
mockFetch.mockResolvedValueOnce(
new Promise<Response>((resolve) => { syncResolve = resolve; }),
);
await adapter.connect();
return {
/** Simulate a sync response arriving. */
async emitSync(syncData: unknown) {
if (syncResolve) {
syncResolve(mockResponse(syncData));
syncResolve = null;
}
// Allow the sync loop to process
await new Promise((r) => setTimeout(r, 10));
// Set up next sync to hang
mockFetch.mockResolvedValueOnce(new Promise(() => {}));
},
};
}
/** Helper: create a Matrix timeline event. */
function createMatrixEvent(overrides: Record<string, unknown> = {}) {
return {
type: 'm.room.message',
event_id: '$evt1',
sender: '@alice:example.org',
origin_server_ts: 1700000000000,
content: {
msgtype: 'm.text',
body: 'Hello Flynn',
},
...overrides,
};
}
/** Helper: wrap events into a sync response. */
function createSyncResponse(roomId: string, events: unknown[], extras: Record<string, unknown> = {}) {
return {
next_batch: 's_next',
rooms: {
join: {
[roomId]: {
timeline: { events },
},
},
},
...extras,
};
}
```
**Step 2: Add inbound message tests**
Append inside the `describe('MatrixAdapter')` block:
```typescript
// ── Inbound message handling ────────────────────────────────────
it('inbound message from allowed room with mention triggers handler', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
const { emitSync } = await connectAdapter(adapter);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: '@Flynn Hello there' },
});
await emitSync(createSyncResponse('!room1:example.org', [event]));
expect(handler).toHaveBeenCalledTimes(1);
const msg: InboundMessage = handler.mock.calls[0][0];
expect(msg.channel).toBe('matrix');
expect(msg.senderId).toBe('!room1:example.org');
expect(msg.senderName).toBe('alice');
expect(msg.text).toBe('Hello there');
expect(msg.id).toBe('$evt1');
});
it('inbound message without mention is ignored when requireMention is true', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
const { emitSync } = await connectAdapter(adapter);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: 'Hello everyone' },
});
await emitSync(createSyncResponse('!room1:example.org', [event]));
expect(handler).not.toHaveBeenCalled();
});
it('inbound message without mention is accepted when requireMention is false', async () => {
const noMentionAdapter = new MatrixAdapter({
...baseConfig,
requireMention: false,
});
const handler = vi.fn();
noMentionAdapter.onMessage(handler);
const { emitSync } = await connectAdapter(noMentionAdapter);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: 'Hello everyone' },
});
await emitSync(createSyncResponse('!room1:example.org', [event]));
expect(handler).toHaveBeenCalledTimes(1);
await noMentionAdapter.disconnect();
});
it('ignores own messages (sender === userId)', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
const { emitSync } = await connectAdapter(adapter);
const event = createMatrixEvent({
sender: '@flynn:example.org',
content: { msgtype: 'm.text', body: 'My own message' },
});
await emitSync(createSyncResponse('!room1:example.org', [event]));
expect(handler).not.toHaveBeenCalled();
});
it('ignores messages in disallowed rooms', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
const { emitSync } = await connectAdapter(adapter);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: '@Flynn hello' },
});
await emitSync(createSyncResponse('!notallowed:example.org', [event]));
expect(handler).not.toHaveBeenCalled();
});
it('DM room bypasses mention requirement', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
// Mark !dm1 as a DM room via m.direct
const directRooms = { '@alice:example.org': ['!dm1:example.org'] };
const dmAdapter = new MatrixAdapter({
...baseConfig,
allowedRoomIds: ['!dm1:example.org'],
});
dmAdapter.onMessage(handler);
const { emitSync } = await connectAdapter(dmAdapter, directRooms);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: 'Hello without mention' },
});
await emitSync(createSyncResponse('!dm1:example.org', [event]));
expect(handler).toHaveBeenCalledTimes(1);
const msg: InboundMessage = handler.mock.calls[0][0];
expect(msg.text).toBe('Hello without mention');
await dmAdapter.disconnect();
});
it('!reset command delivers reset metadata', async () => {
const noMentionAdapter = new MatrixAdapter({
...baseConfig,
requireMention: false,
});
const handler = vi.fn();
noMentionAdapter.onMessage(handler);
const { emitSync } = await connectAdapter(noMentionAdapter);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: '!reset' },
});
await emitSync(createSyncResponse('!room1:example.org', [event]));
expect(handler).toHaveBeenCalledTimes(1);
const msg: InboundMessage = handler.mock.calls[0][0];
expect(msg.text).toBe('!reset');
expect(msg.metadata).toEqual({ isCommand: true, command: 'reset' });
await noMentionAdapter.disconnect();
});
it('strips bot mention from message text', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
const { emitSync } = await connectAdapter(adapter);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: '@Flynn What is the weather?' },
});
await emitSync(createSyncResponse('!room1:example.org', [event]));
expect(handler).toHaveBeenCalledTimes(1);
const msg: InboundMessage = handler.mock.calls[0][0];
expect(msg.text).toBe('What is the weather?');
});
it('ignores non-message events (e.g. m.room.member)', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
const { emitSync } = await connectAdapter(adapter);
const event = createMatrixEvent({
type: 'm.room.member',
content: { membership: 'join' },
});
await emitSync(createSyncResponse('!room1:example.org', [event]));
expect(handler).not.toHaveBeenCalled();
});
it('accepts all rooms when allowedRoomIds is empty', async () => {
const openAdapter = new MatrixAdapter({
...baseConfig,
allowedRoomIds: [],
requireMention: false,
});
const handler = vi.fn();
openAdapter.onMessage(handler);
const { emitSync } = await connectAdapter(openAdapter);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: 'Hello from any room' },
});
await emitSync(createSyncResponse('!anyrandom:example.org', [event]));
expect(handler).toHaveBeenCalledTimes(1);
const msg: InboundMessage = handler.mock.calls[0][0];
expect(msg.text).toBe('Hello from any room');
await openAdapter.disconnect();
});
it('does nothing when no message handler is registered', async () => {
// Don't call onMessage
const { emitSync } = await connectAdapter(adapter);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: '@Flynn hello' },
});
// Should not throw
await emitSync(createSyncResponse('!room1:example.org', [event]));
});
it('mention detection works with Matrix user ID', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
const { emitSync } = await connectAdapter(adapter);
const event = createMatrixEvent({
content: { msgtype: 'm.text', body: '@flynn:example.org can you help?' },
});
await emitSync(createSyncResponse('!room1:example.org', [event]));
expect(handler).toHaveBeenCalledTimes(1);
const msg: InboundMessage = handler.mock.calls[0][0];
expect(msg.text).toBe('can you help?');
});
```
**Step 2: Run tests**
Run: `pnpm test:run src/channels/matrix/adapter.test.ts`
Expected: PASS (all ~19 tests)
**Step 3: Commit**
```bash
git add src/channels/matrix/adapter.test.ts
git commit -m "test(matrix): add inbound message handling and DM detection tests"
```
---
### Task 5: Wire Matrix Adapter into the System
**Files:**
- Modify: `src/channels/index.ts`
- Modify: `src/daemon/channels.ts`
**Step 1: Add Matrix exports to src/channels/index.ts**
Add after the WhatsApp export line:
```typescript
export { MatrixAdapter, type MatrixAdapterConfig } from './matrix/index.js';
```
**Step 2: Register Matrix adapter in src/daemon/channels.ts**
Add import at top:
```typescript
import { ChannelRegistry, TelegramAdapter, WebChatAdapter, DiscordAdapter, SlackAdapter, WhatsAppAdapter, MatrixAdapter, PairingManager } from '../channels/index.js';
```
Add registration block after the WhatsApp block (around line 71):
```typescript
// Register Matrix adapter (if configured)
if (config.matrix) {
const matrixAdapter = new MatrixAdapter({
homeserverUrl: config.matrix.homeserver_url,
accessToken: config.matrix.access_token,
allowedRoomIds: config.matrix.allowed_room_ids.length > 0 ? config.matrix.allowed_room_ids : undefined,
requireMention: config.matrix.require_mention,
syncTimeoutMs: config.matrix.sync_timeout_ms,
displayName: config.matrix.display_name,
pairingManager,
});
channelRegistry.register(matrixAdapter);
}
```
**Step 3: Run typecheck**
Run: `pnpm typecheck`
Expected: PASS
**Step 4: Run all channel tests to ensure no regressions**
Run: `pnpm test:run src/channels/`
Expected: All existing tests PASS
**Step 5: Commit**
```bash
git add src/channels/index.ts src/daemon/channels.ts
git commit -m "feat(matrix): wire Matrix adapter into channel registry and daemon"
```
---
### Task 6: Add Config Schema Test
**Files:**
- Modify: `src/config/schema.test.ts` (if exists, otherwise verify with existing tests)
**Step 1: Check for existing config schema tests**
Read `src/config/schema.test.ts` and add a Matrix config validation test.
```typescript
it('validates matrix config', () => {
const config = configSchema.parse({
models: { default: { provider: 'anthropic', model: 'claude-sonnet-4-20250514' } },
matrix: {
homeserver_url: 'https://matrix.example.org',
access_token: 'syt_test_token',
allowed_room_ids: ['!room1:example.org'],
require_mention: true,
},
});
expect(config.matrix).toBeDefined();
expect(config.matrix!.homeserver_url).toBe('https://matrix.example.org');
expect(config.matrix!.require_mention).toBe(true);
expect(config.matrix!.sync_timeout_ms).toBe(30000); // default
});
it('matrix config is optional', () => {
const config = configSchema.parse({
models: { default: { provider: 'anthropic', model: 'claude-sonnet-4-20250514' } },
});
expect(config.matrix).toBeUndefined();
});
it('matrix config rejects invalid homeserver URL', () => {
expect(() => configSchema.parse({
models: { default: { provider: 'anthropic', model: 'claude-sonnet-4-20250514' } },
matrix: {
homeserver_url: 'not-a-url',
access_token: 'token',
},
})).toThrow();
});
```
**Step 2: Run config schema tests**
Run: `pnpm test:run src/config/schema.test.ts`
Expected: PASS
**Step 3: Commit**
```bash
git add src/config/schema.test.ts
git commit -m "test(matrix): add config schema validation tests"
```
---
### Task 7: Add Credential Redaction for Matrix Token
**Files:**
- Modify: `src/gateway/handlers/config.ts`
**Step 1: Add matrix access_token to redaction**
In the `redactConfig()` function, add redaction for the Matrix access token alongside the existing channel token redactions:
```typescript
// After the existing channel token redactions (e.g., discord.bot_token)
if (redacted.matrix) {
redacted.matrix = { ...redacted.matrix, access_token: REDACTED };
}
```
**Step 2: Run tests**
Run: `pnpm test:run src/gateway/handlers/handlers.test.ts`
Expected: PASS
**Step 3: Commit**
```bash
git add src/gateway/handlers/config.ts
git commit -m "feat(matrix): add access_token to config redaction"
```
---
### Task 8: Final Verification
**Step 1: Run full typecheck**
Run: `pnpm typecheck`
Expected: PASS
**Step 2: Run all tests**
Run: `pnpm test:run`
Expected: All tests PASS
**Step 3: Commit any remaining changes**
```bash
git add -A
git commit -m "feat(matrix): complete Matrix channel adapter implementation"
```
---
## Testing Summary
| Test Category | Count | Description |
|---------------|-------|-------------|
| Basic properties | 2 | name = "matrix", starts disconnected |
| connect/disconnect | 4 | whoami resolution, error handling, clean disconnect, safe disconnect when not connected |
| send | 3 | throws when disconnected, PUT with txnId, skips empty text |
| Inbound messages | 12 | mention gating, DM bypass, own message filtering, room allowlist, reset command, mention stripping, MXID mention, non-message events, open rooms, no handler |
| Config schema | 3 | valid config, optional, invalid URL rejection |
| **Total** | **~24** | |
## Error Handling Strategy
| Error | Recovery |
|-------|----------|
| `/account/whoami` 401 | Throw on connect — adapter stays in error state |
| `/sync` HTTP error | Log, back off 5s, retry |
| `/sync` network error | Log, back off 5s, retry |
| `/send` failure | Throw — caller (registry) logs the error |
| `m.direct` 404 | Silently ignore — no DMs known yet |
| AbortError during sync | Clean exit from loop |
## Sync Loop Lifecycle
```
connect()
├── GET /account/whoami → resolve userId
├── GET /user/{userId}/account_data/m.direct → populate dmRoomIds
└── runSyncLoop() (fire-and-forget async)
├── GET /sync?timeout=30000 (first call, no since token)
│ └── next_batch → save as syncToken
│ └── rooms.join → process timeline events
│ └── account_data → update dmRoomIds
├── GET /sync?timeout=30000&since=... (subsequent calls)
│ └── (repeat)
└── (on error) → sleep 5s → retry
disconnect()
└── AbortController.abort() → sync loop exits
```
## Risks and Mitigations
| Risk | Mitigation |
|------|------------|
| Initial sync floods events from history | First sync may replay old events. The adapter could skip the first sync's events (only use it to get `next_batch`). However, this is how other adapters work (Telegram's grammy does NOT replay), so we process all events from the first sync. If this is a problem, a `skip_initial_sync` config flag can be added later. |
| Sync loop leak on unhandled error | `try/catch` wraps the entire loop body with exponential backoff. AbortController ensures clean shutdown. |
| Bot invited to many rooms, only some allowed | Room allowlist (`allowed_room_ids`) prevents processing. If empty, all joined rooms are processed. |
| Matrix homeserver rate limiting | The sync loop naturally rate-limits (one request at a time with 30s timeout). Send calls are user-triggered and unlikely to hit limits. |
| Access token expiry | Matrix access tokens are typically long-lived. If expired, whoami will fail on connect. Short-lived tokens (refresh tokens) are a future enhancement. |
## Open Questions
1. **Skip initial sync?** — Should the adapter skip processing events from the very first `/sync` response (which may contain old history)? Current design processes them all. Can add a `skip_initial_sync: true` config flag if needed.
2. **Typing indicators?** — Matrix supports `m.typing` events. Should the adapter send a typing indicator while processing? Deferred for initial implementation (other adapters do this inconsistently — Telegram yes, Discord yes, Slack no).
3. **Image/media attachments?** — Matrix supports `m.image`, `m.file`, `m.audio` msgtypes. The initial implementation only handles `m.text`. Media support can be added in a follow-up (download via `/_matrix/media/v3/download/{serverName}/{mediaId}`).
4. **Formatted messages?** — Matrix supports `format: "org.matrix.custom.html"` with `formatted_body`. Should outbound messages include HTML? Deferred — plain text is sufficient for initial implementation.
---
## Files Summary
| Action | File |
|--------|------|
| Modify | `src/config/schema.ts` — add `matrixSchema` + type export |
| Create | `src/channels/matrix/adapter.ts` — full adapter implementation |
| Create | `src/channels/matrix/adapter.test.ts` — 24 tests |
| Create | `src/channels/matrix/index.ts` — barrel export |
| Modify | `src/channels/index.ts` — re-export Matrix adapter |
| Modify | `src/daemon/channels.ts` — register Matrix adapter |
| Modify | `src/gateway/handlers/config.ts` — redact access_token |
| Modify | `src/config/schema.test.ts` — config validation tests |