feat(gateway): add optional bonjour/mdns discovery
This commit is contained in:
@@ -74,6 +74,34 @@ describe('configSchema — server', () => {
|
||||
expect(result.server.ws_rate_limit.max_violations).toBe(3);
|
||||
expect(result.server.ws_rate_limit.violation_window_ms).toBe(2000);
|
||||
});
|
||||
|
||||
it('defaults discovery settings', () => {
|
||||
const result = configSchema.parse(minimalConfig);
|
||||
expect(result.server.discovery.enabled).toBe(false);
|
||||
expect(result.server.discovery.service_name).toBe('flynn-gateway');
|
||||
expect(result.server.discovery.service_type).toBe('_flynn._tcp');
|
||||
expect(result.server.discovery.txt).toEqual({});
|
||||
});
|
||||
|
||||
it('accepts custom discovery settings', () => {
|
||||
const result = configSchema.parse({
|
||||
...minimalConfig,
|
||||
server: {
|
||||
discovery: {
|
||||
enabled: true,
|
||||
service_name: 'flynn-dev',
|
||||
service_type: '_custom._tcp',
|
||||
txt: {
|
||||
env: 'dev',
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(result.server.discovery.enabled).toBe(true);
|
||||
expect(result.server.discovery.service_name).toBe('flynn-dev');
|
||||
expect(result.server.discovery.service_type).toBe('_custom._tcp');
|
||||
expect(result.server.discovery.txt).toEqual({ env: 'dev' });
|
||||
});
|
||||
});
|
||||
|
||||
describe('configSchema — agent_configs', () => {
|
||||
|
||||
@@ -32,6 +32,17 @@ const wsRateLimitSchema = z.object({
|
||||
violation_window_ms: z.number().min(1000).max(60000).default(10000),
|
||||
}).default({});
|
||||
|
||||
const serverDiscoverySchema = z.object({
|
||||
/** Enable local-network service discovery (mDNS/Bonjour advertisement). */
|
||||
enabled: z.boolean().default(false),
|
||||
/** Service instance name advertised on LAN. */
|
||||
service_name: z.string().min(1).default('flynn-gateway'),
|
||||
/** mDNS service type. */
|
||||
service_type: z.string().min(1).default('_flynn._tcp'),
|
||||
/** Additional TXT metadata advertised with the service record. */
|
||||
txt: z.record(z.string(), z.string()).default({}),
|
||||
}).default({});
|
||||
|
||||
const serverSchema = z.object({
|
||||
tailscale: tailscaleSchema,
|
||||
localhost: z.boolean().default(true),
|
||||
@@ -48,6 +59,8 @@ const serverSchema = z.object({
|
||||
max_request_body_bytes: z.number().min(1024).max(10 * 1024 * 1024).default(1_048_576),
|
||||
/** Per-connection WebSocket ingress rate limit settings. */
|
||||
ws_rate_limit: wsRateLimitSchema,
|
||||
/** Optional Bonjour/mDNS advertisement settings. */
|
||||
discovery: serverDiscoverySchema,
|
||||
});
|
||||
|
||||
/** All supported model provider identifiers. Used by the config schema and TUI autocompletion. */
|
||||
|
||||
@@ -321,6 +321,12 @@ export function createGateway(deps: GatewayDeps): GatewayServer {
|
||||
maxViolations: config.server.ws_rate_limit.max_violations,
|
||||
violationWindowMs: config.server.ws_rate_limit.violation_window_ms,
|
||||
},
|
||||
discovery: {
|
||||
enabled: config.server.discovery.enabled,
|
||||
serviceName: config.server.discovery.service_name,
|
||||
serviceType: config.server.discovery.service_type,
|
||||
txtRecord: config.server.discovery.txt,
|
||||
},
|
||||
commandRegistry: deps.commandRegistry,
|
||||
intentRegistry: deps.intentRegistry,
|
||||
routingPolicy: deps.routingPolicy,
|
||||
|
||||
@@ -0,0 +1,102 @@
|
||||
import { describe, it, expect, vi, beforeAll, beforeEach } from 'vitest';
|
||||
import { spawn } from 'child_process';
|
||||
import { EventEmitter } from 'events';
|
||||
import type { ChildProcess } from 'child_process';
|
||||
|
||||
vi.mock('child_process', () => ({
|
||||
spawn: vi.fn(),
|
||||
}));
|
||||
|
||||
class MockChildProcess extends EventEmitter {
|
||||
exitCode: number | null = null;
|
||||
killed = false;
|
||||
unref = vi.fn();
|
||||
kill = vi.fn((signal?: NodeJS.Signals) => {
|
||||
this.killed = true;
|
||||
this.emit('exit', signal === 'SIGKILL' ? 137 : 0, signal ?? null);
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
const mockSpawn = vi.mocked(spawn);
|
||||
|
||||
describe('gateway discovery', () => {
|
||||
let startGatewayDiscovery: typeof import('./discovery.js').startGatewayDiscovery;
|
||||
|
||||
beforeAll(async () => {
|
||||
const mod = await import('./discovery.js');
|
||||
startGatewayDiscovery = mod.startGatewayDiscovery;
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it('starts avahi publisher with txt records', async () => {
|
||||
const child = new MockChildProcess();
|
||||
mockSpawn.mockReturnValueOnce(child as unknown as ChildProcess);
|
||||
setTimeout(() => child.emit('spawn'), 0);
|
||||
|
||||
const handle = await startGatewayDiscovery({
|
||||
serviceName: 'flynn-gateway',
|
||||
serviceType: '_flynn._tcp',
|
||||
port: 18800,
|
||||
txtRecord: { instance: 'pid-123', version: '0.1.0' },
|
||||
});
|
||||
|
||||
expect(mockSpawn).toHaveBeenCalledWith('avahi-publish-service', [
|
||||
'flynn-gateway',
|
||||
'_flynn._tcp',
|
||||
'18800',
|
||||
'instance=pid-123',
|
||||
'version=0.1.0',
|
||||
], { stdio: 'ignore' });
|
||||
expect(child.unref).toHaveBeenCalledOnce();
|
||||
|
||||
await handle.stop();
|
||||
expect(child.kill).toHaveBeenCalledWith('SIGTERM');
|
||||
});
|
||||
|
||||
it('falls back to dns-sd when avahi is unavailable', async () => {
|
||||
const avahiChild = new MockChildProcess();
|
||||
const dnsChild = new MockChildProcess();
|
||||
mockSpawn.mockReturnValueOnce(avahiChild as unknown as ChildProcess);
|
||||
mockSpawn.mockReturnValueOnce(dnsChild as unknown as ChildProcess);
|
||||
|
||||
setTimeout(() => avahiChild.emit('error', new Error('ENOENT')), 0);
|
||||
setTimeout(() => dnsChild.emit('spawn'), 0);
|
||||
|
||||
const handle = await startGatewayDiscovery({
|
||||
serviceName: 'flynn-gateway',
|
||||
serviceType: '_flynn._tcp',
|
||||
port: 18800,
|
||||
});
|
||||
|
||||
expect(mockSpawn).toHaveBeenNthCalledWith(2, 'dns-sd', [
|
||||
'-R',
|
||||
'flynn-gateway',
|
||||
'_flynn._tcp',
|
||||
'local',
|
||||
'18800',
|
||||
], { stdio: 'ignore' });
|
||||
|
||||
await handle.stop();
|
||||
expect(dnsChild.kill).toHaveBeenCalledWith('SIGTERM');
|
||||
});
|
||||
|
||||
it('throws when no supported advertiser command is available', async () => {
|
||||
const avahiChild = new MockChildProcess();
|
||||
const dnsChild = new MockChildProcess();
|
||||
mockSpawn.mockReturnValueOnce(avahiChild as unknown as ChildProcess);
|
||||
mockSpawn.mockReturnValueOnce(dnsChild as unknown as ChildProcess);
|
||||
|
||||
setTimeout(() => avahiChild.emit('error', new Error('ENOENT')), 0);
|
||||
setTimeout(() => dnsChild.emit('error', new Error('ENOENT')), 0);
|
||||
|
||||
await expect(startGatewayDiscovery({
|
||||
serviceName: 'flynn-gateway',
|
||||
serviceType: '_flynn._tcp',
|
||||
port: 18800,
|
||||
})).rejects.toThrow(/Failed to start mDNS advertiser/);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,107 @@
|
||||
import { spawn, type ChildProcess } from 'child_process';
|
||||
|
||||
export interface GatewayDiscoveryConfig {
|
||||
serviceName: string;
|
||||
serviceType: string;
|
||||
port: number;
|
||||
txtRecord?: Record<string, string>;
|
||||
}
|
||||
|
||||
export interface GatewayDiscoveryHandle {
|
||||
stop(): Promise<void>;
|
||||
}
|
||||
|
||||
function toTxtArgs(txtRecord?: Record<string, string>): string[] {
|
||||
if (!txtRecord) {
|
||||
return [];
|
||||
}
|
||||
return Object.entries(txtRecord).map(([key, value]) => `${key}=${value}`);
|
||||
}
|
||||
|
||||
async function spawnAdvertiser(command: string, args: string[]): Promise<ChildProcess> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const child = spawn(command, args, { stdio: 'ignore' });
|
||||
|
||||
const onError = (error: Error): void => {
|
||||
cleanup();
|
||||
reject(error);
|
||||
};
|
||||
const onExit = (code: number | null, signal: NodeJS.Signals | null): void => {
|
||||
cleanup();
|
||||
reject(new Error(`${command} exited early (code=${code ?? 'null'}, signal=${signal ?? 'null'})`));
|
||||
};
|
||||
const onSpawn = (): void => {
|
||||
setTimeout(() => {
|
||||
cleanup();
|
||||
resolve(child);
|
||||
}, 100);
|
||||
};
|
||||
const cleanup = (): void => {
|
||||
child.off('error', onError);
|
||||
child.off('exit', onExit);
|
||||
child.off('spawn', onSpawn);
|
||||
};
|
||||
|
||||
child.once('error', onError);
|
||||
child.once('exit', onExit);
|
||||
child.once('spawn', onSpawn);
|
||||
});
|
||||
}
|
||||
|
||||
async function stopChild(child: ChildProcess): Promise<void> {
|
||||
if (child.exitCode !== null || child.killed) {
|
||||
return;
|
||||
}
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
const timeout = setTimeout(() => {
|
||||
child.kill('SIGKILL');
|
||||
resolve();
|
||||
}, 1000);
|
||||
|
||||
child.once('exit', () => {
|
||||
clearTimeout(timeout);
|
||||
resolve();
|
||||
});
|
||||
|
||||
child.kill('SIGTERM');
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts LAN discovery using best-effort host tools.
|
||||
* Priority: avahi-publish-service (Linux) -> dns-sd (macOS).
|
||||
*/
|
||||
export async function startGatewayDiscovery(config: GatewayDiscoveryConfig): Promise<GatewayDiscoveryHandle> {
|
||||
const txtArgs = toTxtArgs(config.txtRecord);
|
||||
const attempts: Array<{ command: string; args: string[] }> = [
|
||||
{
|
||||
command: 'avahi-publish-service',
|
||||
args: [config.serviceName, config.serviceType, String(config.port), ...txtArgs],
|
||||
},
|
||||
{
|
||||
command: 'dns-sd',
|
||||
args: ['-R', config.serviceName, config.serviceType, 'local', String(config.port), ...txtArgs],
|
||||
},
|
||||
];
|
||||
|
||||
let lastError: Error | null = null;
|
||||
|
||||
for (const attempt of attempts) {
|
||||
try {
|
||||
const child = await spawnAdvertiser(attempt.command, attempt.args);
|
||||
child.unref();
|
||||
return {
|
||||
stop: async () => {
|
||||
await stopChild(child);
|
||||
},
|
||||
};
|
||||
} catch (error) {
|
||||
lastError = error instanceof Error ? error : new Error(String(error));
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error(
|
||||
`Failed to start mDNS advertiser (tried avahi-publish-service and dns-sd): ${lastError?.message ?? 'unknown error'}`,
|
||||
);
|
||||
}
|
||||
@@ -0,0 +1,76 @@
|
||||
import { describe, it, expect, beforeAll, beforeEach, vi } from 'vitest';
|
||||
|
||||
const startDiscoveryMock = vi.fn();
|
||||
const stopDiscoveryMock = vi.fn();
|
||||
|
||||
vi.mock('./discovery.js', () => ({
|
||||
startGatewayDiscovery: startDiscoveryMock,
|
||||
}));
|
||||
|
||||
describe('GatewayServer discovery lifecycle', () => {
|
||||
let GatewayServer: typeof import('./server.js').GatewayServer;
|
||||
let server: import('./server.js').GatewayServer;
|
||||
|
||||
const baseConfig = {
|
||||
port: 18910,
|
||||
host: '0.0.0.0',
|
||||
sessionManager: {
|
||||
getSession: () => ({ id: 's1', addMessage: () => {}, getHistory: () => [], clear: () => {}, setHistory: () => {}, replaceHistory: () => {} }),
|
||||
listSessions: () => [],
|
||||
} as unknown as import('./server.js').GatewayServerConfig['sessionManager'],
|
||||
modelClient: { chat: async () => ({ content: 'ok', usage: { inputTokens: 0, outputTokens: 0 }, stopReason: 'end_turn' }) },
|
||||
systemPrompt: 'test',
|
||||
toolRegistry: {
|
||||
list: () => [],
|
||||
get: () => undefined,
|
||||
} as unknown as import('./server.js').GatewayServerConfig['toolRegistry'],
|
||||
toolExecutor: { execute: async () => ({ success: true, output: '' }) } as unknown as import('./server.js').GatewayServerConfig['toolExecutor'],
|
||||
version: '0.1.0-test',
|
||||
} as const;
|
||||
|
||||
beforeAll(async () => {
|
||||
({ GatewayServer } = await import('./server.js'));
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
startDiscoveryMock.mockResolvedValue({
|
||||
stop: stopDiscoveryMock,
|
||||
});
|
||||
server = new GatewayServer({
|
||||
...baseConfig,
|
||||
discovery: {
|
||||
enabled: true,
|
||||
serviceName: 'flynn-test',
|
||||
serviceType: '_flynn._tcp',
|
||||
txtRecord: { env: 'test' },
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('starts and stops discovery when enabled', async () => {
|
||||
const typedServer = server as unknown as { startDiscovery: (host: string, port: number) => Promise<void> };
|
||||
await typedServer.startDiscovery('0.0.0.0', 18910);
|
||||
await server.stop();
|
||||
|
||||
expect(startDiscoveryMock).toHaveBeenCalledOnce();
|
||||
const [callConfig] = startDiscoveryMock.mock.calls[0] as [Record<string, unknown>];
|
||||
expect(callConfig.serviceName).toBe('flynn-test');
|
||||
expect(callConfig.serviceType).toBe('_flynn._tcp');
|
||||
expect(callConfig.port).toBe(18910);
|
||||
expect(callConfig.txtRecord).toMatchObject({
|
||||
env: 'test',
|
||||
version: '0.1.0-test',
|
||||
});
|
||||
expect(stopDiscoveryMock).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it('does not advertise when host is loopback', async () => {
|
||||
const typedServer = server as unknown as { startDiscovery: (host: string, port: number) => Promise<void> };
|
||||
await typedServer.startDiscovery('127.0.0.1', 18911);
|
||||
await server.stop();
|
||||
|
||||
expect(startDiscoveryMock).not.toHaveBeenCalled();
|
||||
expect(stopDiscoveryMock).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
+50
-1
@@ -9,6 +9,7 @@ import { LaneQueue } from './lane-queue.js';
|
||||
import { MetricsCollector } from './metrics.js';
|
||||
import { authenticateRequest } from './auth.js';
|
||||
import type { AuthConfig } from './auth.js';
|
||||
import { startGatewayDiscovery, type GatewayDiscoveryHandle } from './discovery.js';
|
||||
import {
|
||||
parseMessage,
|
||||
makeError,
|
||||
@@ -86,6 +87,12 @@ export interface GatewayServerConfig {
|
||||
commandRegistry?: CommandRegistry;
|
||||
intentRegistry?: ComponentRegistry;
|
||||
routingPolicy?: RoutingPolicy;
|
||||
discovery?: {
|
||||
enabled: boolean;
|
||||
serviceName: string;
|
||||
serviceType: string;
|
||||
txtRecord?: Record<string, string>;
|
||||
};
|
||||
}
|
||||
|
||||
export class GatewayServer {
|
||||
@@ -103,6 +110,7 @@ export class GatewayServer {
|
||||
private sessionBridge: SessionBridge;
|
||||
private laneQueue: LaneQueue;
|
||||
private metrics: MetricsCollector;
|
||||
private discoveryHandle: GatewayDiscoveryHandle | null = null;
|
||||
private connectionMap: Map<WebSocket, string> = new Map();
|
||||
private connectionRateMap: Map<string, {
|
||||
tokens: number;
|
||||
@@ -272,12 +280,24 @@ export class GatewayServer {
|
||||
|
||||
this.httpServer.listen(port, host, () => {
|
||||
console.log(`Gateway server listening on ${host}:${port}`);
|
||||
resolve();
|
||||
void this.startDiscovery(host, port).finally(() => {
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
if (this.discoveryHandle) {
|
||||
try {
|
||||
await this.discoveryHandle.stop();
|
||||
} catch (err) {
|
||||
console.error('Failed to stop mDNS discovery:', err instanceof Error ? err.message : err);
|
||||
} finally {
|
||||
this.discoveryHandle = null;
|
||||
}
|
||||
}
|
||||
|
||||
// Close all WebSocket connections first
|
||||
for (const [ws, connectionId] of this.connectionMap) {
|
||||
this.sessionBridge.disconnect(connectionId);
|
||||
@@ -543,6 +563,35 @@ export class GatewayServer {
|
||||
this.config.gmailHandler = handler;
|
||||
}
|
||||
|
||||
private async startDiscovery(host: string, port: number): Promise<void> {
|
||||
const discovery = this.config.discovery;
|
||||
if (!discovery?.enabled) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (host === '127.0.0.1' || host === '::1') {
|
||||
console.warn('mDNS discovery is enabled, but server.localhost=true restricts gateway to loopback; skipping advertisement');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const txtRecord: Record<string, string> = {
|
||||
instance: `pid-${process.pid}`,
|
||||
version: this.config.version ?? '0.1.0',
|
||||
...(discovery.txtRecord ?? {}),
|
||||
};
|
||||
this.discoveryHandle = await startGatewayDiscovery({
|
||||
serviceName: discovery.serviceName,
|
||||
serviceType: discovery.serviceType,
|
||||
port,
|
||||
txtRecord,
|
||||
});
|
||||
console.log(`mDNS discovery enabled: ${discovery.serviceName}.${discovery.serviceType}.local:${port}`);
|
||||
} catch (err) {
|
||||
console.warn(`mDNS discovery failed to start: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
/** Read the full request body as a string. */
|
||||
private readRequestBody(req: IncomingMessage): Promise<string> {
|
||||
const maxBytes = this.config.maxRequestBodyBytes ?? GatewayServer.DEFAULT_MAX_REQUEST_BODY_BYTES;
|
||||
|
||||
Reference in New Issue
Block a user