diff --git a/README.md b/README.md index ac1a083..73cfc8f 100644 --- a/README.md +++ b/README.md @@ -205,6 +205,17 @@ Sample Codex hook configuration lives in [hooks/codex/hooks.json](/home/will/lab The current Codex integration does not assume tool or subagent span hooks exist. If a newer Codex CLI exposes official tool/span hooks, they can be added separately without changing the run/session flow above. +## Gemini Hook + +The `hooks/gemini/` directory contains a TypeScript handler for Gemini CLI telemetry. The current integration maps Gemini hook events into agentmon's session/run/span model: + +- `onStart` maps to `session.start` and an initial `run.start` +- `onStop` maps to `run.end` and `session.end` +- `onToolCall` maps to `span.start` +- `onToolResult` maps to `span.end` + +Sample Gemini hook configuration lives in [hooks/gemini/hooks.json](/home/will/lab/agentmon/hooks/gemini/hooks.json). Install the handler from that directory so the `agentmon-gemini-handler` binary is available, then point Gemini CLI at the sample hook config and set `AGENTMON_INGEST_URL` to your ingest gateway. + ## Go SDK Emit events from Go applications: diff --git a/hooks/agentmon/handler.ts b/hooks/agentmon/handler.ts index 1b09d9c..5ce1243 100644 --- a/hooks/agentmon/handler.ts +++ b/hooks/agentmon/handler.ts @@ -1,66 +1,22 @@ import { randomUUID } from 'node:crypto'; import { hostname } from 'node:os'; +import { + Dict, + isRecord, + pickString, + pickNumber, + truncate, + buildEnvelope, + createTransport, +} from '../shared/lib'; -type Dict = Record; - -const INGEST_URL = process.env.AGENTMON_INGEST_URL || 'http://192.168.122.1:8080'; +const INGEST_URL = process.env.AGENTMON_INGEST_URL || 'http://localhost:8080'; const VM_NAME = process.env.AGENTMON_VM_NAME || hostname(); -const BATCH_SIZE = 10; -const FLUSH_MS = 2000; -const FETCH_TIMEOUT_MS = 500; -let buffer: Dict[] = []; -let flushTimer: ReturnType | null = null; -let isFlushing = false; +const { enqueue, flush } = createTransport(INGEST_URL); const activeRuns = new Map(); -function isRecord(value: unknown): value is Dict { - return value !== null && typeof value === 'object' && !Array.isArray(value); -} - -function pickString(...values: unknown[]): string | undefined { - for (const value of values) { - if (typeof value === 'string' && value.trim() !== '') { - return value; - } - } - return undefined; -} - -function pickNumber(...values: unknown[]): number | undefined { - for (const value of values) { - if (typeof value === 'number' && Number.isFinite(value)) { - return value; - } - } - return undefined; -} - -function truncate(value: unknown, limit: number): string | undefined { - if (value === undefined || value === null) { - return undefined; - } - - const text = typeof value === 'string' ? value : safeJSONStringify(value); - if (!text) { - return undefined; - } - - if (text.length <= limit) { - return text; - } - return text.slice(0, limit) + '...'; -} - -function safeJSONStringify(value: unknown): string { - try { - return JSON.stringify(value); - } catch { - return String(value); - } -} - function getEventName(input: Dict): string { const direct = pickString(input.name, input.event); if (direct) { @@ -97,119 +53,6 @@ function getSessionKey(input: Dict, context: Dict): string | undefined { ); } -function buildEnvelope( - type: string, - sessionKey?: string, - opts: { - runId?: string; - spanId?: string; - parentSpanId?: string; - attributes?: Dict; - payload?: Dict; - } = {}, -): Dict { - const correlation: Dict = {}; - if (sessionKey) { - correlation.session_id = sessionKey; - } - if (opts.runId) { - correlation.run_id = opts.runId; - } - if (opts.spanId) { - correlation.span_id = opts.spanId; - } - if (opts.parentSpanId) { - correlation.parent_span_id = opts.parentSpanId; - } - - const envelope: Dict = { - schema: { name: 'agentmon.event', version: 1 }, - event: { - id: randomUUID(), - type, - ts: new Date().toISOString(), - source: { - framework: 'openclaw', - client_id: VM_NAME, - host: VM_NAME, - }, - }, - }; - - if (Object.keys(correlation).length > 0) { - envelope.correlation = correlation; - } - if (opts.attributes && Object.keys(opts.attributes).length > 0) { - envelope.attributes = opts.attributes; - } - if (opts.payload && Object.keys(opts.payload).length > 0) { - envelope.payload = opts.payload; - } - - return envelope; -} - -function scheduleFlush() { - if (!flushTimer) { - flushTimer = setTimeout(() => { - void flush(); - }, FLUSH_MS); - } -} - -function enqueue(event: Dict) { - buffer.push(event); - if (buffer.length >= BATCH_SIZE) { - void flush(); - } else { - scheduleFlush(); - } -} - -async function postBatch(batch: Dict[]) { - const controller = new AbortController(); - const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS); - - try { - await fetch(`${INGEST_URL}/v1/events`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify(batch), - signal: controller.signal, - }); - } finally { - clearTimeout(timeout); - } -} - -async function flush() { - if (flushTimer) { - clearTimeout(flushTimer); - flushTimer = null; - } - if (isFlushing || buffer.length === 0) { - return; - } - - isFlushing = true; - const batch = buffer.splice(0, BATCH_SIZE); - - try { - await postBatch(batch); - } catch { - console.debug(`[agentmon] failed to flush ${batch.length} events`); - } finally { - isFlushing = false; - if (buffer.length > 0) { - if (buffer.length >= BATCH_SIZE) { - void flush(); - } else { - scheduleFlush(); - } - } - } -} - function emitError(sessionKey: string | undefined, runId: string | undefined, spanId: string | undefined, errorValue: unknown) { if (errorValue === undefined || errorValue === null || errorValue === false) { return; @@ -219,7 +62,7 @@ function emitError(sessionKey: string | undefined, runId: string | undefined, sp const message = pickString(errorRecord.message, errorRecord.error, errorValue) || 'unknown'; const errType = pickString(errorRecord.type, errorRecord.code) || 'openclaw'; - enqueue(buildEnvelope('error', sessionKey, { + enqueue(buildEnvelope('openclaw', VM_NAME, 'error', sessionKey, { runId, spanId, payload: { @@ -265,12 +108,12 @@ const handler = async (rawEvent: unknown) => { try { if (eventName === 'command:new') { - enqueue(buildEnvelope('session.start', sessionKey)); + enqueue(buildEnvelope('openclaw', VM_NAME, 'session.start', sessionKey)); return; } if (eventName === 'command:stop') { - enqueue(buildEnvelope('session.end', sessionKey)); + enqueue(buildEnvelope('openclaw', VM_NAME, 'session.end', sessionKey)); if (sessionKey) { activeRuns.delete(sessionKey); } @@ -278,8 +121,8 @@ const handler = async (rawEvent: unknown) => { } if (eventName === 'command:reset') { - enqueue(buildEnvelope('session.end', sessionKey)); - enqueue(buildEnvelope('session.start', sessionKey)); + enqueue(buildEnvelope('openclaw', VM_NAME, 'session.end', sessionKey)); + enqueue(buildEnvelope('openclaw', VM_NAME, 'session.start', sessionKey)); if (sessionKey) { activeRuns.delete(sessionKey); } @@ -297,7 +140,7 @@ const handler = async (rawEvent: unknown) => { activeRuns.set(sessionKey, runId); } - enqueue(buildEnvelope('run.start', sessionKey, { + enqueue(buildEnvelope('openclaw', VM_NAME, 'run.start', sessionKey, { runId, attributes: { agent_id: pickString(context.agentId as string | undefined), @@ -314,7 +157,7 @@ const handler = async (rawEvent: unknown) => { activeRuns.set(sessionKey, runId); } - enqueue(buildEnvelope('run.start', sessionKey, { + enqueue(buildEnvelope('openclaw', VM_NAME, 'run.start', sessionKey, { runId, attributes: { channel: pickString(context.channelId, context.channel_id), @@ -334,7 +177,7 @@ const handler = async (rawEvent: unknown) => { const runId = sessionKey ? activeRuns.get(sessionKey) : undefined; const success = context.success !== false && !context.error; - enqueue(buildEnvelope('run.end', sessionKey, { + enqueue(buildEnvelope('openclaw', VM_NAME, 'run.end', sessionKey, { runId, attributes: { channel: pickString(context.channelId, context.channel_id), diff --git a/hooks/claude-code/handler.ts b/hooks/claude-code/handler.ts index 4343e76..4a4af0b 100644 --- a/hooks/claude-code/handler.ts +++ b/hooks/claude-code/handler.ts @@ -3,15 +3,22 @@ import { randomUUID } from 'node:crypto'; import { hostname, homedir } from 'node:os'; import { readFileSync, writeFileSync, mkdirSync, unlinkSync } from 'node:fs'; import { join } from 'node:path'; +import { + type Dict, + isRecord, + pickString, + pickNumber, + truncate, + buildEnvelope, + createTransport, + readStdin, +} from '../shared/lib'; const INGEST_URL = process.env.AGENTMON_INGEST_URL || 'http://localhost:8080'; const FRAMEWORK = process.env.AGENTMON_FRAMEWORK || 'claude-code'; const HOST = process.env.AGENTMON_HOST || hostname(); -const BATCH_SIZE = 10; -const FLUSH_MS = 2000; -const FETCH_TIMEOUT_MS = 500; -interface Dict { [key: string]: any } +const { enqueue, flush } = createTransport(INGEST_URL); // ── Persisted state (survives between hook subprocess invocations) ────────── interface SessionState { @@ -49,60 +56,10 @@ function clearState(sessionKey: string) { } // ───────────────────────────────────────────────────────────────────────────── -let buffer: Dict[] = []; -let flushTimer: ReturnType | null = null; -let isFlushing = false; - const activeRuns = new Map(); const activeSpans = new Map(); const activeSubagents = new Map(); -function isRecord(value: unknown): value is Dict { - return value !== null && typeof value === 'object' && !Array.isArray(value); -} - -function pickString(...values: unknown[]): string | undefined { - for (const value of values) { - if (typeof value === 'string' && value.trim() !== '') { - return value; - } - } - return undefined; -} - -function pickNumber(...values: unknown[]): number | undefined { - for (const value of values) { - if (typeof value === 'number' && Number.isFinite(value)) { - return value; - } - } - return undefined; -} - -function truncate(value: unknown, limit: number): string | undefined { - if (value === undefined || value === null) { - return undefined; - } - - const text = typeof value === 'string' ? value : safeJSONStringify(value); - if (!text) { - return undefined; - } - - if (text.length <= limit) { - return text; - } - return text.slice(0, limit) + '...'; -} - -function safeJSONStringify(value: unknown): string { - try { - return JSON.stringify(value); - } catch { - return String(value); - } -} - function getSessionKey(input: Dict): string | undefined { return pickString( input.sessionId, @@ -113,11 +70,11 @@ function getSessionKey(input: Dict): string | undefined { } function getUsage(input: Dict): Dict | undefined { - const usage = isRecord(input.usage) ? input.usage : - isRecord(input.llm) ? input.llm : + const usage = isRecord(input.usage) ? input.usage : + isRecord(input.llm) ? input.llm : undefined; if (!usage) return undefined; - + const result: Dict = {}; if (usage.input_tokens !== undefined) result.input_tokens = usage.input_tokens; if (usage.output_tokens !== undefined) result.output_tokens = usage.output_tokens; @@ -126,7 +83,7 @@ function getUsage(input: Dict): Dict | undefined { if (usage.thinking_tokens !== undefined) result.thinking_tokens = usage.thinking_tokens; if (usage.total_tokens !== undefined) result.total_tokens = usage.total_tokens; if (usage.total_cost !== undefined) result.total_cost = usage.total_cost; - + return Object.keys(result).length > 0 ? result : undefined; } @@ -134,130 +91,17 @@ function getContextWindow(input: Dict): Dict | undefined { const stats = isRecord(input.context_stats) ? input.context_stats : isRecord(input.stats) ? input.stats : undefined; if (!stats) return undefined; - + const result: Dict = {}; if (stats.input_tokens !== undefined) result.input_tokens = stats.input_tokens; if (stats.output_tokens !== undefined) result.output_tokens = stats.output_tokens; if (stats.used_tokens !== undefined) result.used_tokens = stats.used_tokens; if (stats.max_tokens !== undefined) result.max_tokens = stats.max_tokens; if (stats.tokens_remaining !== undefined) result.tokens_remaining = stats.tokens_remaining; - + return Object.keys(result).length > 0 ? result : undefined; } -function buildEnvelope( - type: string, - sessionKey?: string, - opts: { - runId?: string; - spanId?: string; - parentSpanId?: string; - attributes?: Dict; - payload?: Dict; - } = {}, -): Dict { - const correlation: Dict = {}; - if (sessionKey) { - correlation.session_id = sessionKey; - } - if (opts.runId) { - correlation.run_id = opts.runId; - } - if (opts.spanId) { - correlation.span_id = opts.spanId; - } - if (opts.parentSpanId) { - correlation.parent_span_id = opts.parentSpanId; - } - - const envelope: Dict = { - schema: { name: 'agentmon.event', version: 1 }, - event: { - id: randomUUID(), - type, - ts: new Date().toISOString(), - source: { - framework: FRAMEWORK, - client_id: HOST, - host: HOST, - }, - }, - }; - - if (Object.keys(correlation).length > 0) { - envelope.correlation = correlation; - } - if (opts.attributes && Object.keys(opts.attributes).length > 0) { - envelope.attributes = opts.attributes; - } - if (opts.payload && Object.keys(opts.payload).length > 0) { - envelope.payload = opts.payload; - } - - return envelope; -} - -function scheduleFlush() { - if (!flushTimer) { - flushTimer = setTimeout(() => { - void flush(); - }, FLUSH_MS); - } -} - -function enqueue(event: Dict) { - buffer.push(event); - if (buffer.length >= BATCH_SIZE) { - void flush(); - } else { - scheduleFlush(); - } -} - -async function postBatch(batch: Dict[]) { - const controller = new AbortController(); - const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS); - - try { - await fetch(`${INGEST_URL}/v1/events`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify(batch), - signal: controller.signal, - }); - } finally { - clearTimeout(timeout); - } -} - -async function flush() { - if (flushTimer) { - clearTimeout(flushTimer); - flushTimer = null; - } - if (isFlushing || buffer.length === 0) { - return; - } - - isFlushing = true; - const batch = buffer.splice(0, BATCH_SIZE); - - try { - await postBatch(batch); - } catch { - console.debug(`[agentmon] failed to flush ${batch.length} events`); - } finally { - isFlushing = false; - if (buffer.length > 0) { - if (buffer.length >= BATCH_SIZE) { - void flush(); - } else { - scheduleFlush(); - } - } - } -} - function emitError(sessionKey: string | undefined, runId: string | undefined, spanId: string | undefined, errorValue: unknown) { if (errorValue === undefined || errorValue === null || errorValue === false) { return; @@ -267,7 +111,7 @@ function emitError(sessionKey: string | undefined, runId: string | undefined, sp const message = pickString(errorRecord.message, errorRecord.error, errorValue) || 'unknown'; const errType = pickString(errorRecord.type, errorRecord.code) || FRAMEWORK; - enqueue(buildEnvelope('error', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'error', sessionKey, { runId, spanId, payload: { @@ -287,11 +131,11 @@ async function handleSessionStart(input: Dict) { const contextWindow = getContextWindow(input); - enqueue(buildEnvelope('session.start', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'session.start', sessionKey, { attributes: contextWindow ? { context_window: contextWindow } : undefined, })); - enqueue(buildEnvelope('run.start', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'run.start', sessionKey, { runId, attributes: { trigger: pickString(input.trigger_type, input.trigger), @@ -313,7 +157,7 @@ async function handleSessionEnd(input: Dict) { const duration = pickNumber(input.duration_ms, input.elapsed_ms); if (runId) { - enqueue(buildEnvelope('run.end', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'run.end', sessionKey, { runId, payload: { status: 'success', @@ -325,7 +169,7 @@ async function handleSessionEnd(input: Dict) { })); } - enqueue(buildEnvelope('session.end', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'session.end', sessionKey, { payload: { ...(usage && { usage }), ...(contextWindow && { context_window: contextWindow }), @@ -346,7 +190,7 @@ async function handlePromptSubmit(input: Dict) { const prompt = pickString(input.prompt, input.text, input.message); if (runId && prompt) { - enqueue(buildEnvelope('run.end', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'run.end', sessionKey, { runId, payload: { status: 'success', @@ -361,7 +205,7 @@ async function handlePromptSubmit(input: Dict) { saveState(sessionKey, { runId: newRunId, spans: {} }); } - enqueue(buildEnvelope('run.start', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'run.start', sessionKey, { runId: newRunId, attributes: { type: 'user_prompt', @@ -392,7 +236,7 @@ async function handleToolStart(input: Dict) { if (sessionKey) saveState(sessionKey, state); } - enqueue(buildEnvelope('span.start', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'span.start', sessionKey, { runId, spanId, attributes: { @@ -423,7 +267,7 @@ async function handleToolEnd(input: Dict) { if (sessionKey) saveState(sessionKey, state); } - enqueue(buildEnvelope('span.end', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'span.end', sessionKey, { runId, spanId, attributes: { @@ -462,7 +306,7 @@ async function handleSubagentStart(input: Dict) { saveState(sessionKey, state); } - enqueue(buildEnvelope('span.start', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'span.start', sessionKey, { runId, spanId, attributes: { @@ -493,7 +337,7 @@ async function handleSubagentStop(input: Dict) { if (sessionKey) saveState(sessionKey, state); } - enqueue(buildEnvelope('span.end', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'span.end', sessionKey, { runId, spanId, attributes: { @@ -528,7 +372,7 @@ async function handleCompactStart(input: Dict) { saveState(sessionKey, state); } - enqueue(buildEnvelope('span.start', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'span.start', sessionKey, { runId, spanId, attributes: { @@ -554,7 +398,7 @@ async function handleCompactEnd(input: Dict) { if (sessionKey) saveState(sessionKey, state); } - enqueue(buildEnvelope('span.end', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'span.end', sessionKey, { runId, spanId, attributes: { @@ -585,7 +429,7 @@ async function handleNotification(input: Dict) { const runId = sessionKey ? (activeRuns.get(sessionKey) || state.runId) : undefined; if (runId) { - enqueue(buildEnvelope('run.end', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'run.end', sessionKey, { runId, payload: { status: 'success', @@ -660,14 +504,4 @@ const handler = async () => { } }; -async function readStdin(): Promise { - return new Promise((resolve) => { - let data = ''; - process.stdin.on('data', (chunk) => data += chunk); - process.stdin.on('end', () => resolve(data)); - process.stdin.on('error', () => resolve('')); - setTimeout(() => resolve(data), 100); - }); -} - handler(); diff --git a/hooks/codex/handler.ts b/hooks/codex/handler.ts index 7827696..8c07b41 100644 --- a/hooks/codex/handler.ts +++ b/hooks/codex/handler.ts @@ -1,68 +1,25 @@ #!/usr/bin/env node import { randomUUID } from 'node:crypto'; import { hostname } from 'node:os'; +import { + type Dict, + isRecord, + pickString, + pickNumber, + truncate, + buildEnvelope, + createTransport, + readStdin, +} from '../shared/lib'; const INGEST_URL = process.env.AGENTMON_INGEST_URL || 'http://localhost:8080'; const FRAMEWORK = process.env.AGENTMON_FRAMEWORK || 'codex'; const HOST = process.env.AGENTMON_HOST || hostname(); -const BATCH_SIZE = 10; -const FLUSH_MS = 2000; -const FETCH_TIMEOUT_MS = 500; -interface Dict { [key: string]: any } - -let buffer: Dict[] = []; -let flushTimer: ReturnType | null = null; -let isFlushing = false; +const { enqueue, flush } = createTransport(INGEST_URL); const activeRuns = new Map(); -function isRecord(value: unknown): value is Dict { - return value !== null && typeof value === 'object' && !Array.isArray(value); -} - -function pickString(...values: unknown[]): string | undefined { - for (const value of values) { - if (typeof value === 'string' && value.trim() !== '') { - return value; - } - } - return undefined; -} - -function pickNumber(...values: unknown[]): number | undefined { - for (const value of values) { - if (typeof value === 'number' && Number.isFinite(value)) { - return value; - } - } - return undefined; -} - -function truncate(value: unknown, limit: number): string | undefined { - if (value === undefined || value === null) { - return undefined; - } - - const text = typeof value === 'string' ? value : safeJSONStringify(value); - if (!text) { - return undefined; - } - - if (text.length <= limit) { - return text; - } - return text.slice(0, limit) + '...'; -} - -function safeJSONStringify(value: unknown): string { - try { - return JSON.stringify(value); - } catch { - return String(value); - } -} - function getSessionKey(input: Dict): string | undefined { return pickString( input.id, @@ -108,75 +65,6 @@ function getModel(input: Dict): string | undefined { ); } -function buildEnvelope( - type: string, - sessionKey?: string, - opts: { - runId?: string; - spanId?: string; - parentSpanId?: string; - attributes?: Dict; - payload?: Dict; - } = {}, -): Dict { - const correlation: Dict = {}; - if (sessionKey) { - correlation.session_id = sessionKey; - } - if (opts.runId) { - correlation.run_id = opts.runId; - } - if (opts.spanId) { - correlation.span_id = opts.spanId; - } - if (opts.parentSpanId) { - correlation.parent_span_id = opts.parentSpanId; - } - - const envelope: Dict = { - schema: { name: 'agentmon.event', version: 1 }, - event: { - id: randomUUID(), - type, - ts: new Date().toISOString(), - source: { - framework: FRAMEWORK, - client_id: HOST, - host: HOST, - }, - }, - }; - - if (Object.keys(correlation).length > 0) { - envelope.correlation = correlation; - } - if (opts.attributes && Object.keys(opts.attributes).length > 0) { - envelope.attributes = opts.attributes; - } - if (opts.payload && Object.keys(opts.payload).length > 0) { - envelope.payload = opts.payload; - } - - return envelope; -} - -function scheduleFlush() { - if (!flushTimer) { - flushTimer = setTimeout(() => { - void flush(); - }, FLUSH_MS); - } -} - -function enqueue(event: Dict) { - buffer.push(event); - if (buffer.length >= BATCH_SIZE) { - void flush(); - } else { - scheduleFlush(); - } -} - function enqueueMetricSnapshot(sessionKey: string | undefined, runId: string | undefined, usage: Dict | undefined, input: Dict) { if (!usage) { return; @@ -188,7 +76,7 @@ function enqueueMetricSnapshot(sessionKey: string | undefined, runId: string | u metrics.model = model; } - enqueue(buildEnvelope('metric.snapshot', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'metric.snapshot', sessionKey, { runId, payload: { metrics }, })); @@ -200,7 +88,7 @@ function startRun(sessionKey: string | undefined, input: Dict): string { activeRuns.set(sessionKey, runId); } - enqueue(buildEnvelope('run.start', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'run.start', sessionKey, { runId, attributes: { trigger: pickString(input.trigger_type, input.trigger, input.event, input.event_type), @@ -219,7 +107,7 @@ function endRun(sessionKey: string | undefined, runId: string | undefined, input } const usage = getUsage(input); - enqueue(buildEnvelope('run.end', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'run.end', sessionKey, { runId, payload: { status: 'success', @@ -231,54 +119,10 @@ function endRun(sessionKey: string | undefined, runId: string | undefined, input enqueueMetricSnapshot(sessionKey, runId, usage, input); } -async function postBatch(batch: Dict[]) { - const controller = new AbortController(); - const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS); - - try { - await fetch(`${INGEST_URL}/v1/events`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify(batch), - signal: controller.signal, - }); - } finally { - clearTimeout(timeout); - } -} - -async function flush() { - if (flushTimer) { - clearTimeout(flushTimer); - flushTimer = null; - } - if (isFlushing || buffer.length === 0) { - return; - } - - isFlushing = true; - const batch = buffer.splice(0, BATCH_SIZE); - - try { - await postBatch(batch); - } catch { - console.debug(`[agentmon] failed to flush ${batch.length} events`); - } finally { - isFlushing = false; - if (buffer.length > 0) { - if (buffer.length >= BATCH_SIZE) { - void flush(); - } else { - scheduleFlush(); - } - } - } -} - async function handleSessionStart(input: Dict) { const sessionKey = getSessionKey(input) || randomUUID(); - enqueue(buildEnvelope('session.start', sessionKey)); + enqueue(buildEnvelope(FRAMEWORK, HOST, 'session.start', sessionKey)); startRun(sessionKey, input); await flush(); @@ -292,7 +136,7 @@ async function handleSessionEnd(input: Dict) { endRun(sessionKey, runId, input, duration); - enqueue(buildEnvelope('session.end', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'session.end', sessionKey, { payload: { model: getModel(input), ...(usage && { usage }), @@ -376,14 +220,4 @@ const handler = async () => { } }; -async function readStdin(): Promise { - return new Promise((resolve) => { - let data = ''; - process.stdin.on('data', (chunk) => data += chunk); - process.stdin.on('end', () => resolve(data)); - process.stdin.on('error', () => resolve('')); - setTimeout(() => resolve(data), 100); - }); -} - handler(); diff --git a/hooks/copilot/handler.ts b/hooks/copilot/handler.ts index 75de55a..96e86e3 100644 --- a/hooks/copilot/handler.ts +++ b/hooks/copilot/handler.ts @@ -1,69 +1,26 @@ #!/usr/bin/env node import { randomUUID } from 'node:crypto'; import { hostname } from 'node:os'; +import { + type Dict, + isRecord, + pickString, + pickNumber, + truncate, + buildEnvelope, + createTransport, + readStdin, +} from '../shared/lib'; const INGEST_URL = process.env.AGENTMON_INGEST_URL || 'http://localhost:8080'; const FRAMEWORK = process.env.AGENTMON_FRAMEWORK || 'copilot'; const HOST = process.env.AGENTMON_HOST || hostname(); -const BATCH_SIZE = 10; -const FLUSH_MS = 2000; -const FETCH_TIMEOUT_MS = 500; -interface Dict { [key: string]: any } - -let buffer: Dict[] = []; -let flushTimer: ReturnType | null = null; -let isFlushing = false; +const { enqueue, flush } = createTransport(INGEST_URL); const activeRuns = new Map(); const activeSpans = new Map(); -function isRecord(value: unknown): value is Dict { - return value !== null && typeof value === 'object' && !Array.isArray(value); -} - -function pickString(...values: unknown[]): string | undefined { - for (const value of values) { - if (typeof value === 'string' && value.trim() !== '') { - return value; - } - } - return undefined; -} - -function pickNumber(...values: unknown[]): number | undefined { - for (const value of values) { - if (typeof value === 'number' && Number.isFinite(value)) { - return value; - } - } - return undefined; -} - -function truncate(value: unknown, limit: number): string | undefined { - if (value === undefined || value === null) { - return undefined; - } - - const text = typeof value === 'string' ? value : safeJSONStringify(value); - if (!text) { - return undefined; - } - - if (text.length <= limit) { - return text; - } - return text.slice(0, limit) + '...'; -} - -function safeJSONStringify(value: unknown): string { - try { - return JSON.stringify(value); - } catch { - return String(value); - } -} - function getSessionKey(input: Dict): string | undefined { return pickString( input.sessionId, @@ -90,127 +47,14 @@ function getUsage(input: Dict): Dict | undefined { return Object.keys(result).length > 0 ? result : undefined; } -function buildEnvelope( - type: string, - sessionKey?: string, - opts: { - runId?: string; - spanId?: string; - parentSpanId?: string; - attributes?: Dict; - payload?: Dict; - } = {}, -): Dict { - const correlation: Dict = {}; - if (sessionKey) { - correlation.session_id = sessionKey; - } - if (opts.runId) { - correlation.run_id = opts.runId; - } - if (opts.spanId) { - correlation.span_id = opts.spanId; - } - if (opts.parentSpanId) { - correlation.parent_span_id = opts.parentSpanId; - } - - const envelope: Dict = { - schema: { name: 'agentmon.event', version: 1 }, - event: { - id: randomUUID(), - type, - ts: new Date().toISOString(), - source: { - framework: FRAMEWORK, - client_id: HOST, - host: HOST, - }, - }, - }; - - if (Object.keys(correlation).length > 0) { - envelope.correlation = correlation; - } - if (opts.attributes && Object.keys(opts.attributes).length > 0) { - envelope.attributes = opts.attributes; - } - if (opts.payload && Object.keys(opts.payload).length > 0) { - envelope.payload = opts.payload; - } - - return envelope; -} - -function scheduleFlush() { - if (!flushTimer) { - flushTimer = setTimeout(() => { - void flush(); - }, FLUSH_MS); - } -} - -function enqueue(event: Dict) { - buffer.push(event); - if (buffer.length >= BATCH_SIZE) { - void flush(); - } else { - scheduleFlush(); - } -} - -async function postBatch(batch: Dict[]) { - const controller = new AbortController(); - const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS); - - try { - await fetch(`${INGEST_URL}/v1/events`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify(batch), - signal: controller.signal, - }); - } finally { - clearTimeout(timeout); - } -} - -async function flush() { - if (flushTimer) { - clearTimeout(flushTimer); - flushTimer = null; - } - if (isFlushing || buffer.length === 0) { - return; - } - - isFlushing = true; - const batch = buffer.splice(0, BATCH_SIZE); - - try { - await postBatch(batch); - } catch { - console.debug(`[agentmon] failed to flush ${batch.length} events`); - } finally { - isFlushing = false; - if (buffer.length > 0) { - if (buffer.length >= BATCH_SIZE) { - void flush(); - } else { - scheduleFlush(); - } - } - } -} - async function handleSessionStart(input: Dict) { const sessionKey = getSessionKey(input) || randomUUID(); const runId = randomUUID(); activeRuns.set(sessionKey, runId); - enqueue(buildEnvelope('session.start', sessionKey)); + enqueue(buildEnvelope(FRAMEWORK, HOST, 'session.start', sessionKey)); - enqueue(buildEnvelope('run.start', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'run.start', sessionKey, { runId, payload: { prompt_preview: truncate(input.prompt, 200), @@ -227,7 +71,7 @@ async function handleSessionEnd(input: Dict) { const duration = pickNumber(input.duration_ms, input.elapsed_ms); if (runId) { - enqueue(buildEnvelope('run.end', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'run.end', sessionKey, { runId, payload: { status: 'success', @@ -237,7 +81,7 @@ async function handleSessionEnd(input: Dict) { })); } - enqueue(buildEnvelope('session.end', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'session.end', sessionKey, { payload: usage ? { usage } : undefined, })); @@ -252,7 +96,7 @@ async function handlePromptSubmit(input: Dict) { const prompt = pickString(input.prompt, input.text, input.message); if (runId && prompt) { - enqueue(buildEnvelope('run.end', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'run.end', sessionKey, { runId, payload: { status: 'success', @@ -266,7 +110,7 @@ async function handlePromptSubmit(input: Dict) { activeRuns.set(sessionKey, newRunId); } - enqueue(buildEnvelope('run.start', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'run.start', sessionKey, { runId: newRunId, payload: { prompt_preview: truncate(prompt, 200), @@ -287,7 +131,7 @@ async function handleToolStart(input: Dict) { activeSpans.set(sessionKey + ':' + toolName, spanId); } - enqueue(buildEnvelope('span.start', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'span.start', sessionKey, { runId, spanId, attributes: { @@ -313,7 +157,7 @@ async function handleToolEnd(input: Dict) { const duration = pickNumber(input.duration_ms, input.elapsed_ms); const usage = getUsage(input); - enqueue(buildEnvelope('span.end', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'span.end', sessionKey, { runId, spanId, attributes: { @@ -371,14 +215,4 @@ const handler = async () => { } }; -async function readStdin(): Promise { - return new Promise((resolve) => { - let data = ''; - process.stdin.on('data', (chunk) => data += chunk); - process.stdin.on('end', () => resolve(data)); - process.stdin.on('error', () => resolve('')); - setTimeout(() => resolve(data), 100); - }); -} - handler(); diff --git a/hooks/gemini/handler.ts b/hooks/gemini/handler.ts index ad9e10d..2d13856 100644 --- a/hooks/gemini/handler.ts +++ b/hooks/gemini/handler.ts @@ -2,68 +2,26 @@ import { randomUUID } from 'node:crypto'; import { hostname } from 'node:os'; +import { + Dict, + isRecord, + pickString, + pickNumber, + truncate, + buildEnvelope, + createTransport, + readStdin, +} from '../shared/lib'; + const INGEST_URL = process.env.AGENTMON_INGEST_URL || 'http://localhost:8080'; const FRAMEWORK = process.env.AGENTMON_FRAMEWORK || 'gemini'; const HOST = process.env.AGENTMON_HOST || hostname(); -const BATCH_SIZE = 10; -const FLUSH_MS = 2000; -const FETCH_TIMEOUT_MS = 500; -interface Dict { [key: string]: any } - -let buffer: Dict[] = []; -let flushTimer: ReturnType | null = null; -let isFlushing = false; +const { enqueue, flush } = createTransport(INGEST_URL); const activeRuns = new Map(); const activeSpans = new Map(); -function isRecord(value: unknown): value is Dict { - return value !== null && typeof value === 'object' && !Array.isArray(value); -} - -function pickString(...values: unknown[]): string | undefined { - for (const value of values) { - if (typeof value === 'string' && value.trim() !== '') { - return value; - } - } - return undefined; -} - -function pickNumber(...values: unknown[]): number | undefined { - for (const value of values) { - if (typeof value === 'number' && Number.isFinite(value)) { - return value; - } - } - return undefined; -} - -function truncate(value: unknown, limit: number): string | undefined { - if (value === undefined || value === null) { - return undefined; - } - - const text = typeof value === 'string' ? value : safeJSONStringify(value); - if (!text) { - return undefined; - } - - if (text.length <= limit) { - return text; - } - return text.slice(0, limit) + '...'; -} - -function safeJSONStringify(value: unknown): string { - try { - return JSON.stringify(value); - } catch { - return String(value); - } -} - function getSessionKey(input: Dict): string | undefined { return pickString( input.sessionId, @@ -89,127 +47,14 @@ function getUsage(input: Dict): Dict | undefined { return Object.keys(result).length > 0 ? result : undefined; } -function buildEnvelope( - type: string, - sessionKey?: string, - opts: { - runId?: string; - spanId?: string; - parentSpanId?: string; - attributes?: Dict; - payload?: Dict; - } = {}, -): Dict { - const correlation: Dict = {}; - if (sessionKey) { - correlation.session_id = sessionKey; - } - if (opts.runId) { - correlation.run_id = opts.runId; - } - if (opts.spanId) { - correlation.span_id = opts.spanId; - } - if (opts.parentSpanId) { - correlation.parent_span_id = opts.parentSpanId; - } - - const envelope: Dict = { - schema: { name: 'agentmon.event', version: 1 }, - event: { - id: randomUUID(), - type, - ts: new Date().toISOString(), - source: { - framework: FRAMEWORK, - client_id: HOST, - host: HOST, - }, - }, - }; - - if (Object.keys(correlation).length > 0) { - envelope.correlation = correlation; - } - if (opts.attributes && Object.keys(opts.attributes).length > 0) { - envelope.attributes = opts.attributes; - } - if (opts.payload && Object.keys(opts.payload).length > 0) { - envelope.payload = opts.payload; - } - - return envelope; -} - -function scheduleFlush() { - if (!flushTimer) { - flushTimer = setTimeout(() => { - void flush(); - }, FLUSH_MS); - } -} - -function enqueue(event: Dict) { - buffer.push(event); - if (buffer.length >= BATCH_SIZE) { - void flush(); - } else { - scheduleFlush(); - } -} - -async function postBatch(batch: Dict[]) { - const controller = new AbortController(); - const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS); - - try { - await fetch(`${INGEST_URL}/v1/events`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify(batch), - signal: controller.signal, - }); - } finally { - clearTimeout(timeout); - } -} - -async function flush() { - if (flushTimer) { - clearTimeout(flushTimer); - flushTimer = null; - } - if (isFlushing || buffer.length === 0) { - return; - } - - isFlushing = true; - const batch = buffer.splice(0, BATCH_SIZE); - - try { - await postBatch(batch); - } catch { - console.debug(`[agentmon] failed to flush ${batch.length} events`); - } finally { - isFlushing = false; - if (buffer.length > 0) { - if (buffer.length >= BATCH_SIZE) { - void flush(); - } else { - scheduleFlush(); - } - } - } -} - async function handleSessionStart(input: Dict) { const sessionKey = getSessionKey(input) || randomUUID(); const runId = randomUUID(); activeRuns.set(sessionKey, runId); - enqueue(buildEnvelope('session.start', sessionKey)); + enqueue(buildEnvelope(FRAMEWORK, HOST, 'session.start', sessionKey)); - enqueue(buildEnvelope('run.start', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'run.start', sessionKey, { runId, payload: { prompt_preview: truncate(input.prompt, 200), @@ -226,7 +71,7 @@ async function handleSessionEnd(input: Dict) { const duration = pickNumber(input.duration_ms, input.elapsed_ms); if (runId) { - enqueue(buildEnvelope('run.end', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'run.end', sessionKey, { runId, payload: { status: 'success', @@ -236,7 +81,7 @@ async function handleSessionEnd(input: Dict) { })); } - enqueue(buildEnvelope('session.end', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'session.end', sessionKey, { payload: usage ? { usage } : undefined, })); @@ -256,7 +101,7 @@ async function handleToolCall(input: Dict) { activeSpans.set(sessionKey + ':' + toolName, spanId); } - enqueue(buildEnvelope('span.start', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'span.start', sessionKey, { runId, spanId, attributes: { @@ -282,7 +127,7 @@ async function handleToolResult(input: Dict) { const duration = pickNumber(input.duration_ms, input.elapsed_ms); const usage = getUsage(input); - enqueue(buildEnvelope('span.end', sessionKey, { + enqueue(buildEnvelope(FRAMEWORK, HOST, 'span.end', sessionKey, { runId, spanId, attributes: { @@ -337,14 +182,4 @@ const handler = async () => { } }; -async function readStdin(): Promise { - return new Promise((resolve) => { - let data = ''; - process.stdin.on('data', (chunk) => data += chunk); - process.stdin.on('end', () => resolve(data)); - process.stdin.on('error', () => resolve('')); - setTimeout(() => resolve(data), 100); - }); -} - handler(); diff --git a/hooks/gemini/package-lock.json b/hooks/gemini/package-lock.json index aa8df77..237739f 100644 --- a/hooks/gemini/package-lock.json +++ b/hooks/gemini/package-lock.json @@ -8,7 +8,7 @@ "name": "@anthropic-ai/agentmon-gemini", "version": "1.0.0", "bin": { - "agentmon-handler": "handler.js" + "agentmon-gemini-handler": "handler.js" }, "devDependencies": { "esbuild": "^0.20.0", diff --git a/hooks/gemini/package.json b/hooks/gemini/package.json index 9b5e168..326b03a 100644 --- a/hooks/gemini/package.json +++ b/hooks/gemini/package.json @@ -5,7 +5,7 @@ "main": "handler.js", "type": "module", "bin": { - "agentmon-handler": "./handler.js" + "agentmon-gemini-handler": "./handler.js" }, "scripts": { "build": "npx esbuild handler.ts --platform=node --format=esm --outfile=handler.js" diff --git a/hooks/opencode/plugin.ts b/hooks/opencode/plugin.ts index b7bb755..2887bf9 100644 --- a/hooks/opencode/plugin.ts +++ b/hooks/opencode/plugin.ts @@ -1,21 +1,50 @@ import { Plugin } from "@opencode-ai/plugin"; +import { hostname } from "node:os"; const INGEST_URL = process.env.AGENTMON_INGEST_URL || "http://localhost:8080"; +const FETCH_TIMEOUT_MS = 500; let buffer: any[] = []; -let flushTimer: NodeJS.Timeout | null = null; +let flushTimer: ReturnType | null = null; +let isFlushing = false; const BATCH_SIZE = 10; const FLUSH_MS = 2000; let currentSessionId: string | undefined; let currentRunId: string | undefined; +const activeSpans = new Map(); + +function scheduleFlush() { + if (!flushTimer) { + flushTimer = setTimeout(() => { + void flush(); + }, FLUSH_MS); + } +} + function enqueue(event: any) { buffer.push(event); if (buffer.length >= BATCH_SIZE) { - flush(); - } else if (!flushTimer) { - flushTimer = setTimeout(flush, FLUSH_MS); + void flush(); + } else { + scheduleFlush(); + } +} + +async function postBatch(batch: any[]) { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS); + + try { + await fetch(`${INGEST_URL}/v1/events`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(batch), + signal: controller.signal, + }); + } finally { + clearTimeout(timeout); } } @@ -24,17 +53,24 @@ async function flush() { clearTimeout(flushTimer); flushTimer = null; } - if (buffer.length === 0) return; - + if (isFlushing || buffer.length === 0) return; + + isFlushing = true; const batch = buffer.splice(0, BATCH_SIZE); + try { - await fetch(`${INGEST_URL}/v1/events`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify(batch), - }); - } catch (e) { + await postBatch(batch); + } catch { console.debug("[agentmon] failed to flush events"); + } finally { + isFlushing = false; + if (buffer.length > 0) { + if (buffer.length >= BATCH_SIZE) { + void flush(); + } else { + scheduleFlush(); + } + } } } @@ -48,7 +84,7 @@ function buildEnvelope(type: string, data: any) { source: { framework: "opencode", client_id: "opencode", - host: require("os").hostname(), + host: hostname(), }, }, ...(data.correlation && { correlation: data.correlation }), @@ -63,29 +99,32 @@ export const agentmon: Plugin = async (input) => { if (!currentSessionId) { currentSessionId = crypto.randomUUID(); currentRunId = crypto.randomUUID(); - + enqueue(buildEnvelope("session.start", { correlation: { session_id: currentSessionId }, })); - + enqueue(buildEnvelope("run.start", { correlation: { session_id: currentSessionId, run_id: currentRunId }, })); } - + const spanId = crypto.randomUUID(); + activeSpans.set(data.tool, spanId); enqueue(buildEnvelope("span.start", { correlation: { session_id: currentSessionId, run_id: currentRunId, span_id: spanId }, attributes: { span_kind: "tool", name: data.tool }, payload: { input: (JSON.stringify(data.input) ?? "").substring(0, 200) }, })); }, - + "tool.execute.after": async (data: any) => { if (!currentSessionId || !currentRunId) return; - + + const spanId = activeSpans.get(data.tool); + activeSpans.delete(data.tool); enqueue(buildEnvelope("span.end", { - correlation: { session_id: currentSessionId, run_id: currentRunId }, + correlation: { session_id: currentSessionId, run_id: currentRunId, ...(spanId && { span_id: spanId }) }, attributes: { span_kind: "tool", name: data.tool }, payload: { status: data.error ? "error" : "success", @@ -94,22 +133,22 @@ export const agentmon: Plugin = async (input) => { }, })); }, - + "chat.message": async (data: any) => { if (!currentSessionId) { currentSessionId = data.sessionID || crypto.randomUUID(); currentRunId = crypto.randomUUID(); - + enqueue(buildEnvelope("session.start", { correlation: { session_id: currentSessionId }, })); - + enqueue(buildEnvelope("run.start", { correlation: { session_id: currentSessionId, run_id: currentRunId }, })); } }, - + "experimental.session.compacting": async (data: any) => { if (currentSessionId && currentRunId) { enqueue(buildEnvelope("span.start", { diff --git a/hooks/shared/lib.ts b/hooks/shared/lib.ts new file mode 100644 index 0000000..1e19a51 --- /dev/null +++ b/hooks/shared/lib.ts @@ -0,0 +1,196 @@ +import { randomUUID } from 'node:crypto'; + +export interface Dict { [key: string]: any } + +export function isRecord(value: unknown): value is Dict { + return value !== null && typeof value === 'object' && !Array.isArray(value); +} + +export function pickString(...values: unknown[]): string | undefined { + for (const value of values) { + if (typeof value === 'string' && value.trim() !== '') { + return value; + } + } + return undefined; +} + +export function pickNumber(...values: unknown[]): number | undefined { + for (const value of values) { + if (typeof value === 'number' && Number.isFinite(value)) { + return value; + } + } + return undefined; +} + +export function truncate(value: unknown, limit: number): string | undefined { + if (value === undefined || value === null) { + return undefined; + } + + const text = typeof value === 'string' ? value : safeJSONStringify(value); + if (!text) { + return undefined; + } + + if (text.length <= limit) { + return text; + } + return text.slice(0, limit) + '...'; +} + +export function safeJSONStringify(value: unknown): string { + try { + return JSON.stringify(value); + } catch { + return String(value); + } +} + +export function buildEnvelope( + framework: string, + host: string, + type: string, + sessionKey?: string, + opts: { + runId?: string; + spanId?: string; + parentSpanId?: string; + attributes?: Dict; + payload?: Dict; + } = {}, +): Dict { + const correlation: Dict = {}; + if (sessionKey) { + correlation.session_id = sessionKey; + } + if (opts.runId) { + correlation.run_id = opts.runId; + } + if (opts.spanId) { + correlation.span_id = opts.spanId; + } + if (opts.parentSpanId) { + correlation.parent_span_id = opts.parentSpanId; + } + + const envelope: Dict = { + schema: { name: 'agentmon.event', version: 1 }, + event: { + id: randomUUID(), + type, + ts: new Date().toISOString(), + source: { + framework, + client_id: host, + host, + }, + }, + }; + + if (Object.keys(correlation).length > 0) { + envelope.correlation = correlation; + } + if (opts.attributes && Object.keys(opts.attributes).length > 0) { + envelope.attributes = opts.attributes; + } + if (opts.payload && Object.keys(opts.payload).length > 0) { + envelope.payload = opts.payload; + } + + return envelope; +} + +export function createTransport( + ingestUrl: string, + opts?: { batchSize?: number; flushMs?: number; fetchTimeoutMs?: number }, +): { enqueue(event: Dict): void; flush(): Promise } { + const batchSize = opts?.batchSize ?? 10; + const flushMs = opts?.flushMs ?? 2000; + const fetchTimeoutMs = opts?.fetchTimeoutMs ?? 500; + + let buffer: Dict[] = []; + let flushTimer: ReturnType | null = null; + let isFlushing = false; + + async function postBatch(batch: Dict[]) { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), fetchTimeoutMs); + + try { + await fetch(`${ingestUrl}/v1/events`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(batch), + signal: controller.signal, + }); + } finally { + clearTimeout(timeout); + } + } + + function scheduleFlush() { + if (!flushTimer) { + flushTimer = setTimeout(() => { + void flush(); + }, flushMs); + } + } + + async function flush() { + if (flushTimer) { + clearTimeout(flushTimer); + flushTimer = null; + } + if (isFlushing || buffer.length === 0) { + return; + } + + isFlushing = true; + const batch = buffer.splice(0, batchSize); + + try { + await postBatch(batch); + } catch { + console.debug(`[agentmon] failed to flush ${batch.length} events`); + } finally { + isFlushing = false; + if (buffer.length > 0) { + if (buffer.length >= batchSize) { + void flush(); + } else { + scheduleFlush(); + } + } + } + } + + function enqueue(event: Dict) { + buffer.push(event); + if (buffer.length >= batchSize) { + void flush(); + } else { + scheduleFlush(); + } + } + + return { enqueue, flush }; +} + +export async function readStdin(): Promise { + return new Promise((resolve) => { + let data = ''; + let done = false; + const timer = setTimeout(() => finish(data), 100); + const finish = (value: string) => { + if (done) return; + done = true; + clearTimeout(timer); + resolve(value); + }; + process.stdin.on('data', (chunk) => { data += chunk; }); + process.stdin.on('end', () => finish(data)); + process.stdin.on('error', () => finish('')); + }); +}