feat(hooks): consolidate shared transport helpers

This commit is contained in:
William Valentin
2026-03-26 11:22:27 -07:00
parent d49785cb25
commit fdfcb50e80
10 changed files with 374 additions and 948 deletions
+62 -23
View File
@@ -1,21 +1,50 @@
import { Plugin } from "@opencode-ai/plugin";
import { hostname } from "node:os";
const INGEST_URL = process.env.AGENTMON_INGEST_URL || "http://localhost:8080";
const FETCH_TIMEOUT_MS = 500;
let buffer: any[] = [];
let flushTimer: NodeJS.Timeout | null = null;
let flushTimer: ReturnType<typeof setTimeout> | null = null;
let isFlushing = false;
const BATCH_SIZE = 10;
const FLUSH_MS = 2000;
let currentSessionId: string | undefined;
let currentRunId: string | undefined;
const activeSpans = new Map<string, string>();
function scheduleFlush() {
if (!flushTimer) {
flushTimer = setTimeout(() => {
void flush();
}, FLUSH_MS);
}
}
function enqueue(event: any) {
buffer.push(event);
if (buffer.length >= BATCH_SIZE) {
flush();
} else if (!flushTimer) {
flushTimer = setTimeout(flush, FLUSH_MS);
void flush();
} else {
scheduleFlush();
}
}
async function postBatch(batch: any[]) {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS);
try {
await fetch(`${INGEST_URL}/v1/events`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(batch),
signal: controller.signal,
});
} finally {
clearTimeout(timeout);
}
}
@@ -24,17 +53,24 @@ async function flush() {
clearTimeout(flushTimer);
flushTimer = null;
}
if (buffer.length === 0) return;
if (isFlushing || buffer.length === 0) return;
isFlushing = true;
const batch = buffer.splice(0, BATCH_SIZE);
try {
await fetch(`${INGEST_URL}/v1/events`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(batch),
});
} catch (e) {
await postBatch(batch);
} catch {
console.debug("[agentmon] failed to flush events");
} finally {
isFlushing = false;
if (buffer.length > 0) {
if (buffer.length >= BATCH_SIZE) {
void flush();
} else {
scheduleFlush();
}
}
}
}
@@ -48,7 +84,7 @@ function buildEnvelope(type: string, data: any) {
source: {
framework: "opencode",
client_id: "opencode",
host: require("os").hostname(),
host: hostname(),
},
},
...(data.correlation && { correlation: data.correlation }),
@@ -63,29 +99,32 @@ export const agentmon: Plugin = async (input) => {
if (!currentSessionId) {
currentSessionId = crypto.randomUUID();
currentRunId = crypto.randomUUID();
enqueue(buildEnvelope("session.start", {
correlation: { session_id: currentSessionId },
}));
enqueue(buildEnvelope("run.start", {
correlation: { session_id: currentSessionId, run_id: currentRunId },
}));
}
const spanId = crypto.randomUUID();
activeSpans.set(data.tool, spanId);
enqueue(buildEnvelope("span.start", {
correlation: { session_id: currentSessionId, run_id: currentRunId, span_id: spanId },
attributes: { span_kind: "tool", name: data.tool },
payload: { input: (JSON.stringify(data.input) ?? "").substring(0, 200) },
}));
},
"tool.execute.after": async (data: any) => {
if (!currentSessionId || !currentRunId) return;
const spanId = activeSpans.get(data.tool);
activeSpans.delete(data.tool);
enqueue(buildEnvelope("span.end", {
correlation: { session_id: currentSessionId, run_id: currentRunId },
correlation: { session_id: currentSessionId, run_id: currentRunId, ...(spanId && { span_id: spanId }) },
attributes: { span_kind: "tool", name: data.tool },
payload: {
status: data.error ? "error" : "success",
@@ -94,22 +133,22 @@ export const agentmon: Plugin = async (input) => {
},
}));
},
"chat.message": async (data: any) => {
if (!currentSessionId) {
currentSessionId = data.sessionID || crypto.randomUUID();
currentRunId = crypto.randomUUID();
enqueue(buildEnvelope("session.start", {
correlation: { session_id: currentSessionId },
}));
enqueue(buildEnvelope("run.start", {
correlation: { session_id: currentSessionId, run_id: currentRunId },
}));
}
},
"experimental.session.compacting": async (data: any) => {
if (currentSessionId && currentRunId) {
enqueue(buildEnvelope("span.start", {