From 0215c037da0a27035d95dceeaf6c034b12cc60d0 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Thu, 19 Mar 2026 15:35:47 -0700 Subject: [PATCH] feat: add agentmon monitoring hook for OpenClaw telemetry Add hook handler that forwards OpenClaw agent events to the agentmon ingest endpoint for monitoring and observability. - ansible/playbooks/files/agentmon-hook/: Ansible-deployable hook - openclaw/hooks/agentmon/: Hook installed in OpenClaw instance Co-Authored-By: Claude Opus 4.6 --- ansible/playbooks/files/agentmon-hook/HOOK.md | 38 ++ .../playbooks/files/agentmon-hook/handler.ts | 411 ++++++++++++++++++ openclaw/hooks/agentmon/HOOK.md | 33 ++ openclaw/hooks/agentmon/handler.js | 295 +++++++++++++ openclaw/hooks/agentmon/handler.ts | 357 +++++++++++++++ 5 files changed, 1134 insertions(+) create mode 100644 ansible/playbooks/files/agentmon-hook/HOOK.md create mode 100644 ansible/playbooks/files/agentmon-hook/handler.ts create mode 100644 openclaw/hooks/agentmon/HOOK.md create mode 100644 openclaw/hooks/agentmon/handler.js create mode 100644 openclaw/hooks/agentmon/handler.ts diff --git a/ansible/playbooks/files/agentmon-hook/HOOK.md b/ansible/playbooks/files/agentmon-hook/HOOK.md new file mode 100644 index 0000000..e4d6654 --- /dev/null +++ b/ansible/playbooks/files/agentmon-hook/HOOK.md @@ -0,0 +1,38 @@ +--- +name: agentmon +description: "Emit OpenClaw telemetry events to the agentmon monitoring pipeline" +metadata: + openclaw: + events: + - "command:new" + - "command:stop" + - "command:reset" + - "message:received" + - "message:sent" + - "tool_result_persist" + - "session:compact:before" + - "session:compact:after" + export: "default" + requires: + env: + - "AGENTMON_INGEST_URL" +--- + +# Agentmon Telemetry Hook + +Captures OpenClaw agent activity and emits it as `agentmon.event` envelopes to +the agentmon ingest gateway. + +## Configuration + +Set the ingest gateway URL before enabling the hook: + +```bash +export AGENTMON_INGEST_URL=http://192.168.122.1:8080 +``` + +You can optionally override the VM identifier: + +```bash +export AGENTMON_VM_NAME=zap +``` diff --git a/ansible/playbooks/files/agentmon-hook/handler.ts b/ansible/playbooks/files/agentmon-hook/handler.ts new file mode 100644 index 0000000..4a2e180 --- /dev/null +++ b/ansible/playbooks/files/agentmon-hook/handler.ts @@ -0,0 +1,411 @@ +import { randomUUID } from 'node:crypto'; +import { hostname } from 'node:os'; + +type Dict = Record; + +const INGEST_URL = process.env.AGENTMON_INGEST_URL || 'http://192.168.122.1:8080'; +const VM_NAME = process.env.AGENTMON_VM_NAME || hostname(); +const BATCH_SIZE = 10; +const FLUSH_MS = 2000; +const FETCH_TIMEOUT_MS = 500; + +let buffer: Dict[] = []; +let flushTimer: ReturnType | null = null; +let isFlushing = false; + +const activeRuns = new Map(); +const activeCompactions = new Map(); + +function isRecord(value: unknown): value is Dict { + return value !== null && typeof value === 'object' && !Array.isArray(value); +} + +function pickString(...values: unknown[]): string | undefined { + for (const value of values) { + if (typeof value === 'string' && value.trim() !== '') { + return value; + } + } + return undefined; +} + +function pickNumber(...values: unknown[]): number | undefined { + for (const value of values) { + if (typeof value === 'number' && Number.isFinite(value)) { + return value; + } + } + return undefined; +} + +function truncate(value: unknown, limit: number): string | undefined { + if (value === undefined || value === null) { + return undefined; + } + + const text = typeof value === 'string' ? value : safeJSONStringify(value); + if (!text) { + return undefined; + } + + if (text.length <= limit) { + return text; + } + return text.slice(0, limit) + '...'; +} + +function safeJSONStringify(value: unknown): string { + try { + return JSON.stringify(value); + } catch { + return String(value); + } +} + +function getEventName(input: Dict): string { + const direct = pickString(input.name, input.event); + if (direct) { + return direct; + } + + if (typeof input.type === 'string' && input.type.includes(':')) { + return input.type; + } + + if (typeof input.type === 'string' && typeof input.action === 'string') { + return `${input.type}:${input.action}`; + } + + if (typeof input.type === 'string') { + return input.type; + } + + return ''; +} + +function getContext(input: Dict): Dict { + return isRecord(input.context) ? input.context : {}; +} + +function getSessionKey(input: Dict, context: Dict): string | undefined { + return pickString( + input.sessionKey, + context.sessionKey, + context.session_id, + input.session_id, + isRecord(input.session) ? input.session.key : undefined, + isRecord(context.session) ? context.session.key : undefined, + ); +} + +function buildEnvelope( + type: string, + sessionKey?: string, + opts: { + runId?: string; + spanId?: string; + parentSpanId?: string; + attributes?: Dict; + payload?: Dict; + } = {}, +): Dict { + const correlation: Dict = {}; + if (sessionKey) { + correlation.session_id = sessionKey; + } + if (opts.runId) { + correlation.run_id = opts.runId; + } + if (opts.spanId) { + correlation.span_id = opts.spanId; + } + if (opts.parentSpanId) { + correlation.parent_span_id = opts.parentSpanId; + } + + const envelope: Dict = { + schema: { name: 'agentmon.event', version: 1 }, + event: { + id: randomUUID(), + type, + ts: new Date().toISOString(), + source: { + framework: 'openclaw', + client_id: VM_NAME, + host: VM_NAME, + }, + }, + }; + + if (Object.keys(correlation).length > 0) { + envelope.correlation = correlation; + } + if (opts.attributes && Object.keys(opts.attributes).length > 0) { + envelope.attributes = opts.attributes; + } + if (opts.payload && Object.keys(opts.payload).length > 0) { + envelope.payload = opts.payload; + } + + return envelope; +} + +function scheduleFlush() { + if (!flushTimer) { + flushTimer = setTimeout(() => { + void flush(); + }, FLUSH_MS); + } +} + +function enqueue(event: Dict) { + buffer.push(event); + if (buffer.length >= BATCH_SIZE) { + void flush(); + } else { + scheduleFlush(); + } +} + +async function postBatch(batch: Dict[]) { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS); + + try { + await fetch(`${INGEST_URL}/v1/events`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(batch), + signal: controller.signal, + }); + } finally { + clearTimeout(timeout); + } +} + +async function flush() { + if (flushTimer) { + clearTimeout(flushTimer); + flushTimer = null; + } + if (isFlushing || buffer.length === 0) { + return; + } + + isFlushing = true; + const batch = buffer.splice(0, BATCH_SIZE); + + try { + await postBatch(batch); + } catch { + console.debug(`[agentmon] failed to flush ${batch.length} events`); + } finally { + isFlushing = false; + if (buffer.length > 0) { + if (buffer.length >= BATCH_SIZE) { + void flush(); + } else { + scheduleFlush(); + } + } + } +} + +function emitError(sessionKey: string | undefined, runId: string | undefined, spanId: string | undefined, errorValue: unknown) { + if (errorValue === undefined || errorValue === null || errorValue === false) { + return; + } + + const errorRecord = isRecord(errorValue) ? errorValue : {}; + const message = pickString(errorRecord.message, errorRecord.error, errorValue) || 'unknown'; + const errType = pickString(errorRecord.type, errorRecord.code) || 'openclaw'; + + enqueue(buildEnvelope('error', sessionKey, { + runId, + spanId, + payload: { + error: { + type: errType, + message, + }, + }, + })); +} + +function buildRunPayload(context: Dict, success: boolean): Dict { + const payload: Dict = { + status: success ? 'success' : 'error', + }; + + const duration = pickNumber(context.duration_ms, context.durationMs, context.elapsed_ms); + if (duration !== undefined) { + payload.duration_ms = duration; + } + + const usage = isRecord(context.usage) ? context.usage : undefined; + if (usage) { + payload.usage = usage; + } + + const errorMessage = pickString(context.error, isRecord(context.result) ? context.result.error : undefined); + if (errorMessage) { + payload.error = errorMessage; + } + + return payload; +} + +const handler = async (rawEvent: unknown) => { + if (!isRecord(rawEvent)) { + return; + } + + const context = getContext(rawEvent); + const eventName = getEventName(rawEvent); + const sessionKey = getSessionKey(rawEvent, context); + + try { + if (eventName === 'command:new') { + enqueue(buildEnvelope('session.start', sessionKey)); + return; + } + + if (eventName === 'command:stop') { + enqueue(buildEnvelope('session.end', sessionKey)); + if (sessionKey) { + activeRuns.delete(sessionKey); + activeCompactions.delete(sessionKey); + } + return; + } + + if (eventName === 'command:reset') { + enqueue(buildEnvelope('session.end', sessionKey)); + enqueue(buildEnvelope('session.start', sessionKey)); + if (sessionKey) { + activeRuns.delete(sessionKey); + activeCompactions.delete(sessionKey); + } + return; + } + + if (eventName === 'message:received') { + const runId = randomUUID(); + if (sessionKey) { + activeRuns.set(sessionKey, runId); + } + + enqueue(buildEnvelope('run.start', sessionKey, { + runId, + attributes: { + channel: pickString(context.channelId, context.channel_id), + from: pickString(context.from, context.sender), + }, + payload: { + message_preview: truncate( + pickString(context.content, context.message, context.text) || context.input, + 200, + ), + }, + })); + return; + } + + if (eventName === 'message:sent') { + const runId = sessionKey ? activeRuns.get(sessionKey) : undefined; + const success = context.success !== false && !context.error; + + enqueue(buildEnvelope('run.end', sessionKey, { + runId, + attributes: { + channel: pickString(context.channelId, context.channel_id), + to: pickString(context.to, context.recipient), + }, + payload: buildRunPayload(context, success), + })); + + if (!success) { + emitError(sessionKey, runId, undefined, context.error); + } + return; + } + + if (eventName === 'tool_result_persist') { + const runId = sessionKey ? activeRuns.get(sessionKey) : undefined; + const spanId = randomUUID(); + const success = context.success !== false && !context.error; + const toolName = pickString(context.toolName, context.tool_name, context.name) || 'unknown_tool'; + const payload: Dict = { + status: success ? 'success' : 'error', + }; + + const duration = pickNumber(context.duration_ms, context.durationMs, context.elapsed_ms); + if (duration !== undefined) { + payload.duration_ms = duration; + } + + const resultPreview = truncate(context.result ?? context.output, 500); + if (resultPreview) { + payload.result_preview = resultPreview; + } + + enqueue(buildEnvelope('span.end', sessionKey, { + runId, + spanId, + attributes: { + span_kind: 'tool', + name: toolName, + }, + payload, + })); + + if (!success) { + emitError(sessionKey, runId, spanId, context.error); + } + return; + } + + if (eventName === 'session:compact:before') { + const runId = sessionKey ? activeRuns.get(sessionKey) : undefined; + const spanId = randomUUID(); + if (sessionKey) { + activeCompactions.set(sessionKey, spanId); + } + + enqueue(buildEnvelope('span.start', sessionKey, { + runId, + spanId, + attributes: { + span_kind: 'internal', + name: 'context_compaction', + }, + })); + return; + } + + if (eventName === 'session:compact:after') { + const runId = sessionKey ? activeRuns.get(sessionKey) : undefined; + const spanId = (sessionKey && activeCompactions.get(sessionKey)) || randomUUID(); + if (sessionKey) { + activeCompactions.delete(sessionKey); + } + + enqueue(buildEnvelope('span.end', sessionKey, { + runId, + spanId, + attributes: { + span_kind: 'internal', + name: 'context_compaction', + }, + payload: { + status: 'success', + duration_ms: pickNumber(context.duration_ms, context.durationMs, context.elapsed_ms), + }, + })); + } + } catch { + console.debug('[agentmon] handler error'); + } +}; + +export default handler; diff --git a/openclaw/hooks/agentmon/HOOK.md b/openclaw/hooks/agentmon/HOOK.md new file mode 100644 index 0000000..15b4441 --- /dev/null +++ b/openclaw/hooks/agentmon/HOOK.md @@ -0,0 +1,33 @@ +--- +name: agentmon +description: "Emit OpenClaw telemetry events to the agentmon monitoring pipeline" +metadata: + openclaw: + events: + - "command:new" + - "command:stop" + - "command:reset" + - "message:received" + - "message:sent" + - "agent:bootstrap" + export: "default" +--- + +# Agentmon Telemetry Hook + +Captures OpenClaw agent activity and emits it as `agentmon.event` envelopes to +the agentmon ingest gateway. + +## Configuration + +Set the ingest gateway URL before enabling the hook: + +```bash +export AGENTMON_INGEST_URL=http://192.168.122.1:8080 +``` + +You can optionally override the VM identifier: + +```bash +export AGENTMON_VM_NAME=zap +``` diff --git a/openclaw/hooks/agentmon/handler.js b/openclaw/hooks/agentmon/handler.js new file mode 100644 index 0000000..d64eea4 --- /dev/null +++ b/openclaw/hooks/agentmon/handler.js @@ -0,0 +1,295 @@ +import { randomUUID } from "node:crypto"; +import { hostname } from "node:os"; +const INGEST_URL = process.env.AGENTMON_INGEST_URL || "http://192.168.122.1:8080"; +const VM_NAME = process.env.AGENTMON_VM_NAME || hostname(); +const BATCH_SIZE = 10; +const FLUSH_MS = 2e3; +const FETCH_TIMEOUT_MS = 500; +let buffer = []; +let flushTimer = null; +let isFlushing = false; +const activeRuns = /* @__PURE__ */ new Map(); +function isRecord(value) { + return value !== null && typeof value === "object" && !Array.isArray(value); +} +function pickString(...values) { + for (const value of values) { + if (typeof value === "string" && value.trim() !== "") { + return value; + } + } + return void 0; +} +function pickNumber(...values) { + for (const value of values) { + if (typeof value === "number" && Number.isFinite(value)) { + return value; + } + } + return void 0; +} +function truncate(value, limit) { + if (value === void 0 || value === null) { + return void 0; + } + const text = typeof value === "string" ? value : safeJSONStringify(value); + if (!text) { + return void 0; + } + if (text.length <= limit) { + return text; + } + return text.slice(0, limit) + "..."; +} +function safeJSONStringify(value) { + try { + return JSON.stringify(value); + } catch { + return String(value); + } +} +function getEventName(input) { + const direct = pickString(input.name, input.event); + if (direct) { + return direct; + } + if (typeof input.type === "string" && input.type.includes(":")) { + return input.type; + } + if (typeof input.type === "string" && typeof input.action === "string") { + return `${input.type}:${input.action}`; + } + if (typeof input.type === "string") { + return input.type; + } + return ""; +} +function getContext(input) { + return isRecord(input.context) ? input.context : {}; +} +function getSessionKey(input, context) { + return pickString( + input.sessionKey, + context.sessionKey, + context.session_id, + input.session_id, + isRecord(input.session) ? input.session.key : void 0, + isRecord(context.session) ? context.session.key : void 0 + ); +} +function buildEnvelope(type, sessionKey, opts = {}) { + const correlation = {}; + if (sessionKey) { + correlation.session_id = sessionKey; + } + if (opts.runId) { + correlation.run_id = opts.runId; + } + if (opts.spanId) { + correlation.span_id = opts.spanId; + } + if (opts.parentSpanId) { + correlation.parent_span_id = opts.parentSpanId; + } + const envelope = { + schema: { name: "agentmon.event", version: 1 }, + event: { + id: randomUUID(), + type, + ts: (/* @__PURE__ */ new Date()).toISOString(), + source: { + framework: "openclaw", + client_id: VM_NAME, + host: VM_NAME + } + } + }; + if (Object.keys(correlation).length > 0) { + envelope.correlation = correlation; + } + if (opts.attributes && Object.keys(opts.attributes).length > 0) { + envelope.attributes = opts.attributes; + } + if (opts.payload && Object.keys(opts.payload).length > 0) { + envelope.payload = opts.payload; + } + return envelope; +} +function scheduleFlush() { + if (!flushTimer) { + flushTimer = setTimeout(() => { + void flush(); + }, FLUSH_MS); + } +} +function enqueue(event) { + buffer.push(event); + if (buffer.length >= BATCH_SIZE) { + void flush(); + } else { + scheduleFlush(); + } +} +async function postBatch(batch) { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS); + try { + await fetch(`${INGEST_URL}/v1/events`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(batch), + signal: controller.signal + }); + } finally { + clearTimeout(timeout); + } +} +async function flush() { + if (flushTimer) { + clearTimeout(flushTimer); + flushTimer = null; + } + if (isFlushing || buffer.length === 0) { + return; + } + isFlushing = true; + const batch = buffer.splice(0, BATCH_SIZE); + try { + await postBatch(batch); + } catch { + console.debug(`[agentmon] failed to flush ${batch.length} events`); + } finally { + isFlushing = false; + if (buffer.length > 0) { + if (buffer.length >= BATCH_SIZE) { + void flush(); + } else { + scheduleFlush(); + } + } + } +} +function emitError(sessionKey, runId, spanId, errorValue) { + if (errorValue === void 0 || errorValue === null || errorValue === false) { + return; + } + const errorRecord = isRecord(errorValue) ? errorValue : {}; + const message = pickString(errorRecord.message, errorRecord.error, errorValue) || "unknown"; + const errType = pickString(errorRecord.type, errorRecord.code) || "openclaw"; + enqueue(buildEnvelope("error", sessionKey, { + runId, + spanId, + payload: { + error: { + type: errType, + message + } + } + })); +} +function buildRunPayload(context, success) { + const payload = { + status: success ? "success" : "error" + }; + const duration = pickNumber(context.duration_ms, context.durationMs, context.elapsed_ms); + if (duration !== void 0) { + payload.duration_ms = duration; + } + const usage = isRecord(context.usage) ? context.usage : void 0; + if (usage) { + payload.usage = usage; + } + const errorMessage = pickString(context.error, isRecord(context.result) ? context.result.error : void 0); + if (errorMessage) { + payload.error = errorMessage; + } + return payload; +} +const handler = async (rawEvent) => { + if (!isRecord(rawEvent)) { + return; + } + const context = getContext(rawEvent); + const eventName = getEventName(rawEvent); + const sessionKey = getSessionKey(rawEvent, context); + try { + if (eventName === "command:new") { + enqueue(buildEnvelope("session.start", sessionKey)); + return; + } + if (eventName === "command:stop") { + enqueue(buildEnvelope("session.end", sessionKey)); + if (sessionKey) { + activeRuns.delete(sessionKey); + } + return; + } + if (eventName === "command:reset") { + enqueue(buildEnvelope("session.end", sessionKey)); + enqueue(buildEnvelope("session.start", sessionKey)); + if (sessionKey) { + activeRuns.delete(sessionKey); + } + return; + } + if (eventName === "agent:bootstrap") { + const existingRunId = sessionKey ? activeRuns.get(sessionKey) : void 0; + if (!existingRunId) { + const runId = randomUUID(); + if (sessionKey) { + activeRuns.set(sessionKey, runId); + } + enqueue(buildEnvelope("run.start", sessionKey, { + runId, + attributes: { + agent_id: pickString(context.agentId), + run_kind: "embedded" + } + })); + } + return; + } + if (eventName === "message:received") { + const runId = randomUUID(); + if (sessionKey) { + activeRuns.set(sessionKey, runId); + } + enqueue(buildEnvelope("run.start", sessionKey, { + runId, + attributes: { + channel: pickString(context.channelId, context.channel_id), + from: pickString(context.from, context.sender) + }, + payload: { + message_preview: truncate( + pickString(context.content, context.message, context.text) || context.input, + 200 + ) + } + })); + return; + } + if (eventName === "message:sent") { + const runId = sessionKey ? activeRuns.get(sessionKey) : void 0; + const success = context.success !== false && !context.error; + enqueue(buildEnvelope("run.end", sessionKey, { + runId, + attributes: { + channel: pickString(context.channelId, context.channel_id), + to: pickString(context.to, context.recipient) + }, + payload: buildRunPayload(context, success) + })); + if (!success) { + emitError(sessionKey, runId, void 0, context.error); + } + return; + } + } catch { + console.debug("[agentmon] handler error"); + } +}; +var handler_default = handler; +export { + handler_default as default +}; diff --git a/openclaw/hooks/agentmon/handler.ts b/openclaw/hooks/agentmon/handler.ts new file mode 100644 index 0000000..1b09d9c --- /dev/null +++ b/openclaw/hooks/agentmon/handler.ts @@ -0,0 +1,357 @@ +import { randomUUID } from 'node:crypto'; +import { hostname } from 'node:os'; + +type Dict = Record; + +const INGEST_URL = process.env.AGENTMON_INGEST_URL || 'http://192.168.122.1:8080'; +const VM_NAME = process.env.AGENTMON_VM_NAME || hostname(); +const BATCH_SIZE = 10; +const FLUSH_MS = 2000; +const FETCH_TIMEOUT_MS = 500; + +let buffer: Dict[] = []; +let flushTimer: ReturnType | null = null; +let isFlushing = false; + +const activeRuns = new Map(); + +function isRecord(value: unknown): value is Dict { + return value !== null && typeof value === 'object' && !Array.isArray(value); +} + +function pickString(...values: unknown[]): string | undefined { + for (const value of values) { + if (typeof value === 'string' && value.trim() !== '') { + return value; + } + } + return undefined; +} + +function pickNumber(...values: unknown[]): number | undefined { + for (const value of values) { + if (typeof value === 'number' && Number.isFinite(value)) { + return value; + } + } + return undefined; +} + +function truncate(value: unknown, limit: number): string | undefined { + if (value === undefined || value === null) { + return undefined; + } + + const text = typeof value === 'string' ? value : safeJSONStringify(value); + if (!text) { + return undefined; + } + + if (text.length <= limit) { + return text; + } + return text.slice(0, limit) + '...'; +} + +function safeJSONStringify(value: unknown): string { + try { + return JSON.stringify(value); + } catch { + return String(value); + } +} + +function getEventName(input: Dict): string { + const direct = pickString(input.name, input.event); + if (direct) { + return direct; + } + + if (typeof input.type === 'string' && input.type.includes(':')) { + return input.type; + } + + if (typeof input.type === 'string' && typeof input.action === 'string') { + return `${input.type}:${input.action}`; + } + + if (typeof input.type === 'string') { + return input.type; + } + + return ''; +} + +function getContext(input: Dict): Dict { + return isRecord(input.context) ? input.context : {}; +} + +function getSessionKey(input: Dict, context: Dict): string | undefined { + return pickString( + input.sessionKey, + context.sessionKey, + context.session_id, + input.session_id, + isRecord(input.session) ? input.session.key : undefined, + isRecord(context.session) ? context.session.key : undefined, + ); +} + +function buildEnvelope( + type: string, + sessionKey?: string, + opts: { + runId?: string; + spanId?: string; + parentSpanId?: string; + attributes?: Dict; + payload?: Dict; + } = {}, +): Dict { + const correlation: Dict = {}; + if (sessionKey) { + correlation.session_id = sessionKey; + } + if (opts.runId) { + correlation.run_id = opts.runId; + } + if (opts.spanId) { + correlation.span_id = opts.spanId; + } + if (opts.parentSpanId) { + correlation.parent_span_id = opts.parentSpanId; + } + + const envelope: Dict = { + schema: { name: 'agentmon.event', version: 1 }, + event: { + id: randomUUID(), + type, + ts: new Date().toISOString(), + source: { + framework: 'openclaw', + client_id: VM_NAME, + host: VM_NAME, + }, + }, + }; + + if (Object.keys(correlation).length > 0) { + envelope.correlation = correlation; + } + if (opts.attributes && Object.keys(opts.attributes).length > 0) { + envelope.attributes = opts.attributes; + } + if (opts.payload && Object.keys(opts.payload).length > 0) { + envelope.payload = opts.payload; + } + + return envelope; +} + +function scheduleFlush() { + if (!flushTimer) { + flushTimer = setTimeout(() => { + void flush(); + }, FLUSH_MS); + } +} + +function enqueue(event: Dict) { + buffer.push(event); + if (buffer.length >= BATCH_SIZE) { + void flush(); + } else { + scheduleFlush(); + } +} + +async function postBatch(batch: Dict[]) { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS); + + try { + await fetch(`${INGEST_URL}/v1/events`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(batch), + signal: controller.signal, + }); + } finally { + clearTimeout(timeout); + } +} + +async function flush() { + if (flushTimer) { + clearTimeout(flushTimer); + flushTimer = null; + } + if (isFlushing || buffer.length === 0) { + return; + } + + isFlushing = true; + const batch = buffer.splice(0, BATCH_SIZE); + + try { + await postBatch(batch); + } catch { + console.debug(`[agentmon] failed to flush ${batch.length} events`); + } finally { + isFlushing = false; + if (buffer.length > 0) { + if (buffer.length >= BATCH_SIZE) { + void flush(); + } else { + scheduleFlush(); + } + } + } +} + +function emitError(sessionKey: string | undefined, runId: string | undefined, spanId: string | undefined, errorValue: unknown) { + if (errorValue === undefined || errorValue === null || errorValue === false) { + return; + } + + const errorRecord = isRecord(errorValue) ? errorValue : {}; + const message = pickString(errorRecord.message, errorRecord.error, errorValue) || 'unknown'; + const errType = pickString(errorRecord.type, errorRecord.code) || 'openclaw'; + + enqueue(buildEnvelope('error', sessionKey, { + runId, + spanId, + payload: { + error: { + type: errType, + message, + }, + }, + })); +} + +function buildRunPayload(context: Dict, success: boolean): Dict { + const payload: Dict = { + status: success ? 'success' : 'error', + }; + + const duration = pickNumber(context.duration_ms, context.durationMs, context.elapsed_ms); + if (duration !== undefined) { + payload.duration_ms = duration; + } + + const usage = isRecord(context.usage) ? context.usage : undefined; + if (usage) { + payload.usage = usage; + } + + const errorMessage = pickString(context.error, isRecord(context.result) ? context.result.error : undefined); + if (errorMessage) { + payload.error = errorMessage; + } + + return payload; +} + +const handler = async (rawEvent: unknown) => { + if (!isRecord(rawEvent)) { + return; + } + + const context = getContext(rawEvent); + const eventName = getEventName(rawEvent); + const sessionKey = getSessionKey(rawEvent, context); + + try { + if (eventName === 'command:new') { + enqueue(buildEnvelope('session.start', sessionKey)); + return; + } + + if (eventName === 'command:stop') { + enqueue(buildEnvelope('session.end', sessionKey)); + if (sessionKey) { + activeRuns.delete(sessionKey); + } + return; + } + + if (eventName === 'command:reset') { + enqueue(buildEnvelope('session.end', sessionKey)); + enqueue(buildEnvelope('session.start', sessionKey)); + if (sessionKey) { + activeRuns.delete(sessionKey); + } + return; + } + + if (eventName === 'agent:bootstrap') { + // Only emit run.start if no run is already active for this session. + // Interactive message-channel sessions already emit run.start via + // message:received; this handler captures cron/automation embedded runs. + const existingRunId = sessionKey ? activeRuns.get(sessionKey) : undefined; + if (!existingRunId) { + const runId = randomUUID(); + if (sessionKey) { + activeRuns.set(sessionKey, runId); + } + + enqueue(buildEnvelope('run.start', sessionKey, { + runId, + attributes: { + agent_id: pickString(context.agentId as string | undefined), + run_kind: 'embedded', + }, + })); + } + return; + } + + if (eventName === 'message:received') { + const runId = randomUUID(); + if (sessionKey) { + activeRuns.set(sessionKey, runId); + } + + enqueue(buildEnvelope('run.start', sessionKey, { + runId, + attributes: { + channel: pickString(context.channelId, context.channel_id), + from: pickString(context.from, context.sender), + }, + payload: { + message_preview: truncate( + pickString(context.content, context.message, context.text) || context.input, + 200, + ), + }, + })); + return; + } + + if (eventName === 'message:sent') { + const runId = sessionKey ? activeRuns.get(sessionKey) : undefined; + const success = context.success !== false && !context.error; + + enqueue(buildEnvelope('run.end', sessionKey, { + runId, + attributes: { + channel: pickString(context.channelId, context.channel_id), + to: pickString(context.to, context.recipient), + }, + payload: buildRunPayload(context, success), + })); + + if (!success) { + emitError(sessionKey, runId, undefined, context.error); + } + return; + } + + } catch { + console.debug('[agentmon] handler error'); + } +}; + +export default handler;