# Tier 2 Implementation Plan **Date**: 2026-02-07 **Features**: Inbound Webhooks, Dockerfile, Heartbeat, Vector Memory Search **Status**: Planning --- ## 1. Inbound Webhooks **Complexity**: Medium ### Overview HTTP endpoint (`POST /webhooks/:name`) that triggers agent processing, following the same pattern as `CronScheduler`: implements `ChannelAdapter`, produces `InboundMessage`, routes responses to a configured output channel. ### Config Schema Additions Add to `src/config/schema.ts`: ```typescript const webhookSchema = z.object({ name: z.string().min(1, 'Webhook name is required'), /** Optional per-webhook HMAC secret for payload signature verification. */ secret: z.string().optional(), /** Message template — use {{body}} for raw body, {{json.fieldName}} for JSON fields. */ message: z.string().default('{{body}}'), output: z.object({ channel: z.string().min(1), peer: z.string().min(1), }), enabled: z.boolean().default(true), }); ``` Extend `automationSchema`: ```typescript const automationSchema = z.object({ cron: z.array(cronJobSchema).default([]), webhooks: z.array(webhookSchema).default([]), }).default({}); ``` Add type export: ```typescript export type WebhookConfig = z.infer; ``` ### Files to Create | File | Purpose | |------|---------| | `src/automation/webhooks.ts` | `WebhookHandler` class implementing `ChannelAdapter` | | `src/automation/webhooks.test.ts` | Unit tests | ### Files to Modify | File | Change | |------|--------| | `src/config/schema.ts` | Add `webhookSchema`, extend `automationSchema`, add type export | | `src/automation/index.ts` | Export `WebhookHandler` | | `src/gateway/server.ts` | Route `POST /webhooks/:name` to the `WebhookHandler` before static/404 | | `src/daemon/index.ts` | Instantiate `WebhookHandler`, register with `ChannelRegistry`, wire to gateway | ### Key Class: `WebhookHandler` ```typescript // src/automation/webhooks.ts import { createHmac, timingSafeEqual } from 'crypto'; import type { IncomingMessage, ServerResponse } from 'http'; import type { ChannelAdapter, ChannelStatus, InboundMessage, OutboundMessage } from '../channels/types.js'; import type { WebhookConfig } from '../config/schema.js'; /** Minimal interface for the parts of ChannelRegistry we need. */ interface ChannelLookup { get(name: string): { send(peerId: string, message: OutboundMessage): Promise } | undefined; } export class WebhookHandler implements ChannelAdapter { readonly name = 'webhook'; private _status: ChannelStatus = 'disconnected'; private messageHandler?: (msg: InboundMessage) => void; private webhooks: Map = new Map(); constructor( private readonly webhookConfigs: WebhookConfig[], private readonly channelLookup: ChannelLookup, ) { for (const wh of webhookConfigs) { this.webhooks.set(wh.name, wh); } } // ChannelAdapter interface methods... get status(): ChannelStatus { return this._status; } async connect(): Promise { this._status = 'connected'; } async disconnect(): Promise { this._status = 'disconnected'; } onMessage(handler: (msg: InboundMessage) => void): void { this.messageHandler = handler; } async send(peerId: string, message: OutboundMessage): Promise { // Route to output channel (same pattern as CronScheduler.send) const webhook = this.webhooks.get(peerId); if (!webhook) return; const outputAdapter = this.channelLookup.get(webhook.output.channel); if (!outputAdapter) return; await outputAdapter.send(webhook.output.peer, message); } /** * Handle incoming HTTP request. Called by the gateway HTTP handler. * Returns true if the request was handled (webhook found), false otherwise. */ async handleRequest(webhookName: string, req: IncomingMessage, res: ServerResponse): Promise { const webhook = this.webhooks.get(webhookName); if (!webhook || !webhook.enabled) { return false; // Not found, let gateway return 404 } // Read body const body = await readBody(req); // Verify HMAC signature if secret configured if (webhook.secret) { const signature = req.headers['x-webhook-signature'] as string | undefined; if (!verifyHmac(body, webhook.secret, signature)) { res.writeHead(401, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: 'Invalid signature' })); return true; } } // Build message text from template const messageText = renderTemplate(webhook.message, body); // Produce InboundMessage const msg: InboundMessage = { id: `webhook-${webhookName}-${Date.now()}`, channel: 'webhook', senderId: webhookName, senderName: `webhook:${webhookName}`, text: messageText, timestamp: Date.now(), metadata: { webhookName, contentType: req.headers['content-type'] }, }; this.messageHandler?.(msg); res.writeHead(202, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ accepted: true, id: msg.id })); return true; } getWebhookNames(): string[] { return Array.from(this.webhooks.keys()); } } ``` ### Integration in Gateway HTTP Handler In `GatewayServer.handleHttpRequest()`, add route matching **before** the static file serving: ```typescript // Check for webhook routes: POST /webhooks/:name if (req.method === 'POST' && req.url) { const match = req.url.match(/^\/webhooks\/([a-zA-Z0-9_-]+)/); if (match && this.webhookHandler) { const handled = await this.webhookHandler.handleRequest(match[1], req, res); if (handled) return; } } ``` The `webhookHandler` reference is passed into `GatewayServerConfig` as an optional field, similar to how `channelRegistry` is passed. ### Integration in Daemon Startup In `src/daemon/index.ts`, after the cron scheduler block (~line 812-817): ```typescript // Register webhook handler (if any webhooks configured) let webhookHandler: WebhookHandler | undefined; if (config.automation.webhooks.length > 0) { webhookHandler = new WebhookHandler(config.automation.webhooks, channelRegistry); channelRegistry.register(webhookHandler); console.log(`Registered ${config.automation.webhooks.length} webhook(s)`); } ``` Then pass `webhookHandler` to the `GatewayServer` constructor. ### HMAC Verification Standard approach: `X-Webhook-Signature: sha256=` header, verified via `crypto.createHmac('sha256', secret).update(body).digest('hex')` with timing-safe comparison. Also support gateway token auth as an alternative (already handled by the gateway auth middleware for HTTP). ### Template Rendering Simple template engine for the `message` field: - `{{body}}` — raw request body string - `{{json.fieldName}}` — extracted JSON field (dot notation for nested) - Default: `{{body}}` if no template specified ### Test Plan `src/automation/webhooks.test.ts`: - Implements ChannelAdapter interface (name, status) - `handleRequest()` produces InboundMessage with correct fields - HMAC verification rejects invalid signatures - HMAC verification passes valid signatures - Returns false for unknown webhook names - Returns false for disabled webhooks - `send()` routes to output channel (same as cron test pattern) - Template rendering with `{{body}}` and `{{json.field}}` - `getWebhookNames()` lists all webhook names ### Config Example ```yaml automation: webhooks: - name: github-push secret: "whsec_..." message: "GitHub push to {{json.repository.full_name}}: {{json.head_commit.message}}" output: channel: telegram peer: "123456" - name: alertmanager message: "Alert: {{json.alerts.0.annotations.summary}}" output: channel: telegram peer: "123456" ``` --- ## 2. Dockerfile **Complexity**: Simple ### Overview Multi-stage Docker build for the Flynn daemon. No TypeScript source code changes required — this is purely build/deployment infrastructure. ### Files to Create | File | Purpose | |------|---------| | `Dockerfile` | Multi-stage build | | `.dockerignore` | Exclude dev artifacts | | `docker-compose.yml` | Example deployment with volumes | ### Dockerfile Design ```dockerfile # ── Stage 1: Build ────────────────────────────────────────────── FROM node:22-alpine AS builder # Install pnpm RUN corepack enable && corepack prepare pnpm@latest --activate WORKDIR /app # Install dependencies first (layer caching) COPY package.json pnpm-lock.yaml ./ RUN pnpm install --frozen-lockfile # Copy source and build COPY tsconfig.json ./ COPY src/ src/ # better-sqlite3 needs build tools for native compilation RUN apk add --no-cache python3 make g++ && \ pnpm build # ── Stage 2: Runtime ──────────────────────────────────────────── FROM node:22-alpine AS runtime RUN corepack enable && corepack prepare pnpm@latest --activate WORKDIR /app # Copy package files and install production deps only COPY package.json pnpm-lock.yaml ./ RUN apk add --no-cache python3 make g++ && \ pnpm install --frozen-lockfile --prod && \ apk del python3 make g++ # Copy compiled output from builder COPY --from=builder /app/dist dist/ # Copy gateway UI assets (if they exist) COPY --from=builder /app/src/gateway/ui dist/gateway/ui/ # Copy prompt templates and bundled skills COPY prompts/ prompts/ COPY skills/ skills/ # Create data directories RUN mkdir -p /data/memory /data/sessions /config # Runtime config ENV NODE_ENV=production ENV FLYNN_CONFIG=/config/config.yaml ENV FLYNN_DATA_DIR=/data EXPOSE 18800 # Health check HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \ CMD wget -qO- http://localhost:18800/ || exit 1 ENTRYPOINT ["node", "dist/cli/index.js"] CMD ["start"] ``` ### Key Considerations **better-sqlite3 native deps**: This is the trickiest part. `better-sqlite3` requires native compilation. Options: 1. Install build tools (`python3`, `make`, `g++`) in the runtime stage, compile, then remove — this is the safest approach and keeps the image functional. 2. Use `--build-from-source` flag and carry the built `.node` binary from the builder stage. This requires matching the Node.js version and Alpine version exactly between stages (which we do since both use `node:22-alpine`). 3. Alternatively, use `better-sqlite3`'s prebuilt binaries if they ship for Alpine/musl — check at build time. **Recommended approach**: Option 2 — copy `node_modules` from builder stage since both stages use identical base image: ```dockerfile # In runtime stage, instead of separate pnpm install: COPY --from=builder /app/node_modules node_modules/ ``` This eliminates the need for build tools in the runtime image entirely and produces a smaller image. ### .dockerignore ``` node_modules/ dist/ .git/ .gitignore *.log .env .env.* .worktrees/ docs/ src/**/*.test.ts vitest.config.* eslint.config.* tsconfig.json *.md !README.md ``` ### docker-compose.yml ```yaml version: '3.8' services: flynn: build: . image: flynn:latest container_name: flynn restart: unless-stopped ports: - "18800:18800" volumes: - ./config.yaml:/config/config.yaml:ro - flynn-data:/data - flynn-memory:/data/memory environment: - FLYNN_CONFIG=/config/config.yaml - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} # Add other provider keys as needed healthcheck: test: ["CMD", "wget", "-qO-", "http://localhost:18800/"] interval: 30s timeout: 5s retries: 3 volumes: flynn-data: flynn-memory: ``` ### Daemon Changes for Docker The daemon currently defaults data dir to `~/.local/share/flynn`. For Docker, we need to respect an environment variable override. Check if `src/daemon/index.ts` already handles this. Looking at line 525: `const dataDir = resolve(homedir(), '.local/share/flynn');` **Modification needed** in `src/daemon/index.ts`: ```typescript // Support FLYNN_DATA_DIR env var for Docker deployments const dataDir = process.env.FLYNN_DATA_DIR ?? resolve(homedir(), '.local/share/flynn'); ``` Similarly, config loading in `src/config/loader.ts` should respect `FLYNN_CONFIG` env var if not already. ### Files to Modify | File | Change | |------|--------| | `src/daemon/index.ts` | Support `FLYNN_DATA_DIR` environment variable | | `src/config/loader.ts` | Support `FLYNN_CONFIG` environment variable (verify) | ### Test Plan No unit tests needed (Docker build is tested by building). Integration test: - `docker build -t flynn:test .` - `docker run --rm flynn:test --help` (verify CLI works) - `docker compose up -d` with test config (verify startup) --- ## 3. Heartbeat **Complexity**: Medium ### Overview Periodic self-check system that validates daemon health and optionally notifies a configured channel on failure. Fits naturally as an extension to the `src/automation/` module, alongside cron. ### Config Schema Additions Add to `src/config/schema.ts`: ```typescript const heartbeatCheckSchema = z.enum([ 'gateway', // Is the HTTP/WS server responsive? 'model', // Can we reach the default model provider? 'channels', // Are all channel adapters connected? 'memory', // Is the memory store readable/writable? 'disk', // Is disk space above threshold? ]); const heartbeatSchema = z.object({ enabled: z.boolean().default(false), /** Check interval (e.g. '60s', '5m', '1h'). Default: '5m'. */ interval: z.string().default('5m'), /** Which checks to run. Default: all. */ checks: z.array(heartbeatCheckSchema).default(['gateway', 'model', 'channels', 'memory', 'disk']), /** Optional notification on failure. */ notify: z.object({ channel: z.string().min(1), peer: z.string().min(1), }).optional(), /** Number of consecutive failures before notifying. Default: 2. */ failure_threshold: z.number().min(1).max(10).default(2), /** Disk space warning threshold in MB. Default: 100. */ disk_threshold_mb: z.number().min(10).default(100), }).default({}); ``` Extend `automationSchema`: ```typescript const automationSchema = z.object({ cron: z.array(cronJobSchema).default([]), webhooks: z.array(webhookSchema).default([]), heartbeat: heartbeatSchema, }).default({}); ``` Add type export: ```typescript export type HeartbeatConfig = z.infer; ``` ### Files to Create | File | Purpose | |------|---------| | `src/automation/heartbeat.ts` | `HeartbeatMonitor` class | | `src/automation/heartbeat.test.ts` | Unit tests | ### Files to Modify | File | Change | |------|--------| | `src/config/schema.ts` | Add `heartbeatSchema`, extend `automationSchema`, add type export | | `src/automation/index.ts` | Export `HeartbeatMonitor` | | `src/daemon/index.ts` | Instantiate `HeartbeatMonitor`, register shutdown handler | ### Key Class: `HeartbeatMonitor` ```typescript // src/automation/heartbeat.ts import { statfsSync } from 'fs'; import type { HeartbeatConfig } from '../config/schema.js'; import type { ChannelAdapter, ChannelStatus, OutboundMessage } from '../channels/types.js'; import type { ModelRouter } from '../models/router.js'; export interface HeartbeatCheckResult { name: string; healthy: boolean; message: string; durationMs: number; } export interface HeartbeatResult { timestamp: number; healthy: boolean; checks: HeartbeatCheckResult[]; } interface HeartbeatDeps { config: HeartbeatConfig; /** Get the gateway HTTP server for health probing. */ getGatewayPort?: () => number; /** Model router for provider health check. */ modelRouter?: ModelRouter; /** Channel registry for adapter status checks. */ getChannels?: () => Array<{ name: string; status: ChannelStatus }>; /** Memory store dir for read/write check. */ memoryDir?: string; /** Data dir for disk space check. */ dataDir?: string; /** Send notification to a channel. */ notify?: (channel: string, peer: string, message: OutboundMessage) => Promise; } export class HeartbeatMonitor { private timer?: ReturnType; private consecutiveFailures = 0; private lastResult?: HeartbeatResult; constructor(private readonly deps: HeartbeatDeps) {} start(): void { if (!this.deps.config.enabled) return; const intervalMs = parseInterval(this.deps.config.interval); this.timer = setInterval(() => this.runChecks(), intervalMs); // Run first check after a short delay (let services finish starting) setTimeout(() => this.runChecks(), 5000); } stop(): void { if (this.timer) { clearInterval(this.timer); this.timer = undefined; } } /** Run all configured checks and optionally notify on failure. */ async runChecks(): Promise { const checks: HeartbeatCheckResult[] = []; const enabledChecks = this.deps.config.checks; for (const checkName of enabledChecks) { const start = Date.now(); try { const result = await this.runSingleCheck(checkName); checks.push({ ...result, durationMs: Date.now() - start }); } catch (error) { checks.push({ name: checkName, healthy: false, message: error instanceof Error ? error.message : String(error), durationMs: Date.now() - start, }); } } const healthy = checks.every(c => c.healthy); const result: HeartbeatResult = { timestamp: Date.now(), healthy, checks }; this.lastResult = result; if (!healthy) { this.consecutiveFailures++; if (this.consecutiveFailures >= this.deps.config.failure_threshold) { await this.sendNotification(result); } } else { // Reset on recovery if (this.consecutiveFailures >= this.deps.config.failure_threshold) { await this.sendRecoveryNotification(result); } this.consecutiveFailures = 0; } return result; } getLastResult(): HeartbeatResult | undefined { return this.lastResult; } private async runSingleCheck(name: string): Promise> { switch (name) { case 'gateway': return this.checkGateway(); case 'model': return this.checkModel(); case 'channels': return this.checkChannels(); case 'memory': return this.checkMemory(); case 'disk': return this.checkDisk(); default: return { name, healthy: false, message: `Unknown check: ${name}` }; } } // Individual check implementations... } ``` ### Individual Check Implementations 1. **Gateway check**: HTTP GET to `http://localhost:/` — expects non-error response. 2. **Model check**: Call `modelRouter.healthCheck()` or attempt a minimal completion with max_tokens=1. Note: `ModelRouter` doesn't currently have a health check method, so we'd add a lightweight one that just verifies the API key/connection. Alternatively, use a simpler approach: verify the model client can be instantiated without errors, or do a `models.list()` API call for providers that support it. 3. **Channels check**: Iterate registered channels, verify all have `status === 'connected'`. 4. **Memory check**: Attempt `store.read('global')` and verify no exceptions. 5. **Disk check**: Use `statfsSync()` on the data directory, calculate free space in MB, compare to threshold. ### Notification Format ``` ⚠️ Flynn Heartbeat Failure Failed checks: - gateway: Connection refused on port 18800 - model: API key invalid (401) Consecutive failures: 3/2 (threshold) ``` And on recovery: ``` ✅ Flynn Heartbeat Recovery All checks passing after 3 consecutive failures. ``` ### Integration in Daemon Startup In `src/daemon/index.ts`, after all services are started (after `gateway.start()`): ```typescript // Initialize heartbeat monitor (if enabled) let heartbeatMonitor: HeartbeatMonitor | undefined; if (config.automation.heartbeat?.enabled) { heartbeatMonitor = new HeartbeatMonitor({ config: config.automation.heartbeat, getGatewayPort: () => config.server.port, modelRouter, getChannels: () => channelRegistry.list().map(a => ({ name: a.name, status: a.status })), memoryDir: memoryDir, dataDir, notify: config.automation.heartbeat.notify ? async (channel, peer, message) => { const adapter = channelRegistry.get(channel); if (adapter) await adapter.send(peer, message); } : undefined, }); heartbeatMonitor.start(); console.log(`Heartbeat monitor started (interval: ${config.automation.heartbeat.interval})`); lifecycle.onShutdown(async () => { heartbeatMonitor!.stop(); console.log('Heartbeat monitor stopped'); }); } ``` ### Interval Parsing Reuse or create a simple parser (similar to `parseDuration` in `src/session/index.ts`): ```typescript function parseInterval(str: string): number { const match = str.match(/^(\d+)(s|m|h)$/); if (!match) return 300_000; // default 5 minutes const [, value, unit] = match; const multipliers: Record = { s: 1000, m: 60_000, h: 3_600_000 }; return parseInt(value) * multipliers[unit]; } ``` ### Exposing via Gateway (Optional Enhancement) Add a `system.heartbeat` handler to the gateway WS handlers that returns `heartbeatMonitor.getLastResult()`. This lets the web UI or CLI display health status. This is a nice-to-have and can be done as a follow-up. ### Test Plan `src/automation/heartbeat.test.ts`: - `start()` does nothing when `enabled: false` - `runChecks()` runs all configured checks - `runChecks()` returns healthy=true when all checks pass - `runChecks()` returns healthy=false when any check fails - Notification sent after `failure_threshold` consecutive failures - Recovery notification sent when checks pass after failures - `stop()` clears the timer - `getLastResult()` returns most recent result - Individual check tests: - `checkChannels()` detects disconnected adapters - `checkDisk()` detects low disk space - `checkMemory()` handles missing memory dir - `parseInterval()` correctly parses '60s', '5m', '1h' ### Config Example ```yaml automation: heartbeat: enabled: true interval: "5m" checks: [gateway, model, channels, memory, disk] notify: channel: telegram peer: "123456" failure_threshold: 2 disk_threshold_mb: 100 ``` --- ## 4. Vector Memory Search **Complexity**: Complex ### Overview Add embedding-based semantic search to the existing `MemoryStore`, enabling hybrid search (vector similarity + keyword). Provider-agnostic embedding generation with graceful fallback to keyword-only search when embeddings are unavailable. ### Architecture Decision: Storage Backend **Options considered**: 1. **SQLite with manual vector math** — Store embeddings as BLOBs in the existing `better-sqlite3` database, compute cosine similarity in JS. Simple, no new deps. 2. **SQLite with sqlite-vec extension** — Native vector similarity in SQL. Requires native extension compilation, complicates Docker build. 3. **Separate HNSW index (hnswlib-node)** — Purpose-built ANN index. Fast for large collections but adds a native dep. 4. **In-memory float arrays** — Store embeddings as JSON files alongside markdown, load into memory, compute similarity in JS. **Recommendation**: Option 1 (SQLite with manual vector math). Rationale: - `better-sqlite3` is already a dependency - Memory collections are small enough (hundreds to low thousands of chunks) that brute-force cosine similarity in JS is fast enough (<10ms for 1000 vectors) - No new native dependencies - Embeddings stored in existing data directory pattern - Clean upgrade path to sqlite-vec if performance becomes an issue ### Config Schema Additions Extend `memorySchema` in `src/config/schema.ts`: ```typescript const embeddingProviderSchema = z.enum(['openai', 'gemini', 'ollama', 'llamacpp']); const embeddingSchema = z.object({ enabled: z.boolean().default(false), provider: embeddingProviderSchema.default('openai'), model: z.string().default('text-embedding-3-small'), /** Provider endpoint (required for ollama/llamacpp). */ endpoint: z.string().optional(), /** API key (required for openai/gemini). */ api_key: z.string().optional(), /** Embedding vector dimensions. Auto-detected from model if not set. */ dimensions: z.number().optional(), /** Max tokens per chunk for embedding. Default: 512. */ chunk_size: z.number().min(64).max(8192).default(512), /** Overlap between chunks in tokens. Default: 50. */ chunk_overlap: z.number().min(0).max(1024).default(50), /** Number of top results to return from vector search. Default: 5. */ top_k: z.number().min(1).max(50).default(5), /** Weight for vector results vs keyword results in hybrid scoring (0.0 = keyword only, 1.0 = vector only). Default: 0.7. */ hybrid_weight: z.number().min(0).max(1).default(0.7), }).default({}); const memorySchema = z.object({ enabled: z.boolean().default(true), dir: z.string().optional(), auto_extract: z.boolean().default(true), max_context_tokens: z.number().min(100).max(10000).default(2000), embedding: embeddingSchema, }).default({}); ``` Add type exports: ```typescript export type EmbeddingConfig = z.infer; ``` ### Files to Create | File | Purpose | |------|---------| | `src/memory/embeddings.ts` | `EmbeddingProvider` interface + provider implementations | | `src/memory/embeddings.test.ts` | Unit tests for embedding providers | | `src/memory/vector-store.ts` | `VectorStore` class — SQLite-backed vector storage | | `src/memory/vector-store.test.ts` | Unit tests for vector store | | `src/memory/chunker.ts` | Text chunking utility | | `src/memory/chunker.test.ts` | Unit tests for chunker | | `src/memory/hybrid-search.ts` | `HybridSearch` class — combines vector + keyword | | `src/memory/hybrid-search.test.ts` | Unit tests for hybrid search | ### Files to Modify | File | Change | |------|--------| | `src/config/schema.ts` | Add `embeddingSchema`, extend `memorySchema`, add type export | | `src/memory/store.ts` | Add `searchHybrid()` method, accept optional `VectorStore` in constructor | | `src/memory/index.ts` | Export new modules | | `src/tools/builtin/memory-search.ts` | Use `searchHybrid()` when available, fall back to `search()` | | `src/daemon/index.ts` | Initialize `EmbeddingProvider`, `VectorStore`, wire into `MemoryStore` | ### Key Interfaces and Classes #### EmbeddingProvider ```typescript // src/memory/embeddings.ts export interface EmbeddingProvider { /** Generate embeddings for one or more text chunks. */ embed(texts: string[]): Promise; /** Embedding vector dimensions. */ readonly dimensions: number; } export class OpenAIEmbeddingProvider implements EmbeddingProvider { // Uses openai SDK (already a dep) with embeddings.create() readonly dimensions: number; constructor(config: { apiKey?: string; model: string; dimensions?: number }) { ... } async embed(texts: string[]): Promise { ... } } export class GeminiEmbeddingProvider implements EmbeddingProvider { // Uses @google/generative-ai SDK (already a dep) with embedContent() readonly dimensions: number; constructor(config: { apiKey?: string; model: string }) { ... } async embed(texts: string[]): Promise { ... } } export class OllamaEmbeddingProvider implements EmbeddingProvider { // Uses ollama SDK (already a dep) with embeddings() readonly dimensions: number; constructor(config: { host?: string; model: string }) { ... } async embed(texts: string[]): Promise { ... } } export class LlamaCppEmbeddingProvider implements EmbeddingProvider { // HTTP POST to /embedding endpoint readonly dimensions: number; constructor(config: { endpoint: string; model?: string }) { ... } async embed(texts: string[]): Promise { ... } } export function createEmbeddingProvider(config: EmbeddingConfig): EmbeddingProvider { ... } ``` #### Text Chunker ```typescript // src/memory/chunker.ts export interface Chunk { /** The text content of this chunk. */ text: string; /** Source namespace. */ namespace: string; /** Starting line number in the source file (1-based). */ startLine: number; /** Ending line number in the source file (1-based). */ endLine: number; } /** * Split memory content into overlapping chunks suitable for embedding. * * Strategy: split on paragraph boundaries (double newline), then merge * small paragraphs to reach target chunk size, with configurable overlap. */ export function chunkText( content: string, namespace: string, options: { chunkSize: number; chunkOverlap: number }, ): Chunk[] { ... } ``` #### VectorStore ```typescript // src/memory/vector-store.ts import Database from 'better-sqlite3'; export interface VectorSearchResult { namespace: string; chunkText: string; startLine: number; endLine: number; similarity: number; } export class VectorStore { private db: Database.Database; constructor(dbPath: string) { this.db = new Database(dbPath); this.initSchema(); } private initSchema(): void { this.db.exec(` CREATE TABLE IF NOT EXISTS embeddings ( id INTEGER PRIMARY KEY AUTOINCREMENT, namespace TEXT NOT NULL, chunk_text TEXT NOT NULL, start_line INTEGER NOT NULL, end_line INTEGER NOT NULL, embedding BLOB NOT NULL, created_at INTEGER NOT NULL DEFAULT (unixepoch()), content_hash TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_embeddings_namespace ON embeddings(namespace); CREATE INDEX IF NOT EXISTS idx_embeddings_hash ON embeddings(content_hash); `); } /** Store embedding vectors for a set of chunks. */ upsertChunks(chunks: Array<{ namespace: string; text: string; startLine: number; endLine: number; embedding: number[]; contentHash: string }>): void { ... } /** Delete all embeddings for a namespace (for re-indexing). */ deleteNamespace(namespace: string): void { ... } /** Check if a namespace+hash combination already exists (skip re-embedding). */ hasContentHash(namespace: string, hash: string): boolean { ... } /** * Search for similar vectors using cosine similarity. * Computed in JS — fine for <10k vectors. */ search(queryEmbedding: number[], topK: number): VectorSearchResult[] { const allRows = this.db.prepare('SELECT * FROM embeddings').all(); // Compute cosine similarity for each, sort, take topK ... } close(): void { this.db.close(); } } ``` The embedding is stored as a BLOB (Float32Array buffer). Cosine similarity: ```typescript function cosineSimilarity(a: number[], b: number[]): number { let dotProduct = 0; let normA = 0; let normB = 0; for (let i = 0; i < a.length; i++) { dotProduct += a[i] * b[i]; normA += a[i] * a[i]; normB += b[i] * b[i]; } return dotProduct / (Math.sqrt(normA) * Math.sqrt(normB)); } ``` #### HybridSearch ```typescript // src/memory/hybrid-search.ts import type { SearchResult } from './store.js'; import type { VectorSearchResult, VectorStore } from './vector-store.js'; import type { EmbeddingProvider } from './embeddings.js'; import type { MemoryStore } from './store.js'; export interface HybridSearchResult { namespace: string; content: string; context: string; line: number; score: number; source: 'keyword' | 'vector' | 'both'; } export class HybridSearch { constructor( private readonly memoryStore: MemoryStore, private readonly vectorStore: VectorStore, private readonly embeddingProvider: EmbeddingProvider, private readonly hybridWeight: number = 0.7, ) {} /** * Search using both keyword and vector similarity, merge and rank results. * * Scoring: final_score = (hybrid_weight * vector_score) + ((1 - hybrid_weight) * keyword_score) * Keyword score: 1.0 for exact match (normalized by result count) * Vector score: cosine similarity (0.0 - 1.0) */ async search(query: string, topK: number = 5): Promise { // Run both searches in parallel const [keywordResults, vectorResults] = await Promise.all([ Promise.resolve(this.memoryStore.search(query)), this.vectorSearch(query, topK * 2), // fetch more for merge ]); return this.mergeResults(keywordResults, vectorResults, topK); } private async vectorSearch(query: string, topK: number): Promise { const [queryEmbedding] = await this.embeddingProvider.embed([query]); return this.vectorStore.search(queryEmbedding, topK); } private mergeResults( keyword: SearchResult[], vector: VectorSearchResult[], topK: number, ): HybridSearchResult[] { // Deduplicate by namespace+line, compute hybrid score, sort, take topK ... } } ``` ### Indexing Pipeline When does indexing happen? Two approaches: 1. **On write**: When `MemoryStore.write()` is called, also chunk + embed + store. This keeps the index always up to date but adds latency to writes. 2. **Background indexer**: A separate timer that scans for unindexed content (via content hash comparison) and indexes it periodically. **Recommendation**: Background indexer with write-triggered hint. On `MemoryStore.write()`, mark the namespace as "dirty". The background indexer (runs every 30s or so) processes only dirty namespaces. This keeps writes fast while ensuring the index stays reasonably fresh. ```typescript // Addition to MemoryStore: private dirtyNamespaces: Set = new Set(); write(namespace: string, content: string, mode: 'append' | 'replace'): void { // ... existing write logic ... this.dirtyNamespaces.add(namespace); } getDirtyNamespaces(): string[] { const ns = Array.from(this.dirtyNamespaces); this.dirtyNamespaces.clear(); return ns; } ``` Indexer runs in daemon: ```typescript // In daemon startup, after memory store init: if (memoryStore && config.memory.embedding.enabled) { const embeddingProvider = createEmbeddingProvider(config.memory.embedding); const vectorStore = new VectorStore(resolve(dataDir, 'vectors.db')); const hybridSearch = new HybridSearch(memoryStore, vectorStore, embeddingProvider, config.memory.embedding.hybrid_weight); // Background indexer const indexInterval = setInterval(async () => { const dirty = memoryStore.getDirtyNamespaces(); for (const ns of dirty) { const content = memoryStore.read(ns); const contentHash = createHash('md5').update(content).digest('hex'); if (vectorStore.hasContentHash(ns, contentHash)) continue; const chunks = chunkText(content, ns, { chunkSize: config.memory.embedding.chunk_size, chunkOverlap: config.memory.embedding.chunk_overlap, }); const embeddings = await embeddingProvider.embed(chunks.map(c => c.text)); vectorStore.upsertChunks(chunks.map((c, i) => ({ namespace: c.namespace, text: c.text, startLine: c.startLine, endLine: c.endLine, embedding: embeddings[i], contentHash, }))); } }, 30_000); lifecycle.onShutdown(async () => { clearInterval(indexInterval); vectorStore.close(); }); } ``` ### Integration with memory.search Tool Modify `src/tools/builtin/memory-search.ts` to accept an optional `HybridSearch` instance: ```typescript export function createMemorySearchTool(store: MemoryStore, hybridSearch?: HybridSearch): Tool { return { name: 'memory.search', description: 'Search across all memory files for a keyword or phrase. Uses semantic search when available, with keyword fallback.', inputSchema: { type: 'object', properties: { query: { type: 'string', description: 'The search query (supports semantic meaning when embeddings are enabled)', }, }, required: ['query'], }, execute: async (rawArgs: unknown): Promise => { const args = rawArgs as { query: string }; try { if (hybridSearch) { const results = await hybridSearch.search(args.query); // Format hybrid results... } else { // Existing keyword-only path (unchanged) const results = store.search(args.query); // ...existing formatting... } } catch (error) { // On vector search failure, fall back to keyword const results = store.search(args.query); // ...format with note about fallback... } }, }; } ``` Also update `createMemoryTools()` in `src/tools/builtin/index.ts` to accept the optional `HybridSearch`: ```typescript export function createMemoryTools(store: MemoryStore, hybridSearch?: HybridSearch): Tool[] { return [ createMemoryReadTool(store), createMemoryWriteTool(store), createMemorySearchTool(store, hybridSearch), ]; } ``` ### Graceful Fallback Chain 1. If `embedding.enabled: false` (default) — pure keyword search, no embedding deps loaded 2. If `embedding.enabled: true` but provider unreachable — log warning, fall back to keyword search 3. If embedding succeeds but vector DB corrupted — catch error, fall back to keyword search 4. If embedding succeeds — hybrid search with configurable weight ### Test Plan **`src/memory/chunker.test.ts`**: - Splits text into chunks of configured size - Respects paragraph boundaries - Generates correct overlap between chunks - Handles empty content - Preserves line number tracking - Single paragraph smaller than chunk size returns one chunk **`src/memory/embeddings.test.ts`**: - `createEmbeddingProvider()` creates correct provider for each type - OpenAI provider calls correct API endpoint (mocked) - Ollama provider calls correct endpoint (mocked) - Provider returns correct dimensionality - Handles API errors gracefully **`src/memory/vector-store.test.ts`**: - Creates SQLite database with correct schema - `upsertChunks()` stores and retrieves embeddings - `search()` returns results sorted by similarity - `deleteNamespace()` removes all embeddings for a namespace - `hasContentHash()` detects existing content - `close()` cleans up database connection - Cosine similarity computation is correct **`src/memory/hybrid-search.test.ts`**: - Returns keyword results when vector search unavailable - Merges keyword and vector results with correct weighting - Deduplicates results from both sources - Respects `topK` limit - Handles empty results from either source - `source` field correctly indicates 'keyword', 'vector', or 'both' ### Config Example ```yaml memory: enabled: true auto_extract: true max_context_tokens: 2000 embedding: enabled: true provider: openai model: text-embedding-3-small api_key: "${OPENAI_API_KEY}" chunk_size: 512 chunk_overlap: 50 top_k: 5 hybrid_weight: 0.7 ``` --- ## Implementation Order Recommended sequence (dependencies and risk): | Order | Feature | Rationale | |-------|---------|-----------| | 1 | **Dockerfile** | No code changes, unblocks deployment testing for all other features | | 2 | **Inbound Webhooks** | Clean pattern (mirrors cron), moderate scope, high utility | | 3 | **Heartbeat** | Depends on running services for meaningful checks, benefits from Docker for testing | | 4 | **Vector Memory Search** | Most complex, most files, most risk — do last when other features are stable | ### Estimated Effort | Feature | New Files | Modified Files | Estimated Lines | Complexity | |---------|-----------|----------------|-----------------|------------| | Inbound Webhooks | 2 | 4 | ~300 | Medium | | Dockerfile | 3 | 1-2 | ~100 | Simple | | Heartbeat | 2 | 3 | ~350 | Medium | | Vector Memory Search | 8 | 5 | ~800 | Complex | | **Total** | **15** | **12-13** | **~1550** | | --- ## Cross-Cutting Concerns ### Config Schema Validation All new schema additions follow the existing pattern: - Use `.default({})` for optional sections so the config is valid without them - Export `z.infer<>` type aliases - Existing tests in `src/config/schema.test.ts` should still pass (no breaking changes) ### Daemon Startup Order New services slot into the existing startup sequence: 1. Memory store (existing) 2. **Vector store + embedding provider** (new, after memory store) 3. Session store (existing) 4. Tool registry (existing) 5. **Memory tools with hybrid search** (modified) 6. Gateway server (existing) 7. Channel registry (existing) 8. **Webhook handler** (new, registered with channel registry) 9. Cron scheduler (existing) 10. Start all channels (existing) 11. Start gateway (existing) 12. **Heartbeat monitor** (new, after all services started) ### Shutdown Order (LIFO) Added to lifecycle in reverse: 1. Heartbeat monitor stop 2. Gateway stop (existing) 3. Channel adapters stop (existing, includes webhook handler) 4. Vector store close 5. Process manager stop (existing) 6. Session store close (existing)