Files
flynn/docs/plans/2026-02-07-tier2-implementation-plan.md
William Valentin b50c140d25 feat: add Docker support and inbound webhooks (Tier 2)
- Dockerfile: multi-stage build (node:22-alpine), better-sqlite3 native deps handled
- .dockerignore + docker-compose.yml for deployment
- FLYNN_DATA_DIR env var support in daemon, CLI, and TUI
- WebhookHandler: ChannelAdapter for HTTP POST /webhooks/:name
- Per-webhook HMAC auth, template rendering ({{body}}, {{json.field}})
- Config schema: automation.webhooks array with name/secret/message/output
- Gateway routes webhook requests before static files (bypasses gateway auth)
- 23 new tests for webhook functionality, 874 total tests passing
2026-02-07 14:36:05 -08:00

40 KiB

Tier 2 Implementation Plan

Date: 2026-02-07 Features: Inbound Webhooks, Dockerfile, Heartbeat, Vector Memory Search Status: Planning


1. Inbound Webhooks

Complexity: Medium

Overview

HTTP endpoint (POST /webhooks/:name) that triggers agent processing, following the same pattern as CronScheduler: implements ChannelAdapter, produces InboundMessage, routes responses to a configured output channel.

Config Schema Additions

Add to src/config/schema.ts:

const webhookSchema = z.object({
  name: z.string().min(1, 'Webhook name is required'),
  /** Optional per-webhook HMAC secret for payload signature verification. */
  secret: z.string().optional(),
  /** Message template — use {{body}} for raw body, {{json.fieldName}} for JSON fields. */
  message: z.string().default('{{body}}'),
  output: z.object({
    channel: z.string().min(1),
    peer: z.string().min(1),
  }),
  enabled: z.boolean().default(true),
});

Extend automationSchema:

const automationSchema = z.object({
  cron: z.array(cronJobSchema).default([]),
  webhooks: z.array(webhookSchema).default([]),
}).default({});

Add type export:

export type WebhookConfig = z.infer<typeof webhookSchema>;

Files to Create

File Purpose
src/automation/webhooks.ts WebhookHandler class implementing ChannelAdapter
src/automation/webhooks.test.ts Unit tests

Files to Modify

File Change
src/config/schema.ts Add webhookSchema, extend automationSchema, add type export
src/automation/index.ts Export WebhookHandler
src/gateway/server.ts Route POST /webhooks/:name to the WebhookHandler before static/404
src/daemon/index.ts Instantiate WebhookHandler, register with ChannelRegistry, wire to gateway

Key Class: WebhookHandler

// src/automation/webhooks.ts

import { createHmac, timingSafeEqual } from 'crypto';
import type { IncomingMessage, ServerResponse } from 'http';
import type { ChannelAdapter, ChannelStatus, InboundMessage, OutboundMessage } from '../channels/types.js';
import type { WebhookConfig } from '../config/schema.js';

/** Minimal interface for the parts of ChannelRegistry we need. */
interface ChannelLookup {
  get(name: string): { send(peerId: string, message: OutboundMessage): Promise<void> } | undefined;
}

export class WebhookHandler implements ChannelAdapter {
  readonly name = 'webhook';
  private _status: ChannelStatus = 'disconnected';
  private messageHandler?: (msg: InboundMessage) => void;
  private webhooks: Map<string, WebhookConfig> = new Map();

  constructor(
    private readonly webhookConfigs: WebhookConfig[],
    private readonly channelLookup: ChannelLookup,
  ) {
    for (const wh of webhookConfigs) {
      this.webhooks.set(wh.name, wh);
    }
  }

  // ChannelAdapter interface methods...
  get status(): ChannelStatus { return this._status; }
  async connect(): Promise<void> { this._status = 'connected'; }
  async disconnect(): Promise<void> { this._status = 'disconnected'; }
  onMessage(handler: (msg: InboundMessage) => void): void { this.messageHandler = handler; }

  async send(peerId: string, message: OutboundMessage): Promise<void> {
    // Route to output channel (same pattern as CronScheduler.send)
    const webhook = this.webhooks.get(peerId);
    if (!webhook) return;
    const outputAdapter = this.channelLookup.get(webhook.output.channel);
    if (!outputAdapter) return;
    await outputAdapter.send(webhook.output.peer, message);
  }

  /**
   * Handle incoming HTTP request. Called by the gateway HTTP handler.
   * Returns true if the request was handled (webhook found), false otherwise.
   */
  async handleRequest(webhookName: string, req: IncomingMessage, res: ServerResponse): Promise<boolean> {
    const webhook = this.webhooks.get(webhookName);
    if (!webhook || !webhook.enabled) {
      return false; // Not found, let gateway return 404
    }

    // Read body
    const body = await readBody(req);

    // Verify HMAC signature if secret configured
    if (webhook.secret) {
      const signature = req.headers['x-webhook-signature'] as string | undefined;
      if (!verifyHmac(body, webhook.secret, signature)) {
        res.writeHead(401, { 'Content-Type': 'application/json' });
        res.end(JSON.stringify({ error: 'Invalid signature' }));
        return true;
      }
    }

    // Build message text from template
    const messageText = renderTemplate(webhook.message, body);

    // Produce InboundMessage
    const msg: InboundMessage = {
      id: `webhook-${webhookName}-${Date.now()}`,
      channel: 'webhook',
      senderId: webhookName,
      senderName: `webhook:${webhookName}`,
      text: messageText,
      timestamp: Date.now(),
      metadata: { webhookName, contentType: req.headers['content-type'] },
    };

    this.messageHandler?.(msg);

    res.writeHead(202, { 'Content-Type': 'application/json' });
    res.end(JSON.stringify({ accepted: true, id: msg.id }));
    return true;
  }

  getWebhookNames(): string[] {
    return Array.from(this.webhooks.keys());
  }
}

Integration in Gateway HTTP Handler

In GatewayServer.handleHttpRequest(), add route matching before the static file serving:

// Check for webhook routes: POST /webhooks/:name
if (req.method === 'POST' && req.url) {
  const match = req.url.match(/^\/webhooks\/([a-zA-Z0-9_-]+)/);
  if (match && this.webhookHandler) {
    const handled = await this.webhookHandler.handleRequest(match[1], req, res);
    if (handled) return;
  }
}

The webhookHandler reference is passed into GatewayServerConfig as an optional field, similar to how channelRegistry is passed.

Integration in Daemon Startup

In src/daemon/index.ts, after the cron scheduler block (~line 812-817):

// Register webhook handler (if any webhooks configured)
let webhookHandler: WebhookHandler | undefined;
if (config.automation.webhooks.length > 0) {
  webhookHandler = new WebhookHandler(config.automation.webhooks, channelRegistry);
  channelRegistry.register(webhookHandler);
  console.log(`Registered ${config.automation.webhooks.length} webhook(s)`);
}

Then pass webhookHandler to the GatewayServer constructor.

HMAC Verification

Standard approach: X-Webhook-Signature: sha256=<hex> header, verified via crypto.createHmac('sha256', secret).update(body).digest('hex') with timing-safe comparison.

Also support gateway token auth as an alternative (already handled by the gateway auth middleware for HTTP).

Template Rendering

Simple template engine for the message field:

  • {{body}} — raw request body string
  • {{json.fieldName}} — extracted JSON field (dot notation for nested)
  • Default: {{body}} if no template specified

Test Plan

src/automation/webhooks.test.ts:

  • Implements ChannelAdapter interface (name, status)
  • handleRequest() produces InboundMessage with correct fields
  • HMAC verification rejects invalid signatures
  • HMAC verification passes valid signatures
  • Returns false for unknown webhook names
  • Returns false for disabled webhooks
  • send() routes to output channel (same as cron test pattern)
  • Template rendering with {{body}} and {{json.field}}
  • getWebhookNames() lists all webhook names

Config Example

automation:
  webhooks:
    - name: github-push
      secret: "whsec_..."
      message: "GitHub push to {{json.repository.full_name}}: {{json.head_commit.message}}"
      output:
        channel: telegram
        peer: "123456"
    - name: alertmanager
      message: "Alert: {{json.alerts.0.annotations.summary}}"
      output:
        channel: telegram
        peer: "123456"

2. Dockerfile

Complexity: Simple

Overview

Multi-stage Docker build for the Flynn daemon. No TypeScript source code changes required — this is purely build/deployment infrastructure.

Files to Create

File Purpose
Dockerfile Multi-stage build
.dockerignore Exclude dev artifacts
docker-compose.yml Example deployment with volumes

Dockerfile Design

# ── Stage 1: Build ──────────────────────────────────────────────
FROM node:22-alpine AS builder

# Install pnpm
RUN corepack enable && corepack prepare pnpm@latest --activate

WORKDIR /app

# Install dependencies first (layer caching)
COPY package.json pnpm-lock.yaml ./
RUN pnpm install --frozen-lockfile

# Copy source and build
COPY tsconfig.json ./
COPY src/ src/

# better-sqlite3 needs build tools for native compilation
RUN apk add --no-cache python3 make g++ && \
    pnpm build

# ── Stage 2: Runtime ────────────────────────────────────────────
FROM node:22-alpine AS runtime

RUN corepack enable && corepack prepare pnpm@latest --activate

WORKDIR /app

# Copy package files and install production deps only
COPY package.json pnpm-lock.yaml ./
RUN apk add --no-cache python3 make g++ && \
    pnpm install --frozen-lockfile --prod && \
    apk del python3 make g++

# Copy compiled output from builder
COPY --from=builder /app/dist dist/

# Copy gateway UI assets (if they exist)
COPY --from=builder /app/src/gateway/ui dist/gateway/ui/

# Copy prompt templates and bundled skills
COPY prompts/ prompts/
COPY skills/ skills/

# Create data directories
RUN mkdir -p /data/memory /data/sessions /config

# Runtime config
ENV NODE_ENV=production
ENV FLYNN_CONFIG=/config/config.yaml
ENV FLYNN_DATA_DIR=/data

EXPOSE 18800

# Health check
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
  CMD wget -qO- http://localhost:18800/ || exit 1

ENTRYPOINT ["node", "dist/cli/index.js"]
CMD ["start"]

Key Considerations

better-sqlite3 native deps: This is the trickiest part. better-sqlite3 requires native compilation. Options:

  1. Install build tools (python3, make, g++) in the runtime stage, compile, then remove — this is the safest approach and keeps the image functional.
  2. Use --build-from-source flag and carry the built .node binary from the builder stage. This requires matching the Node.js version and Alpine version exactly between stages (which we do since both use node:22-alpine).
  3. Alternatively, use better-sqlite3's prebuilt binaries if they ship for Alpine/musl — check at build time.

Recommended approach: Option 2 — copy node_modules from builder stage since both stages use identical base image:

# In runtime stage, instead of separate pnpm install:
COPY --from=builder /app/node_modules node_modules/

This eliminates the need for build tools in the runtime image entirely and produces a smaller image.

.dockerignore

node_modules/
dist/
.git/
.gitignore
*.log
.env
.env.*
.worktrees/
docs/
src/**/*.test.ts
vitest.config.*
eslint.config.*
tsconfig.json
*.md
!README.md

docker-compose.yml

version: '3.8'
services:
  flynn:
    build: .
    image: flynn:latest
    container_name: flynn
    restart: unless-stopped
    ports:
      - "18800:18800"
    volumes:
      - ./config.yaml:/config/config.yaml:ro
      - flynn-data:/data
      - flynn-memory:/data/memory
    environment:
      - FLYNN_CONFIG=/config/config.yaml
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      # Add other provider keys as needed
    healthcheck:
      test: ["CMD", "wget", "-qO-", "http://localhost:18800/"]
      interval: 30s
      timeout: 5s
      retries: 3

volumes:
  flynn-data:
  flynn-memory:

Daemon Changes for Docker

The daemon currently defaults data dir to ~/.local/share/flynn. For Docker, we need to respect an environment variable override. Check if src/daemon/index.ts already handles this.

Looking at line 525: const dataDir = resolve(homedir(), '.local/share/flynn');

Modification needed in src/daemon/index.ts:

// Support FLYNN_DATA_DIR env var for Docker deployments
const dataDir = process.env.FLYNN_DATA_DIR ?? resolve(homedir(), '.local/share/flynn');

Similarly, config loading in src/config/loader.ts should respect FLYNN_CONFIG env var if not already.

Files to Modify

File Change
src/daemon/index.ts Support FLYNN_DATA_DIR environment variable
src/config/loader.ts Support FLYNN_CONFIG environment variable (verify)

Test Plan

No unit tests needed (Docker build is tested by building). Integration test:

  • docker build -t flynn:test .
  • docker run --rm flynn:test --help (verify CLI works)
  • docker compose up -d with test config (verify startup)

3. Heartbeat

Complexity: Medium

Overview

Periodic self-check system that validates daemon health and optionally notifies a configured channel on failure. Fits naturally as an extension to the src/automation/ module, alongside cron.

Config Schema Additions

Add to src/config/schema.ts:

const heartbeatCheckSchema = z.enum([
  'gateway',           // Is the HTTP/WS server responsive?
  'model',             // Can we reach the default model provider?
  'channels',          // Are all channel adapters connected?
  'memory',            // Is the memory store readable/writable?
  'disk',              // Is disk space above threshold?
]);

const heartbeatSchema = z.object({
  enabled: z.boolean().default(false),
  /** Check interval (e.g. '60s', '5m', '1h'). Default: '5m'. */
  interval: z.string().default('5m'),
  /** Which checks to run. Default: all. */
  checks: z.array(heartbeatCheckSchema).default(['gateway', 'model', 'channels', 'memory', 'disk']),
  /** Optional notification on failure. */
  notify: z.object({
    channel: z.string().min(1),
    peer: z.string().min(1),
  }).optional(),
  /** Number of consecutive failures before notifying. Default: 2. */
  failure_threshold: z.number().min(1).max(10).default(2),
  /** Disk space warning threshold in MB. Default: 100. */
  disk_threshold_mb: z.number().min(10).default(100),
}).default({});

Extend automationSchema:

const automationSchema = z.object({
  cron: z.array(cronJobSchema).default([]),
  webhooks: z.array(webhookSchema).default([]),
  heartbeat: heartbeatSchema,
}).default({});

Add type export:

export type HeartbeatConfig = z.infer<typeof heartbeatSchema>;

Files to Create

File Purpose
src/automation/heartbeat.ts HeartbeatMonitor class
src/automation/heartbeat.test.ts Unit tests

Files to Modify

File Change
src/config/schema.ts Add heartbeatSchema, extend automationSchema, add type export
src/automation/index.ts Export HeartbeatMonitor
src/daemon/index.ts Instantiate HeartbeatMonitor, register shutdown handler

Key Class: HeartbeatMonitor

// src/automation/heartbeat.ts

import { statfsSync } from 'fs';
import type { HeartbeatConfig } from '../config/schema.js';
import type { ChannelAdapter, ChannelStatus, OutboundMessage } from '../channels/types.js';
import type { ModelRouter } from '../models/router.js';

export interface HeartbeatCheckResult {
  name: string;
  healthy: boolean;
  message: string;
  durationMs: number;
}

export interface HeartbeatResult {
  timestamp: number;
  healthy: boolean;
  checks: HeartbeatCheckResult[];
}

interface HeartbeatDeps {
  config: HeartbeatConfig;
  /** Get the gateway HTTP server for health probing. */
  getGatewayPort?: () => number;
  /** Model router for provider health check. */
  modelRouter?: ModelRouter;
  /** Channel registry for adapter status checks. */
  getChannels?: () => Array<{ name: string; status: ChannelStatus }>;
  /** Memory store dir for read/write check. */
  memoryDir?: string;
  /** Data dir for disk space check. */
  dataDir?: string;
  /** Send notification to a channel. */
  notify?: (channel: string, peer: string, message: OutboundMessage) => Promise<void>;
}

export class HeartbeatMonitor {
  private timer?: ReturnType<typeof setInterval>;
  private consecutiveFailures = 0;
  private lastResult?: HeartbeatResult;

  constructor(private readonly deps: HeartbeatDeps) {}

  start(): void {
    if (!this.deps.config.enabled) return;
    const intervalMs = parseInterval(this.deps.config.interval);
    this.timer = setInterval(() => this.runChecks(), intervalMs);
    // Run first check after a short delay (let services finish starting)
    setTimeout(() => this.runChecks(), 5000);
  }

  stop(): void {
    if (this.timer) {
      clearInterval(this.timer);
      this.timer = undefined;
    }
  }

  /** Run all configured checks and optionally notify on failure. */
  async runChecks(): Promise<HeartbeatResult> {
    const checks: HeartbeatCheckResult[] = [];
    const enabledChecks = this.deps.config.checks;

    for (const checkName of enabledChecks) {
      const start = Date.now();
      try {
        const result = await this.runSingleCheck(checkName);
        checks.push({ ...result, durationMs: Date.now() - start });
      } catch (error) {
        checks.push({
          name: checkName,
          healthy: false,
          message: error instanceof Error ? error.message : String(error),
          durationMs: Date.now() - start,
        });
      }
    }

    const healthy = checks.every(c => c.healthy);
    const result: HeartbeatResult = { timestamp: Date.now(), healthy, checks };
    this.lastResult = result;

    if (!healthy) {
      this.consecutiveFailures++;
      if (this.consecutiveFailures >= this.deps.config.failure_threshold) {
        await this.sendNotification(result);
      }
    } else {
      // Reset on recovery
      if (this.consecutiveFailures >= this.deps.config.failure_threshold) {
        await this.sendRecoveryNotification(result);
      }
      this.consecutiveFailures = 0;
    }

    return result;
  }

  getLastResult(): HeartbeatResult | undefined {
    return this.lastResult;
  }

  private async runSingleCheck(name: string): Promise<Omit<HeartbeatCheckResult, 'durationMs'>> {
    switch (name) {
      case 'gateway':   return this.checkGateway();
      case 'model':     return this.checkModel();
      case 'channels':  return this.checkChannels();
      case 'memory':    return this.checkMemory();
      case 'disk':      return this.checkDisk();
      default:          return { name, healthy: false, message: `Unknown check: ${name}` };
    }
  }

  // Individual check implementations...
}

Individual Check Implementations

  1. Gateway check: HTTP GET to http://localhost:<port>/ — expects non-error response.
  2. Model check: Call modelRouter.healthCheck() or attempt a minimal completion with max_tokens=1. Note: ModelRouter doesn't currently have a health check method, so we'd add a lightweight one that just verifies the API key/connection. Alternatively, use a simpler approach: verify the model client can be instantiated without errors, or do a models.list() API call for providers that support it.
  3. Channels check: Iterate registered channels, verify all have status === 'connected'.
  4. Memory check: Attempt store.read('global') and verify no exceptions.
  5. Disk check: Use statfsSync() on the data directory, calculate free space in MB, compare to threshold.

Notification Format

⚠️ Flynn Heartbeat Failure

Failed checks:
- gateway: Connection refused on port 18800
- model: API key invalid (401)

Consecutive failures: 3/2 (threshold)

And on recovery:

✅ Flynn Heartbeat Recovery

All checks passing after 3 consecutive failures.

Integration in Daemon Startup

In src/daemon/index.ts, after all services are started (after gateway.start()):

// Initialize heartbeat monitor (if enabled)
let heartbeatMonitor: HeartbeatMonitor | undefined;
if (config.automation.heartbeat?.enabled) {
  heartbeatMonitor = new HeartbeatMonitor({
    config: config.automation.heartbeat,
    getGatewayPort: () => config.server.port,
    modelRouter,
    getChannels: () => channelRegistry.list().map(a => ({ name: a.name, status: a.status })),
    memoryDir: memoryDir,
    dataDir,
    notify: config.automation.heartbeat.notify
      ? async (channel, peer, message) => {
          const adapter = channelRegistry.get(channel);
          if (adapter) await adapter.send(peer, message);
        }
      : undefined,
  });
  heartbeatMonitor.start();
  console.log(`Heartbeat monitor started (interval: ${config.automation.heartbeat.interval})`);

  lifecycle.onShutdown(async () => {
    heartbeatMonitor!.stop();
    console.log('Heartbeat monitor stopped');
  });
}

Interval Parsing

Reuse or create a simple parser (similar to parseDuration in src/session/index.ts):

function parseInterval(str: string): number {
  const match = str.match(/^(\d+)(s|m|h)$/);
  if (!match) return 300_000; // default 5 minutes
  const [, value, unit] = match;
  const multipliers: Record<string, number> = { s: 1000, m: 60_000, h: 3_600_000 };
  return parseInt(value) * multipliers[unit];
}

Exposing via Gateway (Optional Enhancement)

Add a system.heartbeat handler to the gateway WS handlers that returns heartbeatMonitor.getLastResult(). This lets the web UI or CLI display health status. This is a nice-to-have and can be done as a follow-up.

Test Plan

src/automation/heartbeat.test.ts:

  • start() does nothing when enabled: false
  • runChecks() runs all configured checks
  • runChecks() returns healthy=true when all checks pass
  • runChecks() returns healthy=false when any check fails
  • Notification sent after failure_threshold consecutive failures
  • Recovery notification sent when checks pass after failures
  • stop() clears the timer
  • getLastResult() returns most recent result
  • Individual check tests:
    • checkChannels() detects disconnected adapters
    • checkDisk() detects low disk space
    • checkMemory() handles missing memory dir
  • parseInterval() correctly parses '60s', '5m', '1h'

Config Example

automation:
  heartbeat:
    enabled: true
    interval: "5m"
    checks: [gateway, model, channels, memory, disk]
    notify:
      channel: telegram
      peer: "123456"
    failure_threshold: 2
    disk_threshold_mb: 100

Complexity: Complex

Overview

Add embedding-based semantic search to the existing MemoryStore, enabling hybrid search (vector similarity + keyword). Provider-agnostic embedding generation with graceful fallback to keyword-only search when embeddings are unavailable.

Architecture Decision: Storage Backend

Options considered:

  1. SQLite with manual vector math — Store embeddings as BLOBs in the existing better-sqlite3 database, compute cosine similarity in JS. Simple, no new deps.
  2. SQLite with sqlite-vec extension — Native vector similarity in SQL. Requires native extension compilation, complicates Docker build.
  3. Separate HNSW index (hnswlib-node) — Purpose-built ANN index. Fast for large collections but adds a native dep.
  4. In-memory float arrays — Store embeddings as JSON files alongside markdown, load into memory, compute similarity in JS.

Recommendation: Option 1 (SQLite with manual vector math). Rationale:

  • better-sqlite3 is already a dependency
  • Memory collections are small enough (hundreds to low thousands of chunks) that brute-force cosine similarity in JS is fast enough (<10ms for 1000 vectors)
  • No new native dependencies
  • Embeddings stored in existing data directory pattern
  • Clean upgrade path to sqlite-vec if performance becomes an issue

Config Schema Additions

Extend memorySchema in src/config/schema.ts:

const embeddingProviderSchema = z.enum(['openai', 'gemini', 'ollama', 'llamacpp']);

const embeddingSchema = z.object({
  enabled: z.boolean().default(false),
  provider: embeddingProviderSchema.default('openai'),
  model: z.string().default('text-embedding-3-small'),
  /** Provider endpoint (required for ollama/llamacpp). */
  endpoint: z.string().optional(),
  /** API key (required for openai/gemini). */
  api_key: z.string().optional(),
  /** Embedding vector dimensions. Auto-detected from model if not set. */
  dimensions: z.number().optional(),
  /** Max tokens per chunk for embedding. Default: 512. */
  chunk_size: z.number().min(64).max(8192).default(512),
  /** Overlap between chunks in tokens. Default: 50. */
  chunk_overlap: z.number().min(0).max(1024).default(50),
  /** Number of top results to return from vector search. Default: 5. */
  top_k: z.number().min(1).max(50).default(5),
  /** Weight for vector results vs keyword results in hybrid scoring (0.0 = keyword only, 1.0 = vector only). Default: 0.7. */
  hybrid_weight: z.number().min(0).max(1).default(0.7),
}).default({});

const memorySchema = z.object({
  enabled: z.boolean().default(true),
  dir: z.string().optional(),
  auto_extract: z.boolean().default(true),
  max_context_tokens: z.number().min(100).max(10000).default(2000),
  embedding: embeddingSchema,
}).default({});

Add type exports:

export type EmbeddingConfig = z.infer<typeof embeddingSchema>;

Files to Create

File Purpose
src/memory/embeddings.ts EmbeddingProvider interface + provider implementations
src/memory/embeddings.test.ts Unit tests for embedding providers
src/memory/vector-store.ts VectorStore class — SQLite-backed vector storage
src/memory/vector-store.test.ts Unit tests for vector store
src/memory/chunker.ts Text chunking utility
src/memory/chunker.test.ts Unit tests for chunker
src/memory/hybrid-search.ts HybridSearch class — combines vector + keyword
src/memory/hybrid-search.test.ts Unit tests for hybrid search

Files to Modify

File Change
src/config/schema.ts Add embeddingSchema, extend memorySchema, add type export
src/memory/store.ts Add searchHybrid() method, accept optional VectorStore in constructor
src/memory/index.ts Export new modules
src/tools/builtin/memory-search.ts Use searchHybrid() when available, fall back to search()
src/daemon/index.ts Initialize EmbeddingProvider, VectorStore, wire into MemoryStore

Key Interfaces and Classes

EmbeddingProvider

// src/memory/embeddings.ts

export interface EmbeddingProvider {
  /** Generate embeddings for one or more text chunks. */
  embed(texts: string[]): Promise<number[][]>;
  /** Embedding vector dimensions. */
  readonly dimensions: number;
}

export class OpenAIEmbeddingProvider implements EmbeddingProvider {
  // Uses openai SDK (already a dep) with embeddings.create()
  readonly dimensions: number;
  constructor(config: { apiKey?: string; model: string; dimensions?: number }) { ... }
  async embed(texts: string[]): Promise<number[][]> { ... }
}

export class GeminiEmbeddingProvider implements EmbeddingProvider {
  // Uses @google/generative-ai SDK (already a dep) with embedContent()
  readonly dimensions: number;
  constructor(config: { apiKey?: string; model: string }) { ... }
  async embed(texts: string[]): Promise<number[][]> { ... }
}

export class OllamaEmbeddingProvider implements EmbeddingProvider {
  // Uses ollama SDK (already a dep) with embeddings()
  readonly dimensions: number;
  constructor(config: { host?: string; model: string }) { ... }
  async embed(texts: string[]): Promise<number[][]> { ... }
}

export class LlamaCppEmbeddingProvider implements EmbeddingProvider {
  // HTTP POST to /embedding endpoint
  readonly dimensions: number;
  constructor(config: { endpoint: string; model?: string }) { ... }
  async embed(texts: string[]): Promise<number[][]> { ... }
}

export function createEmbeddingProvider(config: EmbeddingConfig): EmbeddingProvider { ... }

Text Chunker

// src/memory/chunker.ts

export interface Chunk {
  /** The text content of this chunk. */
  text: string;
  /** Source namespace. */
  namespace: string;
  /** Starting line number in the source file (1-based). */
  startLine: number;
  /** Ending line number in the source file (1-based). */
  endLine: number;
}

/**
 * Split memory content into overlapping chunks suitable for embedding.
 *
 * Strategy: split on paragraph boundaries (double newline), then merge
 * small paragraphs to reach target chunk size, with configurable overlap.
 */
export function chunkText(
  content: string,
  namespace: string,
  options: { chunkSize: number; chunkOverlap: number },
): Chunk[] { ... }

VectorStore

// src/memory/vector-store.ts

import Database from 'better-sqlite3';

export interface VectorSearchResult {
  namespace: string;
  chunkText: string;
  startLine: number;
  endLine: number;
  similarity: number;
}

export class VectorStore {
  private db: Database.Database;

  constructor(dbPath: string) {
    this.db = new Database(dbPath);
    this.initSchema();
  }

  private initSchema(): void {
    this.db.exec(`
      CREATE TABLE IF NOT EXISTS embeddings (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        namespace TEXT NOT NULL,
        chunk_text TEXT NOT NULL,
        start_line INTEGER NOT NULL,
        end_line INTEGER NOT NULL,
        embedding BLOB NOT NULL,
        created_at INTEGER NOT NULL DEFAULT (unixepoch()),
        content_hash TEXT NOT NULL
      );
      CREATE INDEX IF NOT EXISTS idx_embeddings_namespace ON embeddings(namespace);
      CREATE INDEX IF NOT EXISTS idx_embeddings_hash ON embeddings(content_hash);
    `);
  }

  /** Store embedding vectors for a set of chunks. */
  upsertChunks(chunks: Array<{ namespace: string; text: string; startLine: number; endLine: number; embedding: number[]; contentHash: string }>): void { ... }

  /** Delete all embeddings for a namespace (for re-indexing). */
  deleteNamespace(namespace: string): void { ... }

  /** Check if a namespace+hash combination already exists (skip re-embedding). */
  hasContentHash(namespace: string, hash: string): boolean { ... }

  /**
   * Search for similar vectors using cosine similarity.
   * Computed in JS — fine for <10k vectors.
   */
  search(queryEmbedding: number[], topK: number): VectorSearchResult[] {
    const allRows = this.db.prepare('SELECT * FROM embeddings').all();
    // Compute cosine similarity for each, sort, take topK
    ...
  }

  close(): void {
    this.db.close();
  }
}

The embedding is stored as a BLOB (Float32Array buffer). Cosine similarity:

function cosineSimilarity(a: number[], b: number[]): number {
  let dotProduct = 0;
  let normA = 0;
  let normB = 0;
  for (let i = 0; i < a.length; i++) {
    dotProduct += a[i] * b[i];
    normA += a[i] * a[i];
    normB += b[i] * b[i];
  }
  return dotProduct / (Math.sqrt(normA) * Math.sqrt(normB));
}

HybridSearch

// src/memory/hybrid-search.ts

import type { SearchResult } from './store.js';
import type { VectorSearchResult, VectorStore } from './vector-store.js';
import type { EmbeddingProvider } from './embeddings.js';
import type { MemoryStore } from './store.js';

export interface HybridSearchResult {
  namespace: string;
  content: string;
  context: string;
  line: number;
  score: number;
  source: 'keyword' | 'vector' | 'both';
}

export class HybridSearch {
  constructor(
    private readonly memoryStore: MemoryStore,
    private readonly vectorStore: VectorStore,
    private readonly embeddingProvider: EmbeddingProvider,
    private readonly hybridWeight: number = 0.7,
  ) {}

  /**
   * Search using both keyword and vector similarity, merge and rank results.
   *
   * Scoring: final_score = (hybrid_weight * vector_score) + ((1 - hybrid_weight) * keyword_score)
   * Keyword score: 1.0 for exact match (normalized by result count)
   * Vector score: cosine similarity (0.0 - 1.0)
   */
  async search(query: string, topK: number = 5): Promise<HybridSearchResult[]> {
    // Run both searches in parallel
    const [keywordResults, vectorResults] = await Promise.all([
      Promise.resolve(this.memoryStore.search(query)),
      this.vectorSearch(query, topK * 2), // fetch more for merge
    ]);

    return this.mergeResults(keywordResults, vectorResults, topK);
  }

  private async vectorSearch(query: string, topK: number): Promise<VectorSearchResult[]> {
    const [queryEmbedding] = await this.embeddingProvider.embed([query]);
    return this.vectorStore.search(queryEmbedding, topK);
  }

  private mergeResults(
    keyword: SearchResult[],
    vector: VectorSearchResult[],
    topK: number,
  ): HybridSearchResult[] {
    // Deduplicate by namespace+line, compute hybrid score, sort, take topK
    ...
  }
}

Indexing Pipeline

When does indexing happen? Two approaches:

  1. On write: When MemoryStore.write() is called, also chunk + embed + store. This keeps the index always up to date but adds latency to writes.
  2. Background indexer: A separate timer that scans for unindexed content (via content hash comparison) and indexes it periodically.

Recommendation: Background indexer with write-triggered hint. On MemoryStore.write(), mark the namespace as "dirty". The background indexer (runs every 30s or so) processes only dirty namespaces. This keeps writes fast while ensuring the index stays reasonably fresh.

// Addition to MemoryStore:
private dirtyNamespaces: Set<string> = new Set();

write(namespace: string, content: string, mode: 'append' | 'replace'): void {
  // ... existing write logic ...
  this.dirtyNamespaces.add(namespace);
}

getDirtyNamespaces(): string[] {
  const ns = Array.from(this.dirtyNamespaces);
  this.dirtyNamespaces.clear();
  return ns;
}

Indexer runs in daemon:

// In daemon startup, after memory store init:
if (memoryStore && config.memory.embedding.enabled) {
  const embeddingProvider = createEmbeddingProvider(config.memory.embedding);
  const vectorStore = new VectorStore(resolve(dataDir, 'vectors.db'));
  const hybridSearch = new HybridSearch(memoryStore, vectorStore, embeddingProvider, config.memory.embedding.hybrid_weight);

  // Background indexer
  const indexInterval = setInterval(async () => {
    const dirty = memoryStore.getDirtyNamespaces();
    for (const ns of dirty) {
      const content = memoryStore.read(ns);
      const contentHash = createHash('md5').update(content).digest('hex');
      if (vectorStore.hasContentHash(ns, contentHash)) continue;

      const chunks = chunkText(content, ns, {
        chunkSize: config.memory.embedding.chunk_size,
        chunkOverlap: config.memory.embedding.chunk_overlap,
      });
      const embeddings = await embeddingProvider.embed(chunks.map(c => c.text));
      vectorStore.upsertChunks(chunks.map((c, i) => ({
        namespace: c.namespace,
        text: c.text,
        startLine: c.startLine,
        endLine: c.endLine,
        embedding: embeddings[i],
        contentHash,
      })));
    }
  }, 30_000);

  lifecycle.onShutdown(async () => {
    clearInterval(indexInterval);
    vectorStore.close();
  });
}

Integration with memory.search Tool

Modify src/tools/builtin/memory-search.ts to accept an optional HybridSearch instance:

export function createMemorySearchTool(store: MemoryStore, hybridSearch?: HybridSearch): Tool {
  return {
    name: 'memory.search',
    description: 'Search across all memory files for a keyword or phrase. Uses semantic search when available, with keyword fallback.',
    inputSchema: {
      type: 'object',
      properties: {
        query: {
          type: 'string',
          description: 'The search query (supports semantic meaning when embeddings are enabled)',
        },
      },
      required: ['query'],
    },
    execute: async (rawArgs: unknown): Promise<ToolResult> => {
      const args = rawArgs as { query: string };

      try {
        if (hybridSearch) {
          const results = await hybridSearch.search(args.query);
          // Format hybrid results...
        } else {
          // Existing keyword-only path (unchanged)
          const results = store.search(args.query);
          // ...existing formatting...
        }
      } catch (error) {
        // On vector search failure, fall back to keyword
        const results = store.search(args.query);
        // ...format with note about fallback...
      }
    },
  };
}

Also update createMemoryTools() in src/tools/builtin/index.ts to accept the optional HybridSearch:

export function createMemoryTools(store: MemoryStore, hybridSearch?: HybridSearch): Tool[] {
  return [
    createMemoryReadTool(store),
    createMemoryWriteTool(store),
    createMemorySearchTool(store, hybridSearch),
  ];
}

Graceful Fallback Chain

  1. If embedding.enabled: false (default) — pure keyword search, no embedding deps loaded
  2. If embedding.enabled: true but provider unreachable — log warning, fall back to keyword search
  3. If embedding succeeds but vector DB corrupted — catch error, fall back to keyword search
  4. If embedding succeeds — hybrid search with configurable weight

Test Plan

src/memory/chunker.test.ts:

  • Splits text into chunks of configured size
  • Respects paragraph boundaries
  • Generates correct overlap between chunks
  • Handles empty content
  • Preserves line number tracking
  • Single paragraph smaller than chunk size returns one chunk

src/memory/embeddings.test.ts:

  • createEmbeddingProvider() creates correct provider for each type
  • OpenAI provider calls correct API endpoint (mocked)
  • Ollama provider calls correct endpoint (mocked)
  • Provider returns correct dimensionality
  • Handles API errors gracefully

src/memory/vector-store.test.ts:

  • Creates SQLite database with correct schema
  • upsertChunks() stores and retrieves embeddings
  • search() returns results sorted by similarity
  • deleteNamespace() removes all embeddings for a namespace
  • hasContentHash() detects existing content
  • close() cleans up database connection
  • Cosine similarity computation is correct

src/memory/hybrid-search.test.ts:

  • Returns keyword results when vector search unavailable
  • Merges keyword and vector results with correct weighting
  • Deduplicates results from both sources
  • Respects topK limit
  • Handles empty results from either source
  • source field correctly indicates 'keyword', 'vector', or 'both'

Config Example

memory:
  enabled: true
  auto_extract: true
  max_context_tokens: 2000
  embedding:
    enabled: true
    provider: openai
    model: text-embedding-3-small
    api_key: "${OPENAI_API_KEY}"
    chunk_size: 512
    chunk_overlap: 50
    top_k: 5
    hybrid_weight: 0.7

Implementation Order

Recommended sequence (dependencies and risk):

Order Feature Rationale
1 Dockerfile No code changes, unblocks deployment testing for all other features
2 Inbound Webhooks Clean pattern (mirrors cron), moderate scope, high utility
3 Heartbeat Depends on running services for meaningful checks, benefits from Docker for testing
4 Vector Memory Search Most complex, most files, most risk — do last when other features are stable

Estimated Effort

Feature New Files Modified Files Estimated Lines Complexity
Inbound Webhooks 2 4 ~300 Medium
Dockerfile 3 1-2 ~100 Simple
Heartbeat 2 3 ~350 Medium
Vector Memory Search 8 5 ~800 Complex
Total 15 12-13 ~1550

Cross-Cutting Concerns

Config Schema Validation

All new schema additions follow the existing pattern:

  • Use .default({}) for optional sections so the config is valid without them
  • Export z.infer<> type aliases
  • Existing tests in src/config/schema.test.ts should still pass (no breaking changes)

Daemon Startup Order

New services slot into the existing startup sequence:

  1. Memory store (existing)
  2. Vector store + embedding provider (new, after memory store)
  3. Session store (existing)
  4. Tool registry (existing)
  5. Memory tools with hybrid search (modified)
  6. Gateway server (existing)
  7. Channel registry (existing)
  8. Webhook handler (new, registered with channel registry)
  9. Cron scheduler (existing)
  10. Start all channels (existing)
  11. Start gateway (existing)
  12. Heartbeat monitor (new, after all services started)

Shutdown Order (LIFO)

Added to lifecycle in reverse:

  1. Heartbeat monitor stop
  2. Gateway stop (existing)
  3. Channel adapters stop (existing, includes webhook handler)
  4. Vector store close
  5. Process manager stop (existing)
  6. Session store close (existing)