Files
flynn/docs/plans/2026-02-15-matrix-channel-adapter.md
T
2026-02-15 18:02:14 -08:00

46 KiB

Matrix Channel Adapter Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Add a Matrix channel adapter using raw fetch against the Matrix Client-Server API v3, with zero external dependencies (no matrix-js-sdk).

Architecture: Polling-based adapter using /_matrix/client/v3/sync with incremental since tokens. Auth via pre-provisioned access token in config. Room allowlist + mention gating for multi-room safety. DM detection via m.direct account data. Outbound via /_matrix/client/v3/rooms/{roomId}/send/m.room.message/{txnId}.

Tech Stack: Node.js fetch (built-in), Vitest for tests, Zod for config schema validation.


Context

Existing Patterns

All Flynn channel adapters follow a consistent pattern:

  1. Config interface — e.g. TelegramAdapterConfig, DiscordAdapterConfig
  2. Adapter class implementing ChannelAdapter from src/channels/types.ts
  3. Barrel export from src/channels/<name>/index.ts
  4. Re-export in src/channels/index.ts
  5. Zod schema in src/config/schema.ts (optional field on root config)
  6. Registration in src/daemon/channels.ts (conditional on config presence)
  7. Tests mocking the platform SDK; here we mock global.fetch

Key ChannelAdapter interface

interface ChannelAdapter {
  readonly name: string;
  readonly status: ChannelStatus;
  connect(): Promise<void>;
  disconnect(): Promise<void>;
  send(peerId: string, message: OutboundMessage): Promise<void>;
  onMessage(handler: (msg: InboundMessage) => void): void;
}

senderId convention

  • Telegram: String(chatId) (numeric chat ID)
  • Discord: channelId (snowflake string)
  • Slack: channelId:threadTs
  • WhatsApp: number@c.us

For Matrix: roomId (e.g. !abc123:matrix.org). This gives one session per room, matching the Discord pattern. The Matrix userId (e.g. @flynn:matrix.org) goes into senderName.

peerId convention

peerId passed to send() is the roomId. This is the same as senderId since replies go back to the same room.


Config Shape

matrix:
  homeserver_url: "https://matrix.example.org"
  access_token: "${MATRIX_ACCESS_TOKEN}"
  allowed_room_ids:
    - "!abc123:matrix.org"
  require_mention: true        # default: true
  sync_timeout_ms: 30000       # default: 30000 (long-poll timeout)
  display_name: "Flynn"        # optional: set bot display name on connect

Zod schema:

const matrixSchema = z.object({
  homeserver_url: z.string().url('Homeserver URL is required'),
  access_token: z.string().min(1, 'Access token is required'),
  allowed_room_ids: z.array(z.string()).default([]),
  require_mention: z.boolean().default(true),
  sync_timeout_ms: z.number().min(1000).max(120000).default(30000),
  display_name: z.string().optional(),
}).optional();

Matrix Client-Server API Endpoints Used

Endpoint Method Purpose
/_matrix/client/v3/sync GET Long-poll for events (with ?since=&timeout=&filter=)
/_matrix/client/v3/rooms/{roomId}/send/m.room.message/{txnId} PUT Send a message
/_matrix/client/v3/user/{userId}/account_data/m.direct GET Detect DM rooms
/_matrix/client/v3/account/whoami GET Resolve own userId on connect

Sync filter (minimize bandwidth)

{
  "room": {
    "timeline": { "limit": 50 },
    "state": { "lazy_load_members": true }
  },
  "presence": { "not_types": ["*"] },
  "account_data": { "types": ["m.direct"] }
}

Design Decisions

1. No matrix-js-sdk dependency

  • Choice: Raw fetch calls against /_matrix/client/v3 endpoints
  • Rationale: User explicitly requested minimal dependencies. The Matrix CS API is well-documented REST/JSON. We only need sync, send, whoami, and account_data — 4 endpoints total.
  • Alternatives: matrix-js-sdk (heavy, ~2MB, pulls in many transitive deps), matrix-bot-sdk (lighter but still a dep).

2. Polling via /sync long-poll (not WebSocket or SSE)

  • Choice: /sync with timeout param for long-polling, looped in an async function with AbortController for clean shutdown.
  • Rationale: This is the standard Matrix sync mechanism. Simple, robust, no extra deps. The timeout param makes it behave like long-polling (server holds connection open until events arrive or timeout elapses).
  • Alternatives: Sliding Sync (MSC3575, not universally supported), WebSocket transport (experimental extension).

3. senderId = roomId

  • Choice: Use the Matrix room ID (!...) as senderId in InboundMessage, matching the Discord adapter pattern where senderId = channelId.
  • Rationale: Flynn creates one session per channel:senderId pair. Using roomId means one session per room, which is the natural boundary. The human user's Matrix ID goes into senderName.

4. DM detection via m.direct account data

  • Choice: On connect, fetch the bot's m.direct account data to build a Set<string> of known DM room IDs. DM rooms bypass require_mention.
  • Rationale: Matrix DMs are just rooms, but the m.direct account data flags which rooms are direct messages. This is the standard approach and doesn't require room member counting heuristics.

5. Mention detection via body text

  • Choice: Check for @displayName or bot's Matrix user ID in the message body text.
  • Rationale: Matrix m.room.message events include both body (plain text) and optional formatted_body (HTML with <a href="https://matrix.to/#/@user:server"> pill links). Checking the plain text body for the display name or MXID is simpler and sufficient. The HTML formatted_body could also be checked for matrix.to links, but body text matching covers all clients.

6. Transaction ID for idempotent sends

  • Choice: Generate txnId as m${Date.now()}.${counter++} per adapter instance.
  • Rationale: Matrix PUT send requires a txnId for idempotency. A timestamp + counter ensures uniqueness within a process lifetime.

7. Message size limit

  • Choice: Use splitMessage() utility with a 65536-char limit (Matrix spec allows up to 65536 bytes for event content).
  • Rationale: Matrix's limit is much higher than Telegram (4096) or Discord (2000), but chunking at 65536 chars prevents oversized events.

Implementation Steps

Task 1: Add Matrix Zod Schema to Config

Files:

  • Modify: src/config/schema.ts

Step 1: Add the matrixSchema definition

Add after the whatsappSchema definition (around line 350):

const matrixSchema = z.object({
  homeserver_url: z.string().url('Homeserver URL is required'),
  access_token: z.string().min(1, 'Access token is required'),
  allowed_room_ids: z.array(z.string()).default([]),
  require_mention: z.boolean().default(true),
  sync_timeout_ms: z.number().min(1000).max(120000).default(30000),
  display_name: z.string().optional(),
}).optional();

Step 2: Add matrix to configSchema

In configSchema (around line 510), add after whatsapp:

matrix: matrixSchema,

Step 3: Add exported type

After the existing type exports (around line 559), add:

export type MatrixConfig = z.infer<typeof matrixSchema>;

Step 4: Run typecheck

Run: pnpm typecheck Expected: PASS

Step 5: Commit

git add src/config/schema.ts
git commit -m "feat(matrix): add Matrix channel config schema"

Task 2: Create Matrix Adapter — Core Structure and Connect

Files:

  • Create: src/channels/matrix/adapter.ts
  • Create: src/channels/matrix/adapter.test.ts
  • Create: src/channels/matrix/index.ts

Step 1: Write the adapter.test.ts skeleton with connect/disconnect tests

Create src/channels/matrix/adapter.test.ts:

import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { MatrixAdapter, type MatrixAdapterConfig } from './adapter.js';
import type { InboundMessage } from '../types.js';

// ── Mock global fetch ─────────────────────────────────────────────

const mockFetch = vi.fn();

beforeEach(() => {
  vi.stubGlobal('fetch', mockFetch);
});

afterEach(() => {
  vi.restoreAllMocks();
});

const baseConfig: MatrixAdapterConfig = {
  homeserverUrl: 'https://matrix.example.org',
  accessToken: 'syt_test_token',
  allowedRoomIds: ['!room1:example.org'],
  requireMention: true,
  syncTimeoutMs: 30000,
};

/** Helper: create a mock fetch Response. */
function mockResponse(body: unknown, status = 200): Response {
  return {
    ok: status >= 200 && status < 300,
    status,
    json: async () => body,
    text: async () => JSON.stringify(body),
  } as Response;
}

/** Helper: make fetch return specific responses in sequence. */
function mockFetchSequence(...responses: Response[]) {
  for (const response of responses) {
    mockFetch.mockResolvedValueOnce(response);
  }
}

describe('MatrixAdapter', () => {
  let adapter: MatrixAdapter;

  beforeEach(() => {
    vi.clearAllMocks();
    adapter = new MatrixAdapter(baseConfig);
  });

  afterEach(async () => {
    // Ensure sync loop is stopped
    if (adapter.status === 'connected' || adapter.status === 'connecting') {
      await adapter.disconnect();
    }
  });

  // ── Basic properties ────────────────────────────────────────────

  it('has name "matrix"', () => {
    expect(adapter.name).toBe('matrix');
  });

  it('starts as disconnected', () => {
    expect(adapter.status).toBe('disconnected');
  });

  // ── connect / disconnect ────────────────────────────────────────

  it('connect resolves userId via whoami and sets connected', async () => {
    // whoami
    mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' }));
    // m.direct account data
    mockFetch.mockResolvedValueOnce(mockResponse({}));
    // First sync (will be aborted on disconnect)
    mockFetch.mockResolvedValueOnce(
      new Promise(() => {}), // never resolves — simulates long-poll
    );

    await adapter.connect();

    expect(adapter.status).toBe('connected');
    // Verify whoami was called
    expect(mockFetch).toHaveBeenCalledWith(
      'https://matrix.example.org/_matrix/client/v3/account/whoami',
      expect.objectContaining({
        headers: expect.objectContaining({
          Authorization: 'Bearer syt_test_token',
        }),
      }),
    );
  });

  it('connect throws on whoami failure', async () => {
    mockFetch.mockResolvedValueOnce(mockResponse({ errcode: 'M_UNKNOWN_TOKEN' }, 401));

    await expect(adapter.connect()).rejects.toThrow();
  });

  it('disconnect sets status to disconnected', async () => {
    // whoami
    mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' }));
    // m.direct
    mockFetch.mockResolvedValueOnce(mockResponse({}));
    // sync long-poll (never resolves)
    mockFetch.mockResolvedValueOnce(new Promise(() => {}));

    await adapter.connect();
    expect(adapter.status).toBe('connected');

    await adapter.disconnect();
    expect(adapter.status).toBe('disconnected');
  });

  it('disconnect is safe when not connected', async () => {
    await adapter.disconnect();
    expect(adapter.status).toBe('disconnected');
  });
});

Step 2: Run test to verify it fails (adapter doesn't exist yet)

Run: pnpm test:run src/channels/matrix/adapter.test.ts Expected: FAIL — cannot resolve ./adapter.js

Step 3: Write adapter.ts with connect/disconnect

Create src/channels/matrix/adapter.ts:

/**
 * Matrix channel adapter.
 *
 * Implements the ChannelAdapter interface using raw fetch against the
 * Matrix Client-Server API v3. No external Matrix SDK dependency.
 * Uses /sync long-polling for inbound messages and PUT /send for outbound.
 */

import type {
  InboundMessage,
  OutboundMessage,
  ChannelAdapter,
  ChannelStatus,
} from '../types.js';
import { splitMessage } from '../utils.js';
import type { PairingManager } from '../pairing.js';

/** Configuration for the Matrix channel adapter. */
export interface MatrixAdapterConfig {
  homeserverUrl: string;
  accessToken: string;
  /** Room IDs to respond in. Empty = all joined rooms. */
  allowedRoomIds?: string[];
  /** Require @mention to respond in rooms (default: true). DMs always respond. */
  requireMention?: boolean;
  /** Long-poll timeout for /sync in milliseconds (default: 30000). */
  syncTimeoutMs?: number;
  /** Optional display name to set on connect. */
  displayName?: string;
  /** Optional pairing manager for DM pairing codes. */
  pairingManager?: PairingManager;
}

/** Maximum message length before chunking (Matrix allows up to 65536 bytes). */
const MAX_MESSAGE_LENGTH = 65536;

/**
 * Matrix channel adapter backed by raw Client-Server API v3 fetch calls.
 *
 * Handles room allowlist filtering, optional mention requirement,
 * DM detection via m.direct, and message chunking.
 */
export class MatrixAdapter implements ChannelAdapter {
  readonly name = 'matrix';

  private _status: ChannelStatus = 'disconnected';
  private messageHandler?: (msg: InboundMessage) => void;
  private config: MatrixAdapterConfig;

  /** Resolved bot user ID (e.g. @flynn:example.org). */
  private userId: string | null = null;
  /** Resolved bot display name (for mention detection). */
  private botDisplayName: string | null = null;
  /** Set of room IDs that are direct messages. */
  private dmRoomIds: Set<string> = new Set();
  /** Incremental sync token. */
  private syncToken: string | null = null;
  /** AbortController for the sync loop. */
  private syncAbort: AbortController | null = null;
  /** Transaction ID counter for idempotent sends. */
  private txnCounter = 0;

  get status(): ChannelStatus {
    return this._status;
  }

  constructor(config: MatrixAdapterConfig) {
    this.config = config;
  }

  /** Register the inbound message handler. Called by the registry before connect(). */
  onMessage(handler: (msg: InboundMessage) => void): void {
    this.messageHandler = handler;
  }

  /**
   * Connect to the Matrix homeserver:
   * 1. Resolve bot userId via /account/whoami
   * 2. Fetch m.direct account data for DM detection
   * 3. Start the /sync long-poll loop
   */
  async connect(): Promise<void> {
    this._status = 'connecting';

    try {
      // Resolve our own user ID
      const whoami = await this.matrixGet<{ user_id: string }>(
        '/_matrix/client/v3/account/whoami',
      );
      this.userId = whoami.user_id;
      this.botDisplayName = this.config.displayName ?? this.userId.split(':')[0].slice(1);

      // Fetch m.direct account data for DM detection
      await this.loadDirectRooms();

      // Start sync loop (fire and forget — runs until disconnect)
      this.syncAbort = new AbortController();
      this.runSyncLoop(this.syncAbort.signal);

      this._status = 'connected';
      console.log(`Matrix adapter connected as ${this.userId}`);
    } catch (error) {
      this._status = 'error';
      throw error;
    }
  }

  /** Stop the sync loop and clean up. */
  async disconnect(): Promise<void> {
    if (this.syncAbort) {
      this.syncAbort.abort();
      this.syncAbort = null;
    }
    this.userId = null;
    this.syncToken = null;
    this.dmRoomIds.clear();
    this._status = 'disconnected';
  }

  /** Send an outbound message to a Matrix room. */
  async send(peerId: string, message: OutboundMessage): Promise<void> {
    if (this._status !== 'connected') {
      throw new Error('Matrix adapter not connected');
    }

    const text = message.text ?? '';
    if (!text.trim()) { return; }

    if (text.length <= MAX_MESSAGE_LENGTH) {
      await this.sendRoomMessage(peerId, text);
    } else {
      const chunks = splitMessage(text, MAX_MESSAGE_LENGTH);
      for (const chunk of chunks) {
        await this.sendRoomMessage(peerId, chunk);
      }
    }
  }

  // ── Private: Matrix API helpers ─────────────────────────────────

  /** Make an authenticated GET request to the homeserver. */
  private async matrixGet<T>(path: string, query?: Record<string, string>): Promise<T> {
    const url = new URL(path, this.config.homeserverUrl);
    if (query) {
      for (const [k, v] of Object.entries(query)) {
        url.searchParams.set(k, v);
      }
    }

    const response = await fetch(url.toString(), {
      method: 'GET',
      headers: {
        Authorization: `Bearer ${this.config.accessToken}`,
      },
    });

    if (!response.ok) {
      const body = await response.text();
      throw new Error(`Matrix API error (${response.status}): ${body}`);
    }

    return response.json() as Promise<T>;
  }

  /** Make an authenticated PUT request to the homeserver. */
  private async matrixPut<T>(path: string, body: unknown): Promise<T> {
    const url = new URL(path, this.config.homeserverUrl);

    const response = await fetch(url.toString(), {
      method: 'PUT',
      headers: {
        Authorization: `Bearer ${this.config.accessToken}`,
        'Content-Type': 'application/json',
      },
      body: JSON.stringify(body),
    });

    if (!response.ok) {
      const text = await response.text();
      throw new Error(`Matrix API error (${response.status}): ${text}`);
    }

    return response.json() as Promise<T>;
  }

  /** Send a text message to a room with idempotent txnId. */
  private async sendRoomMessage(roomId: string, text: string): Promise<void> {
    const txnId = `m${Date.now()}.${this.txnCounter++}`;
    await this.matrixPut(
      `/_matrix/client/v3/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${txnId}`,
      {
        msgtype: 'm.text',
        body: text,
      },
    );
  }

  /** Load the m.direct account data to populate dmRoomIds. */
  private async loadDirectRooms(): Promise<void> {
    try {
      const data = await this.matrixGet<Record<string, string[]>>(
        `/_matrix/client/v3/user/${encodeURIComponent(this.userId!)}/account_data/m.direct`,
      );
      this.dmRoomIds.clear();
      // m.direct maps userId → [roomId, ...]. Flatten all room IDs.
      for (const roomIds of Object.values(data)) {
        if (Array.isArray(roomIds)) {
          for (const roomId of roomIds) {
            this.dmRoomIds.add(roomId);
          }
        }
      }
    } catch {
      // m.direct may not exist if the bot has no DMs yet — that's fine.
      this.dmRoomIds.clear();
    }
  }

  /** Check if a room is a DM. */
  private isDM(roomId: string): boolean {
    return this.dmRoomIds.has(roomId);
  }

  // ── Private: Sync loop ──────────────────────────────────────────

  /** Run the /sync long-poll loop until aborted. */
  private async runSyncLoop(signal: AbortSignal): Promise<void> {
    // Build the sync filter to minimize bandwidth
    const filter = JSON.stringify({
      room: {
        timeline: { limit: 50 },
        state: { lazy_load_members: true },
      },
      presence: { not_types: ['*'] },
      account_data: { types: ['m.direct'] },
    });

    while (!signal.aborted) {
      try {
        const query: Record<string, string> = {
          timeout: String(this.config.syncTimeoutMs ?? 30000),
          filter,
        };
        if (this.syncToken) {
          query.since = this.syncToken;
        }

        const url = new URL('/_matrix/client/v3/sync', this.config.homeserverUrl);
        for (const [k, v] of Object.entries(query)) {
          url.searchParams.set(k, v);
        }

        const response = await fetch(url.toString(), {
          method: 'GET',
          headers: {
            Authorization: `Bearer ${this.config.accessToken}`,
          },
          signal,
        });

        if (!response.ok) {
          const body = await response.text();
          console.error(`Matrix sync error (${response.status}): ${body}`);
          // Back off on error
          await this.sleep(5000, signal);
          continue;
        }

        const syncResponse = await response.json() as MatrixSyncResponse;
        this.syncToken = syncResponse.next_batch;

        // Process sync response
        this.processSyncResponse(syncResponse);
      } catch (error) {
        if (signal.aborted) { return; }
        console.error('Matrix sync loop error:', error instanceof Error ? error.message : 'Unknown error');
        // Back off on error
        await this.sleep(5000, signal);
      }
    }
  }

  /** Process a /sync response — extract timeline events from joined rooms. */
  private processSyncResponse(sync: MatrixSyncResponse): void {
    if (!this.messageHandler) { return; }

    const joinedRooms = sync.rooms?.join;
    if (!joinedRooms) { return; }

    // Update m.direct from account_data if present
    const accountData = sync.account_data?.events;
    if (accountData) {
      for (const event of accountData) {
        if (event.type === 'm.direct' && event.content) {
          this.dmRoomIds.clear();
          for (const roomIds of Object.values(event.content as Record<string, string[]>)) {
            if (Array.isArray(roomIds)) {
              for (const roomId of roomIds) {
                this.dmRoomIds.add(roomId);
              }
            }
          }
        }
      }
    }

    for (const [roomId, roomData] of Object.entries(joinedRooms)) {
      const events = roomData.timeline?.events;
      if (!events) { continue; }

      for (const event of events) {
        this.handleTimelineEvent(roomId, event);
      }
    }
  }

  /** Handle a single timeline event from a joined room. */
  private handleTimelineEvent(roomId: string, event: MatrixEvent): void {
    if (!this.messageHandler) { return; }

    // Only process m.room.message events
    if (event.type !== 'm.room.message') { return; }

    // Ignore our own messages
    if (event.sender === this.userId) { return; }

    // Room allowlist check
    const allowedRoomIds = this.config.allowedRoomIds ?? [];
    if (allowedRoomIds.length > 0 && !allowedRoomIds.includes(roomId)) {
      // Pairing fallback
      const pm = this.config.pairingManager;
      if (pm?.enabled) {
        if (pm.isApproved('matrix', roomId)) {
          // Approved — fall through
        } else {
          const text = (event.content?.body ?? '').trim();
          if (text && pm.validateCode('matrix', roomId, text)) {
            this.sendRoomMessage(roomId, 'Pairing successful! You can now chat with Flynn.').catch(() => {});
          }
          return;
        }
      } else {
        return;
      }
    }

    const body = event.content?.body ?? '';
    const isDm = this.isDM(roomId);

    // Mention gating: require mention in non-DM rooms
    const requireMention = this.config.requireMention ?? true;
    if (!isDm && requireMention) {
      const isMentioned = this.isBotMentioned(body);
      if (!isMentioned) { return; }
    }

    // Strip bot mention from text
    let text = body;
    if (this.botDisplayName) {
      text = text.replace(new RegExp(`@?${this.escapeRegex(this.botDisplayName)}\\b`, 'gi'), '').trim();
    }
    if (this.userId) {
      text = text.replace(new RegExp(this.escapeRegex(this.userId), 'g'), '').trim();
    }

    // Detect reset command
    if (text === '!reset' || text === 'reset') {
      this.messageHandler({
        id: event.event_id,
        channel: 'matrix',
        senderId: roomId,
        senderName: this.extractDisplayName(event.sender),
        text: '!reset',
        timestamp: event.origin_server_ts,
        metadata: { isCommand: true, command: 'reset' },
      });
      return;
    }

    // Regular message
    this.messageHandler({
      id: event.event_id,
      channel: 'matrix',
      senderId: roomId,
      senderName: this.extractDisplayName(event.sender),
      text,
      timestamp: event.origin_server_ts,
    });
  }

  /** Check if the bot is mentioned in the message text. */
  private isBotMentioned(text: string): boolean {
    if (this.botDisplayName && text.toLowerCase().includes(this.botDisplayName.toLowerCase())) {
      return true;
    }
    if (this.userId && text.includes(this.userId)) {
      return true;
    }
    return false;
  }

  /** Extract a display name from a Matrix user ID. @user:server → user */
  private extractDisplayName(userId: string): string {
    const match = userId.match(/^@([^:]+)/);
    return match ? match[1] : userId;
  }

  /** Escape a string for use in a RegExp. */
  private escapeRegex(str: string): string {
    return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
  }

  /** Sleep with abort support. */
  private sleep(ms: number, signal: AbortSignal): Promise<void> {
    return new Promise((resolve) => {
      const timer = setTimeout(resolve, ms);
      signal.addEventListener('abort', () => {
        clearTimeout(timer);
        resolve();
      }, { once: true });
    });
  }
}

// ── Matrix API response types ──────────────────────────────────────

interface MatrixSyncResponse {
  next_batch: string;
  rooms?: {
    join?: Record<string, MatrixJoinedRoom>;
  };
  account_data?: {
    events?: Array<{ type: string; content?: unknown }>;
  };
}

interface MatrixJoinedRoom {
  timeline?: {
    events?: MatrixEvent[];
  };
}

interface MatrixEvent {
  type: string;
  event_id: string;
  sender: string;
  origin_server_ts: number;
  content?: {
    body?: string;
    msgtype?: string;
    [key: string]: unknown;
  };
}

Step 4: Run test to verify connect/disconnect pass

Run: pnpm test:run src/channels/matrix/adapter.test.ts Expected: PASS (4 tests)

Step 5: Create barrel export

Create src/channels/matrix/index.ts:

export { MatrixAdapter, type MatrixAdapterConfig } from './adapter.js';

Step 6: Commit

git add src/channels/matrix/
git commit -m "feat(matrix): add Matrix adapter core with connect/disconnect"

Task 3: Add Send Tests and Verify Send Works

Files:

  • Modify: src/channels/matrix/adapter.test.ts

Step 1: Add send tests to the test file

Append these tests inside the describe('MatrixAdapter') block:

  // ── send ──────────────────────────────────────────────────────

  it('send throws when not connected', async () => {
    await expect(adapter.send('!room1:example.org', { text: 'hello' })).rejects.toThrow(
      'Matrix adapter not connected',
    );
  });

  it('send delivers a message via PUT with txnId', async () => {
    // Connect first
    mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' }));
    mockFetch.mockResolvedValueOnce(mockResponse({}));
    mockFetch.mockResolvedValueOnce(new Promise(() => {})); // sync hangs
    await adapter.connect();

    // Send message
    mockFetch.mockResolvedValueOnce(mockResponse({ event_id: '$sent1' }));
    await adapter.send('!room1:example.org', { text: 'Hello there' });

    // Find the PUT call (skip whoami, m.direct, sync)
    const putCall = mockFetch.mock.calls.find(
      (call) => call[1]?.method === 'PUT',
    );
    expect(putCall).toBeDefined();
    expect(putCall![0]).toContain('/_matrix/client/v3/rooms/');
    expect(putCall![0]).toContain('/send/m.room.message/');

    const body = JSON.parse(putCall![1].body);
    expect(body.msgtype).toBe('m.text');
    expect(body.body).toBe('Hello there');
  });

  it('send skips empty text', async () => {
    mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' }));
    mockFetch.mockResolvedValueOnce(mockResponse({}));
    mockFetch.mockResolvedValueOnce(new Promise(() => {}));
    await adapter.connect();

    await adapter.send('!room1:example.org', { text: '' });

    // No PUT call should have been made
    const putCall = mockFetch.mock.calls.find(
      (call) => call[1]?.method === 'PUT',
    );
    expect(putCall).toBeUndefined();
  });

Step 2: Run tests

Run: pnpm test:run src/channels/matrix/adapter.test.ts Expected: PASS (7 tests)

Step 3: Commit

git add src/channels/matrix/adapter.test.ts
git commit -m "test(matrix): add send tests"

Task 4: Add Inbound Message Handling Tests

Files:

  • Modify: src/channels/matrix/adapter.test.ts

Step 1: Add a helper to connect the adapter and simulate sync

Add these helpers near the top of the test file (after mockFetchSequence):

/** Helper: connect the adapter with standard mocks. Returns a function to simulate sync. */
async function connectAdapter(adapter: MatrixAdapter, directRooms: Record<string, string[]> = {}) {
  // whoami
  mockFetch.mockResolvedValueOnce(mockResponse({ user_id: '@flynn:example.org' }));
  // m.direct
  mockFetch.mockResolvedValueOnce(mockResponse(directRooms));
  // sync long-poll — we'll control this manually
  let syncResolve: ((value: Response) => void) | null = null;
  mockFetch.mockResolvedValueOnce(
    new Promise<Response>((resolve) => { syncResolve = resolve; }),
  );
  await adapter.connect();

  return {
    /** Simulate a sync response arriving. */
    async emitSync(syncData: unknown) {
      if (syncResolve) {
        syncResolve(mockResponse(syncData));
        syncResolve = null;
      }
      // Allow the sync loop to process
      await new Promise((r) => setTimeout(r, 10));
      // Set up next sync to hang
      mockFetch.mockResolvedValueOnce(new Promise(() => {}));
    },
  };
}

/** Helper: create a Matrix timeline event. */
function createMatrixEvent(overrides: Record<string, unknown> = {}) {
  return {
    type: 'm.room.message',
    event_id: '$evt1',
    sender: '@alice:example.org',
    origin_server_ts: 1700000000000,
    content: {
      msgtype: 'm.text',
      body: 'Hello Flynn',
    },
    ...overrides,
  };
}

/** Helper: wrap events into a sync response. */
function createSyncResponse(roomId: string, events: unknown[], extras: Record<string, unknown> = {}) {
  return {
    next_batch: 's_next',
    rooms: {
      join: {
        [roomId]: {
          timeline: { events },
        },
      },
    },
    ...extras,
  };
}

Step 2: Add inbound message tests

Append inside the describe('MatrixAdapter') block:

  // ── Inbound message handling ────────────────────────────────────

  it('inbound message from allowed room with mention triggers handler', async () => {
    const handler = vi.fn();
    adapter.onMessage(handler);

    const { emitSync } = await connectAdapter(adapter);

    const event = createMatrixEvent({
      content: { msgtype: 'm.text', body: '@Flynn Hello there' },
    });
    await emitSync(createSyncResponse('!room1:example.org', [event]));

    expect(handler).toHaveBeenCalledTimes(1);
    const msg: InboundMessage = handler.mock.calls[0][0];
    expect(msg.channel).toBe('matrix');
    expect(msg.senderId).toBe('!room1:example.org');
    expect(msg.senderName).toBe('alice');
    expect(msg.text).toBe('Hello there');
    expect(msg.id).toBe('$evt1');
  });

  it('inbound message without mention is ignored when requireMention is true', async () => {
    const handler = vi.fn();
    adapter.onMessage(handler);

    const { emitSync } = await connectAdapter(adapter);

    const event = createMatrixEvent({
      content: { msgtype: 'm.text', body: 'Hello everyone' },
    });
    await emitSync(createSyncResponse('!room1:example.org', [event]));

    expect(handler).not.toHaveBeenCalled();
  });

  it('inbound message without mention is accepted when requireMention is false', async () => {
    const noMentionAdapter = new MatrixAdapter({
      ...baseConfig,
      requireMention: false,
    });
    const handler = vi.fn();
    noMentionAdapter.onMessage(handler);

    const { emitSync } = await connectAdapter(noMentionAdapter);

    const event = createMatrixEvent({
      content: { msgtype: 'm.text', body: 'Hello everyone' },
    });
    await emitSync(createSyncResponse('!room1:example.org', [event]));

    expect(handler).toHaveBeenCalledTimes(1);

    await noMentionAdapter.disconnect();
  });

  it('ignores own messages (sender === userId)', async () => {
    const handler = vi.fn();
    adapter.onMessage(handler);

    const { emitSync } = await connectAdapter(adapter);

    const event = createMatrixEvent({
      sender: '@flynn:example.org',
      content: { msgtype: 'm.text', body: 'My own message' },
    });
    await emitSync(createSyncResponse('!room1:example.org', [event]));

    expect(handler).not.toHaveBeenCalled();
  });

  it('ignores messages in disallowed rooms', async () => {
    const handler = vi.fn();
    adapter.onMessage(handler);

    const { emitSync } = await connectAdapter(adapter);

    const event = createMatrixEvent({
      content: { msgtype: 'm.text', body: '@Flynn hello' },
    });
    await emitSync(createSyncResponse('!notallowed:example.org', [event]));

    expect(handler).not.toHaveBeenCalled();
  });

  it('DM room bypasses mention requirement', async () => {
    const handler = vi.fn();
    adapter.onMessage(handler);

    // Mark !dm1 as a DM room via m.direct
    const directRooms = { '@alice:example.org': ['!dm1:example.org'] };
    const dmAdapter = new MatrixAdapter({
      ...baseConfig,
      allowedRoomIds: ['!dm1:example.org'],
    });
    dmAdapter.onMessage(handler);
    const { emitSync } = await connectAdapter(dmAdapter, directRooms);

    const event = createMatrixEvent({
      content: { msgtype: 'm.text', body: 'Hello without mention' },
    });
    await emitSync(createSyncResponse('!dm1:example.org', [event]));

    expect(handler).toHaveBeenCalledTimes(1);
    const msg: InboundMessage = handler.mock.calls[0][0];
    expect(msg.text).toBe('Hello without mention');

    await dmAdapter.disconnect();
  });

  it('!reset command delivers reset metadata', async () => {
    const noMentionAdapter = new MatrixAdapter({
      ...baseConfig,
      requireMention: false,
    });
    const handler = vi.fn();
    noMentionAdapter.onMessage(handler);

    const { emitSync } = await connectAdapter(noMentionAdapter);

    const event = createMatrixEvent({
      content: { msgtype: 'm.text', body: '!reset' },
    });
    await emitSync(createSyncResponse('!room1:example.org', [event]));

    expect(handler).toHaveBeenCalledTimes(1);
    const msg: InboundMessage = handler.mock.calls[0][0];
    expect(msg.text).toBe('!reset');
    expect(msg.metadata).toEqual({ isCommand: true, command: 'reset' });

    await noMentionAdapter.disconnect();
  });

  it('strips bot mention from message text', async () => {
    const handler = vi.fn();
    adapter.onMessage(handler);

    const { emitSync } = await connectAdapter(adapter);

    const event = createMatrixEvent({
      content: { msgtype: 'm.text', body: '@Flynn What is the weather?' },
    });
    await emitSync(createSyncResponse('!room1:example.org', [event]));

    expect(handler).toHaveBeenCalledTimes(1);
    const msg: InboundMessage = handler.mock.calls[0][0];
    expect(msg.text).toBe('What is the weather?');
  });

  it('ignores non-message events (e.g. m.room.member)', async () => {
    const handler = vi.fn();
    adapter.onMessage(handler);

    const { emitSync } = await connectAdapter(adapter);

    const event = createMatrixEvent({
      type: 'm.room.member',
      content: { membership: 'join' },
    });
    await emitSync(createSyncResponse('!room1:example.org', [event]));

    expect(handler).not.toHaveBeenCalled();
  });

  it('accepts all rooms when allowedRoomIds is empty', async () => {
    const openAdapter = new MatrixAdapter({
      ...baseConfig,
      allowedRoomIds: [],
      requireMention: false,
    });
    const handler = vi.fn();
    openAdapter.onMessage(handler);

    const { emitSync } = await connectAdapter(openAdapter);

    const event = createMatrixEvent({
      content: { msgtype: 'm.text', body: 'Hello from any room' },
    });
    await emitSync(createSyncResponse('!anyrandom:example.org', [event]));

    expect(handler).toHaveBeenCalledTimes(1);
    const msg: InboundMessage = handler.mock.calls[0][0];
    expect(msg.text).toBe('Hello from any room');

    await openAdapter.disconnect();
  });

  it('does nothing when no message handler is registered', async () => {
    // Don't call onMessage
    const { emitSync } = await connectAdapter(adapter);

    const event = createMatrixEvent({
      content: { msgtype: 'm.text', body: '@Flynn hello' },
    });

    // Should not throw
    await emitSync(createSyncResponse('!room1:example.org', [event]));
  });

  it('mention detection works with Matrix user ID', async () => {
    const handler = vi.fn();
    adapter.onMessage(handler);

    const { emitSync } = await connectAdapter(adapter);

    const event = createMatrixEvent({
      content: { msgtype: 'm.text', body: '@flynn:example.org can you help?' },
    });
    await emitSync(createSyncResponse('!room1:example.org', [event]));

    expect(handler).toHaveBeenCalledTimes(1);
    const msg: InboundMessage = handler.mock.calls[0][0];
    expect(msg.text).toBe('can you help?');
  });

Step 2: Run tests

Run: pnpm test:run src/channels/matrix/adapter.test.ts Expected: PASS (all ~19 tests)

Step 3: Commit

git add src/channels/matrix/adapter.test.ts
git commit -m "test(matrix): add inbound message handling and DM detection tests"

Task 5: Wire Matrix Adapter into the System

Files:

  • Modify: src/channels/index.ts
  • Modify: src/daemon/channels.ts

Step 1: Add Matrix exports to src/channels/index.ts

Add after the WhatsApp export line:

export { MatrixAdapter, type MatrixAdapterConfig } from './matrix/index.js';

Step 2: Register Matrix adapter in src/daemon/channels.ts

Add import at top:

import { ChannelRegistry, TelegramAdapter, WebChatAdapter, DiscordAdapter, SlackAdapter, WhatsAppAdapter, MatrixAdapter, PairingManager } from '../channels/index.js';

Add registration block after the WhatsApp block (around line 71):

  // Register Matrix adapter (if configured)
  if (config.matrix) {
    const matrixAdapter = new MatrixAdapter({
      homeserverUrl: config.matrix.homeserver_url,
      accessToken: config.matrix.access_token,
      allowedRoomIds: config.matrix.allowed_room_ids.length > 0 ? config.matrix.allowed_room_ids : undefined,
      requireMention: config.matrix.require_mention,
      syncTimeoutMs: config.matrix.sync_timeout_ms,
      displayName: config.matrix.display_name,
      pairingManager,
    });
    channelRegistry.register(matrixAdapter);
  }

Step 3: Run typecheck

Run: pnpm typecheck Expected: PASS

Step 4: Run all channel tests to ensure no regressions

Run: pnpm test:run src/channels/ Expected: All existing tests PASS

Step 5: Commit

git add src/channels/index.ts src/daemon/channels.ts
git commit -m "feat(matrix): wire Matrix adapter into channel registry and daemon"

Task 6: Add Config Schema Test

Files:

  • Modify: src/config/schema.test.ts (if exists, otherwise verify with existing tests)

Step 1: Check for existing config schema tests

Read src/config/schema.test.ts and add a Matrix config validation test.

it('validates matrix config', () => {
  const config = configSchema.parse({
    models: { default: { provider: 'anthropic', model: 'claude-sonnet-4-20250514' } },
    matrix: {
      homeserver_url: 'https://matrix.example.org',
      access_token: 'syt_test_token',
      allowed_room_ids: ['!room1:example.org'],
      require_mention: true,
    },
  });
  expect(config.matrix).toBeDefined();
  expect(config.matrix!.homeserver_url).toBe('https://matrix.example.org');
  expect(config.matrix!.require_mention).toBe(true);
  expect(config.matrix!.sync_timeout_ms).toBe(30000); // default
});

it('matrix config is optional', () => {
  const config = configSchema.parse({
    models: { default: { provider: 'anthropic', model: 'claude-sonnet-4-20250514' } },
  });
  expect(config.matrix).toBeUndefined();
});

it('matrix config rejects invalid homeserver URL', () => {
  expect(() => configSchema.parse({
    models: { default: { provider: 'anthropic', model: 'claude-sonnet-4-20250514' } },
    matrix: {
      homeserver_url: 'not-a-url',
      access_token: 'token',
    },
  })).toThrow();
});

Step 2: Run config schema tests

Run: pnpm test:run src/config/schema.test.ts Expected: PASS

Step 3: Commit

git add src/config/schema.test.ts
git commit -m "test(matrix): add config schema validation tests"

Task 7: Add Credential Redaction for Matrix Token

Files:

  • Modify: src/gateway/handlers/config.ts

Step 1: Add matrix access_token to redaction

In the redactConfig() function, add redaction for the Matrix access token alongside the existing channel token redactions:

// After the existing channel token redactions (e.g., discord.bot_token)
if (redacted.matrix) {
  redacted.matrix = { ...redacted.matrix, access_token: REDACTED };
}

Step 2: Run tests

Run: pnpm test:run src/gateway/handlers/handlers.test.ts Expected: PASS

Step 3: Commit

git add src/gateway/handlers/config.ts
git commit -m "feat(matrix): add access_token to config redaction"

Task 8: Final Verification

Step 1: Run full typecheck

Run: pnpm typecheck Expected: PASS

Step 2: Run all tests

Run: pnpm test:run Expected: All tests PASS

Step 3: Commit any remaining changes

git add -A
git commit -m "feat(matrix): complete Matrix channel adapter implementation"

Testing Summary

Test Category Count Description
Basic properties 2 name = "matrix", starts disconnected
connect/disconnect 4 whoami resolution, error handling, clean disconnect, safe disconnect when not connected
send 3 throws when disconnected, PUT with txnId, skips empty text
Inbound messages 12 mention gating, DM bypass, own message filtering, room allowlist, reset command, mention stripping, MXID mention, non-message events, open rooms, no handler
Config schema 3 valid config, optional, invalid URL rejection
Total ~24

Error Handling Strategy

Error Recovery
/account/whoami 401 Throw on connect — adapter stays in error state
/sync HTTP error Log, back off 5s, retry
/sync network error Log, back off 5s, retry
/send failure Throw — caller (registry) logs the error
m.direct 404 Silently ignore — no DMs known yet
AbortError during sync Clean exit from loop

Sync Loop Lifecycle

connect()
  ├── GET /account/whoami → resolve userId
  ├── GET /user/{userId}/account_data/m.direct → populate dmRoomIds
  └── runSyncLoop() (fire-and-forget async)
        ├── GET /sync?timeout=30000 (first call, no since token)
        │     └── next_batch → save as syncToken
        │     └── rooms.join → process timeline events
        │     └── account_data → update dmRoomIds
        ├── GET /sync?timeout=30000&since=... (subsequent calls)
        │     └── (repeat)
        └── (on error) → sleep 5s → retry

disconnect()
  └── AbortController.abort() → sync loop exits

Risks and Mitigations

Risk Mitigation
Initial sync floods events from history First sync may replay old events. The adapter could skip the first sync's events (only use it to get next_batch). However, this is how other adapters work (Telegram's grammy does NOT replay), so we process all events from the first sync. If this is a problem, a skip_initial_sync config flag can be added later.
Sync loop leak on unhandled error try/catch wraps the entire loop body with exponential backoff. AbortController ensures clean shutdown.
Bot invited to many rooms, only some allowed Room allowlist (allowed_room_ids) prevents processing. If empty, all joined rooms are processed.
Matrix homeserver rate limiting The sync loop naturally rate-limits (one request at a time with 30s timeout). Send calls are user-triggered and unlikely to hit limits.
Access token expiry Matrix access tokens are typically long-lived. If expired, whoami will fail on connect. Short-lived tokens (refresh tokens) are a future enhancement.

Open Questions

  1. Skip initial sync? — Should the adapter skip processing events from the very first /sync response (which may contain old history)? Current design processes them all. Can add a skip_initial_sync: true config flag if needed.

  2. Typing indicators? — Matrix supports m.typing events. Should the adapter send a typing indicator while processing? Deferred for initial implementation (other adapters do this inconsistently — Telegram yes, Discord yes, Slack no).

  3. Image/media attachments? — Matrix supports m.image, m.file, m.audio msgtypes. The initial implementation only handles m.text. Media support can be added in a follow-up (download via /_matrix/media/v3/download/{serverName}/{mediaId}).

  4. Formatted messages? — Matrix supports format: "org.matrix.custom.html" with formatted_body. Should outbound messages include HTML? Deferred — plain text is sufficient for initial implementation.


Files Summary

Action File
Modify src/config/schema.ts — add matrixSchema + type export
Create src/channels/matrix/adapter.ts — full adapter implementation
Create src/channels/matrix/adapter.test.ts — 24 tests
Create src/channels/matrix/index.ts — barrel export
Modify src/channels/index.ts — re-export Matrix adapter
Modify src/daemon/channels.ts — register Matrix adapter
Modify src/gateway/handlers/config.ts — redact access_token
Modify src/config/schema.test.ts — config validation tests