- Dockerfile: multi-stage build (node:22-alpine), better-sqlite3 native deps handled
- .dockerignore + docker-compose.yml for deployment
- FLYNN_DATA_DIR env var support in daemon, CLI, and TUI
- WebhookHandler: ChannelAdapter for HTTP POST /webhooks/:name
- Per-webhook HMAC auth, template rendering ({{body}}, {{json.field}})
- Config schema: automation.webhooks array with name/secret/message/output
- Gateway routes webhook requests before static files (bypasses gateway auth)
- 23 new tests for webhook functionality, 874 total tests passing
40 KiB
Tier 2 Implementation Plan
Date: 2026-02-07 Features: Inbound Webhooks, Dockerfile, Heartbeat, Vector Memory Search Status: Planning
1. Inbound Webhooks
Complexity: Medium
Overview
HTTP endpoint (POST /webhooks/:name) that triggers agent processing, following the same pattern as CronScheduler: implements ChannelAdapter, produces InboundMessage, routes responses to a configured output channel.
Config Schema Additions
Add to src/config/schema.ts:
const webhookSchema = z.object({
name: z.string().min(1, 'Webhook name is required'),
/** Optional per-webhook HMAC secret for payload signature verification. */
secret: z.string().optional(),
/** Message template — use {{body}} for raw body, {{json.fieldName}} for JSON fields. */
message: z.string().default('{{body}}'),
output: z.object({
channel: z.string().min(1),
peer: z.string().min(1),
}),
enabled: z.boolean().default(true),
});
Extend automationSchema:
const automationSchema = z.object({
cron: z.array(cronJobSchema).default([]),
webhooks: z.array(webhookSchema).default([]),
}).default({});
Add type export:
export type WebhookConfig = z.infer<typeof webhookSchema>;
Files to Create
| File | Purpose |
|---|---|
src/automation/webhooks.ts |
WebhookHandler class implementing ChannelAdapter |
src/automation/webhooks.test.ts |
Unit tests |
Files to Modify
| File | Change |
|---|---|
src/config/schema.ts |
Add webhookSchema, extend automationSchema, add type export |
src/automation/index.ts |
Export WebhookHandler |
src/gateway/server.ts |
Route POST /webhooks/:name to the WebhookHandler before static/404 |
src/daemon/index.ts |
Instantiate WebhookHandler, register with ChannelRegistry, wire to gateway |
Key Class: WebhookHandler
// src/automation/webhooks.ts
import { createHmac, timingSafeEqual } from 'crypto';
import type { IncomingMessage, ServerResponse } from 'http';
import type { ChannelAdapter, ChannelStatus, InboundMessage, OutboundMessage } from '../channels/types.js';
import type { WebhookConfig } from '../config/schema.js';
/** Minimal interface for the parts of ChannelRegistry we need. */
interface ChannelLookup {
get(name: string): { send(peerId: string, message: OutboundMessage): Promise<void> } | undefined;
}
export class WebhookHandler implements ChannelAdapter {
readonly name = 'webhook';
private _status: ChannelStatus = 'disconnected';
private messageHandler?: (msg: InboundMessage) => void;
private webhooks: Map<string, WebhookConfig> = new Map();
constructor(
private readonly webhookConfigs: WebhookConfig[],
private readonly channelLookup: ChannelLookup,
) {
for (const wh of webhookConfigs) {
this.webhooks.set(wh.name, wh);
}
}
// ChannelAdapter interface methods...
get status(): ChannelStatus { return this._status; }
async connect(): Promise<void> { this._status = 'connected'; }
async disconnect(): Promise<void> { this._status = 'disconnected'; }
onMessage(handler: (msg: InboundMessage) => void): void { this.messageHandler = handler; }
async send(peerId: string, message: OutboundMessage): Promise<void> {
// Route to output channel (same pattern as CronScheduler.send)
const webhook = this.webhooks.get(peerId);
if (!webhook) return;
const outputAdapter = this.channelLookup.get(webhook.output.channel);
if (!outputAdapter) return;
await outputAdapter.send(webhook.output.peer, message);
}
/**
* Handle incoming HTTP request. Called by the gateway HTTP handler.
* Returns true if the request was handled (webhook found), false otherwise.
*/
async handleRequest(webhookName: string, req: IncomingMessage, res: ServerResponse): Promise<boolean> {
const webhook = this.webhooks.get(webhookName);
if (!webhook || !webhook.enabled) {
return false; // Not found, let gateway return 404
}
// Read body
const body = await readBody(req);
// Verify HMAC signature if secret configured
if (webhook.secret) {
const signature = req.headers['x-webhook-signature'] as string | undefined;
if (!verifyHmac(body, webhook.secret, signature)) {
res.writeHead(401, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Invalid signature' }));
return true;
}
}
// Build message text from template
const messageText = renderTemplate(webhook.message, body);
// Produce InboundMessage
const msg: InboundMessage = {
id: `webhook-${webhookName}-${Date.now()}`,
channel: 'webhook',
senderId: webhookName,
senderName: `webhook:${webhookName}`,
text: messageText,
timestamp: Date.now(),
metadata: { webhookName, contentType: req.headers['content-type'] },
};
this.messageHandler?.(msg);
res.writeHead(202, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ accepted: true, id: msg.id }));
return true;
}
getWebhookNames(): string[] {
return Array.from(this.webhooks.keys());
}
}
Integration in Gateway HTTP Handler
In GatewayServer.handleHttpRequest(), add route matching before the static file serving:
// Check for webhook routes: POST /webhooks/:name
if (req.method === 'POST' && req.url) {
const match = req.url.match(/^\/webhooks\/([a-zA-Z0-9_-]+)/);
if (match && this.webhookHandler) {
const handled = await this.webhookHandler.handleRequest(match[1], req, res);
if (handled) return;
}
}
The webhookHandler reference is passed into GatewayServerConfig as an optional field, similar to how channelRegistry is passed.
Integration in Daemon Startup
In src/daemon/index.ts, after the cron scheduler block (~line 812-817):
// Register webhook handler (if any webhooks configured)
let webhookHandler: WebhookHandler | undefined;
if (config.automation.webhooks.length > 0) {
webhookHandler = new WebhookHandler(config.automation.webhooks, channelRegistry);
channelRegistry.register(webhookHandler);
console.log(`Registered ${config.automation.webhooks.length} webhook(s)`);
}
Then pass webhookHandler to the GatewayServer constructor.
HMAC Verification
Standard approach: X-Webhook-Signature: sha256=<hex> header, verified via crypto.createHmac('sha256', secret).update(body).digest('hex') with timing-safe comparison.
Also support gateway token auth as an alternative (already handled by the gateway auth middleware for HTTP).
Template Rendering
Simple template engine for the message field:
{{body}}— raw request body string{{json.fieldName}}— extracted JSON field (dot notation for nested)- Default:
{{body}}if no template specified
Test Plan
src/automation/webhooks.test.ts:
- Implements ChannelAdapter interface (name, status)
handleRequest()produces InboundMessage with correct fields- HMAC verification rejects invalid signatures
- HMAC verification passes valid signatures
- Returns false for unknown webhook names
- Returns false for disabled webhooks
send()routes to output channel (same as cron test pattern)- Template rendering with
{{body}}and{{json.field}} getWebhookNames()lists all webhook names
Config Example
automation:
webhooks:
- name: github-push
secret: "whsec_..."
message: "GitHub push to {{json.repository.full_name}}: {{json.head_commit.message}}"
output:
channel: telegram
peer: "123456"
- name: alertmanager
message: "Alert: {{json.alerts.0.annotations.summary}}"
output:
channel: telegram
peer: "123456"
2. Dockerfile
Complexity: Simple
Overview
Multi-stage Docker build for the Flynn daemon. No TypeScript source code changes required — this is purely build/deployment infrastructure.
Files to Create
| File | Purpose |
|---|---|
Dockerfile |
Multi-stage build |
.dockerignore |
Exclude dev artifacts |
docker-compose.yml |
Example deployment with volumes |
Dockerfile Design
# ── Stage 1: Build ──────────────────────────────────────────────
FROM node:22-alpine AS builder
# Install pnpm
RUN corepack enable && corepack prepare pnpm@latest --activate
WORKDIR /app
# Install dependencies first (layer caching)
COPY package.json pnpm-lock.yaml ./
RUN pnpm install --frozen-lockfile
# Copy source and build
COPY tsconfig.json ./
COPY src/ src/
# better-sqlite3 needs build tools for native compilation
RUN apk add --no-cache python3 make g++ && \
pnpm build
# ── Stage 2: Runtime ────────────────────────────────────────────
FROM node:22-alpine AS runtime
RUN corepack enable && corepack prepare pnpm@latest --activate
WORKDIR /app
# Copy package files and install production deps only
COPY package.json pnpm-lock.yaml ./
RUN apk add --no-cache python3 make g++ && \
pnpm install --frozen-lockfile --prod && \
apk del python3 make g++
# Copy compiled output from builder
COPY --from=builder /app/dist dist/
# Copy gateway UI assets (if they exist)
COPY --from=builder /app/src/gateway/ui dist/gateway/ui/
# Copy prompt templates and bundled skills
COPY prompts/ prompts/
COPY skills/ skills/
# Create data directories
RUN mkdir -p /data/memory /data/sessions /config
# Runtime config
ENV NODE_ENV=production
ENV FLYNN_CONFIG=/config/config.yaml
ENV FLYNN_DATA_DIR=/data
EXPOSE 18800
# Health check
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD wget -qO- http://localhost:18800/ || exit 1
ENTRYPOINT ["node", "dist/cli/index.js"]
CMD ["start"]
Key Considerations
better-sqlite3 native deps: This is the trickiest part. better-sqlite3 requires native compilation. Options:
- Install build tools (
python3,make,g++) in the runtime stage, compile, then remove — this is the safest approach and keeps the image functional. - Use
--build-from-sourceflag and carry the built.nodebinary from the builder stage. This requires matching the Node.js version and Alpine version exactly between stages (which we do since both usenode:22-alpine). - Alternatively, use
better-sqlite3's prebuilt binaries if they ship for Alpine/musl — check at build time.
Recommended approach: Option 2 — copy node_modules from builder stage since both stages use identical base image:
# In runtime stage, instead of separate pnpm install:
COPY --from=builder /app/node_modules node_modules/
This eliminates the need for build tools in the runtime image entirely and produces a smaller image.
.dockerignore
node_modules/
dist/
.git/
.gitignore
*.log
.env
.env.*
.worktrees/
docs/
src/**/*.test.ts
vitest.config.*
eslint.config.*
tsconfig.json
*.md
!README.md
docker-compose.yml
version: '3.8'
services:
flynn:
build: .
image: flynn:latest
container_name: flynn
restart: unless-stopped
ports:
- "18800:18800"
volumes:
- ./config.yaml:/config/config.yaml:ro
- flynn-data:/data
- flynn-memory:/data/memory
environment:
- FLYNN_CONFIG=/config/config.yaml
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
# Add other provider keys as needed
healthcheck:
test: ["CMD", "wget", "-qO-", "http://localhost:18800/"]
interval: 30s
timeout: 5s
retries: 3
volumes:
flynn-data:
flynn-memory:
Daemon Changes for Docker
The daemon currently defaults data dir to ~/.local/share/flynn. For Docker, we need to respect an environment variable override. Check if src/daemon/index.ts already handles this.
Looking at line 525: const dataDir = resolve(homedir(), '.local/share/flynn');
Modification needed in src/daemon/index.ts:
// Support FLYNN_DATA_DIR env var for Docker deployments
const dataDir = process.env.FLYNN_DATA_DIR ?? resolve(homedir(), '.local/share/flynn');
Similarly, config loading in src/config/loader.ts should respect FLYNN_CONFIG env var if not already.
Files to Modify
| File | Change |
|---|---|
src/daemon/index.ts |
Support FLYNN_DATA_DIR environment variable |
src/config/loader.ts |
Support FLYNN_CONFIG environment variable (verify) |
Test Plan
No unit tests needed (Docker build is tested by building). Integration test:
docker build -t flynn:test .docker run --rm flynn:test --help(verify CLI works)docker compose up -dwith test config (verify startup)
3. Heartbeat
Complexity: Medium
Overview
Periodic self-check system that validates daemon health and optionally notifies a configured channel on failure. Fits naturally as an extension to the src/automation/ module, alongside cron.
Config Schema Additions
Add to src/config/schema.ts:
const heartbeatCheckSchema = z.enum([
'gateway', // Is the HTTP/WS server responsive?
'model', // Can we reach the default model provider?
'channels', // Are all channel adapters connected?
'memory', // Is the memory store readable/writable?
'disk', // Is disk space above threshold?
]);
const heartbeatSchema = z.object({
enabled: z.boolean().default(false),
/** Check interval (e.g. '60s', '5m', '1h'). Default: '5m'. */
interval: z.string().default('5m'),
/** Which checks to run. Default: all. */
checks: z.array(heartbeatCheckSchema).default(['gateway', 'model', 'channels', 'memory', 'disk']),
/** Optional notification on failure. */
notify: z.object({
channel: z.string().min(1),
peer: z.string().min(1),
}).optional(),
/** Number of consecutive failures before notifying. Default: 2. */
failure_threshold: z.number().min(1).max(10).default(2),
/** Disk space warning threshold in MB. Default: 100. */
disk_threshold_mb: z.number().min(10).default(100),
}).default({});
Extend automationSchema:
const automationSchema = z.object({
cron: z.array(cronJobSchema).default([]),
webhooks: z.array(webhookSchema).default([]),
heartbeat: heartbeatSchema,
}).default({});
Add type export:
export type HeartbeatConfig = z.infer<typeof heartbeatSchema>;
Files to Create
| File | Purpose |
|---|---|
src/automation/heartbeat.ts |
HeartbeatMonitor class |
src/automation/heartbeat.test.ts |
Unit tests |
Files to Modify
| File | Change |
|---|---|
src/config/schema.ts |
Add heartbeatSchema, extend automationSchema, add type export |
src/automation/index.ts |
Export HeartbeatMonitor |
src/daemon/index.ts |
Instantiate HeartbeatMonitor, register shutdown handler |
Key Class: HeartbeatMonitor
// src/automation/heartbeat.ts
import { statfsSync } from 'fs';
import type { HeartbeatConfig } from '../config/schema.js';
import type { ChannelAdapter, ChannelStatus, OutboundMessage } from '../channels/types.js';
import type { ModelRouter } from '../models/router.js';
export interface HeartbeatCheckResult {
name: string;
healthy: boolean;
message: string;
durationMs: number;
}
export interface HeartbeatResult {
timestamp: number;
healthy: boolean;
checks: HeartbeatCheckResult[];
}
interface HeartbeatDeps {
config: HeartbeatConfig;
/** Get the gateway HTTP server for health probing. */
getGatewayPort?: () => number;
/** Model router for provider health check. */
modelRouter?: ModelRouter;
/** Channel registry for adapter status checks. */
getChannels?: () => Array<{ name: string; status: ChannelStatus }>;
/** Memory store dir for read/write check. */
memoryDir?: string;
/** Data dir for disk space check. */
dataDir?: string;
/** Send notification to a channel. */
notify?: (channel: string, peer: string, message: OutboundMessage) => Promise<void>;
}
export class HeartbeatMonitor {
private timer?: ReturnType<typeof setInterval>;
private consecutiveFailures = 0;
private lastResult?: HeartbeatResult;
constructor(private readonly deps: HeartbeatDeps) {}
start(): void {
if (!this.deps.config.enabled) return;
const intervalMs = parseInterval(this.deps.config.interval);
this.timer = setInterval(() => this.runChecks(), intervalMs);
// Run first check after a short delay (let services finish starting)
setTimeout(() => this.runChecks(), 5000);
}
stop(): void {
if (this.timer) {
clearInterval(this.timer);
this.timer = undefined;
}
}
/** Run all configured checks and optionally notify on failure. */
async runChecks(): Promise<HeartbeatResult> {
const checks: HeartbeatCheckResult[] = [];
const enabledChecks = this.deps.config.checks;
for (const checkName of enabledChecks) {
const start = Date.now();
try {
const result = await this.runSingleCheck(checkName);
checks.push({ ...result, durationMs: Date.now() - start });
} catch (error) {
checks.push({
name: checkName,
healthy: false,
message: error instanceof Error ? error.message : String(error),
durationMs: Date.now() - start,
});
}
}
const healthy = checks.every(c => c.healthy);
const result: HeartbeatResult = { timestamp: Date.now(), healthy, checks };
this.lastResult = result;
if (!healthy) {
this.consecutiveFailures++;
if (this.consecutiveFailures >= this.deps.config.failure_threshold) {
await this.sendNotification(result);
}
} else {
// Reset on recovery
if (this.consecutiveFailures >= this.deps.config.failure_threshold) {
await this.sendRecoveryNotification(result);
}
this.consecutiveFailures = 0;
}
return result;
}
getLastResult(): HeartbeatResult | undefined {
return this.lastResult;
}
private async runSingleCheck(name: string): Promise<Omit<HeartbeatCheckResult, 'durationMs'>> {
switch (name) {
case 'gateway': return this.checkGateway();
case 'model': return this.checkModel();
case 'channels': return this.checkChannels();
case 'memory': return this.checkMemory();
case 'disk': return this.checkDisk();
default: return { name, healthy: false, message: `Unknown check: ${name}` };
}
}
// Individual check implementations...
}
Individual Check Implementations
- Gateway check: HTTP GET to
http://localhost:<port>/— expects non-error response. - Model check: Call
modelRouter.healthCheck()or attempt a minimal completion with max_tokens=1. Note:ModelRouterdoesn't currently have a health check method, so we'd add a lightweight one that just verifies the API key/connection. Alternatively, use a simpler approach: verify the model client can be instantiated without errors, or do amodels.list()API call for providers that support it. - Channels check: Iterate registered channels, verify all have
status === 'connected'. - Memory check: Attempt
store.read('global')and verify no exceptions. - Disk check: Use
statfsSync()on the data directory, calculate free space in MB, compare to threshold.
Notification Format
⚠️ Flynn Heartbeat Failure
Failed checks:
- gateway: Connection refused on port 18800
- model: API key invalid (401)
Consecutive failures: 3/2 (threshold)
And on recovery:
✅ Flynn Heartbeat Recovery
All checks passing after 3 consecutive failures.
Integration in Daemon Startup
In src/daemon/index.ts, after all services are started (after gateway.start()):
// Initialize heartbeat monitor (if enabled)
let heartbeatMonitor: HeartbeatMonitor | undefined;
if (config.automation.heartbeat?.enabled) {
heartbeatMonitor = new HeartbeatMonitor({
config: config.automation.heartbeat,
getGatewayPort: () => config.server.port,
modelRouter,
getChannels: () => channelRegistry.list().map(a => ({ name: a.name, status: a.status })),
memoryDir: memoryDir,
dataDir,
notify: config.automation.heartbeat.notify
? async (channel, peer, message) => {
const adapter = channelRegistry.get(channel);
if (adapter) await adapter.send(peer, message);
}
: undefined,
});
heartbeatMonitor.start();
console.log(`Heartbeat monitor started (interval: ${config.automation.heartbeat.interval})`);
lifecycle.onShutdown(async () => {
heartbeatMonitor!.stop();
console.log('Heartbeat monitor stopped');
});
}
Interval Parsing
Reuse or create a simple parser (similar to parseDuration in src/session/index.ts):
function parseInterval(str: string): number {
const match = str.match(/^(\d+)(s|m|h)$/);
if (!match) return 300_000; // default 5 minutes
const [, value, unit] = match;
const multipliers: Record<string, number> = { s: 1000, m: 60_000, h: 3_600_000 };
return parseInt(value) * multipliers[unit];
}
Exposing via Gateway (Optional Enhancement)
Add a system.heartbeat handler to the gateway WS handlers that returns heartbeatMonitor.getLastResult(). This lets the web UI or CLI display health status. This is a nice-to-have and can be done as a follow-up.
Test Plan
src/automation/heartbeat.test.ts:
start()does nothing whenenabled: falserunChecks()runs all configured checksrunChecks()returns healthy=true when all checks passrunChecks()returns healthy=false when any check fails- Notification sent after
failure_thresholdconsecutive failures - Recovery notification sent when checks pass after failures
stop()clears the timergetLastResult()returns most recent result- Individual check tests:
checkChannels()detects disconnected adapterscheckDisk()detects low disk spacecheckMemory()handles missing memory dir
parseInterval()correctly parses '60s', '5m', '1h'
Config Example
automation:
heartbeat:
enabled: true
interval: "5m"
checks: [gateway, model, channels, memory, disk]
notify:
channel: telegram
peer: "123456"
failure_threshold: 2
disk_threshold_mb: 100
4. Vector Memory Search
Complexity: Complex
Overview
Add embedding-based semantic search to the existing MemoryStore, enabling hybrid search (vector similarity + keyword). Provider-agnostic embedding generation with graceful fallback to keyword-only search when embeddings are unavailable.
Architecture Decision: Storage Backend
Options considered:
- SQLite with manual vector math — Store embeddings as BLOBs in the existing
better-sqlite3database, compute cosine similarity in JS. Simple, no new deps. - SQLite with sqlite-vec extension — Native vector similarity in SQL. Requires native extension compilation, complicates Docker build.
- Separate HNSW index (hnswlib-node) — Purpose-built ANN index. Fast for large collections but adds a native dep.
- In-memory float arrays — Store embeddings as JSON files alongside markdown, load into memory, compute similarity in JS.
Recommendation: Option 1 (SQLite with manual vector math). Rationale:
better-sqlite3is already a dependency- Memory collections are small enough (hundreds to low thousands of chunks) that brute-force cosine similarity in JS is fast enough (<10ms for 1000 vectors)
- No new native dependencies
- Embeddings stored in existing data directory pattern
- Clean upgrade path to sqlite-vec if performance becomes an issue
Config Schema Additions
Extend memorySchema in src/config/schema.ts:
const embeddingProviderSchema = z.enum(['openai', 'gemini', 'ollama', 'llamacpp']);
const embeddingSchema = z.object({
enabled: z.boolean().default(false),
provider: embeddingProviderSchema.default('openai'),
model: z.string().default('text-embedding-3-small'),
/** Provider endpoint (required for ollama/llamacpp). */
endpoint: z.string().optional(),
/** API key (required for openai/gemini). */
api_key: z.string().optional(),
/** Embedding vector dimensions. Auto-detected from model if not set. */
dimensions: z.number().optional(),
/** Max tokens per chunk for embedding. Default: 512. */
chunk_size: z.number().min(64).max(8192).default(512),
/** Overlap between chunks in tokens. Default: 50. */
chunk_overlap: z.number().min(0).max(1024).default(50),
/** Number of top results to return from vector search. Default: 5. */
top_k: z.number().min(1).max(50).default(5),
/** Weight for vector results vs keyword results in hybrid scoring (0.0 = keyword only, 1.0 = vector only). Default: 0.7. */
hybrid_weight: z.number().min(0).max(1).default(0.7),
}).default({});
const memorySchema = z.object({
enabled: z.boolean().default(true),
dir: z.string().optional(),
auto_extract: z.boolean().default(true),
max_context_tokens: z.number().min(100).max(10000).default(2000),
embedding: embeddingSchema,
}).default({});
Add type exports:
export type EmbeddingConfig = z.infer<typeof embeddingSchema>;
Files to Create
| File | Purpose |
|---|---|
src/memory/embeddings.ts |
EmbeddingProvider interface + provider implementations |
src/memory/embeddings.test.ts |
Unit tests for embedding providers |
src/memory/vector-store.ts |
VectorStore class — SQLite-backed vector storage |
src/memory/vector-store.test.ts |
Unit tests for vector store |
src/memory/chunker.ts |
Text chunking utility |
src/memory/chunker.test.ts |
Unit tests for chunker |
src/memory/hybrid-search.ts |
HybridSearch class — combines vector + keyword |
src/memory/hybrid-search.test.ts |
Unit tests for hybrid search |
Files to Modify
| File | Change |
|---|---|
src/config/schema.ts |
Add embeddingSchema, extend memorySchema, add type export |
src/memory/store.ts |
Add searchHybrid() method, accept optional VectorStore in constructor |
src/memory/index.ts |
Export new modules |
src/tools/builtin/memory-search.ts |
Use searchHybrid() when available, fall back to search() |
src/daemon/index.ts |
Initialize EmbeddingProvider, VectorStore, wire into MemoryStore |
Key Interfaces and Classes
EmbeddingProvider
// src/memory/embeddings.ts
export interface EmbeddingProvider {
/** Generate embeddings for one or more text chunks. */
embed(texts: string[]): Promise<number[][]>;
/** Embedding vector dimensions. */
readonly dimensions: number;
}
export class OpenAIEmbeddingProvider implements EmbeddingProvider {
// Uses openai SDK (already a dep) with embeddings.create()
readonly dimensions: number;
constructor(config: { apiKey?: string; model: string; dimensions?: number }) { ... }
async embed(texts: string[]): Promise<number[][]> { ... }
}
export class GeminiEmbeddingProvider implements EmbeddingProvider {
// Uses @google/generative-ai SDK (already a dep) with embedContent()
readonly dimensions: number;
constructor(config: { apiKey?: string; model: string }) { ... }
async embed(texts: string[]): Promise<number[][]> { ... }
}
export class OllamaEmbeddingProvider implements EmbeddingProvider {
// Uses ollama SDK (already a dep) with embeddings()
readonly dimensions: number;
constructor(config: { host?: string; model: string }) { ... }
async embed(texts: string[]): Promise<number[][]> { ... }
}
export class LlamaCppEmbeddingProvider implements EmbeddingProvider {
// HTTP POST to /embedding endpoint
readonly dimensions: number;
constructor(config: { endpoint: string; model?: string }) { ... }
async embed(texts: string[]): Promise<number[][]> { ... }
}
export function createEmbeddingProvider(config: EmbeddingConfig): EmbeddingProvider { ... }
Text Chunker
// src/memory/chunker.ts
export interface Chunk {
/** The text content of this chunk. */
text: string;
/** Source namespace. */
namespace: string;
/** Starting line number in the source file (1-based). */
startLine: number;
/** Ending line number in the source file (1-based). */
endLine: number;
}
/**
* Split memory content into overlapping chunks suitable for embedding.
*
* Strategy: split on paragraph boundaries (double newline), then merge
* small paragraphs to reach target chunk size, with configurable overlap.
*/
export function chunkText(
content: string,
namespace: string,
options: { chunkSize: number; chunkOverlap: number },
): Chunk[] { ... }
VectorStore
// src/memory/vector-store.ts
import Database from 'better-sqlite3';
export interface VectorSearchResult {
namespace: string;
chunkText: string;
startLine: number;
endLine: number;
similarity: number;
}
export class VectorStore {
private db: Database.Database;
constructor(dbPath: string) {
this.db = new Database(dbPath);
this.initSchema();
}
private initSchema(): void {
this.db.exec(`
CREATE TABLE IF NOT EXISTS embeddings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
namespace TEXT NOT NULL,
chunk_text TEXT NOT NULL,
start_line INTEGER NOT NULL,
end_line INTEGER NOT NULL,
embedding BLOB NOT NULL,
created_at INTEGER NOT NULL DEFAULT (unixepoch()),
content_hash TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_embeddings_namespace ON embeddings(namespace);
CREATE INDEX IF NOT EXISTS idx_embeddings_hash ON embeddings(content_hash);
`);
}
/** Store embedding vectors for a set of chunks. */
upsertChunks(chunks: Array<{ namespace: string; text: string; startLine: number; endLine: number; embedding: number[]; contentHash: string }>): void { ... }
/** Delete all embeddings for a namespace (for re-indexing). */
deleteNamespace(namespace: string): void { ... }
/** Check if a namespace+hash combination already exists (skip re-embedding). */
hasContentHash(namespace: string, hash: string): boolean { ... }
/**
* Search for similar vectors using cosine similarity.
* Computed in JS — fine for <10k vectors.
*/
search(queryEmbedding: number[], topK: number): VectorSearchResult[] {
const allRows = this.db.prepare('SELECT * FROM embeddings').all();
// Compute cosine similarity for each, sort, take topK
...
}
close(): void {
this.db.close();
}
}
The embedding is stored as a BLOB (Float32Array buffer). Cosine similarity:
function cosineSimilarity(a: number[], b: number[]): number {
let dotProduct = 0;
let normA = 0;
let normB = 0;
for (let i = 0; i < a.length; i++) {
dotProduct += a[i] * b[i];
normA += a[i] * a[i];
normB += b[i] * b[i];
}
return dotProduct / (Math.sqrt(normA) * Math.sqrt(normB));
}
HybridSearch
// src/memory/hybrid-search.ts
import type { SearchResult } from './store.js';
import type { VectorSearchResult, VectorStore } from './vector-store.js';
import type { EmbeddingProvider } from './embeddings.js';
import type { MemoryStore } from './store.js';
export interface HybridSearchResult {
namespace: string;
content: string;
context: string;
line: number;
score: number;
source: 'keyword' | 'vector' | 'both';
}
export class HybridSearch {
constructor(
private readonly memoryStore: MemoryStore,
private readonly vectorStore: VectorStore,
private readonly embeddingProvider: EmbeddingProvider,
private readonly hybridWeight: number = 0.7,
) {}
/**
* Search using both keyword and vector similarity, merge and rank results.
*
* Scoring: final_score = (hybrid_weight * vector_score) + ((1 - hybrid_weight) * keyword_score)
* Keyword score: 1.0 for exact match (normalized by result count)
* Vector score: cosine similarity (0.0 - 1.0)
*/
async search(query: string, topK: number = 5): Promise<HybridSearchResult[]> {
// Run both searches in parallel
const [keywordResults, vectorResults] = await Promise.all([
Promise.resolve(this.memoryStore.search(query)),
this.vectorSearch(query, topK * 2), // fetch more for merge
]);
return this.mergeResults(keywordResults, vectorResults, topK);
}
private async vectorSearch(query: string, topK: number): Promise<VectorSearchResult[]> {
const [queryEmbedding] = await this.embeddingProvider.embed([query]);
return this.vectorStore.search(queryEmbedding, topK);
}
private mergeResults(
keyword: SearchResult[],
vector: VectorSearchResult[],
topK: number,
): HybridSearchResult[] {
// Deduplicate by namespace+line, compute hybrid score, sort, take topK
...
}
}
Indexing Pipeline
When does indexing happen? Two approaches:
- On write: When
MemoryStore.write()is called, also chunk + embed + store. This keeps the index always up to date but adds latency to writes. - Background indexer: A separate timer that scans for unindexed content (via content hash comparison) and indexes it periodically.
Recommendation: Background indexer with write-triggered hint. On MemoryStore.write(), mark the namespace as "dirty". The background indexer (runs every 30s or so) processes only dirty namespaces. This keeps writes fast while ensuring the index stays reasonably fresh.
// Addition to MemoryStore:
private dirtyNamespaces: Set<string> = new Set();
write(namespace: string, content: string, mode: 'append' | 'replace'): void {
// ... existing write logic ...
this.dirtyNamespaces.add(namespace);
}
getDirtyNamespaces(): string[] {
const ns = Array.from(this.dirtyNamespaces);
this.dirtyNamespaces.clear();
return ns;
}
Indexer runs in daemon:
// In daemon startup, after memory store init:
if (memoryStore && config.memory.embedding.enabled) {
const embeddingProvider = createEmbeddingProvider(config.memory.embedding);
const vectorStore = new VectorStore(resolve(dataDir, 'vectors.db'));
const hybridSearch = new HybridSearch(memoryStore, vectorStore, embeddingProvider, config.memory.embedding.hybrid_weight);
// Background indexer
const indexInterval = setInterval(async () => {
const dirty = memoryStore.getDirtyNamespaces();
for (const ns of dirty) {
const content = memoryStore.read(ns);
const contentHash = createHash('md5').update(content).digest('hex');
if (vectorStore.hasContentHash(ns, contentHash)) continue;
const chunks = chunkText(content, ns, {
chunkSize: config.memory.embedding.chunk_size,
chunkOverlap: config.memory.embedding.chunk_overlap,
});
const embeddings = await embeddingProvider.embed(chunks.map(c => c.text));
vectorStore.upsertChunks(chunks.map((c, i) => ({
namespace: c.namespace,
text: c.text,
startLine: c.startLine,
endLine: c.endLine,
embedding: embeddings[i],
contentHash,
})));
}
}, 30_000);
lifecycle.onShutdown(async () => {
clearInterval(indexInterval);
vectorStore.close();
});
}
Integration with memory.search Tool
Modify src/tools/builtin/memory-search.ts to accept an optional HybridSearch instance:
export function createMemorySearchTool(store: MemoryStore, hybridSearch?: HybridSearch): Tool {
return {
name: 'memory.search',
description: 'Search across all memory files for a keyword or phrase. Uses semantic search when available, with keyword fallback.',
inputSchema: {
type: 'object',
properties: {
query: {
type: 'string',
description: 'The search query (supports semantic meaning when embeddings are enabled)',
},
},
required: ['query'],
},
execute: async (rawArgs: unknown): Promise<ToolResult> => {
const args = rawArgs as { query: string };
try {
if (hybridSearch) {
const results = await hybridSearch.search(args.query);
// Format hybrid results...
} else {
// Existing keyword-only path (unchanged)
const results = store.search(args.query);
// ...existing formatting...
}
} catch (error) {
// On vector search failure, fall back to keyword
const results = store.search(args.query);
// ...format with note about fallback...
}
},
};
}
Also update createMemoryTools() in src/tools/builtin/index.ts to accept the optional HybridSearch:
export function createMemoryTools(store: MemoryStore, hybridSearch?: HybridSearch): Tool[] {
return [
createMemoryReadTool(store),
createMemoryWriteTool(store),
createMemorySearchTool(store, hybridSearch),
];
}
Graceful Fallback Chain
- If
embedding.enabled: false(default) — pure keyword search, no embedding deps loaded - If
embedding.enabled: truebut provider unreachable — log warning, fall back to keyword search - If embedding succeeds but vector DB corrupted — catch error, fall back to keyword search
- If embedding succeeds — hybrid search with configurable weight
Test Plan
src/memory/chunker.test.ts:
- Splits text into chunks of configured size
- Respects paragraph boundaries
- Generates correct overlap between chunks
- Handles empty content
- Preserves line number tracking
- Single paragraph smaller than chunk size returns one chunk
src/memory/embeddings.test.ts:
createEmbeddingProvider()creates correct provider for each type- OpenAI provider calls correct API endpoint (mocked)
- Ollama provider calls correct endpoint (mocked)
- Provider returns correct dimensionality
- Handles API errors gracefully
src/memory/vector-store.test.ts:
- Creates SQLite database with correct schema
upsertChunks()stores and retrieves embeddingssearch()returns results sorted by similaritydeleteNamespace()removes all embeddings for a namespacehasContentHash()detects existing contentclose()cleans up database connection- Cosine similarity computation is correct
src/memory/hybrid-search.test.ts:
- Returns keyword results when vector search unavailable
- Merges keyword and vector results with correct weighting
- Deduplicates results from both sources
- Respects
topKlimit - Handles empty results from either source
sourcefield correctly indicates 'keyword', 'vector', or 'both'
Config Example
memory:
enabled: true
auto_extract: true
max_context_tokens: 2000
embedding:
enabled: true
provider: openai
model: text-embedding-3-small
api_key: "${OPENAI_API_KEY}"
chunk_size: 512
chunk_overlap: 50
top_k: 5
hybrid_weight: 0.7
Implementation Order
Recommended sequence (dependencies and risk):
| Order | Feature | Rationale |
|---|---|---|
| 1 | Dockerfile | No code changes, unblocks deployment testing for all other features |
| 2 | Inbound Webhooks | Clean pattern (mirrors cron), moderate scope, high utility |
| 3 | Heartbeat | Depends on running services for meaningful checks, benefits from Docker for testing |
| 4 | Vector Memory Search | Most complex, most files, most risk — do last when other features are stable |
Estimated Effort
| Feature | New Files | Modified Files | Estimated Lines | Complexity |
|---|---|---|---|---|
| Inbound Webhooks | 2 | 4 | ~300 | Medium |
| Dockerfile | 3 | 1-2 | ~100 | Simple |
| Heartbeat | 2 | 3 | ~350 | Medium |
| Vector Memory Search | 8 | 5 | ~800 | Complex |
| Total | 15 | 12-13 | ~1550 |
Cross-Cutting Concerns
Config Schema Validation
All new schema additions follow the existing pattern:
- Use
.default({})for optional sections so the config is valid without them - Export
z.infer<>type aliases - Existing tests in
src/config/schema.test.tsshould still pass (no breaking changes)
Daemon Startup Order
New services slot into the existing startup sequence:
- Memory store (existing)
- Vector store + embedding provider (new, after memory store)
- Session store (existing)
- Tool registry (existing)
- Memory tools with hybrid search (modified)
- Gateway server (existing)
- Channel registry (existing)
- Webhook handler (new, registered with channel registry)
- Cron scheduler (existing)
- Start all channels (existing)
- Start gateway (existing)
- Heartbeat monitor (new, after all services started)
Shutdown Order (LIFO)
Added to lifecycle in reverse:
- Heartbeat monitor stop
- Gateway stop (existing)
- Channel adapters stop (existing, includes webhook handler)
- Vector store close
- Process manager stop (existing)
- Session store close (existing)