From ca463d5ca2e3cddc7ab6bc8fd702db2715fc06d7 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Sun, 22 Feb 2026 20:54:37 -0800 Subject: [PATCH] feat(gateway): add observability sources, series, and service log RPCs --- src/gateway/handlers/handlers.test.ts | 120 +++ src/gateway/handlers/observability.test.ts | 256 ++++++ src/gateway/handlers/observability.ts | 932 +++++++++++++++++++++ src/gateway/handlers/system.ts | 77 ++ src/gateway/server.ts | 18 + 5 files changed, 1403 insertions(+) create mode 100644 src/gateway/handlers/observability.test.ts create mode 100644 src/gateway/handlers/observability.ts diff --git a/src/gateway/handlers/handlers.test.ts b/src/gateway/handlers/handlers.test.ts index 0ce92ac..8763c7a 100644 --- a/src/gateway/handlers/handlers.test.ts +++ b/src/gateway/handlers/handlers.test.ts @@ -12,6 +12,7 @@ import { createConfigHandlers, redactConfig } from './config.js'; import { createPairingHandlers } from './pairing.js'; import type { LocalBackendStatus, LocalBackendControlResult } from './localBackends.js'; import type { DockerDependencyStatus, DockerDependencyControlResult } from './dockerDependencies.js'; +import type { ObservabilitySource, ObservabilitySeriesSnapshot, ServiceLogSnapshot } from './observability.js'; import { PairingManager } from '../../channels/pairing.js'; import { LaneQueue } from '../lane-queue.js'; import { CanvasStore } from '../canvas-store.js'; @@ -375,6 +376,125 @@ describe('system handlers', () => { expect(getPath(result.result, 'action')).toBe('restart'); }); + it('system.observabilitySources returns empty list when callback is not provided', async () => { + const req: GatewayRequest = { id: 48, method: 'system.observabilitySources' }; + const result = await handlers['system.observabilitySources'](req) as GatewayResponse; + expect(getPath(result.result, 'sources')).toEqual([]); + }); + + it('system.observabilitySources returns source list from callback', async () => { + const getObservabilitySources = vi.fn(async (): Promise => ([ + { + id: 'systemd:flynn', + name: 'Flynn daemon', + kind: 'systemd_system', + runtime: 'systemd_system', + status: 'running', + graphCapable: true, + logCapable: true, + }, + ])); + + const handlers = createSystemHandlers({ + ...deps, + getObservabilitySources, + }); + + const req: GatewayRequest = { id: 49, method: 'system.observabilitySources' }; + const result = await handlers['system.observabilitySources'](req) as GatewayResponse; + expect(getObservabilitySources).toHaveBeenCalledTimes(1); + expect(getPath(result.result, 'sources', '0', 'id')).toBe('systemd:flynn'); + }); + + it('system.observabilitySeries validates sourceIds parameter', async () => { + const handlers = createSystemHandlers({ + ...deps, + getObservabilitySeries: vi.fn(), + }); + + const result = await handlers['system.observabilitySeries']({ + id: 50, + method: 'system.observabilitySeries', + params: { sourceIds: 'not-an-array' as unknown as string[] }, + }) as GatewayError; + + expect(result.error.code).toBe(ErrorCode.InvalidRequest); + }); + + it('system.observabilitySeries forwards query to callback', async () => { + const snapshot: ObservabilitySeriesSnapshot = { + generatedAt: 123, + windowMinutes: 60, + bucketSeconds: 30, + series: [ + { + sourceId: 'systemd:flynn', + points: [{ ts: 100, stateCode: 3, healthCode: 2, errorCount: 0, restartCount: 1 }], + }, + ], + }; + + const getObservabilitySeries = vi.fn(async () => snapshot); + const handlers = createSystemHandlers({ + ...deps, + getObservabilitySeries, + }); + + const req: GatewayRequest = { + id: 51, + method: 'system.observabilitySeries', + params: { windowMinutes: 120, bucketSeconds: 60, sourceIds: ['systemd:flynn'] }, + }; + const result = await handlers['system.observabilitySeries'](req) as GatewayResponse; + expect(getObservabilitySeries).toHaveBeenCalledWith({ + windowMinutes: 120, + bucketSeconds: 60, + sourceIds: ['systemd:flynn'], + }); + expect(getPath(result.result, 'series', '0', 'points', '0', 'restartCount')).toBe(1); + }); + + it('system.serviceLogs validates required sourceId', async () => { + const handlers = createSystemHandlers({ + ...deps, + getServiceLogs: vi.fn(), + }); + + const result = await handlers['system.serviceLogs']({ + id: 52, + method: 'system.serviceLogs', + params: { lines: 100 }, + }) as GatewayError; + expect(result.error.code).toBe(ErrorCode.InvalidRequest); + }); + + it('system.serviceLogs forwards request to callback', async () => { + const snapshot: ServiceLogSnapshot = { + sourceId: 'docker:whisper', + fetchedAt: 123, + redacted: false, + truncated: false, + lines: [{ ts: 100, level: 'warn', text: 'queue depth high' }], + }; + const getServiceLogs = vi.fn(async (): Promise => snapshot); + const handlers = createSystemHandlers({ + ...deps, + getServiceLogs, + }); + const req: GatewayRequest = { + id: 53, + method: 'system.serviceLogs', + params: { sourceId: 'docker:whisper', lines: 50, sinceSeconds: 600 }, + }; + const result = await handlers['system.serviceLogs'](req) as GatewayResponse; + expect(getServiceLogs).toHaveBeenCalledWith({ + sourceId: 'docker:whisper', + lines: 50, + sinceSeconds: 600, + }); + expect(getPath(result.result, 'lines', '0', 'text')).toBe('queue depth high'); + }); + it('system.presence returns empty result when getPresence is not provided', async () => { const req: GatewayRequest = { id: 4, method: 'system.presence' }; const result = await handlers['system.presence'](req) as GatewayResponse; diff --git a/src/gateway/handlers/observability.test.ts b/src/gateway/handlers/observability.test.ts new file mode 100644 index 0000000..c73f649 --- /dev/null +++ b/src/gateway/handlers/observability.test.ts @@ -0,0 +1,256 @@ +import { describe, it, expect } from 'vitest'; +import type { Config } from '../../config/index.js'; +import { ObservabilityCollector } from './observability.js'; + +function createConfig(): Config { + return { + models: { + default: { provider: 'ollama', model: 'llama3.2' }, + fast: { provider: 'openai', model: 'gpt-4o-mini' }, + complex: { provider: 'anthropic', model: 'claude-3-7-sonnet' }, + }, + memory: { + embedding: { + provider: 'openai', + model: 'text-embedding-3-small', + }, + }, + audio: { + enabled: true, + provider: { + type: 'custom', + endpoint: 'http://localhost:18801/v1/audio/transcriptions', + }, + }, + web_search: { + provider: 'brave', + api_key: 'test-brave-key', + endpoint: undefined, + max_results: 5, + }, + } as unknown as Config; +} + +describe('ObservabilityCollector', () => { + it('collects sources and tracks restart counters in sampled series', async () => { + let now = 1_700_000_000_000; + let flynnPid = 100; + + const runner = async (command: string, args: string[]) => { + const key = `${command} ${args.join(' ')}`; + + if (key === 'systemctl show flynn.service --property=LoadState,ActiveState,SubState,Description,ExecMainPID,Result --no-pager') { + return { + stdout: `LoadState=loaded\nActiveState=active\nSubState=running\nDescription=Flynn daemon\nExecMainPID=${flynnPid}\nResult=success\n`, + stderr: '', + }; + } + + if (key === 'systemctl --user show ollama.service --property=LoadState,ActiveState,SubState,UnitFileState,Description,ExecMainPID,Result --no-pager') { + return { + stdout: 'LoadState=loaded\nActiveState=active\nSubState=running\nUnitFileState=enabled\nDescription=Ollama\nExecMainPID=211\nResult=success\n', + stderr: '', + }; + } + + if (key === 'systemctl --user show llama-server.service --property=LoadState,ActiveState,SubState,UnitFileState,Description,ExecMainPID,Result --no-pager') { + return { + stdout: 'LoadState=loaded\nActiveState=inactive\nSubState=dead\nUnitFileState=enabled\nDescription=llama.cpp\nExecMainPID=0\nResult=success\n', + stderr: '', + }; + } + + if (key === 'docker compose -f docker-compose.yml config --profiles') { + return { stdout: 'voice\n', stderr: '' }; + } + + if (key === 'docker compose -f docker-compose.yml --profile voice config --services') { + return { stdout: 'flynn\nwhisper-server\n', stderr: '' }; + } + + if (key === 'docker compose -f docker-compose.yml --profile voice ps --all --format json') { + return { stdout: '[]', stderr: '' }; + } + + if (key === 'docker compose -f docker-compose.yml --profile voice ps whisper-server --format json') { + return { + stdout: JSON.stringify([ + { + Name: 'flynn-whisper-server-1', + Service: 'whisper-server', + State: 'running', + Health: 'healthy', + Status: 'Up 2 minutes (healthy)', + }, + ]), + stderr: '', + }; + } + + throw new Error(`Unexpected command: ${key}`); + }; + + const collector = new ObservabilityCollector({ + config: createConfig(), + runner, + now: () => now, + samplingIntervalMs: 300_000, + }); + + await collector.forceSample(); + now += 30_000; + flynnPid = 101; + await collector.forceSample(); + + const sources = await collector.listSources(); + expect(sources.map((entry) => entry.id)).toEqual(expect.arrayContaining([ + 'systemd:flynn', + 'systemd-user:ollama', + 'systemd-user:llamacpp', + 'docker:whisper', + ])); + + const series = await collector.getSeries({ windowMinutes: 60, bucketSeconds: 30 }); + const flynnSeries = series.series.find((entry) => entry.sourceId === 'systemd:flynn'); + expect(flynnSeries).toBeTruthy(); + expect(flynnSeries?.points.length).toBeGreaterThanOrEqual(2); + expect(flynnSeries?.points[flynnSeries.points.length - 1]?.restartCount).toBe(1); + }); + + it('returns redacted journal logs for systemd sources', async () => { + let now = 1_700_000_000_000; + + const runner = async (command: string, args: string[]) => { + const key = `${command} ${args.join(' ')}`; + + if (key === 'systemctl show flynn.service --property=LoadState,ActiveState,SubState,Description,ExecMainPID,Result --no-pager') { + return { + stdout: 'LoadState=loaded\nActiveState=active\nSubState=running\nDescription=Flynn daemon\nExecMainPID=777\nResult=success\n', + stderr: '', + }; + } + + if (key === 'systemctl --user show ollama.service --property=LoadState,ActiveState,SubState,UnitFileState,Description,ExecMainPID,Result --no-pager') { + return { stdout: 'LoadState=not-found\nActiveState=inactive\nSubState=dead\nDescription=Ollama\nExecMainPID=0\nResult=not-found\n', stderr: '' }; + } + + if (key === 'systemctl --user show llama-server.service --property=LoadState,ActiveState,SubState,UnitFileState,Description,ExecMainPID,Result --no-pager') { + return { stdout: 'LoadState=not-found\nActiveState=inactive\nSubState=dead\nDescription=llama.cpp\nExecMainPID=0\nResult=not-found\n', stderr: '' }; + } + + if (key === 'docker compose -f docker-compose.yml config --profiles') { + return { stdout: '', stderr: '' }; + } + + if (key === 'docker compose -f docker-compose.yml config --services') { + return { stdout: 'flynn\n', stderr: '' }; + } + + if (key === 'docker compose -f docker-compose.yml ps --all --format json') { + return { stdout: '[]', stderr: '' }; + } + + if (key === 'journalctl -u flynn.service --since 600 seconds ago --no-pager --output short-iso-precise -n 200') { + return { + stdout: '2026-02-23 10:10:10.000000+0000 host flynn[777]: Authorization: Bearer super-secret-token-value\n', + stderr: '', + }; + } + + throw new Error(`Unexpected command: ${key}`); + }; + + const collector = new ObservabilityCollector({ + config: createConfig(), + runner, + now: () => now, + }); + + await collector.forceSample(); + const logs = await collector.getServiceLogs({ sourceId: 'systemd:flynn', sinceSeconds: 600 }); + + expect(logs.sourceId).toBe('systemd:flynn'); + expect(logs.redacted).toBe(true); + expect(logs.lines).toHaveLength(1); + expect(logs.lines[0]?.text).toContain('[REDACTED_BEARER]'); + expect(logs.lines[0]?.level).toBe('info'); + }); + + it('parses docker logs with levels and timestamps', async () => { + let now = 1_700_000_000_000; + + const runner = async (command: string, args: string[]) => { + const key = `${command} ${args.join(' ')}`; + + if (key === 'systemctl show flynn.service --property=LoadState,ActiveState,SubState,Description,ExecMainPID,Result --no-pager') { + return { + stdout: 'LoadState=loaded\nActiveState=active\nSubState=running\nDescription=Flynn daemon\nExecMainPID=777\nResult=success\n', + stderr: '', + }; + } + + if (key === 'systemctl --user show ollama.service --property=LoadState,ActiveState,SubState,UnitFileState,Description,ExecMainPID,Result --no-pager') { + return { stdout: 'LoadState=not-found\nActiveState=inactive\nSubState=dead\nDescription=Ollama\nExecMainPID=0\nResult=not-found\n', stderr: '' }; + } + + if (key === 'systemctl --user show llama-server.service --property=LoadState,ActiveState,SubState,UnitFileState,Description,ExecMainPID,Result --no-pager') { + return { stdout: 'LoadState=not-found\nActiveState=inactive\nSubState=dead\nDescription=llama.cpp\nExecMainPID=0\nResult=not-found\n', stderr: '' }; + } + + if (key === 'docker compose -f docker-compose.yml config --profiles') { + return { stdout: 'voice\n', stderr: '' }; + } + + if (key === 'docker compose -f docker-compose.yml --profile voice config --services') { + return { stdout: 'flynn\nwhisper-server\n', stderr: '' }; + } + + if (key === 'docker compose -f docker-compose.yml --profile voice ps --all --format json') { + return { stdout: '[]', stderr: '' }; + } + + if (key === 'docker compose -f docker-compose.yml --profile voice ps whisper-server --format json') { + return { + stdout: JSON.stringify([ + { + Name: 'flynn-whisper-server-1', + Service: 'whisper-server', + State: 'running', + Health: 'healthy', + Status: 'Up 2 minutes (healthy)', + }, + ]), + stderr: '', + }; + } + + if (key === 'docker compose -f docker-compose.yml config --profiles') { + return { stdout: 'voice\n', stderr: '' }; + } + + if (key === 'docker compose -f docker-compose.yml --profile voice logs --timestamps --tail 120 --since 300s whisper-server') { + return { + stdout: 'whisper-server | 2026-02-23T10:10:10Z warning: queue depth high\n', + stderr: '', + }; + } + + throw new Error(`Unexpected command: ${key}`); + }; + + const collector = new ObservabilityCollector({ + config: createConfig(), + runner, + now: () => now, + }); + + await collector.forceSample(); + const logs = await collector.getServiceLogs({ sourceId: 'docker:whisper', lines: 120, sinceSeconds: 300 }); + + expect(logs.sourceId).toBe('docker:whisper'); + expect(logs.lines).toHaveLength(1); + expect(logs.lines[0]?.level).toBe('warn'); + expect(typeof logs.lines[0]?.ts).toBe('number'); + expect(logs.lines[0]?.text).toContain('queue depth high'); + }); +}); diff --git a/src/gateway/handlers/observability.ts b/src/gateway/handlers/observability.ts new file mode 100644 index 0000000..6696ff3 --- /dev/null +++ b/src/gateway/handlers/observability.ts @@ -0,0 +1,932 @@ +import { execFile as execFileCb } from 'node:child_process'; +import { promisify } from 'node:util'; +import { redactForAudit } from '../../audit/redact.js'; +import type { Config } from '../../config/index.js'; +import { listDockerDependencyStatuses } from './dockerDependencies.js'; +import { listLocalBackendStatuses } from './localBackends.js'; + +const execFile = promisify(execFileCb); +const COMPOSE_FILE = 'docker-compose.yml'; +const DEFAULT_FLYNN_UNIT = 'flynn.service'; +const DEFAULT_WINDOW_MINUTES = 60; +const MAX_WINDOW_MINUTES = 240; +const DEFAULT_BUCKET_SECONDS = 30; +const ALLOWED_BUCKET_SECONDS = [15, 30, 60] as const; +const DEFAULT_LOG_LINES = 200; +const MAX_LOG_LINES = 1000; +const DEFAULT_LOG_SINCE_SECONDS = 900; +const MAX_LOG_SINCE_SECONDS = 86_400; +const DEFAULT_SAMPLE_INTERVAL_MS = 30_000; +const DEFAULT_MAX_SAMPLES = 720; +const DEFAULT_TIMEOUT_MS = 10_000; +const LARGE_TIMEOUT_MS = 15_000; +const LARGE_BUFFER_BYTES = 4 * 1024 * 1024; + +const STATE_UNAVAILABLE = 0; +const STATE_STOPPED = 1; +const STATE_DEGRADED = 2; +const STATE_RUNNING = 3; + +const HEALTH_UNKNOWN = 0; +const HEALTH_DEGRADED = 1; +const HEALTH_HEALTHY = 2; + +export type ObservabilitySourceKind = 'docker_dependency' | 'systemd_user' | 'systemd_system'; +export type ObservabilityRuntime = 'docker_compose' | 'systemd_user' | 'systemd_system'; +export type ObservabilitySourceStatus = 'running' | 'degraded' | 'stopped' | 'unavailable' | 'unknown'; + +export interface ObservabilitySource { + id: string; + name: string; + kind: ObservabilitySourceKind; + runtime: ObservabilityRuntime; + status: ObservabilitySourceStatus; + graphCapable: boolean; + logCapable: boolean; + metadata?: { + unit?: string; + service?: string; + state?: string; + health?: string; + statusText?: string; + containerName?: string | null; + }; +} + +export interface ObservabilitySeriesPoint { + ts: number; + stateCode: number; + healthCode: number; + errorCount: number; + restartCount: number; +} + +export interface ObservabilitySeriesEntry { + sourceId: string; + points: ObservabilitySeriesPoint[]; +} + +export interface ObservabilitySeriesSnapshot { + generatedAt: number; + windowMinutes: number; + bucketSeconds: number; + series: ObservabilitySeriesEntry[]; +} + +export interface ServiceLogEntry { + ts?: number; + level?: 'info' | 'warn' | 'error'; + text: string; +} + +export interface ServiceLogSnapshot { + sourceId: string; + fetchedAt: number; + redacted: boolean; + lines: ServiceLogEntry[]; + truncated: boolean; +} + +export interface ServiceLogQuery { + sourceId: string; + lines?: number; + sinceSeconds?: number; +} + +export interface ObservabilitySeriesQuery { + windowMinutes?: number; + bucketSeconds?: number; + sourceIds?: string[]; +} + +type ExecResult = { stdout: string; stderr: string }; +type CommandRunner = ( + command: string, + args: string[], + opts?: { timeoutMs?: number; maxBufferBytes?: number }, +) => Promise; + +interface SourceSnapshot { + source: ObservabilitySource; + stateCode: number; + healthCode: number; + hasError: boolean; + fingerprint: string | null; +} + +interface SourceCounter { + errorCount: number; + restartCount: number; + lastStateCode: number; + lastFingerprint: string | null; + hasPrevious: boolean; +} + +interface SampleRecord { + ts: number; + stateCode: number; + healthCode: number; + errorCount: number; + restartCount: number; +} + +function defaultRunner( + command: string, + args: string[], + opts?: { timeoutMs?: number; maxBufferBytes?: number }, +): Promise { + return execFile(command, args, { + timeout: opts?.timeoutMs ?? DEFAULT_TIMEOUT_MS, + maxBuffer: opts?.maxBufferBytes ?? LARGE_BUFFER_BYTES, + }) as Promise; +} + +function normalizeError(error: unknown): string { + if (error && typeof error === 'object') { + const maybe = error as { stderr?: string; stdout?: string; message?: string }; + const stderr = maybe.stderr?.trim(); + if (stderr) {return stderr;} + const stdout = maybe.stdout?.trim(); + if (stdout) {return stdout;} + if (typeof maybe.message === 'string' && maybe.message.trim().length > 0) { + return maybe.message.trim(); + } + } + if (error instanceof Error && error.message.trim().length > 0) { + return error.message.trim(); + } + return String(error); +} + +function parseKeyValueOutput(output: string): Record { + const result: Record = {}; + for (const line of output.split('\n')) { + if (!line.trim()) {continue;} + const idx = line.indexOf('='); + if (idx <= 0) {continue;} + const key = line.slice(0, idx).trim(); + const value = line.slice(idx + 1).trim(); + result[key] = value; + } + return result; +} + +function parseInteger(input: string | undefined): number | null { + if (!input) {return null;} + const parsed = Number(input); + if (!Number.isFinite(parsed)) {return null;} + if (parsed <= 0) {return null;} + return Math.floor(parsed); +} + +function mapSystemdStatus(activeState: string, error?: string): { + status: ObservabilitySourceStatus; + stateCode: number; + healthCode: number; +} { + if (error) { + return { status: 'unavailable', stateCode: STATE_UNAVAILABLE, healthCode: HEALTH_UNKNOWN }; + } + + const state = activeState.trim().toLowerCase(); + if (state === 'active') { + return { status: 'running', stateCode: STATE_RUNNING, healthCode: HEALTH_HEALTHY }; + } + if (state === 'inactive' || state === 'dead') { + return { status: 'stopped', stateCode: STATE_STOPPED, healthCode: HEALTH_UNKNOWN }; + } + if (state === 'failed') { + return { status: 'degraded', stateCode: STATE_DEGRADED, healthCode: HEALTH_DEGRADED }; + } + if (state === 'activating' || state === 'deactivating' || state === 'reloading') { + return { status: 'degraded', stateCode: STATE_DEGRADED, healthCode: HEALTH_DEGRADED }; + } + if (state === 'not-found') { + return { status: 'unavailable', stateCode: STATE_UNAVAILABLE, healthCode: HEALTH_UNKNOWN }; + } + return { status: 'unknown', stateCode: STATE_DEGRADED, healthCode: HEALTH_UNKNOWN }; +} + +function mapDockerStatus(stateRaw: string, healthRaw: string, error?: string): { + status: ObservabilitySourceStatus; + stateCode: number; + healthCode: number; +} { + if (error) { + return { status: 'unavailable', stateCode: STATE_UNAVAILABLE, healthCode: HEALTH_UNKNOWN }; + } + + const state = stateRaw.trim().toLowerCase(); + const health = healthRaw.trim().toLowerCase(); + + if (state === 'running') { + if (health === 'healthy' || health === 'none' || health === 'unknown') { + return { + status: 'running', + stateCode: STATE_RUNNING, + healthCode: health === 'healthy' ? HEALTH_HEALTHY : HEALTH_UNKNOWN, + }; + } + return { status: 'degraded', stateCode: STATE_DEGRADED, healthCode: HEALTH_DEGRADED }; + } + + if (state === 'stopped' || state === 'not-created' || state === 'created') { + return { status: 'stopped', stateCode: STATE_STOPPED, healthCode: HEALTH_UNKNOWN }; + } + + if (state === 'unavailable' || state === 'not-found') { + return { status: 'unavailable', stateCode: STATE_UNAVAILABLE, healthCode: HEALTH_UNKNOWN }; + } + + if (state === 'restarting' || state === 'paused') { + return { status: 'degraded', stateCode: STATE_DEGRADED, healthCode: HEALTH_DEGRADED }; + } + + return { status: 'unknown', stateCode: STATE_DEGRADED, healthCode: HEALTH_UNKNOWN }; +} + +function classifyLogLevel(text: string): 'info' | 'warn' | 'error' { + const lower = text.toLowerCase(); + if (lower.includes('error') || lower.includes('failed') || lower.includes('fatal') || lower.includes('panic')) { + return 'error'; + } + if (lower.includes('warn') || lower.includes('warning') || lower.includes('degraded')) { + return 'warn'; + } + return 'info'; +} + +function normalizeOffset(ts: string): string { + return ts.replace(/([+-]\d{2})(\d{2})$/, '$1:$2'); +} + +function parseTimestamp(raw: string | undefined): number | undefined { + if (!raw) {return undefined;} + const candidates = [ + raw, + normalizeOffset(raw), + raw.includes(' ') ? raw.replace(' ', 'T') : raw, + normalizeOffset(raw.includes(' ') ? raw.replace(' ', 'T') : raw), + ]; + + for (const candidate of candidates) { + const parsed = Date.parse(candidate); + if (!Number.isNaN(parsed)) { + return parsed; + } + } + return undefined; +} + +function sanitizeWindowMinutes(value: number | undefined): number { + const parsed = Math.floor(Number(value ?? DEFAULT_WINDOW_MINUTES)); + if (!Number.isFinite(parsed) || parsed <= 0) { + return DEFAULT_WINDOW_MINUTES; + } + return Math.min(MAX_WINDOW_MINUTES, parsed); +} + +function sanitizeBucketSeconds(value: number | undefined): number { + const parsed = Math.floor(Number(value ?? DEFAULT_BUCKET_SECONDS)); + if ((ALLOWED_BUCKET_SECONDS as readonly number[]).includes(parsed)) { + return parsed; + } + return DEFAULT_BUCKET_SECONDS; +} + +function sanitizeLogLines(value: number | undefined): number { + const parsed = Math.floor(Number(value ?? DEFAULT_LOG_LINES)); + if (!Number.isFinite(parsed) || parsed <= 0) { + return DEFAULT_LOG_LINES; + } + return Math.min(MAX_LOG_LINES, parsed); +} + +function sanitizeSinceSeconds(value: number | undefined): number { + const parsed = Math.floor(Number(value ?? DEFAULT_LOG_SINCE_SECONDS)); + if (!Number.isFinite(parsed) || parsed <= 0) { + return DEFAULT_LOG_SINCE_SECONDS; + } + return Math.min(MAX_LOG_SINCE_SECONDS, parsed); +} + +function normalizeSourceId(value: string): string { + return value.trim(); +} + +function redactLogText(text: string): { text: string; redacted: boolean } { + const result = redactForAudit(text); + return { + text: typeof result.value === 'string' ? result.value : String(result.value), + redacted: result.redactions > 0, + }; +} + +function splitDockerLogLine(line: string): { ts?: number; level: 'info' | 'warn' | 'error'; text: string; redacted: boolean } { + let content = line.trim(); + const prefixed = content.match(/^([A-Za-z0-9_.-]+)\s+\|\s+(.*)$/); + if (prefixed) { + content = prefixed[2]?.trim() ?? content; + } + + const timestamped = content.match(/^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d(?:\.\d+)?(?:Z|[+-]\d\d:?\d\d))\s+(.*)$/); + let ts: number | undefined; + if (timestamped) { + ts = parseTimestamp(timestamped[1]); + content = timestamped[2] ?? content; + } + + const redacted = redactLogText(content); + return { + ts, + level: classifyLogLevel(redacted.text), + text: redacted.text, + redacted: redacted.redacted, + }; +} + +function splitJournalLine(line: string): { ts?: number; level: 'info' | 'warn' | 'error'; text: string; redacted: boolean } { + let content = line.trim(); + let ts: number | undefined; + + const timestamped = content.match(/^([0-9T:\-+. ]{19,})(\s+.*)$/); + if (timestamped) { + const parsedTs = parseTimestamp(timestamped[1]?.trim()); + if (parsedTs !== undefined) { + ts = parsedTs; + content = timestamped[2]?.trim() ?? content; + } + } + + const redacted = redactLogText(content); + return { + ts, + level: classifyLogLevel(redacted.text), + text: redacted.text, + redacted: redacted.redacted, + }; +} + +interface SystemdStatus { + unit: string; + name: string; + loadState: string; + activeState: string; + subState: string; + statusText: string; + pid: number | null; + result: string; + error?: string; +} + +async function fetchSystemdUnitStatus( + runner: CommandRunner, + opts: { unit: string; name: string; user: boolean }, +): Promise { + const args = [ + ...(opts.user ? ['--user'] : []), + 'show', + opts.unit, + '--property=LoadState,ActiveState,SubState,Description,ExecMainPID,Result', + '--no-pager', + ]; + + try { + const response = await runner('systemctl', args, { + timeoutMs: DEFAULT_TIMEOUT_MS, + maxBufferBytes: 1024 * 1024, + }); + const parsed = parseKeyValueOutput(response.stdout); + const loadState = parsed.LoadState || 'unknown'; + const activeState = parsed.ActiveState || 'unknown'; + const subState = parsed.SubState || 'unknown'; + const description = parsed.Description || opts.name; + const pid = parseInteger(parsed.ExecMainPID); + const result = parsed.Result || 'unknown'; + return { + unit: opts.unit, + name: description, + loadState, + activeState, + subState, + statusText: activeState === subState ? activeState : `${activeState}/${subState}`, + pid, + result, + }; + } catch (error) { + const detail = normalizeError(error); + return { + unit: opts.unit, + name: opts.name, + loadState: 'unknown', + activeState: detail.toLowerCase().includes('not found') ? 'not-found' : 'unknown', + subState: 'unknown', + statusText: 'unavailable', + pid: null, + result: 'unknown', + error: detail, + }; + } +} + +async function discoverComposeProfileArgs(runner: CommandRunner): Promise { + try { + const response = await runner('docker', ['compose', '-f', COMPOSE_FILE, 'config', '--profiles'], { + timeoutMs: DEFAULT_TIMEOUT_MS, + maxBufferBytes: LARGE_BUFFER_BYTES, + }); + const profiles = response.stdout + .split('\n') + .map((line) => line.trim()) + .filter((line) => line.length > 0); + + const unique = new Set(); + for (const profile of profiles) { + unique.add(profile); + } + const args: string[] = []; + for (const profile of unique) { + args.push('--profile', profile); + } + return args; + } catch { + return []; + } +} + +export interface ObservabilityCollectorOptions { + config: Config; + flynnSystemdUnit?: string; + samplingIntervalMs?: number; + maxSamplesPerSource?: number; + now?: () => number; + runner?: CommandRunner; +} + +/** + * Collects bounded service status samples and log snapshots for the web dashboard. + */ +export class ObservabilityCollector { + private readonly config: Config; + private readonly flynnSystemdUnit: string; + private readonly samplingIntervalMs: number; + private readonly maxSamplesPerSource: number; + private readonly now: () => number; + private readonly runner: CommandRunner; + + private sampleTimer: NodeJS.Timeout | null = null; + private inFlightSample: Promise | null = null; + private readonly sourceHistory = new Map(); + private readonly sourceMeta = new Map(); + private readonly sourceCounters = new Map(); + + constructor(options: ObservabilityCollectorOptions) { + this.config = options.config; + this.flynnSystemdUnit = options.flynnSystemdUnit ?? DEFAULT_FLYNN_UNIT; + this.samplingIntervalMs = Math.max(5_000, Math.floor(options.samplingIntervalMs ?? DEFAULT_SAMPLE_INTERVAL_MS)); + this.maxSamplesPerSource = Math.max(60, Math.floor(options.maxSamplesPerSource ?? DEFAULT_MAX_SAMPLES)); + this.now = options.now ?? (() => Date.now()); + this.runner = options.runner ?? defaultRunner; + } + + start(): void { + if (this.sampleTimer) { + return; + } + void this.forceSample(); + this.sampleTimer = setInterval(() => { + void this.forceSample(); + }, this.samplingIntervalMs); + this.sampleTimer.unref?.(); + } + + stop(): void { + if (!this.sampleTimer) { + return; + } + clearInterval(this.sampleTimer); + this.sampleTimer = null; + } + + async listSources(): Promise { + await this.ensureSampled(); + return Array.from(this.sourceMeta.values()) + .sort((a, b) => a.name.localeCompare(b.name)); + } + + async getSeries(query?: ObservabilitySeriesQuery): Promise { + await this.ensureSampled(); + + const now = this.now(); + const windowMinutes = sanitizeWindowMinutes(query?.windowMinutes); + const bucketSeconds = sanitizeBucketSeconds(query?.bucketSeconds); + const bucketMs = bucketSeconds * 1000; + const lowerBound = now - (windowMinutes * 60_000); + + const sourceFilter = query?.sourceIds && query.sourceIds.length > 0 + ? new Set(query.sourceIds.map((id) => normalizeSourceId(id)).filter((id) => id.length > 0)) + : null; + + const series: ObservabilitySeriesEntry[] = []; + for (const [sourceId, history] of this.sourceHistory.entries()) { + if (sourceFilter && !sourceFilter.has(sourceId)) { + continue; + } + + const bucketMap = new Map(); + for (const sample of history) { + if (sample.ts < lowerBound) { + continue; + } + const bucketTs = Math.floor(sample.ts / bucketMs) * bucketMs; + bucketMap.set(bucketTs, sample); + } + + const points = Array.from(bucketMap.entries()) + .sort((a, b) => a[0] - b[0]) + .map(([bucketTs, sample]) => ({ + ts: bucketTs, + stateCode: sample.stateCode, + healthCode: sample.healthCode, + errorCount: sample.errorCount, + restartCount: sample.restartCount, + })); + + series.push({ sourceId, points }); + } + + series.sort((a, b) => { + const left = this.sourceMeta.get(a.sourceId)?.name ?? a.sourceId; + const right = this.sourceMeta.get(b.sourceId)?.name ?? b.sourceId; + return left.localeCompare(right); + }); + + return { + generatedAt: now, + windowMinutes, + bucketSeconds, + series, + }; + } + + async getServiceLogs(query: ServiceLogQuery): Promise { + await this.ensureSampled(); + + const sourceId = normalizeSourceId(query.sourceId); + if (!sourceId) { + throw new Error('sourceId is required'); + } + + const source = this.sourceMeta.get(sourceId); + if (!source || !source.logCapable) { + throw new Error(`Log source not found or unavailable: ${sourceId}`); + } + + const lines = sanitizeLogLines(query.lines); + const sinceSeconds = sanitizeSinceSeconds(query.sinceSeconds); + + if (source.runtime === 'docker_compose') { + return this.fetchDockerLogs(source, lines, sinceSeconds); + } + if (source.runtime === 'systemd_user' || source.runtime === 'systemd_system') { + return this.fetchJournalLogs(source, lines, sinceSeconds); + } + + throw new Error(`Unsupported log runtime for source: ${sourceId}`); + } + + async forceSample(): Promise { + if (this.inFlightSample) { + await this.inFlightSample; + return; + } + + this.inFlightSample = this.collectSample() + .catch(() => { + // Keep sampling resilient; errors are reflected as unavailable source snapshots. + }) + .finally(() => { + this.inFlightSample = null; + }); + + await this.inFlightSample; + } + + private async ensureSampled(): Promise { + if (this.sourceMeta.size > 0) { + return; + } + await this.forceSample(); + } + + private async collectSample(): Promise { + const sampleTime = this.now(); + + const [flynnResult, localBackendsResult, dockerDependenciesResult] = await Promise.allSettled([ + fetchSystemdUnitStatus(this.runner, { + unit: this.flynnSystemdUnit, + name: 'Flynn daemon', + user: false, + }), + listLocalBackendStatuses(this.config, async (args: string[]) => { + return this.runner('systemctl', args, { + timeoutMs: DEFAULT_TIMEOUT_MS, + maxBufferBytes: 1024 * 1024, + }); + }), + listDockerDependencyStatuses(this.config, async (args: string[]) => { + return this.runner('docker', ['compose', '-f', COMPOSE_FILE, ...args], { + timeoutMs: DEFAULT_TIMEOUT_MS, + maxBufferBytes: LARGE_BUFFER_BYTES, + }); + }), + ]); + + const flynnStatus = flynnResult.status === 'fulfilled' + ? flynnResult.value + : { + unit: this.flynnSystemdUnit, + name: 'Flynn daemon', + loadState: 'unknown', + activeState: 'unknown', + subState: 'unknown', + statusText: 'unavailable', + pid: null, + result: 'unknown', + error: normalizeError(flynnResult.reason), + }; + const localBackends = localBackendsResult.status === 'fulfilled' + ? localBackendsResult.value + : []; + const dockerDependencies = dockerDependenciesResult.status === 'fulfilled' + ? dockerDependenciesResult.value + : []; + + const snapshots: SourceSnapshot[] = []; + + const flynnMapped = mapSystemdStatus(flynnStatus.activeState, flynnStatus.error); + snapshots.push({ + source: { + id: 'systemd:flynn', + name: 'Flynn daemon', + kind: 'systemd_system', + runtime: 'systemd_system', + status: flynnMapped.status, + graphCapable: true, + logCapable: true, + metadata: { + unit: this.flynnSystemdUnit, + state: flynnStatus.activeState, + statusText: flynnStatus.statusText, + }, + }, + stateCode: flynnMapped.stateCode, + healthCode: flynnMapped.healthCode, + hasError: Boolean(flynnStatus.error), + fingerprint: flynnStatus.pid ? `pid:${flynnStatus.pid}` : null, + }); + + for (const backend of localBackends) { + const mapped = mapSystemdStatus(String(backend.activeState ?? ''), backend.error); + snapshots.push({ + source: { + id: `systemd-user:${backend.id}`, + name: backend.name, + kind: 'systemd_user', + runtime: 'systemd_user', + status: mapped.status, + graphCapable: true, + logCapable: backend.loadState !== 'not-found', + metadata: { + unit: backend.unit, + state: backend.activeState, + statusText: backend.statusText, + }, + }, + stateCode: mapped.stateCode, + healthCode: mapped.healthCode, + hasError: Boolean(backend.error) || backend.activeState === 'failed', + fingerprint: backend.pid ? `pid:${backend.pid}` : null, + }); + } + + for (const dependency of dockerDependencies) { + const mapped = mapDockerStatus(dependency.state, dependency.health, dependency.error); + snapshots.push({ + source: { + id: `docker:${dependency.id}`, + name: dependency.name, + kind: 'docker_dependency', + runtime: 'docker_compose', + status: mapped.status, + graphCapable: true, + logCapable: dependency.id !== 'compose', + metadata: { + service: dependency.service, + state: dependency.state, + health: dependency.health, + statusText: dependency.statusText, + containerName: dependency.containerName, + }, + }, + stateCode: mapped.stateCode, + healthCode: mapped.healthCode, + hasError: Boolean(dependency.error) || mapped.status === 'degraded' || mapped.status === 'unavailable', + fingerprint: dependency.containerName ? `container:${dependency.containerName}` : null, + }); + } + + const seenSourceIds = new Set(); + for (const snapshot of snapshots) { + seenSourceIds.add(snapshot.source.id); + this.recordSourceSample(snapshot, sampleTime); + } + + // Keep stale entries visible as unavailable when they disappear. + for (const [sourceId, source] of this.sourceMeta.entries()) { + if (seenSourceIds.has(sourceId)) { + continue; + } + this.recordSourceSample({ + source: { + ...source, + status: 'unavailable', + }, + stateCode: STATE_UNAVAILABLE, + healthCode: HEALTH_UNKNOWN, + hasError: true, + fingerprint: null, + }, sampleTime); + } + } + + private recordSourceSample(snapshot: SourceSnapshot, timestamp: number): void { + this.sourceMeta.set(snapshot.source.id, snapshot.source); + + const counter = this.sourceCounters.get(snapshot.source.id) ?? { + errorCount: 0, + restartCount: 0, + lastStateCode: snapshot.stateCode, + lastFingerprint: snapshot.fingerprint, + hasPrevious: false, + } satisfies SourceCounter; + + if (snapshot.hasError) { + counter.errorCount += 1; + } + + if (counter.hasPrevious) { + const enteredRunning = counter.lastStateCode !== STATE_RUNNING && snapshot.stateCode === STATE_RUNNING; + const fingerprintChanged = ( + snapshot.stateCode === STATE_RUNNING + && Boolean(snapshot.fingerprint) + && Boolean(counter.lastFingerprint) + && snapshot.fingerprint !== counter.lastFingerprint + ); + if (enteredRunning || fingerprintChanged) { + counter.restartCount += 1; + } + } + + counter.lastStateCode = snapshot.stateCode; + counter.lastFingerprint = snapshot.fingerprint; + counter.hasPrevious = true; + this.sourceCounters.set(snapshot.source.id, counter); + + const records = this.sourceHistory.get(snapshot.source.id) ?? []; + records.push({ + ts: timestamp, + stateCode: snapshot.stateCode, + healthCode: snapshot.healthCode, + errorCount: counter.errorCount, + restartCount: counter.restartCount, + }); + + if (records.length > this.maxSamplesPerSource) { + records.splice(0, records.length - this.maxSamplesPerSource); + } + + this.sourceHistory.set(snapshot.source.id, records); + } + + private async fetchDockerLogs( + source: ObservabilitySource, + lines: number, + sinceSeconds: number, + ): Promise { + const service = source.metadata?.service; + if (!service) { + throw new Error(`Missing docker compose service metadata for source: ${source.id}`); + } + + const profileArgs = await discoverComposeProfileArgs(this.runner); + const response = await this.runner( + 'docker', + [ + 'compose', + '-f', + COMPOSE_FILE, + ...profileArgs, + 'logs', + '--timestamps', + '--tail', + String(lines), + '--since', + `${sinceSeconds}s`, + service, + ], + { timeoutMs: LARGE_TIMEOUT_MS, maxBufferBytes: LARGE_BUFFER_BYTES }, + ); + + const rawLines = response.stdout + .split('\n') + .map((line) => line.trimEnd()) + .filter((line) => line.trim().length > 0); + + let redacted = false; + const parsed: ServiceLogEntry[] = rawLines.map((line) => { + const item = splitDockerLogLine(line); + redacted = redacted || item.redacted; + return { + ts: item.ts, + level: item.level, + text: item.text, + }; + }); + + return { + sourceId: source.id, + fetchedAt: this.now(), + redacted, + lines: parsed, + truncated: parsed.length >= lines, + }; + } + + private async fetchJournalLogs( + source: ObservabilitySource, + lines: number, + sinceSeconds: number, + ): Promise { + const unit = source.metadata?.unit; + if (!unit) { + throw new Error(`Missing systemd unit metadata for source: ${source.id}`); + } + + const user = source.runtime === 'systemd_user'; + const response = await this.runner( + 'journalctl', + [ + ...(user ? ['--user'] : []), + '-u', + unit, + '--since', + `${sinceSeconds} seconds ago`, + '--no-pager', + '--output', + 'short-iso-precise', + '-n', + String(lines), + ], + { + timeoutMs: LARGE_TIMEOUT_MS, + maxBufferBytes: LARGE_BUFFER_BYTES, + }, + ); + + const rawLines = response.stdout + .split('\n') + .map((line) => line.trimEnd()) + .filter((line) => line.trim().length > 0); + + let redacted = false; + const parsed: ServiceLogEntry[] = rawLines.map((line) => { + const item = splitJournalLine(line); + redacted = redacted || item.redacted; + return { + ts: item.ts, + level: item.level, + text: item.text, + }; + }); + + return { + sourceId: source.id, + fetchedAt: this.now(), + redacted, + lines: parsed, + truncated: parsed.length >= lines, + }; + } +} + +export const observabilityDefaults = { + DEFAULT_WINDOW_MINUTES, + MAX_WINDOW_MINUTES, + DEFAULT_BUCKET_SECONDS, + ALLOWED_BUCKET_SECONDS: [...ALLOWED_BUCKET_SECONDS], + DEFAULT_LOG_LINES, + MAX_LOG_LINES, + DEFAULT_LOG_SINCE_SECONDS, + MAX_LOG_SINCE_SECONDS, +} as const; diff --git a/src/gateway/handlers/system.ts b/src/gateway/handlers/system.ts index 8486396..1085c40 100644 --- a/src/gateway/handlers/system.ts +++ b/src/gateway/handlers/system.ts @@ -10,6 +10,13 @@ import type { DockerDependencyControlResult, DockerDependencyStatus, } from './dockerDependencies.js'; +import type { + ObservabilitySource, + ObservabilitySeriesQuery, + ObservabilitySeriesSnapshot, + ServiceLogQuery, + ServiceLogSnapshot, +} from './observability.js'; /** Per-session token usage report returned by system.tokenUsage. */ export interface TokenUsageEntry { @@ -117,6 +124,12 @@ export interface SystemHandlerDeps { getDockerDependencies?: () => Promise | DockerDependencyStatus[]; /** Optional callback to control docker-compose dependencies. */ controlDockerDependency?: (dependency: string, action: string) => Promise; + /** Optional callback to retrieve observability-capable service sources. */ + getObservabilitySources?: () => Promise | ObservabilitySource[]; + /** Optional callback to retrieve sampled observability series for services. */ + getObservabilitySeries?: (opts?: ObservabilitySeriesQuery) => Promise | ObservabilitySeriesSnapshot; + /** Optional callback to retrieve recent logs for a service source. */ + getServiceLogs?: (opts: ServiceLogQuery) => Promise | ServiceLogSnapshot; } function normalizeErrorMessage(error: unknown): string { @@ -379,5 +392,69 @@ export function createSystemHandlers(deps: SystemHandlerDeps) { return makeError(request.id, ErrorCode.InternalError, `Docker dependency control failed: ${normalizeErrorMessage(error)}`); } }, + + 'system.observabilitySources': async (request: GatewayRequest): Promise => { + if (!deps.getObservabilitySources) { + return makeResponse(request.id, { sources: [] }); + } + try { + const sources = await deps.getObservabilitySources(); + return makeResponse(request.id, { sources }); + } catch (error) { + return makeError(request.id, ErrorCode.InternalError, `Failed to load observability sources: ${normalizeErrorMessage(error)}`); + } + }, + + 'system.observabilitySeries': async (request: GatewayRequest): Promise => { + if (!deps.getObservabilitySeries) { + return makeResponse(request.id, { + generatedAt: Date.now(), + windowMinutes: 60, + bucketSeconds: 30, + series: [], + }); + } + + const params = request.params as { windowMinutes?: number; bucketSeconds?: number; sourceIds?: unknown } | undefined; + if ( + params?.sourceIds !== undefined + && (!Array.isArray(params.sourceIds) || params.sourceIds.some((entry) => typeof entry !== 'string')) + ) { + return makeError(request.id, ErrorCode.InvalidRequest, 'sourceIds must be an array of strings'); + } + + try { + const snapshot = await deps.getObservabilitySeries({ + windowMinutes: params?.windowMinutes, + bucketSeconds: params?.bucketSeconds, + sourceIds: params?.sourceIds as string[] | undefined, + }); + return makeResponse(request.id, snapshot); + } catch (error) { + return makeError(request.id, ErrorCode.InternalError, `Failed to load observability series: ${normalizeErrorMessage(error)}`); + } + }, + + 'system.serviceLogs': async (request: GatewayRequest): Promise => { + if (!deps.getServiceLogs) { + return makeError(request.id, ErrorCode.InternalError, 'Service logs are not available in this environment'); + } + + const params = request.params as { sourceId?: string; lines?: number; sinceSeconds?: number } | undefined; + if (!params?.sourceId || typeof params.sourceId !== 'string' || params.sourceId.trim().length === 0) { + return makeError(request.id, ErrorCode.InvalidRequest, 'sourceId is required'); + } + + try { + const logs = await deps.getServiceLogs({ + sourceId: params.sourceId, + lines: params.lines, + sinceSeconds: params.sinceSeconds, + }); + return makeResponse(request.id, logs); + } catch (error) { + return makeError(request.id, ErrorCode.InternalError, `Failed to load service logs: ${normalizeErrorMessage(error)}`); + } + }, }; } diff --git a/src/gateway/server.ts b/src/gateway/server.ts index e3c61b6..9ae5b76 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -36,6 +36,7 @@ import { discoverServices } from './handlers/services.js'; import { createModelCatalogFetcher } from './modelCatalog.js'; import { listLocalBackendStatuses, controlLocalBackend } from './handlers/localBackends.js'; import { listDockerDependencyStatuses, controlDockerDependency } from './handlers/dockerDependencies.js'; +import { ObservabilityCollector } from './handlers/observability.js'; import type { TokenUsageEntry, ContextUsageEntry } from './handlers/system.js'; import type { NodeConnectionState } from './handlers/node.js'; import type { SessionManager } from '../session/manager.js'; @@ -172,6 +173,7 @@ export class GatewayServer { private canvasStore: CanvasStore; private metrics: MetricsCollector; private discoveryHandle: GatewayDiscoveryHandle | null = null; + private observabilityCollector: ObservabilityCollector | null = null; private connectionMap: Map = new Map(); private connectionRateMap: Map controlDockerDependency(runtimeConfig, dependency, action) : undefined, + getObservabilitySources: observabilityCollector + ? () => observabilityCollector.listSources() + : undefined, + getObservabilitySeries: observabilityCollector + ? (opts) => observabilityCollector.getSeries(opts) + : undefined, + getServiceLogs: observabilityCollector + ? (opts) => observabilityCollector.getServiceLogs(opts) + : undefined, getPresence: channelRegistry ? (opts) => channelRegistry.getPresence(opts) : undefined, @@ -547,6 +562,7 @@ export class GatewayServer { const { port } = this.config; return new Promise((resolve) => { + this.observabilityCollector?.start(); // Create HTTP server first — handles static file requests this.httpServer = createServer((req: IncomingMessage, res: ServerResponse) => { this.handleHttpRequest(req, res); @@ -584,6 +600,8 @@ export class GatewayServer { } async stop(): Promise { + this.observabilityCollector?.stop(); + if (this.discoveryHandle) { try { await this.discoveryHandle.stop();