feat(gateway): add observability sources, series, and service log RPCs

This commit is contained in:
William Valentin
2026-02-22 20:54:37 -08:00
parent cbc880c12a
commit ca463d5ca2
5 changed files with 1403 additions and 0 deletions
+120
View File
@@ -12,6 +12,7 @@ import { createConfigHandlers, redactConfig } from './config.js';
import { createPairingHandlers } from './pairing.js';
import type { LocalBackendStatus, LocalBackendControlResult } from './localBackends.js';
import type { DockerDependencyStatus, DockerDependencyControlResult } from './dockerDependencies.js';
import type { ObservabilitySource, ObservabilitySeriesSnapshot, ServiceLogSnapshot } from './observability.js';
import { PairingManager } from '../../channels/pairing.js';
import { LaneQueue } from '../lane-queue.js';
import { CanvasStore } from '../canvas-store.js';
@@ -375,6 +376,125 @@ describe('system handlers', () => {
expect(getPath(result.result, 'action')).toBe('restart');
});
it('system.observabilitySources returns empty list when callback is not provided', async () => {
const req: GatewayRequest = { id: 48, method: 'system.observabilitySources' };
const result = await handlers['system.observabilitySources'](req) as GatewayResponse;
expect(getPath(result.result, 'sources')).toEqual([]);
});
it('system.observabilitySources returns source list from callback', async () => {
const getObservabilitySources = vi.fn(async (): Promise<ObservabilitySource[]> => ([
{
id: 'systemd:flynn',
name: 'Flynn daemon',
kind: 'systemd_system',
runtime: 'systemd_system',
status: 'running',
graphCapable: true,
logCapable: true,
},
]));
const handlers = createSystemHandlers({
...deps,
getObservabilitySources,
});
const req: GatewayRequest = { id: 49, method: 'system.observabilitySources' };
const result = await handlers['system.observabilitySources'](req) as GatewayResponse;
expect(getObservabilitySources).toHaveBeenCalledTimes(1);
expect(getPath(result.result, 'sources', '0', 'id')).toBe('systemd:flynn');
});
it('system.observabilitySeries validates sourceIds parameter', async () => {
const handlers = createSystemHandlers({
...deps,
getObservabilitySeries: vi.fn(),
});
const result = await handlers['system.observabilitySeries']({
id: 50,
method: 'system.observabilitySeries',
params: { sourceIds: 'not-an-array' as unknown as string[] },
}) as GatewayError;
expect(result.error.code).toBe(ErrorCode.InvalidRequest);
});
it('system.observabilitySeries forwards query to callback', async () => {
const snapshot: ObservabilitySeriesSnapshot = {
generatedAt: 123,
windowMinutes: 60,
bucketSeconds: 30,
series: [
{
sourceId: 'systemd:flynn',
points: [{ ts: 100, stateCode: 3, healthCode: 2, errorCount: 0, restartCount: 1 }],
},
],
};
const getObservabilitySeries = vi.fn(async () => snapshot);
const handlers = createSystemHandlers({
...deps,
getObservabilitySeries,
});
const req: GatewayRequest = {
id: 51,
method: 'system.observabilitySeries',
params: { windowMinutes: 120, bucketSeconds: 60, sourceIds: ['systemd:flynn'] },
};
const result = await handlers['system.observabilitySeries'](req) as GatewayResponse;
expect(getObservabilitySeries).toHaveBeenCalledWith({
windowMinutes: 120,
bucketSeconds: 60,
sourceIds: ['systemd:flynn'],
});
expect(getPath(result.result, 'series', '0', 'points', '0', 'restartCount')).toBe(1);
});
it('system.serviceLogs validates required sourceId', async () => {
const handlers = createSystemHandlers({
...deps,
getServiceLogs: vi.fn(),
});
const result = await handlers['system.serviceLogs']({
id: 52,
method: 'system.serviceLogs',
params: { lines: 100 },
}) as GatewayError;
expect(result.error.code).toBe(ErrorCode.InvalidRequest);
});
it('system.serviceLogs forwards request to callback', async () => {
const snapshot: ServiceLogSnapshot = {
sourceId: 'docker:whisper',
fetchedAt: 123,
redacted: false,
truncated: false,
lines: [{ ts: 100, level: 'warn', text: 'queue depth high' }],
};
const getServiceLogs = vi.fn(async (): Promise<ServiceLogSnapshot> => snapshot);
const handlers = createSystemHandlers({
...deps,
getServiceLogs,
});
const req: GatewayRequest = {
id: 53,
method: 'system.serviceLogs',
params: { sourceId: 'docker:whisper', lines: 50, sinceSeconds: 600 },
};
const result = await handlers['system.serviceLogs'](req) as GatewayResponse;
expect(getServiceLogs).toHaveBeenCalledWith({
sourceId: 'docker:whisper',
lines: 50,
sinceSeconds: 600,
});
expect(getPath(result.result, 'lines', '0', 'text')).toBe('queue depth high');
});
it('system.presence returns empty result when getPresence is not provided', async () => {
const req: GatewayRequest = { id: 4, method: 'system.presence' };
const result = await handlers['system.presence'](req) as GatewayResponse;
+256
View File
@@ -0,0 +1,256 @@
import { describe, it, expect } from 'vitest';
import type { Config } from '../../config/index.js';
import { ObservabilityCollector } from './observability.js';
function createConfig(): Config {
return {
models: {
default: { provider: 'ollama', model: 'llama3.2' },
fast: { provider: 'openai', model: 'gpt-4o-mini' },
complex: { provider: 'anthropic', model: 'claude-3-7-sonnet' },
},
memory: {
embedding: {
provider: 'openai',
model: 'text-embedding-3-small',
},
},
audio: {
enabled: true,
provider: {
type: 'custom',
endpoint: 'http://localhost:18801/v1/audio/transcriptions',
},
},
web_search: {
provider: 'brave',
api_key: 'test-brave-key',
endpoint: undefined,
max_results: 5,
},
} as unknown as Config;
}
describe('ObservabilityCollector', () => {
it('collects sources and tracks restart counters in sampled series', async () => {
let now = 1_700_000_000_000;
let flynnPid = 100;
const runner = async (command: string, args: string[]) => {
const key = `${command} ${args.join(' ')}`;
if (key === 'systemctl show flynn.service --property=LoadState,ActiveState,SubState,Description,ExecMainPID,Result --no-pager') {
return {
stdout: `LoadState=loaded\nActiveState=active\nSubState=running\nDescription=Flynn daemon\nExecMainPID=${flynnPid}\nResult=success\n`,
stderr: '',
};
}
if (key === 'systemctl --user show ollama.service --property=LoadState,ActiveState,SubState,UnitFileState,Description,ExecMainPID,Result --no-pager') {
return {
stdout: 'LoadState=loaded\nActiveState=active\nSubState=running\nUnitFileState=enabled\nDescription=Ollama\nExecMainPID=211\nResult=success\n',
stderr: '',
};
}
if (key === 'systemctl --user show llama-server.service --property=LoadState,ActiveState,SubState,UnitFileState,Description,ExecMainPID,Result --no-pager') {
return {
stdout: 'LoadState=loaded\nActiveState=inactive\nSubState=dead\nUnitFileState=enabled\nDescription=llama.cpp\nExecMainPID=0\nResult=success\n',
stderr: '',
};
}
if (key === 'docker compose -f docker-compose.yml config --profiles') {
return { stdout: 'voice\n', stderr: '' };
}
if (key === 'docker compose -f docker-compose.yml --profile voice config --services') {
return { stdout: 'flynn\nwhisper-server\n', stderr: '' };
}
if (key === 'docker compose -f docker-compose.yml --profile voice ps --all --format json') {
return { stdout: '[]', stderr: '' };
}
if (key === 'docker compose -f docker-compose.yml --profile voice ps whisper-server --format json') {
return {
stdout: JSON.stringify([
{
Name: 'flynn-whisper-server-1',
Service: 'whisper-server',
State: 'running',
Health: 'healthy',
Status: 'Up 2 minutes (healthy)',
},
]),
stderr: '',
};
}
throw new Error(`Unexpected command: ${key}`);
};
const collector = new ObservabilityCollector({
config: createConfig(),
runner,
now: () => now,
samplingIntervalMs: 300_000,
});
await collector.forceSample();
now += 30_000;
flynnPid = 101;
await collector.forceSample();
const sources = await collector.listSources();
expect(sources.map((entry) => entry.id)).toEqual(expect.arrayContaining([
'systemd:flynn',
'systemd-user:ollama',
'systemd-user:llamacpp',
'docker:whisper',
]));
const series = await collector.getSeries({ windowMinutes: 60, bucketSeconds: 30 });
const flynnSeries = series.series.find((entry) => entry.sourceId === 'systemd:flynn');
expect(flynnSeries).toBeTruthy();
expect(flynnSeries?.points.length).toBeGreaterThanOrEqual(2);
expect(flynnSeries?.points[flynnSeries.points.length - 1]?.restartCount).toBe(1);
});
it('returns redacted journal logs for systemd sources', async () => {
let now = 1_700_000_000_000;
const runner = async (command: string, args: string[]) => {
const key = `${command} ${args.join(' ')}`;
if (key === 'systemctl show flynn.service --property=LoadState,ActiveState,SubState,Description,ExecMainPID,Result --no-pager') {
return {
stdout: 'LoadState=loaded\nActiveState=active\nSubState=running\nDescription=Flynn daemon\nExecMainPID=777\nResult=success\n',
stderr: '',
};
}
if (key === 'systemctl --user show ollama.service --property=LoadState,ActiveState,SubState,UnitFileState,Description,ExecMainPID,Result --no-pager') {
return { stdout: 'LoadState=not-found\nActiveState=inactive\nSubState=dead\nDescription=Ollama\nExecMainPID=0\nResult=not-found\n', stderr: '' };
}
if (key === 'systemctl --user show llama-server.service --property=LoadState,ActiveState,SubState,UnitFileState,Description,ExecMainPID,Result --no-pager') {
return { stdout: 'LoadState=not-found\nActiveState=inactive\nSubState=dead\nDescription=llama.cpp\nExecMainPID=0\nResult=not-found\n', stderr: '' };
}
if (key === 'docker compose -f docker-compose.yml config --profiles') {
return { stdout: '', stderr: '' };
}
if (key === 'docker compose -f docker-compose.yml config --services') {
return { stdout: 'flynn\n', stderr: '' };
}
if (key === 'docker compose -f docker-compose.yml ps --all --format json') {
return { stdout: '[]', stderr: '' };
}
if (key === 'journalctl -u flynn.service --since 600 seconds ago --no-pager --output short-iso-precise -n 200') {
return {
stdout: '2026-02-23 10:10:10.000000+0000 host flynn[777]: Authorization: Bearer super-secret-token-value\n',
stderr: '',
};
}
throw new Error(`Unexpected command: ${key}`);
};
const collector = new ObservabilityCollector({
config: createConfig(),
runner,
now: () => now,
});
await collector.forceSample();
const logs = await collector.getServiceLogs({ sourceId: 'systemd:flynn', sinceSeconds: 600 });
expect(logs.sourceId).toBe('systemd:flynn');
expect(logs.redacted).toBe(true);
expect(logs.lines).toHaveLength(1);
expect(logs.lines[0]?.text).toContain('[REDACTED_BEARER]');
expect(logs.lines[0]?.level).toBe('info');
});
it('parses docker logs with levels and timestamps', async () => {
let now = 1_700_000_000_000;
const runner = async (command: string, args: string[]) => {
const key = `${command} ${args.join(' ')}`;
if (key === 'systemctl show flynn.service --property=LoadState,ActiveState,SubState,Description,ExecMainPID,Result --no-pager') {
return {
stdout: 'LoadState=loaded\nActiveState=active\nSubState=running\nDescription=Flynn daemon\nExecMainPID=777\nResult=success\n',
stderr: '',
};
}
if (key === 'systemctl --user show ollama.service --property=LoadState,ActiveState,SubState,UnitFileState,Description,ExecMainPID,Result --no-pager') {
return { stdout: 'LoadState=not-found\nActiveState=inactive\nSubState=dead\nDescription=Ollama\nExecMainPID=0\nResult=not-found\n', stderr: '' };
}
if (key === 'systemctl --user show llama-server.service --property=LoadState,ActiveState,SubState,UnitFileState,Description,ExecMainPID,Result --no-pager') {
return { stdout: 'LoadState=not-found\nActiveState=inactive\nSubState=dead\nDescription=llama.cpp\nExecMainPID=0\nResult=not-found\n', stderr: '' };
}
if (key === 'docker compose -f docker-compose.yml config --profiles') {
return { stdout: 'voice\n', stderr: '' };
}
if (key === 'docker compose -f docker-compose.yml --profile voice config --services') {
return { stdout: 'flynn\nwhisper-server\n', stderr: '' };
}
if (key === 'docker compose -f docker-compose.yml --profile voice ps --all --format json') {
return { stdout: '[]', stderr: '' };
}
if (key === 'docker compose -f docker-compose.yml --profile voice ps whisper-server --format json') {
return {
stdout: JSON.stringify([
{
Name: 'flynn-whisper-server-1',
Service: 'whisper-server',
State: 'running',
Health: 'healthy',
Status: 'Up 2 minutes (healthy)',
},
]),
stderr: '',
};
}
if (key === 'docker compose -f docker-compose.yml config --profiles') {
return { stdout: 'voice\n', stderr: '' };
}
if (key === 'docker compose -f docker-compose.yml --profile voice logs --timestamps --tail 120 --since 300s whisper-server') {
return {
stdout: 'whisper-server | 2026-02-23T10:10:10Z warning: queue depth high\n',
stderr: '',
};
}
throw new Error(`Unexpected command: ${key}`);
};
const collector = new ObservabilityCollector({
config: createConfig(),
runner,
now: () => now,
});
await collector.forceSample();
const logs = await collector.getServiceLogs({ sourceId: 'docker:whisper', lines: 120, sinceSeconds: 300 });
expect(logs.sourceId).toBe('docker:whisper');
expect(logs.lines).toHaveLength(1);
expect(logs.lines[0]?.level).toBe('warn');
expect(typeof logs.lines[0]?.ts).toBe('number');
expect(logs.lines[0]?.text).toContain('queue depth high');
});
});
+932
View File
@@ -0,0 +1,932 @@
import { execFile as execFileCb } from 'node:child_process';
import { promisify } from 'node:util';
import { redactForAudit } from '../../audit/redact.js';
import type { Config } from '../../config/index.js';
import { listDockerDependencyStatuses } from './dockerDependencies.js';
import { listLocalBackendStatuses } from './localBackends.js';
const execFile = promisify(execFileCb);
const COMPOSE_FILE = 'docker-compose.yml';
const DEFAULT_FLYNN_UNIT = 'flynn.service';
const DEFAULT_WINDOW_MINUTES = 60;
const MAX_WINDOW_MINUTES = 240;
const DEFAULT_BUCKET_SECONDS = 30;
const ALLOWED_BUCKET_SECONDS = [15, 30, 60] as const;
const DEFAULT_LOG_LINES = 200;
const MAX_LOG_LINES = 1000;
const DEFAULT_LOG_SINCE_SECONDS = 900;
const MAX_LOG_SINCE_SECONDS = 86_400;
const DEFAULT_SAMPLE_INTERVAL_MS = 30_000;
const DEFAULT_MAX_SAMPLES = 720;
const DEFAULT_TIMEOUT_MS = 10_000;
const LARGE_TIMEOUT_MS = 15_000;
const LARGE_BUFFER_BYTES = 4 * 1024 * 1024;
const STATE_UNAVAILABLE = 0;
const STATE_STOPPED = 1;
const STATE_DEGRADED = 2;
const STATE_RUNNING = 3;
const HEALTH_UNKNOWN = 0;
const HEALTH_DEGRADED = 1;
const HEALTH_HEALTHY = 2;
export type ObservabilitySourceKind = 'docker_dependency' | 'systemd_user' | 'systemd_system';
export type ObservabilityRuntime = 'docker_compose' | 'systemd_user' | 'systemd_system';
export type ObservabilitySourceStatus = 'running' | 'degraded' | 'stopped' | 'unavailable' | 'unknown';
export interface ObservabilitySource {
id: string;
name: string;
kind: ObservabilitySourceKind;
runtime: ObservabilityRuntime;
status: ObservabilitySourceStatus;
graphCapable: boolean;
logCapable: boolean;
metadata?: {
unit?: string;
service?: string;
state?: string;
health?: string;
statusText?: string;
containerName?: string | null;
};
}
export interface ObservabilitySeriesPoint {
ts: number;
stateCode: number;
healthCode: number;
errorCount: number;
restartCount: number;
}
export interface ObservabilitySeriesEntry {
sourceId: string;
points: ObservabilitySeriesPoint[];
}
export interface ObservabilitySeriesSnapshot {
generatedAt: number;
windowMinutes: number;
bucketSeconds: number;
series: ObservabilitySeriesEntry[];
}
export interface ServiceLogEntry {
ts?: number;
level?: 'info' | 'warn' | 'error';
text: string;
}
export interface ServiceLogSnapshot {
sourceId: string;
fetchedAt: number;
redacted: boolean;
lines: ServiceLogEntry[];
truncated: boolean;
}
export interface ServiceLogQuery {
sourceId: string;
lines?: number;
sinceSeconds?: number;
}
export interface ObservabilitySeriesQuery {
windowMinutes?: number;
bucketSeconds?: number;
sourceIds?: string[];
}
type ExecResult = { stdout: string; stderr: string };
type CommandRunner = (
command: string,
args: string[],
opts?: { timeoutMs?: number; maxBufferBytes?: number },
) => Promise<ExecResult>;
interface SourceSnapshot {
source: ObservabilitySource;
stateCode: number;
healthCode: number;
hasError: boolean;
fingerprint: string | null;
}
interface SourceCounter {
errorCount: number;
restartCount: number;
lastStateCode: number;
lastFingerprint: string | null;
hasPrevious: boolean;
}
interface SampleRecord {
ts: number;
stateCode: number;
healthCode: number;
errorCount: number;
restartCount: number;
}
function defaultRunner(
command: string,
args: string[],
opts?: { timeoutMs?: number; maxBufferBytes?: number },
): Promise<ExecResult> {
return execFile(command, args, {
timeout: opts?.timeoutMs ?? DEFAULT_TIMEOUT_MS,
maxBuffer: opts?.maxBufferBytes ?? LARGE_BUFFER_BYTES,
}) as Promise<ExecResult>;
}
function normalizeError(error: unknown): string {
if (error && typeof error === 'object') {
const maybe = error as { stderr?: string; stdout?: string; message?: string };
const stderr = maybe.stderr?.trim();
if (stderr) {return stderr;}
const stdout = maybe.stdout?.trim();
if (stdout) {return stdout;}
if (typeof maybe.message === 'string' && maybe.message.trim().length > 0) {
return maybe.message.trim();
}
}
if (error instanceof Error && error.message.trim().length > 0) {
return error.message.trim();
}
return String(error);
}
function parseKeyValueOutput(output: string): Record<string, string> {
const result: Record<string, string> = {};
for (const line of output.split('\n')) {
if (!line.trim()) {continue;}
const idx = line.indexOf('=');
if (idx <= 0) {continue;}
const key = line.slice(0, idx).trim();
const value = line.slice(idx + 1).trim();
result[key] = value;
}
return result;
}
function parseInteger(input: string | undefined): number | null {
if (!input) {return null;}
const parsed = Number(input);
if (!Number.isFinite(parsed)) {return null;}
if (parsed <= 0) {return null;}
return Math.floor(parsed);
}
function mapSystemdStatus(activeState: string, error?: string): {
status: ObservabilitySourceStatus;
stateCode: number;
healthCode: number;
} {
if (error) {
return { status: 'unavailable', stateCode: STATE_UNAVAILABLE, healthCode: HEALTH_UNKNOWN };
}
const state = activeState.trim().toLowerCase();
if (state === 'active') {
return { status: 'running', stateCode: STATE_RUNNING, healthCode: HEALTH_HEALTHY };
}
if (state === 'inactive' || state === 'dead') {
return { status: 'stopped', stateCode: STATE_STOPPED, healthCode: HEALTH_UNKNOWN };
}
if (state === 'failed') {
return { status: 'degraded', stateCode: STATE_DEGRADED, healthCode: HEALTH_DEGRADED };
}
if (state === 'activating' || state === 'deactivating' || state === 'reloading') {
return { status: 'degraded', stateCode: STATE_DEGRADED, healthCode: HEALTH_DEGRADED };
}
if (state === 'not-found') {
return { status: 'unavailable', stateCode: STATE_UNAVAILABLE, healthCode: HEALTH_UNKNOWN };
}
return { status: 'unknown', stateCode: STATE_DEGRADED, healthCode: HEALTH_UNKNOWN };
}
function mapDockerStatus(stateRaw: string, healthRaw: string, error?: string): {
status: ObservabilitySourceStatus;
stateCode: number;
healthCode: number;
} {
if (error) {
return { status: 'unavailable', stateCode: STATE_UNAVAILABLE, healthCode: HEALTH_UNKNOWN };
}
const state = stateRaw.trim().toLowerCase();
const health = healthRaw.trim().toLowerCase();
if (state === 'running') {
if (health === 'healthy' || health === 'none' || health === 'unknown') {
return {
status: 'running',
stateCode: STATE_RUNNING,
healthCode: health === 'healthy' ? HEALTH_HEALTHY : HEALTH_UNKNOWN,
};
}
return { status: 'degraded', stateCode: STATE_DEGRADED, healthCode: HEALTH_DEGRADED };
}
if (state === 'stopped' || state === 'not-created' || state === 'created') {
return { status: 'stopped', stateCode: STATE_STOPPED, healthCode: HEALTH_UNKNOWN };
}
if (state === 'unavailable' || state === 'not-found') {
return { status: 'unavailable', stateCode: STATE_UNAVAILABLE, healthCode: HEALTH_UNKNOWN };
}
if (state === 'restarting' || state === 'paused') {
return { status: 'degraded', stateCode: STATE_DEGRADED, healthCode: HEALTH_DEGRADED };
}
return { status: 'unknown', stateCode: STATE_DEGRADED, healthCode: HEALTH_UNKNOWN };
}
function classifyLogLevel(text: string): 'info' | 'warn' | 'error' {
const lower = text.toLowerCase();
if (lower.includes('error') || lower.includes('failed') || lower.includes('fatal') || lower.includes('panic')) {
return 'error';
}
if (lower.includes('warn') || lower.includes('warning') || lower.includes('degraded')) {
return 'warn';
}
return 'info';
}
function normalizeOffset(ts: string): string {
return ts.replace(/([+-]\d{2})(\d{2})$/, '$1:$2');
}
function parseTimestamp(raw: string | undefined): number | undefined {
if (!raw) {return undefined;}
const candidates = [
raw,
normalizeOffset(raw),
raw.includes(' ') ? raw.replace(' ', 'T') : raw,
normalizeOffset(raw.includes(' ') ? raw.replace(' ', 'T') : raw),
];
for (const candidate of candidates) {
const parsed = Date.parse(candidate);
if (!Number.isNaN(parsed)) {
return parsed;
}
}
return undefined;
}
function sanitizeWindowMinutes(value: number | undefined): number {
const parsed = Math.floor(Number(value ?? DEFAULT_WINDOW_MINUTES));
if (!Number.isFinite(parsed) || parsed <= 0) {
return DEFAULT_WINDOW_MINUTES;
}
return Math.min(MAX_WINDOW_MINUTES, parsed);
}
function sanitizeBucketSeconds(value: number | undefined): number {
const parsed = Math.floor(Number(value ?? DEFAULT_BUCKET_SECONDS));
if ((ALLOWED_BUCKET_SECONDS as readonly number[]).includes(parsed)) {
return parsed;
}
return DEFAULT_BUCKET_SECONDS;
}
function sanitizeLogLines(value: number | undefined): number {
const parsed = Math.floor(Number(value ?? DEFAULT_LOG_LINES));
if (!Number.isFinite(parsed) || parsed <= 0) {
return DEFAULT_LOG_LINES;
}
return Math.min(MAX_LOG_LINES, parsed);
}
function sanitizeSinceSeconds(value: number | undefined): number {
const parsed = Math.floor(Number(value ?? DEFAULT_LOG_SINCE_SECONDS));
if (!Number.isFinite(parsed) || parsed <= 0) {
return DEFAULT_LOG_SINCE_SECONDS;
}
return Math.min(MAX_LOG_SINCE_SECONDS, parsed);
}
function normalizeSourceId(value: string): string {
return value.trim();
}
function redactLogText(text: string): { text: string; redacted: boolean } {
const result = redactForAudit(text);
return {
text: typeof result.value === 'string' ? result.value : String(result.value),
redacted: result.redactions > 0,
};
}
function splitDockerLogLine(line: string): { ts?: number; level: 'info' | 'warn' | 'error'; text: string; redacted: boolean } {
let content = line.trim();
const prefixed = content.match(/^([A-Za-z0-9_.-]+)\s+\|\s+(.*)$/);
if (prefixed) {
content = prefixed[2]?.trim() ?? content;
}
const timestamped = content.match(/^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d(?:\.\d+)?(?:Z|[+-]\d\d:?\d\d))\s+(.*)$/);
let ts: number | undefined;
if (timestamped) {
ts = parseTimestamp(timestamped[1]);
content = timestamped[2] ?? content;
}
const redacted = redactLogText(content);
return {
ts,
level: classifyLogLevel(redacted.text),
text: redacted.text,
redacted: redacted.redacted,
};
}
function splitJournalLine(line: string): { ts?: number; level: 'info' | 'warn' | 'error'; text: string; redacted: boolean } {
let content = line.trim();
let ts: number | undefined;
const timestamped = content.match(/^([0-9T:\-+. ]{19,})(\s+.*)$/);
if (timestamped) {
const parsedTs = parseTimestamp(timestamped[1]?.trim());
if (parsedTs !== undefined) {
ts = parsedTs;
content = timestamped[2]?.trim() ?? content;
}
}
const redacted = redactLogText(content);
return {
ts,
level: classifyLogLevel(redacted.text),
text: redacted.text,
redacted: redacted.redacted,
};
}
interface SystemdStatus {
unit: string;
name: string;
loadState: string;
activeState: string;
subState: string;
statusText: string;
pid: number | null;
result: string;
error?: string;
}
async function fetchSystemdUnitStatus(
runner: CommandRunner,
opts: { unit: string; name: string; user: boolean },
): Promise<SystemdStatus> {
const args = [
...(opts.user ? ['--user'] : []),
'show',
opts.unit,
'--property=LoadState,ActiveState,SubState,Description,ExecMainPID,Result',
'--no-pager',
];
try {
const response = await runner('systemctl', args, {
timeoutMs: DEFAULT_TIMEOUT_MS,
maxBufferBytes: 1024 * 1024,
});
const parsed = parseKeyValueOutput(response.stdout);
const loadState = parsed.LoadState || 'unknown';
const activeState = parsed.ActiveState || 'unknown';
const subState = parsed.SubState || 'unknown';
const description = parsed.Description || opts.name;
const pid = parseInteger(parsed.ExecMainPID);
const result = parsed.Result || 'unknown';
return {
unit: opts.unit,
name: description,
loadState,
activeState,
subState,
statusText: activeState === subState ? activeState : `${activeState}/${subState}`,
pid,
result,
};
} catch (error) {
const detail = normalizeError(error);
return {
unit: opts.unit,
name: opts.name,
loadState: 'unknown',
activeState: detail.toLowerCase().includes('not found') ? 'not-found' : 'unknown',
subState: 'unknown',
statusText: 'unavailable',
pid: null,
result: 'unknown',
error: detail,
};
}
}
async function discoverComposeProfileArgs(runner: CommandRunner): Promise<string[]> {
try {
const response = await runner('docker', ['compose', '-f', COMPOSE_FILE, 'config', '--profiles'], {
timeoutMs: DEFAULT_TIMEOUT_MS,
maxBufferBytes: LARGE_BUFFER_BYTES,
});
const profiles = response.stdout
.split('\n')
.map((line) => line.trim())
.filter((line) => line.length > 0);
const unique = new Set<string>();
for (const profile of profiles) {
unique.add(profile);
}
const args: string[] = [];
for (const profile of unique) {
args.push('--profile', profile);
}
return args;
} catch {
return [];
}
}
export interface ObservabilityCollectorOptions {
config: Config;
flynnSystemdUnit?: string;
samplingIntervalMs?: number;
maxSamplesPerSource?: number;
now?: () => number;
runner?: CommandRunner;
}
/**
* Collects bounded service status samples and log snapshots for the web dashboard.
*/
export class ObservabilityCollector {
private readonly config: Config;
private readonly flynnSystemdUnit: string;
private readonly samplingIntervalMs: number;
private readonly maxSamplesPerSource: number;
private readonly now: () => number;
private readonly runner: CommandRunner;
private sampleTimer: NodeJS.Timeout | null = null;
private inFlightSample: Promise<void> | null = null;
private readonly sourceHistory = new Map<string, SampleRecord[]>();
private readonly sourceMeta = new Map<string, ObservabilitySource>();
private readonly sourceCounters = new Map<string, SourceCounter>();
constructor(options: ObservabilityCollectorOptions) {
this.config = options.config;
this.flynnSystemdUnit = options.flynnSystemdUnit ?? DEFAULT_FLYNN_UNIT;
this.samplingIntervalMs = Math.max(5_000, Math.floor(options.samplingIntervalMs ?? DEFAULT_SAMPLE_INTERVAL_MS));
this.maxSamplesPerSource = Math.max(60, Math.floor(options.maxSamplesPerSource ?? DEFAULT_MAX_SAMPLES));
this.now = options.now ?? (() => Date.now());
this.runner = options.runner ?? defaultRunner;
}
start(): void {
if (this.sampleTimer) {
return;
}
void this.forceSample();
this.sampleTimer = setInterval(() => {
void this.forceSample();
}, this.samplingIntervalMs);
this.sampleTimer.unref?.();
}
stop(): void {
if (!this.sampleTimer) {
return;
}
clearInterval(this.sampleTimer);
this.sampleTimer = null;
}
async listSources(): Promise<ObservabilitySource[]> {
await this.ensureSampled();
return Array.from(this.sourceMeta.values())
.sort((a, b) => a.name.localeCompare(b.name));
}
async getSeries(query?: ObservabilitySeriesQuery): Promise<ObservabilitySeriesSnapshot> {
await this.ensureSampled();
const now = this.now();
const windowMinutes = sanitizeWindowMinutes(query?.windowMinutes);
const bucketSeconds = sanitizeBucketSeconds(query?.bucketSeconds);
const bucketMs = bucketSeconds * 1000;
const lowerBound = now - (windowMinutes * 60_000);
const sourceFilter = query?.sourceIds && query.sourceIds.length > 0
? new Set(query.sourceIds.map((id) => normalizeSourceId(id)).filter((id) => id.length > 0))
: null;
const series: ObservabilitySeriesEntry[] = [];
for (const [sourceId, history] of this.sourceHistory.entries()) {
if (sourceFilter && !sourceFilter.has(sourceId)) {
continue;
}
const bucketMap = new Map<number, SampleRecord>();
for (const sample of history) {
if (sample.ts < lowerBound) {
continue;
}
const bucketTs = Math.floor(sample.ts / bucketMs) * bucketMs;
bucketMap.set(bucketTs, sample);
}
const points = Array.from(bucketMap.entries())
.sort((a, b) => a[0] - b[0])
.map(([bucketTs, sample]) => ({
ts: bucketTs,
stateCode: sample.stateCode,
healthCode: sample.healthCode,
errorCount: sample.errorCount,
restartCount: sample.restartCount,
}));
series.push({ sourceId, points });
}
series.sort((a, b) => {
const left = this.sourceMeta.get(a.sourceId)?.name ?? a.sourceId;
const right = this.sourceMeta.get(b.sourceId)?.name ?? b.sourceId;
return left.localeCompare(right);
});
return {
generatedAt: now,
windowMinutes,
bucketSeconds,
series,
};
}
async getServiceLogs(query: ServiceLogQuery): Promise<ServiceLogSnapshot> {
await this.ensureSampled();
const sourceId = normalizeSourceId(query.sourceId);
if (!sourceId) {
throw new Error('sourceId is required');
}
const source = this.sourceMeta.get(sourceId);
if (!source || !source.logCapable) {
throw new Error(`Log source not found or unavailable: ${sourceId}`);
}
const lines = sanitizeLogLines(query.lines);
const sinceSeconds = sanitizeSinceSeconds(query.sinceSeconds);
if (source.runtime === 'docker_compose') {
return this.fetchDockerLogs(source, lines, sinceSeconds);
}
if (source.runtime === 'systemd_user' || source.runtime === 'systemd_system') {
return this.fetchJournalLogs(source, lines, sinceSeconds);
}
throw new Error(`Unsupported log runtime for source: ${sourceId}`);
}
async forceSample(): Promise<void> {
if (this.inFlightSample) {
await this.inFlightSample;
return;
}
this.inFlightSample = this.collectSample()
.catch(() => {
// Keep sampling resilient; errors are reflected as unavailable source snapshots.
})
.finally(() => {
this.inFlightSample = null;
});
await this.inFlightSample;
}
private async ensureSampled(): Promise<void> {
if (this.sourceMeta.size > 0) {
return;
}
await this.forceSample();
}
private async collectSample(): Promise<void> {
const sampleTime = this.now();
const [flynnResult, localBackendsResult, dockerDependenciesResult] = await Promise.allSettled([
fetchSystemdUnitStatus(this.runner, {
unit: this.flynnSystemdUnit,
name: 'Flynn daemon',
user: false,
}),
listLocalBackendStatuses(this.config, async (args: string[]) => {
return this.runner('systemctl', args, {
timeoutMs: DEFAULT_TIMEOUT_MS,
maxBufferBytes: 1024 * 1024,
});
}),
listDockerDependencyStatuses(this.config, async (args: string[]) => {
return this.runner('docker', ['compose', '-f', COMPOSE_FILE, ...args], {
timeoutMs: DEFAULT_TIMEOUT_MS,
maxBufferBytes: LARGE_BUFFER_BYTES,
});
}),
]);
const flynnStatus = flynnResult.status === 'fulfilled'
? flynnResult.value
: {
unit: this.flynnSystemdUnit,
name: 'Flynn daemon',
loadState: 'unknown',
activeState: 'unknown',
subState: 'unknown',
statusText: 'unavailable',
pid: null,
result: 'unknown',
error: normalizeError(flynnResult.reason),
};
const localBackends = localBackendsResult.status === 'fulfilled'
? localBackendsResult.value
: [];
const dockerDependencies = dockerDependenciesResult.status === 'fulfilled'
? dockerDependenciesResult.value
: [];
const snapshots: SourceSnapshot[] = [];
const flynnMapped = mapSystemdStatus(flynnStatus.activeState, flynnStatus.error);
snapshots.push({
source: {
id: 'systemd:flynn',
name: 'Flynn daemon',
kind: 'systemd_system',
runtime: 'systemd_system',
status: flynnMapped.status,
graphCapable: true,
logCapable: true,
metadata: {
unit: this.flynnSystemdUnit,
state: flynnStatus.activeState,
statusText: flynnStatus.statusText,
},
},
stateCode: flynnMapped.stateCode,
healthCode: flynnMapped.healthCode,
hasError: Boolean(flynnStatus.error),
fingerprint: flynnStatus.pid ? `pid:${flynnStatus.pid}` : null,
});
for (const backend of localBackends) {
const mapped = mapSystemdStatus(String(backend.activeState ?? ''), backend.error);
snapshots.push({
source: {
id: `systemd-user:${backend.id}`,
name: backend.name,
kind: 'systemd_user',
runtime: 'systemd_user',
status: mapped.status,
graphCapable: true,
logCapable: backend.loadState !== 'not-found',
metadata: {
unit: backend.unit,
state: backend.activeState,
statusText: backend.statusText,
},
},
stateCode: mapped.stateCode,
healthCode: mapped.healthCode,
hasError: Boolean(backend.error) || backend.activeState === 'failed',
fingerprint: backend.pid ? `pid:${backend.pid}` : null,
});
}
for (const dependency of dockerDependencies) {
const mapped = mapDockerStatus(dependency.state, dependency.health, dependency.error);
snapshots.push({
source: {
id: `docker:${dependency.id}`,
name: dependency.name,
kind: 'docker_dependency',
runtime: 'docker_compose',
status: mapped.status,
graphCapable: true,
logCapable: dependency.id !== 'compose',
metadata: {
service: dependency.service,
state: dependency.state,
health: dependency.health,
statusText: dependency.statusText,
containerName: dependency.containerName,
},
},
stateCode: mapped.stateCode,
healthCode: mapped.healthCode,
hasError: Boolean(dependency.error) || mapped.status === 'degraded' || mapped.status === 'unavailable',
fingerprint: dependency.containerName ? `container:${dependency.containerName}` : null,
});
}
const seenSourceIds = new Set<string>();
for (const snapshot of snapshots) {
seenSourceIds.add(snapshot.source.id);
this.recordSourceSample(snapshot, sampleTime);
}
// Keep stale entries visible as unavailable when they disappear.
for (const [sourceId, source] of this.sourceMeta.entries()) {
if (seenSourceIds.has(sourceId)) {
continue;
}
this.recordSourceSample({
source: {
...source,
status: 'unavailable',
},
stateCode: STATE_UNAVAILABLE,
healthCode: HEALTH_UNKNOWN,
hasError: true,
fingerprint: null,
}, sampleTime);
}
}
private recordSourceSample(snapshot: SourceSnapshot, timestamp: number): void {
this.sourceMeta.set(snapshot.source.id, snapshot.source);
const counter = this.sourceCounters.get(snapshot.source.id) ?? {
errorCount: 0,
restartCount: 0,
lastStateCode: snapshot.stateCode,
lastFingerprint: snapshot.fingerprint,
hasPrevious: false,
} satisfies SourceCounter;
if (snapshot.hasError) {
counter.errorCount += 1;
}
if (counter.hasPrevious) {
const enteredRunning = counter.lastStateCode !== STATE_RUNNING && snapshot.stateCode === STATE_RUNNING;
const fingerprintChanged = (
snapshot.stateCode === STATE_RUNNING
&& Boolean(snapshot.fingerprint)
&& Boolean(counter.lastFingerprint)
&& snapshot.fingerprint !== counter.lastFingerprint
);
if (enteredRunning || fingerprintChanged) {
counter.restartCount += 1;
}
}
counter.lastStateCode = snapshot.stateCode;
counter.lastFingerprint = snapshot.fingerprint;
counter.hasPrevious = true;
this.sourceCounters.set(snapshot.source.id, counter);
const records = this.sourceHistory.get(snapshot.source.id) ?? [];
records.push({
ts: timestamp,
stateCode: snapshot.stateCode,
healthCode: snapshot.healthCode,
errorCount: counter.errorCount,
restartCount: counter.restartCount,
});
if (records.length > this.maxSamplesPerSource) {
records.splice(0, records.length - this.maxSamplesPerSource);
}
this.sourceHistory.set(snapshot.source.id, records);
}
private async fetchDockerLogs(
source: ObservabilitySource,
lines: number,
sinceSeconds: number,
): Promise<ServiceLogSnapshot> {
const service = source.metadata?.service;
if (!service) {
throw new Error(`Missing docker compose service metadata for source: ${source.id}`);
}
const profileArgs = await discoverComposeProfileArgs(this.runner);
const response = await this.runner(
'docker',
[
'compose',
'-f',
COMPOSE_FILE,
...profileArgs,
'logs',
'--timestamps',
'--tail',
String(lines),
'--since',
`${sinceSeconds}s`,
service,
],
{ timeoutMs: LARGE_TIMEOUT_MS, maxBufferBytes: LARGE_BUFFER_BYTES },
);
const rawLines = response.stdout
.split('\n')
.map((line) => line.trimEnd())
.filter((line) => line.trim().length > 0);
let redacted = false;
const parsed: ServiceLogEntry[] = rawLines.map((line) => {
const item = splitDockerLogLine(line);
redacted = redacted || item.redacted;
return {
ts: item.ts,
level: item.level,
text: item.text,
};
});
return {
sourceId: source.id,
fetchedAt: this.now(),
redacted,
lines: parsed,
truncated: parsed.length >= lines,
};
}
private async fetchJournalLogs(
source: ObservabilitySource,
lines: number,
sinceSeconds: number,
): Promise<ServiceLogSnapshot> {
const unit = source.metadata?.unit;
if (!unit) {
throw new Error(`Missing systemd unit metadata for source: ${source.id}`);
}
const user = source.runtime === 'systemd_user';
const response = await this.runner(
'journalctl',
[
...(user ? ['--user'] : []),
'-u',
unit,
'--since',
`${sinceSeconds} seconds ago`,
'--no-pager',
'--output',
'short-iso-precise',
'-n',
String(lines),
],
{
timeoutMs: LARGE_TIMEOUT_MS,
maxBufferBytes: LARGE_BUFFER_BYTES,
},
);
const rawLines = response.stdout
.split('\n')
.map((line) => line.trimEnd())
.filter((line) => line.trim().length > 0);
let redacted = false;
const parsed: ServiceLogEntry[] = rawLines.map((line) => {
const item = splitJournalLine(line);
redacted = redacted || item.redacted;
return {
ts: item.ts,
level: item.level,
text: item.text,
};
});
return {
sourceId: source.id,
fetchedAt: this.now(),
redacted,
lines: parsed,
truncated: parsed.length >= lines,
};
}
}
export const observabilityDefaults = {
DEFAULT_WINDOW_MINUTES,
MAX_WINDOW_MINUTES,
DEFAULT_BUCKET_SECONDS,
ALLOWED_BUCKET_SECONDS: [...ALLOWED_BUCKET_SECONDS],
DEFAULT_LOG_LINES,
MAX_LOG_LINES,
DEFAULT_LOG_SINCE_SECONDS,
MAX_LOG_SINCE_SECONDS,
} as const;
+77
View File
@@ -10,6 +10,13 @@ import type {
DockerDependencyControlResult,
DockerDependencyStatus,
} from './dockerDependencies.js';
import type {
ObservabilitySource,
ObservabilitySeriesQuery,
ObservabilitySeriesSnapshot,
ServiceLogQuery,
ServiceLogSnapshot,
} from './observability.js';
/** Per-session token usage report returned by system.tokenUsage. */
export interface TokenUsageEntry {
@@ -117,6 +124,12 @@ export interface SystemHandlerDeps {
getDockerDependencies?: () => Promise<DockerDependencyStatus[]> | DockerDependencyStatus[];
/** Optional callback to control docker-compose dependencies. */
controlDockerDependency?: (dependency: string, action: string) => Promise<DockerDependencyControlResult>;
/** Optional callback to retrieve observability-capable service sources. */
getObservabilitySources?: () => Promise<ObservabilitySource[]> | ObservabilitySource[];
/** Optional callback to retrieve sampled observability series for services. */
getObservabilitySeries?: (opts?: ObservabilitySeriesQuery) => Promise<ObservabilitySeriesSnapshot> | ObservabilitySeriesSnapshot;
/** Optional callback to retrieve recent logs for a service source. */
getServiceLogs?: (opts: ServiceLogQuery) => Promise<ServiceLogSnapshot> | ServiceLogSnapshot;
}
function normalizeErrorMessage(error: unknown): string {
@@ -379,5 +392,69 @@ export function createSystemHandlers(deps: SystemHandlerDeps) {
return makeError(request.id, ErrorCode.InternalError, `Docker dependency control failed: ${normalizeErrorMessage(error)}`);
}
},
'system.observabilitySources': async (request: GatewayRequest): Promise<OutboundMessage> => {
if (!deps.getObservabilitySources) {
return makeResponse(request.id, { sources: [] });
}
try {
const sources = await deps.getObservabilitySources();
return makeResponse(request.id, { sources });
} catch (error) {
return makeError(request.id, ErrorCode.InternalError, `Failed to load observability sources: ${normalizeErrorMessage(error)}`);
}
},
'system.observabilitySeries': async (request: GatewayRequest): Promise<OutboundMessage> => {
if (!deps.getObservabilitySeries) {
return makeResponse(request.id, {
generatedAt: Date.now(),
windowMinutes: 60,
bucketSeconds: 30,
series: [],
});
}
const params = request.params as { windowMinutes?: number; bucketSeconds?: number; sourceIds?: unknown } | undefined;
if (
params?.sourceIds !== undefined
&& (!Array.isArray(params.sourceIds) || params.sourceIds.some((entry) => typeof entry !== 'string'))
) {
return makeError(request.id, ErrorCode.InvalidRequest, 'sourceIds must be an array of strings');
}
try {
const snapshot = await deps.getObservabilitySeries({
windowMinutes: params?.windowMinutes,
bucketSeconds: params?.bucketSeconds,
sourceIds: params?.sourceIds as string[] | undefined,
});
return makeResponse(request.id, snapshot);
} catch (error) {
return makeError(request.id, ErrorCode.InternalError, `Failed to load observability series: ${normalizeErrorMessage(error)}`);
}
},
'system.serviceLogs': async (request: GatewayRequest): Promise<OutboundMessage> => {
if (!deps.getServiceLogs) {
return makeError(request.id, ErrorCode.InternalError, 'Service logs are not available in this environment');
}
const params = request.params as { sourceId?: string; lines?: number; sinceSeconds?: number } | undefined;
if (!params?.sourceId || typeof params.sourceId !== 'string' || params.sourceId.trim().length === 0) {
return makeError(request.id, ErrorCode.InvalidRequest, 'sourceId is required');
}
try {
const logs = await deps.getServiceLogs({
sourceId: params.sourceId,
lines: params.lines,
sinceSeconds: params.sinceSeconds,
});
return makeResponse(request.id, logs);
} catch (error) {
return makeError(request.id, ErrorCode.InternalError, `Failed to load service logs: ${normalizeErrorMessage(error)}`);
}
},
};
}
+18
View File
@@ -36,6 +36,7 @@ import { discoverServices } from './handlers/services.js';
import { createModelCatalogFetcher } from './modelCatalog.js';
import { listLocalBackendStatuses, controlLocalBackend } from './handlers/localBackends.js';
import { listDockerDependencyStatuses, controlDockerDependency } from './handlers/dockerDependencies.js';
import { ObservabilityCollector } from './handlers/observability.js';
import type { TokenUsageEntry, ContextUsageEntry } from './handlers/system.js';
import type { NodeConnectionState } from './handlers/node.js';
import type { SessionManager } from '../session/manager.js';
@@ -172,6 +173,7 @@ export class GatewayServer {
private canvasStore: CanvasStore;
private metrics: MetricsCollector;
private discoveryHandle: GatewayDiscoveryHandle | null = null;
private observabilityCollector: ObservabilityCollector | null = null;
private connectionMap: Map<WebSocket, string> = new Map();
private connectionRateMap: Map<string, {
tokens: number;
@@ -209,6 +211,10 @@ export class GatewayServer {
private registerHandlers(): void {
const channelRegistry = this.config.channelRegistry;
const runtimeConfig = this.config.config;
this.observabilityCollector = runtimeConfig
? new ObservabilityCollector({ config: runtimeConfig })
: null;
const observabilityCollector = this.observabilityCollector;
const modelCatalogFetcher = runtimeConfig ? createModelCatalogFetcher(runtimeConfig) : undefined;
const systemHandlers = createSystemHandlers({
startTime: this.startTime,
@@ -244,6 +250,15 @@ export class GatewayServer {
controlDockerDependency: runtimeConfig
? (dependency, action) => controlDockerDependency(runtimeConfig, dependency, action)
: undefined,
getObservabilitySources: observabilityCollector
? () => observabilityCollector.listSources()
: undefined,
getObservabilitySeries: observabilityCollector
? (opts) => observabilityCollector.getSeries(opts)
: undefined,
getServiceLogs: observabilityCollector
? (opts) => observabilityCollector.getServiceLogs(opts)
: undefined,
getPresence: channelRegistry
? (opts) => channelRegistry.getPresence(opts)
: undefined,
@@ -547,6 +562,7 @@ export class GatewayServer {
const { port } = this.config;
return new Promise((resolve) => {
this.observabilityCollector?.start();
// Create HTTP server first — handles static file requests
this.httpServer = createServer((req: IncomingMessage, res: ServerResponse) => {
this.handleHttpRequest(req, res);
@@ -584,6 +600,8 @@ export class GatewayServer {
}
async stop(): Promise<void> {
this.observabilityCollector?.stop();
if (this.discoveryHandle) {
try {
await this.discoveryHandle.stop();