fix(tests): resolve 4 post-phase test failures

- platformClients.integration: iOS/Android push tests lacked
  setStatus() call before listNodes(), so platform filter excluded
  nodes. Added publishHeartbeat() to set platform on connection state.
- server.test: agent.send now emits run_state events before done (Phase
  1). Added sendAndWaitForDone() helper and updated test to find done
  event rather than assuming index 0.
- handlers.test: updated agent.send/cancel assertions to use find()
  and pass send arg to agent.cancel, consistent with run_state events.
- httpBody: req.destroy() closed socket before 413 response could be
  sent. Removed socket destruction from body reader; 413 responses now
  send Connection: close so Node closes the connection cleanly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
William Valentin
2026-02-25 11:55:14 -08:00
parent 787dd61a6d
commit b39010d602
9 changed files with 62 additions and 28 deletions
+1 -1
View File
@@ -136,7 +136,7 @@ export class WebhookHandler implements ChannelAdapter {
} catch (err) { } catch (err) {
if (err instanceof RequestBodyTooLargeError) { if (err instanceof RequestBodyTooLargeError) {
auditLogger?.webhookDenied(webhookName, `Payload too large (>${this.maxRequestBodyBytes} bytes)`); auditLogger?.webhookDenied(webhookName, `Payload too large (>${this.maxRequestBodyBytes} bytes)`);
res.writeHead(413, { 'Content-Type': 'application/json' }); res.writeHead(413, { 'Content-Type': 'application/json', 'Connection': 'close' });
res.end(JSON.stringify({ error: 'Payload too large' })); res.end(JSON.stringify({ error: 'Payload too large' }));
return false; return false;
} }
@@ -396,6 +396,7 @@ describe('platform clients integration', () => {
try { try {
await client.register(); await client.register();
await client.publishHeartbeat();
const push = await client.registerPushToken({ const push = await client.registerPushToken({
token: 'd'.repeat(64), token: 'd'.repeat(64),
topic: 'dev.flynn.ios', topic: 'dev.flynn.ios',
@@ -424,6 +425,7 @@ describe('platform clients integration', () => {
try { try {
await client.register(); await client.register();
await client.publishHeartbeat();
const push = await client.registerPushToken('e'.repeat(64)); const push = await client.registerPushToken('e'.repeat(64));
expect(push.updated).toBe(true); expect(push.updated).toBe(true);
+6 -2
View File
@@ -212,7 +212,8 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
command: parsedCommand, command: parsedCommand,
}); });
const isCommand = Boolean(commandInput && deps.commandRegistry?.isCommand(commandInput)); const commandRegistry = deps.commandRegistry;
const isCommand = Boolean(commandInput && commandRegistry?.isCommand(commandInput));
if (!isCommand) { if (!isCommand) {
activeRequestIds.set(connectionId, request.id); activeRequestIds.set(connectionId, request.id);
auditLogger?.runState?.({ auditLogger?.runState?.({
@@ -231,8 +232,11 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
} }
if (isCommand) { if (isCommand) {
if (!commandRegistry) {
throw new Error('Command registry is not available');
}
const sessionId = deps.sessionBridge.getSessionId(connectionId); const sessionId = deps.sessionBridge.getSessionId(connectionId);
const commandResult = await deps.commandRegistry.execute(commandInput, { const commandResult = await commandRegistry.execute(commandInput, {
channel: 'ws', channel: 'ws',
senderId: connectionId, senderId: connectionId,
sessionId: sessionId ?? `ws:${connectionId}`, sessionId: sessionId ?? `ws:${connectionId}`,
+23 -13
View File
@@ -1146,8 +1146,11 @@ describe('agent handlers', () => {
await handlers['agent.send'](req, send); await handlers['agent.send'](req, send);
expect(mockAgent.process).toHaveBeenCalledWith('hello', undefined); expect(mockAgent.process).toHaveBeenCalledWith('hello', undefined);
expect(sent).toHaveLength(1); const doneEvent = sent.find((msg) => (msg as GatewayEvent).event === 'done') as GatewayEvent | undefined;
const doneEvent = sent[0] as GatewayEvent; expect(doneEvent).toBeTruthy();
if (!doneEvent) {
throw new Error('done event not emitted');
}
expect(doneEvent.event).toBe('done'); expect(doneEvent.event).toBe('done');
expect(getPath(doneEvent.data, 'content')).toBe('response text'); expect(getPath(doneEvent.data, 'content')).toBe('response text');
}); });
@@ -1171,7 +1174,11 @@ describe('agent handlers', () => {
{ mimeType: 'image/png', data: 'iVBOR...', url: undefined, filename: 'screenshot.png' }, { mimeType: 'image/png', data: 'iVBOR...', url: undefined, filename: 'screenshot.png' },
{ mimeType: 'application/pdf', data: undefined, url: 'https://example.com/doc.pdf', filename: undefined }, { mimeType: 'application/pdf', data: undefined, url: 'https://example.com/doc.pdf', filename: undefined },
]); ]);
const doneEvent = sent[0] as GatewayEvent; const doneEvent = sent.find((msg) => (msg as GatewayEvent).event === 'done') as GatewayEvent | undefined;
expect(doneEvent).toBeTruthy();
if (!doneEvent) {
throw new Error('done event not emitted');
}
expect(doneEvent.event).toBe('done'); expect(doneEvent.event).toBe('done');
}); });
@@ -1187,7 +1194,8 @@ describe('agent handlers', () => {
await handlers['agent.send'](req, send); await handlers['agent.send'](req, send);
expect(mockAgent.process).toHaveBeenCalledWith('hi', []); expect(mockAgent.process).toHaveBeenCalledWith('hi', []);
expect(sent).toHaveLength(1); const doneEvent = sent.find((msg) => (msg as GatewayEvent).event === 'done');
expect(doneEvent).toBeTruthy();
}); });
it('agent.send accepts attachment-only requests', async () => { it('agent.send accepts attachment-only requests', async () => {
@@ -1207,8 +1215,8 @@ describe('agent handlers', () => {
expect(mockAgent.process).toHaveBeenCalledWith('', [ expect(mockAgent.process).toHaveBeenCalledWith('', [
{ mimeType: 'image/png', data: 'iVBOR...', url: undefined, filename: undefined }, { mimeType: 'image/png', data: 'iVBOR...', url: undefined, filename: undefined },
]); ]);
expect(sent).toHaveLength(1); const doneEvent = sent.find((msg) => (msg as GatewayEvent).event === 'done');
expect((sent[0] as GatewayEvent).event).toBe('done'); expect(doneEvent).toBeTruthy();
}); });
it('agent.send requires message or attachments', async () => { it('agent.send requires message or attachments', async () => {
@@ -1247,10 +1255,8 @@ describe('agent handlers', () => {
await Promise.all([p1, p2]); await Promise.all([p1, p2]);
// Both should have completed — no AgentBusy error // Both should have completed — no AgentBusy error
expect(sent1).toHaveLength(1); expect(sent1.find((msg) => (msg as GatewayEvent).event === 'done')).toBeTruthy();
expect((sent1[0] as GatewayEvent).event).toBe('done'); expect(sent2.find((msg) => (msg as GatewayEvent).event === 'done')).toBeTruthy();
expect(sent2).toHaveLength(1);
expect((sent2[0] as GatewayEvent).event).toBe('done');
expect(mockAgent.process).toHaveBeenCalledTimes(2); expect(mockAgent.process).toHaveBeenCalledTimes(2);
}); });
@@ -1262,7 +1268,11 @@ describe('agent handlers', () => {
await handlers['agent.send'](req, send); await handlers['agent.send'](req, send);
const errorEvent = sent[0] as GatewayEvent; const errorEvent = sent.find((msg) => (msg as GatewayEvent).event === 'error') as GatewayEvent | undefined;
expect(errorEvent).toBeTruthy();
if (!errorEvent) {
throw new Error('error event not emitted');
}
expect(errorEvent.event).toBe('error'); expect(errorEvent.event).toBe('error');
expect(getPath(errorEvent.data, 'message')).toBe('model failed'); expect(getPath(errorEvent.data, 'message')).toBe('model failed');
}); });
@@ -1291,7 +1301,7 @@ describe('agent handlers', () => {
it('agent.cancel returns cancelled state', async () => { it('agent.cancel returns cancelled state', async () => {
mockBridge.cancel.mockReturnValue(true); mockBridge.cancel.mockReturnValue(true);
const req: GatewayRequest = { id: 7, method: 'agent.cancel', params: { connectionId: 'conn-1' } }; const req: GatewayRequest = { id: 7, method: 'agent.cancel', params: { connectionId: 'conn-1' } };
const result = await handlers['agent.cancel'](req) as GatewayResponse; const result = await handlers['agent.cancel'](req, vi.fn()) as GatewayResponse;
expect(getPath(result.result, 'cancelled')).toBe(true); expect(getPath(result.result, 'cancelled')).toBe(true);
expect(getPath(result.result, 'message')).toContain('Cancellation requested'); expect(getPath(result.result, 'message')).toContain('Cancellation requested');
@@ -1301,7 +1311,7 @@ describe('agent handlers', () => {
it('agent.cancel returns not-cancelled when no active operation exists', async () => { it('agent.cancel returns not-cancelled when no active operation exists', async () => {
mockBridge.cancel.mockReturnValue(false); mockBridge.cancel.mockReturnValue(false);
const req: GatewayRequest = { id: 8, method: 'agent.cancel', params: { connectionId: 'conn-1' } }; const req: GatewayRequest = { id: 8, method: 'agent.cancel', params: { connectionId: 'conn-1' } };
const result = await handlers['agent.cancel'](req) as GatewayResponse; const result = await handlers['agent.cancel'](req, vi.fn()) as GatewayResponse;
expect(getPath(result.result, 'cancelled')).toBe(false); expect(getPath(result.result, 'cancelled')).toBe(false);
expect(getPath(result.result, 'message')).toContain('No active operation'); expect(getPath(result.result, 'message')).toContain('No active operation');
+22 -3
View File
@@ -102,6 +102,24 @@ function sendAndReceiveAll(ws: WebSocket, msg: object, count: number): Promise<A
}); });
} }
/** Send a request and collect messages until a terminal event ('done' or 'error') is received. */
function sendAndWaitForDone(ws: WebSocket, msg: object): Promise<Array<GatewayResponse | GatewayError | GatewayEvent>> {
return new Promise((resolve) => {
const messages: Array<GatewayResponse | GatewayError | GatewayEvent> = [];
const handler = (data: Buffer) => {
const parsed = JSON.parse(data.toString()) as GatewayResponse | GatewayError | GatewayEvent;
messages.push(parsed);
const event = parsed as GatewayEvent;
if (event.event === 'done' || event.event === 'error') {
ws.off('message', handler);
resolve(messages);
}
};
ws.on('message', handler);
ws.send(JSON.stringify(msg));
});
}
beforeAll(async () => { beforeAll(async () => {
LISTEN_ALLOWED = await canListenOnLocalhost(); LISTEN_ALLOWED = await canListenOnLocalhost();
}); });
@@ -231,9 +249,10 @@ describe('GatewayServer integration', () => {
} }
const ws = await createClient(); const ws = await createClient();
try { try {
// agent.send streams events — we expect a 'done' event // agent.send streams events (run_state + done) — wait for terminal done event
const messages = await sendAndReceiveAll(ws, { id: 4, method: 'agent.send', params: { message: 'hi' } }, 1); const messages = await sendAndWaitForDone(ws, { id: 4, method: 'agent.send', params: { message: 'hi' } });
const doneEvent = messages[0] as GatewayEvent; const doneEvent = messages.find((m) => (m as GatewayEvent).event === 'done') as GatewayEvent;
expect(doneEvent).toBeTruthy();
expect(doneEvent.id).toBe(4); expect(doneEvent.id).toBe(4);
expect(doneEvent.event).toBe('done'); expect(doneEvent.event).toBe('done');
expect((doneEvent.data as { content?: string }).content).toBe('Hello from Flynn!'); expect((doneEvent.data as { content?: string }).content).toBe('Hello from Flynn!');
+3 -3
View File
@@ -823,7 +823,7 @@ export class GatewayServer {
rawBody = await this.readRequestBody(req); rawBody = await this.readRequestBody(req);
} catch (err) { } catch (err) {
if (err instanceof RequestBodyTooLargeError) { if (err instanceof RequestBodyTooLargeError) {
res.writeHead(413, { 'Content-Type': 'application/json' }); res.writeHead(413, { 'Content-Type': 'application/json', 'Connection': 'close' });
res.end(JSON.stringify({ error: 'Payload too large' })); res.end(JSON.stringify({ error: 'Payload too large' }));
return true; return true;
} }
@@ -887,7 +887,7 @@ export class GatewayServer {
rawBody = await this.readRequestBody(req); rawBody = await this.readRequestBody(req);
} catch (err) { } catch (err) {
if (err instanceof RequestBodyTooLargeError) { if (err instanceof RequestBodyTooLargeError) {
res.writeHead(413, { 'Content-Type': 'application/json' }); res.writeHead(413, { 'Content-Type': 'application/json', 'Connection': 'close' });
res.end(JSON.stringify({ error: 'Payload too large' })); res.end(JSON.stringify({ error: 'Payload too large' }));
return true; return true;
} }
@@ -974,7 +974,7 @@ export class GatewayServer {
res.end(JSON.stringify({ ok: true })); res.end(JSON.stringify({ ok: true }));
} catch (err) { } catch (err) {
if (err instanceof RequestBodyTooLargeError) { if (err instanceof RequestBodyTooLargeError) {
res.writeHead(413, { 'Content-Type': 'application/json' }); res.writeHead(413, { 'Content-Type': 'application/json', 'Connection': 'close' });
res.end(JSON.stringify({ error: 'Payload too large' })); res.end(JSON.stringify({ error: 'Payload too large' }));
} else { } else {
console.error('Gmail push handler error:', err instanceof Error ? err.message : err); console.error('Gmail push handler error:', err instanceof Error ? err.message : err);
+4 -1
View File
@@ -248,8 +248,11 @@ describe('ChatPage wiring', () => {
await Promise.resolve(); await Promise.resolve();
const statusLine = Array.from(root.querySelectorAll('div')) const statusLine = Array.from(root.querySelectorAll('div'))
.find((el: any) => String(el.textContent ?? '').startsWith('Run status:')); .find((el: any) => String(el.textContent ?? '').startsWith('Run status:')) as any;
expect(statusLine).toBeTruthy(); expect(statusLine).toBeTruthy();
if (!statusLine) {
throw new Error('Run status line not found');
}
expect(statusLine.classList.contains('hidden')).toBe(false); expect(statusLine.classList.contains('hidden')).toBe(false);
resolveResult?.({ content: 'ok' }); resolveResult?.({ content: 'ok' });
+1 -2
View File
@@ -29,7 +29,7 @@ describe('readRequestBody', () => {
expect(req.destroyed).toBe(false); expect(req.destroyed).toBe(false);
}); });
it('rejects oversized body and destroys request', async () => { it('rejects oversized body with RequestBodyTooLargeError', async () => {
const req = new MockRequest(); const req = new MockRequest();
const bodyPromise = readRequestBody(asIncoming(req), { maxBytes: 5 }); const bodyPromise = readRequestBody(asIncoming(req), { maxBytes: 5 });
@@ -37,6 +37,5 @@ describe('readRequestBody', () => {
req.emit('data', Buffer.from('6')); req.emit('data', Buffer.from('6'));
await expect(bodyPromise).rejects.toBeInstanceOf(RequestBodyTooLargeError); await expect(bodyPromise).rejects.toBeInstanceOf(RequestBodyTooLargeError);
expect(req.destroyed).toBe(true);
}); });
}); });
-3
View File
@@ -40,9 +40,6 @@ export function readRequestBody(req: IncomingMessage, opts: ReadRequestBodyOptio
const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
totalBytes += buf.length; totalBytes += buf.length;
if (totalBytes > opts.maxBytes) { if (totalBytes > opts.maxBytes) {
if (typeof req.destroy === 'function') {
req.destroy();
}
fail(new RequestBodyTooLargeError(opts.maxBytes, totalBytes)); fail(new RequestBodyTooLargeError(opts.maxBytes, totalBytes));
return; return;
} }