Files
2026-03-26 11:22:27 -07:00

162 lines
4.4 KiB
TypeScript

import { Plugin } from "@opencode-ai/plugin";
import { hostname } from "node:os";
const INGEST_URL = process.env.AGENTMON_INGEST_URL || "http://localhost:8080";
const FETCH_TIMEOUT_MS = 500;
let buffer: any[] = [];
let flushTimer: ReturnType<typeof setTimeout> | null = null;
let isFlushing = false;
const BATCH_SIZE = 10;
const FLUSH_MS = 2000;
let currentSessionId: string | undefined;
let currentRunId: string | undefined;
const activeSpans = new Map<string, string>();
function scheduleFlush() {
if (!flushTimer) {
flushTimer = setTimeout(() => {
void flush();
}, FLUSH_MS);
}
}
function enqueue(event: any) {
buffer.push(event);
if (buffer.length >= BATCH_SIZE) {
void flush();
} else {
scheduleFlush();
}
}
async function postBatch(batch: any[]) {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS);
try {
await fetch(`${INGEST_URL}/v1/events`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(batch),
signal: controller.signal,
});
} finally {
clearTimeout(timeout);
}
}
async function flush() {
if (flushTimer) {
clearTimeout(flushTimer);
flushTimer = null;
}
if (isFlushing || buffer.length === 0) return;
isFlushing = true;
const batch = buffer.splice(0, BATCH_SIZE);
try {
await postBatch(batch);
} catch {
console.debug("[agentmon] failed to flush events");
} finally {
isFlushing = false;
if (buffer.length > 0) {
if (buffer.length >= BATCH_SIZE) {
void flush();
} else {
scheduleFlush();
}
}
}
}
function buildEnvelope(type: string, data: any) {
return {
schema: { name: "agentmon.event", version: 1 },
event: {
id: crypto.randomUUID(),
type,
ts: new Date().toISOString(),
source: {
framework: "opencode",
client_id: "opencode",
host: hostname(),
},
},
...(data.correlation && { correlation: data.correlation }),
...(data.payload && { payload: data.payload }),
...(data.attributes && { attributes: data.attributes }),
};
}
export const agentmon: Plugin = async (input) => {
return {
"tool.execute.before": async (data: any) => {
if (!currentSessionId) {
currentSessionId = crypto.randomUUID();
currentRunId = crypto.randomUUID();
enqueue(buildEnvelope("session.start", {
correlation: { session_id: currentSessionId },
}));
enqueue(buildEnvelope("run.start", {
correlation: { session_id: currentSessionId, run_id: currentRunId },
}));
}
const spanId = crypto.randomUUID();
activeSpans.set(data.tool, spanId);
enqueue(buildEnvelope("span.start", {
correlation: { session_id: currentSessionId, run_id: currentRunId, span_id: spanId },
attributes: { span_kind: "tool", name: data.tool },
payload: { input: (JSON.stringify(data.input) ?? "").substring(0, 200) },
}));
},
"tool.execute.after": async (data: any) => {
if (!currentSessionId || !currentRunId) return;
const spanId = activeSpans.get(data.tool);
activeSpans.delete(data.tool);
enqueue(buildEnvelope("span.end", {
correlation: { session_id: currentSessionId, run_id: currentRunId, ...(spanId && { span_id: spanId }) },
attributes: { span_kind: "tool", name: data.tool },
payload: {
status: data.error ? "error" : "success",
result_preview: data.result ? JSON.stringify(data.result).substring(0, 500) : undefined,
duration_ms: data.duration,
},
}));
},
"chat.message": async (data: any) => {
if (!currentSessionId) {
currentSessionId = data.sessionID || crypto.randomUUID();
currentRunId = crypto.randomUUID();
enqueue(buildEnvelope("session.start", {
correlation: { session_id: currentSessionId },
}));
enqueue(buildEnvelope("run.start", {
correlation: { session_id: currentSessionId, run_id: currentRunId },
}));
}
},
"experimental.session.compacting": async (data: any) => {
if (currentSessionId && currentRunId) {
enqueue(buildEnvelope("span.start", {
correlation: { session_id: currentSessionId, run_id: currentRunId },
attributes: { span_kind: "internal", name: "context_compaction" },
}));
}
},
};
};