import { Plugin } from "@opencode-ai/plugin"; import { hostname } from "node:os"; const INGEST_URL = process.env.AGENTMON_INGEST_URL || "http://localhost:8080"; const FETCH_TIMEOUT_MS = 500; let buffer: any[] = []; let flushTimer: ReturnType | null = null; let isFlushing = false; const BATCH_SIZE = 10; const FLUSH_MS = 2000; let currentSessionId: string | undefined; let currentRunId: string | undefined; const activeSpans = new Map(); function scheduleFlush() { if (!flushTimer) { flushTimer = setTimeout(() => { void flush(); }, FLUSH_MS); } } function enqueue(event: any) { buffer.push(event); if (buffer.length >= BATCH_SIZE) { void flush(); } else { scheduleFlush(); } } async function postBatch(batch: any[]) { const controller = new AbortController(); const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS); try { await fetch(`${INGEST_URL}/v1/events`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(batch), signal: controller.signal, }); } finally { clearTimeout(timeout); } } async function flush() { if (flushTimer) { clearTimeout(flushTimer); flushTimer = null; } if (isFlushing || buffer.length === 0) return; isFlushing = true; const batch = buffer.splice(0, BATCH_SIZE); try { await postBatch(batch); } catch { console.debug("[agentmon] failed to flush events"); } finally { isFlushing = false; if (buffer.length > 0) { if (buffer.length >= BATCH_SIZE) { void flush(); } else { scheduleFlush(); } } } } function buildEnvelope(type: string, data: any) { return { schema: { name: "agentmon.event", version: 1 }, event: { id: crypto.randomUUID(), type, ts: new Date().toISOString(), source: { framework: "opencode", client_id: "opencode", host: hostname(), }, }, ...(data.correlation && { correlation: data.correlation }), ...(data.payload && { payload: data.payload }), ...(data.attributes && { attributes: data.attributes }), }; } export const agentmon: Plugin = async (input) => { return { "tool.execute.before": async (data: any) => { if (!currentSessionId) { currentSessionId = crypto.randomUUID(); currentRunId = crypto.randomUUID(); enqueue(buildEnvelope("session.start", { correlation: { session_id: currentSessionId }, })); enqueue(buildEnvelope("run.start", { correlation: { session_id: currentSessionId, run_id: currentRunId }, })); } const spanId = crypto.randomUUID(); activeSpans.set(data.tool, spanId); enqueue(buildEnvelope("span.start", { correlation: { session_id: currentSessionId, run_id: currentRunId, span_id: spanId }, attributes: { span_kind: "tool", name: data.tool }, payload: { input: (JSON.stringify(data.input) ?? "").substring(0, 200) }, })); }, "tool.execute.after": async (data: any) => { if (!currentSessionId || !currentRunId) return; const spanId = activeSpans.get(data.tool); activeSpans.delete(data.tool); enqueue(buildEnvelope("span.end", { correlation: { session_id: currentSessionId, run_id: currentRunId, ...(spanId && { span_id: spanId }) }, attributes: { span_kind: "tool", name: data.tool }, payload: { status: data.error ? "error" : "success", result_preview: data.result ? JSON.stringify(data.result).substring(0, 500) : undefined, duration_ms: data.duration, }, })); }, "chat.message": async (data: any) => { if (!currentSessionId) { currentSessionId = data.sessionID || crypto.randomUUID(); currentRunId = crypto.randomUUID(); enqueue(buildEnvelope("session.start", { correlation: { session_id: currentSessionId }, })); enqueue(buildEnvelope("run.start", { correlation: { session_id: currentSessionId, run_id: currentRunId }, })); } }, "experimental.session.compacting": async (data: any) => { if (currentSessionId && currentRunId) { enqueue(buildEnvelope("span.start", { correlation: { session_id: currentSessionId, run_id: currentRunId }, attributes: { span_kind: "internal", name: "context_compaction" }, })); } }, }; };