Files
agentmon/internal/sdk/emitter.go
T
William Valentin 3434db3c59 feat: complete agent monitoring - hook, UI, and backend filter
- Add event_type and framework filters to events query endpoint
- Add /agents SPA route to web-ui server
- Add Agents nav link and route in frontend
- Add agents page CSS (timeline, VM pills, stats panel)
- Build VM status strip, activity timeline, and real-time stats
- Add agentmon hook for OpenClaw (HOOK.md + handler.ts)
- Add docker-compose, Dockerfile, and supporting infra files

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-14 00:26:42 -07:00

352 lines
7.2 KiB
Go

// Package sdk provides the agentmon emitter SDK for sending telemetry events.
package sdk
import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
const (
schemaName = "agentmon.event"
schemaVersion = 1
)
// Emitter is the main client for sending agentmon events.
type Emitter struct {
config Config
httpClient *http.Client
wsClient *WSClient
buffer []Event
bufferSize int
mu sync.Mutex
closed bool
}
// Config holds emitter configuration.
type Config struct {
// ServerURL is the base URL of the ingest gateway (e.g., "http://localhost:8080")
ServerURL string
// APIKey is optional authentication key
APIKey string
// Framework is the name of the agent framework (e.g., "opencode", "claude-code")
Framework string
// ClientID is a stable identifier for this emitter instance
ClientID string
// Host is the hostname where events originate
Host string
// BufferSize is the max number of events to buffer before flushing
BufferSize int
// UseWebSocket enables WebSocket streaming mode
UseWebSocket bool
// EnableLogging enables debug logging
EnableLogging bool
}
// Event represents a complete agentmon event.
type Event map[string]any
// WSClient handles WebSocket communication with the ingest gateway.
type WSClient struct {
conn *websocket.Conn
sendChan chan []byte
ackChan chan int
mu sync.Mutex
closed bool
}
// NewWSClient creates a new WebSocket client.
func NewWSClient(url string) (*WSClient, error) {
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return nil, err
}
return &WSClient{
conn: conn,
sendChan: make(chan []byte, 100),
ackChan: make(chan int, 1),
}, nil
}
// Run starts the WebSocket client's main loop.
func (w *WSClient) Run(ctx context.Context) {
defer w.Close()
go w.readMessages()
w.writeMessages()
}
// Send queues an event to be sent via WebSocket.
func (w *WSClient) Send(data []byte) error {
w.mu.Lock()
if w.closed {
w.mu.Unlock()
return fmt.Errorf("WebSocket client is closed")
}
w.mu.Unlock()
select {
case w.sendChan <- data:
return nil
default:
return fmt.Errorf("send buffer full")
}
}
// Close closes the WebSocket connection.
func (w *WSClient) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return nil
}
w.closed = true
if w.conn != nil {
_ = w.conn.Close()
}
close(w.sendChan)
return nil
}
func (w *WSClient) readMessages() {
for {
_, message, err := w.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
log.Printf("WebSocket read error: %v", err)
}
return
}
var ack map[string]any
if err := json.Unmarshal(message, &ack); err != nil {
log.Printf("Failed to unmarshal ack: %v", err)
continue
}
if seq, ok := ack["ack"].(map[string]any)["up_to_seq"].(float64); ok {
select {
case w.ackChan <- int(seq):
default:
}
}
}
}
func (w *WSClient) writeMessages() {
for data := range w.sendChan {
w.mu.Lock()
if w.closed {
w.mu.Unlock()
return
}
err := w.conn.WriteMessage(websocket.TextMessage, data)
w.mu.Unlock()
if err != nil {
log.Printf("WebSocket write error: %v", err)
return
}
}
}
// NewEmitter creates a new emitter with the given configuration.
func NewEmitter(cfg Config) (*Emitter, error) {
if cfg.ServerURL == "" {
return nil, fmt.Errorf("ServerURL is required")
}
if cfg.Framework == "" {
return nil, fmt.Errorf("Framework is required")
}
if cfg.ClientID == "" {
return nil, fmt.Errorf("ClientID is required")
}
if cfg.Host == "" {
cfg.Host = "localhost"
}
if cfg.BufferSize <= 0 {
cfg.BufferSize = 100
}
e := &Emitter{
config: cfg,
httpClient: &http.Client{Timeout: 30 * time.Second},
buffer: make([]Event, 0, cfg.BufferSize),
bufferSize: cfg.BufferSize,
}
if cfg.UseWebSocket {
wsURL := wsURLFromHTTP(cfg.ServerURL)
wsClient, err := NewWSClient(wsURL)
if err != nil {
return nil, fmt.Errorf("failed to create WebSocket client: %w", err)
}
e.wsClient = wsClient
go e.wsClient.Run(context.Background())
}
return e, nil
}
// Emit sends a single event.
func (e *Emitter) Emit(ctx context.Context, event Event) error {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return fmt.Errorf("emitter is closed")
}
if e.config.UseWebSocket {
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}
return e.wsClient.Send(data)
}
e.buffer = append(e.buffer, event)
if len(e.buffer) >= e.bufferSize {
return e.Flush(ctx)
}
return nil
}
// Flush sends all buffered events to the server.
func (e *Emitter) Flush(ctx context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return fmt.Errorf("emitter is closed")
}
if len(e.buffer) == 0 {
return nil
}
if e.config.EnableLogging {
log.Printf("Flushing %d events", len(e.buffer))
}
events := make([]map[string]any, len(e.buffer))
for i, ev := range e.buffer {
events[i] = ev
}
resp, err := e.sendEvents(ctx, events)
if err != nil {
return fmt.Errorf("failed to send events: %w", err)
}
e.buffer = e.buffer[:0]
if resp.Rejected > 0 && e.config.EnableLogging {
log.Printf("Rejected %d events", resp.Rejected)
if len(resp.Errors) > 0 {
log.Printf("Errors: %v", resp.Errors)
}
}
return nil
}
// Close flushes any remaining events and closes the emitter.
func (e *Emitter) Close(ctx context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return nil
}
e.closed = true
if e.wsClient != nil {
_ = e.wsClient.Close()
}
if len(e.buffer) > 0 {
_ = e.Flush(ctx)
}
return nil
}
type sendResponse struct {
Accepted int `json:"accepted"`
Rejected int `json:"rejected"`
Errors []struct {
Error string `json:"error"`
} `json:"errors,omitempty"`
}
func (e *Emitter) sendEvents(ctx context.Context, events []map[string]any) (*sendResponse, error) {
body, err := json.Marshal(events)
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, "POST", e.config.ServerURL+"/v1/events", bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
if e.config.APIKey != "" {
req.Header.Set("Authorization", "Bearer "+e.config.APIKey)
}
resp, err := e.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
var result sendResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
return &result, nil
}
func wsURLFromHTTP(httpURL string) string {
switch {
case len(httpURL) >= 8 && httpURL[:8] == "https://":
return "wss://" + httpURL[8:] + "/v1/ws"
case len(httpURL) >= 7 && httpURL[:7] == "http://":
return "ws://" + httpURL[7:] + "/v1/ws"
default:
return httpURL + "/v1/ws"
}
}
// generateID creates a new UUID-like identifier.
func generateID() string {
b := make([]byte, 16)
if _, err := rand.Read(b); err != nil {
log.Printf("Failed to generate random ID: %v", err)
return fmt.Sprintf("%d", time.Now().UnixNano())
}
return hex.EncodeToString(b)
}