feat: complete agent monitoring - hook, UI, and backend filter

- Add event_type and framework filters to events query endpoint
- Add /agents SPA route to web-ui server
- Add Agents nav link and route in frontend
- Add agents page CSS (timeline, VM pills, stats panel)
- Build VM status strip, activity timeline, and real-time stats
- Add agentmon hook for OpenClaw (HOOK.md + handler.ts)
- Add docker-compose, Dockerfile, and supporting infra files

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
William Valentin
2026-03-14 00:26:42 -07:00
parent 1927ec6622
commit 3434db3c59
29 changed files with 6228 additions and 231 deletions
+26 -23
View File
@@ -6,14 +6,15 @@ import (
)
var validTypes = map[string]bool{
"session.start": true,
"session.end": true,
"run.start": true,
"run.end": true,
"span.start": true,
"span.end": true,
"error": true,
"metric.snapshot": true,
"session.start": true,
"session.end": true,
"run.start": true,
"run.end": true,
"span.start": true,
"span.end": true,
"error": true,
"metric.snapshot": true,
"openclaw.snapshot": true,
}
type ValidationError struct {
@@ -31,8 +32,8 @@ func Validate(m map[string]any) error {
if !ok {
return ValidationError{Field: "schema", Message: "missing or invalid"}
}
if name, _ := schema["name"].(string); name != "agentmon.event" {
return ValidationError{Field: "schema.name", Message: "must be 'agentmon.event'"}
if name, _ := schema["name"].(string); name != "agentmon.event" && name != "agentmon.openclaw" {
return ValidationError{Field: "schema.name", Message: "must be 'agentmon.event' or 'agentmon.openclaw'"}
}
if ver, _ := schema["version"].(float64); ver != 1 {
return ValidationError{Field: "schema.version", Message: "must be 1"}
@@ -60,20 +61,22 @@ func Validate(m map[string]any) error {
return ValidationError{Field: "event.ts", Message: "required"}
}
// Check source
source, ok := event["source"].(map[string]any)
if !ok {
return ValidationError{Field: "event.source", Message: "missing or invalid"}
}
// Source is optional for openclaw.snapshot events
if eventType != "openclaw.snapshot" {
source, ok := event["source"].(map[string]any)
if !ok {
return ValidationError{Field: "event.source", Message: "missing or invalid"}
}
if fw, _ := source["framework"].(string); fw == "" {
return ValidationError{Field: "event.source.framework", Message: "required"}
}
if cid, _ := source["client_id"].(string); cid == "" {
return ValidationError{Field: "event.source.client_id", Message: "required"}
}
if host, _ := source["host"].(string); host == "" {
return ValidationError{Field: "event.source.host", Message: "required"}
if fw, _ := source["framework"].(string); fw == "" {
return ValidationError{Field: "event.source.framework", Message: "required"}
}
if cid, _ := source["client_id"].(string); cid == "" {
return ValidationError{Field: "event.source.client_id", Message: "required"}
}
if host, _ := source["host"].(string); host == "" {
return ValidationError{Field: "event.source.host", Message: "required"}
}
}
return nil
+353
View File
@@ -0,0 +1,353 @@
package openclaw
import (
"encoding/json"
"fmt"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
)
const (
guestDiskWarningPercent = 80.0
guestDiskCriticalPercent = 95.0
guestMemoryWarningPercent = 80.0
guestMemoryCriticalPercent = 95.0
hostDiskActualGB = 32.0
backupWarningAgeHours = 25.0
backupCriticalAgeHours = 48.0
)
func CollectHostMetrics(domain string) (HostMetrics, error) {
metrics := HostMetrics{}
state, err := virshCmd("domstate", domain)
if err != nil {
return metrics, fmt.Errorf("failed to get VM state: %w", err)
}
metrics.State = strings.TrimSpace(state)
if metrics.State != "running" {
metrics.VCPUs = 0
metrics.MemoryKiB = 0
metrics.Autostart = false
metrics.Snapshots = 0
return metrics, nil
}
info, err := virshCmd("dominfo", domain)
if err != nil {
return metrics, fmt.Errorf("failed to get VM info: %w", err)
}
if cpuStr := parseVirshInfo(info, "CPU(s)"); cpuStr != "" {
if cpu, err := strconv.Atoi(strings.TrimSpace(cpuStr)); err == nil {
metrics.VCPUs = cpu
}
}
if memStr := parseVirshInfo(info, "Max memory"); memStr != "" {
if mem, err := parseMemoryKiB(memStr); err == nil {
metrics.MemoryKiB = mem
}
}
autostartInfo := parseVirshInfo(info, "Autostart")
metrics.Autostart = strings.TrimSpace(autostartInfo) == "enable"
cpuTime, err := virshCmd("dominfo", domain)
if err == nil {
if cpuStr := parseVirshInfo(cpuTime, "CPU time"); cpuStr != "" {
if cpu, err := parseCPUTimeNS(cpuStr); err == nil {
metrics.CPUTime = cpu
}
}
}
snapshots, err := virshCmd("snapshot-list", domain, "--name")
if err == nil {
metrics.Snapshots = len(strings.Fields(strings.TrimSpace(snapshots)))
}
diskPath, err := virshCmd("domblklist", domain)
if err == nil {
lines := strings.Split(diskPath, "\n")
for _, line := range lines {
fields := strings.Fields(line)
if len(fields) >= 2 && fields[1] != "" && fields[1] != "-" {
diskActual, _, err := getDiskStats(fields[1])
if err == nil {
metrics.DiskActual = diskActual
}
}
}
}
return metrics, nil
}
func CollectGuestMetrics(host, user string) (*GuestMetrics, error) {
metrics := &GuestMetrics{}
serviceStatus, err := sshCmd(host, user, "systemctl --user is-active openclaw-gateway.service")
if err == nil {
metrics.ServiceActive = strings.TrimSpace(serviceStatus) == "active"
}
if metrics.ServiceActive {
uptime, err := sshCmd(host, user, "systemctl --user show openclaw-gateway.service -p ActiveEnterTimestamp --value")
if err == nil {
uptimeTS := strings.TrimSpace(uptime)
if ts, err := time.Parse("Mon 2006-01-02 15:04:05 MST", uptimeTS); err == nil {
duration := time.Since(ts)
metrics.ServiceUptime = duration.String()
}
}
}
httpCode, err := sshCmd(host, user, "curl -s -o /dev/null -w '%{http_code}' http://127.0.0.1:18789/")
if err == nil {
if code, err := strconv.Atoi(strings.TrimSpace(httpCode)); err == nil {
metrics.HTTPStatus = code
}
}
version, err := sshCmd(host, user, "ls -la ~/.local/share/pnpm/5/node_modules/openclaw | grep -oP 'openclaw@[0-9.]+' | head -1")
if err == nil {
metrics.Version = strings.TrimSpace(strings.TrimPrefix(version, "openclaw@"))
if metrics.Version != "" {
serviceVersion, err := sshCmd(host, user, "grep OPENCLAW_SERVICE_VERSION ~/.config/systemd/user/openclaw-gateway.service 2>/dev/null | head -1")
if err == nil && strings.Contains(serviceVersion, metrics.Version) {
metrics.VersionConsistent = true
}
}
}
memInfo, err := sshCmd(host, user, "free -b | grep '^Mem:'")
if err == nil {
fields := strings.Fields(memInfo)
if len(fields) >= 3 {
if total, err := strconv.ParseInt(fields[1], 10, 64); err == nil {
metrics.MemoryTotal = total
}
if used, err := strconv.ParseInt(fields[2], 10, 64); err == nil {
metrics.MemoryUsed = used
}
if metrics.MemoryTotal > 0 {
metrics.MemoryPercent = float64(metrics.MemoryUsed) / float64(metrics.MemoryTotal) * 100
}
}
}
diskInfo, err := sshCmd(host, user, "df -B1 / | tail -1")
if err == nil {
fields := strings.Fields(diskInfo)
if len(fields) >= 5 {
if total, err := strconv.ParseInt(fields[1], 10, 64); err == nil {
metrics.DiskTotal = total
}
if used, err := strconv.ParseInt(fields[2], 10, 64); err == nil {
metrics.DiskUsed = used
}
if metrics.DiskTotal > 0 {
metrics.DiskPercent = float64(metrics.DiskUsed) / float64(metrics.DiskTotal) * 100
}
}
}
loadAvg, err := sshCmd(host, user, "awk '{print $1}' /proc/loadavg")
if err == nil {
if load, err := strconv.ParseFloat(strings.TrimSpace(loadAvg), 64); err == nil {
metrics.LoadAverage = load
}
}
swappiness, err := sshCmd(host, user, "cat /proc/sys/vm/swappiness")
if err == nil {
if swap, err := strconv.Atoi(strings.TrimSpace(swappiness)); err == nil {
metrics.Swappiness = swap
}
}
return metrics, nil
}
func CollectBackupStatus(instanceName string) (*BackupStatus, error) {
backupPath := "/home/will/lab/swarm/openclaw"
fileInfo, err := exec.Command("stat", "-c", "%Y", backupPath).Output()
if err != nil {
return nil, fmt.Errorf("failed to get backup timestamp: %w", err)
}
timestampStr := strings.TrimSpace(string(fileInfo))
timestamp, err := strconv.ParseInt(timestampStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse backup timestamp: %w", err)
}
lastBackup := time.Unix(timestamp, 0)
age := time.Since(lastBackup)
ageHours := age.Hours()
return &BackupStatus{
LastBackup: lastBackup.UTC().Format(time.RFC3339),
AgeHours: ageHours,
}, nil
}
func DetectIssues(metrics Metrics) Issues {
issues := Issues{}
if metrics.Guest != nil {
if metrics.Guest.DiskPercent > guestDiskCriticalPercent {
issues.GuestDiskUsageHigh = true
}
if metrics.Guest.MemoryPercent > guestMemoryCriticalPercent {
issues.GuestMemoryUsageHigh = true
}
if !metrics.Guest.ServiceActive {
issues.GatewayDown = true
}
if metrics.Guest.HTTPStatus != 200 {
issues.HTTPUnhealthy = true
}
if metrics.Guest.Version != "" && !metrics.Guest.VersionConsistent {
issues.VersionMismatch = true
}
}
if metrics.Instance.Status == "active" && metrics.Host.State != "running" {
issues.VMNotRunning = true
}
if metrics.Backup != nil && metrics.Backup.AgeHours > backupCriticalAgeHours {
issues.BackupStale = true
}
return issues
}
func LoadInstances(registryPath string) ([]Instance, error) {
data, err := exec.Command("cat", registryPath).Output()
if err != nil {
return nil, fmt.Errorf("failed to read registry: %w", err)
}
var registry struct {
Instances []map[string]any `json:"instances"`
}
if err := json.Unmarshal(data, &registry); err != nil {
return nil, fmt.Errorf("failed to parse registry: %w", err)
}
instances := make([]Instance, 0, len(registry.Instances))
for _, rawInst := range registry.Instances {
inst := Instance{}
if name, ok := rawInst["name"].(string); ok {
inst.Name = name
}
if domain, ok := rawInst["domain"].(string); ok {
inst.Domain = domain
}
if host, ok := rawInst["host"].(string); ok && host != "" {
inst.Host = &host
}
if user, ok := rawInst["user"].(string); ok {
inst.User = user
}
if status, ok := rawInst["status"].(string); ok {
inst.Status = status
}
inst.Additional = rawInst
instances = append(instances, inst)
}
return instances, nil
}
func virshCmd(args ...string) (string, error) {
cmd := exec.Command("virsh", append([]string{"-c", "qemu:///system"}, args...)...)
output, err := cmd.CombinedOutput()
return string(output), err
}
func sshCmd(host, user, command string) (string, error) {
cmd := exec.Command("ssh", "-o", "ConnectTimeout=5", "-o", "BatchMode=yes", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "-o", "LogLevel=ERROR", "-q", fmt.Sprintf("%s@%s", user, host), command)
output, err := cmd.CombinedOutput()
return string(output), err
}
func parseVirshInfo(info, key string) string {
re := regexp.MustCompile(fmt.Sprintf(`(?m)^%s:\s*(.*)$`, regexp.QuoteMeta(key)))
match := re.FindStringSubmatch(info)
if match != nil && len(match) > 1 {
return match[1]
}
return ""
}
func parseMemoryKiB(memStr string) (int64, error) {
memStr = strings.TrimSpace(strings.ToLower(memStr))
re := regexp.MustCompile(`^(\d+(?:\.\d+)?)\s*([kmgt]?)i?b$`)
match := re.FindStringSubmatch(memStr)
if match == nil || len(match) < 3 {
return 0, fmt.Errorf("invalid memory format: %s", memStr)
}
value, err := strconv.ParseFloat(match[1], 64)
if err != nil {
return 0, fmt.Errorf("failed to parse memory value: %w", err)
}
unit := match[2]
multiplier := int64(1)
switch unit {
case "k":
multiplier = 1
case "m":
multiplier = 1024
case "g":
multiplier = 1024 * 1024
case "t":
multiplier = 1024 * 1024 * 1024
}
return int64(value * float64(multiplier)), nil
}
func parseCPUTimeNS(cpuStr string) (int64, error) {
parts := strings.Fields(cpuStr)
if len(parts) < 4 {
return 0, fmt.Errorf("invalid CPU time format")
}
hours, _ := strconv.ParseFloat(parts[0], 64)
minutes, _ := strconv.ParseFloat(parts[2], 64)
seconds, _ := strconv.ParseFloat(strings.TrimSuffix(parts[4], "s"), 64)
totalSeconds := hours*3600 + minutes*60 + seconds
return int64(totalSeconds * 1e9), nil
}
func getDiskStats(path string) (actual, virtual int64, err error) {
info, err := exec.Command("stat", "-c", "%s %b", path).Output()
if err != nil {
return 0, 0, err
}
fields := strings.Fields(string(info))
if len(fields) < 2 {
return 0, 0, fmt.Errorf("invalid stat output")
}
blockSize, _ := strconv.ParseInt(fields[0], 10, 64)
blockCount, _ := strconv.ParseInt(fields[1], 10, 64)
actual = blockSize * blockCount
return actual, 0, nil
}
+64
View File
@@ -0,0 +1,64 @@
package openclaw
import "time"
type Instance struct {
Name string `json:"name"`
Domain string `json:"domain"`
Host *string `json:"host,omitempty"`
User string `json:"user"`
Status string `json:"status"`
Additional map[string]any `json:"-"`
}
type HostMetrics struct {
State string `json:"state"`
VCPUs int `json:"vcpus"`
MemoryKiB int64 `json:"memory_kib"`
Autostart bool `json:"autostart"`
Snapshots int `json:"snapshots"`
DiskActual int64 `json:"disk_actual_bytes"`
CPUTime int64 `json:"cpu_time_ns"`
}
type GuestMetrics struct {
ServiceActive bool `json:"service_active"`
ServiceUptime string `json:"service_uptime"`
HTTPStatus int `json:"http_status"`
Version string `json:"version"`
VersionConsistent bool `json:"version_consistent"`
MemoryTotal int64 `json:"memory_total_bytes"`
MemoryUsed int64 `json:"memory_used_bytes"`
MemoryPercent float64 `json:"memory_percent"`
DiskTotal int64 `json:"disk_total_bytes"`
DiskUsed int64 `json:"disk_used_bytes"`
DiskPercent float64 `json:"disk_percent"`
LoadAverage float64 `json:"load_average"`
Swappiness int `json:"swappiness"`
}
type BackupStatus struct {
LastBackup string `json:"last_backup"`
AgeHours float64 `json:"age_hours"`
}
type Metrics struct {
Instance Instance `json:"instance"`
Host HostMetrics `json:"host"`
Guest *GuestMetrics `json:"guest,omitempty"`
Backup *BackupStatus `json:"backup,omitempty"`
Timestamp time.Time `json:"timestamp"`
Error string `json:"error,omitempty"`
}
type Issues struct {
GuestDiskUsageHigh bool `json:"guest_disk_usage_high"`
GuestMemoryUsageHigh bool `json:"guest_memory_usage_high"`
HostDiskUsageHigh bool `json:"host_disk_usage_high"`
GatewayDown bool `json:"gateway_down"`
HTTPUnhealthy bool `json:"http_unhealthy"`
VersionMismatch bool `json:"version_mismatch"`
VMNotRunning bool `json:"vm_not_running"`
BackupStale bool `json:"backup_stale"`
}
+211
View File
@@ -0,0 +1,211 @@
# Agentmon SDK
The agentmon SDK provides a Go client for sending telemetry events to the agentmon backend.
## Installation
```bash
go get agentmon/internal/sdk
```
## Quick Start
```go
package main
import (
"context"
"log"
"agentmon/internal/sdk"
)
func main() {
emitter, err := sdk.NewEmitter(sdk.Config{
ServerURL: "http://localhost:8080",
Framework: "my-agent",
ClientID: "my-client-001",
Host: "localhost",
BufferSize: 100,
})
if err != nil {
log.Fatal(err)
}
defer emitter.Close(context.Background())
ctx := context.Background()
sessionID := "session-123"
// Start a session
sessionStart := sdk.NewSessionStart(sessionID, sdk.WithSource(emitter))
if err := emitter.Emit(ctx, sessionStart); err != nil {
log.Printf("Error: %v", err)
}
// ... do work ...
// End the session
sessionEnd := sdk.NewSessionEnd(sessionID, sdk.WithSource(emitter))
if err := emitter.Emit(ctx, sessionEnd); err != nil {
log.Printf("Error: %v", err)
}
// Flush buffered events
if err := emitter.Flush(ctx); err != nil {
log.Printf("Error: %v", err)
}
}
```
## Configuration
The `Config` struct configures the emitter:
| Field | Type | Required | Default | Description |
|-------|------|----------|---------|-------------|
| `ServerURL` | string | Yes | - | URL of the ingest gateway (e.g., `http://localhost:8080`) |
| `APIKey` | string | No | - | Optional API key for authentication |
| `Framework` | string | Yes | - | Name of the framework (e.g., `opencode`, `claude-code`) |
| `ClientID` | string | Yes | - | Stable identifier for this emitter instance |
| `Host` | string | No | `localhost` | Hostname where events originate |
| `BufferSize` | int | No | `100` | Max number of events to buffer before flushing |
| `UseWebSocket` | bool | No | `false` | Enable WebSocket streaming mode |
| `EnableLogging` | bool | No | `false` | Enable debug logging |
## Event Types
### Session Events
```go
// Start a session
sessionStart := sdk.NewSessionStart(sessionID,
sdk.WithSource(emitter),
sdk.WithAttributes(map[string]any{
"cwd": "/home/user/project",
"repo": "myrepo",
"branch": "main",
}),
)
// End a session
sessionEnd := sdk.NewSessionEnd(sessionID,
sdk.WithSource(emitter),
)
```
### Run Events
```go
// Start a run
runStart := sdk.NewRunStart(sessionID, runID,
sdk.WithSource(emitter),
sdk.WithAttributes(map[string]any{
"command": "my-command",
"agent": "my-agent",
}),
)
// End a run
runEnd := sdk.NewRunEnd(sessionID, runID, "success", 60000,
sdk.WithSource(emitter),
sdk.WithLLMUsage("claude-3-opus", 1000, 500, 0.015),
)
```
### Span Events
```go
// Start a span
spanStart := sdk.NewSpanStart(sessionID, runID, traceID, spanID,
sdk.WithSource(emitter),
sdk.WithSpanKind("tool"),
sdk.WithName("Bash"),
sdk.WithAttributes(map[string]any{
"command": "echo hello",
}),
)
// End a span
spanEnd := sdk.NewSpanEnd(sessionID, runID, traceID, spanID, "success", 1000,
sdk.WithSource(emitter),
sdk.WithSpanKind("tool"),
sdk.WithName("Bash"),
)
```
### Error Events
```go
errEvent := sdk.NewError(sessionID, runID, traceID, spanID,
"validation", "invalid input",
sdk.WithSource(emitter),
sdk.WithErrorDetails("VAL001", false),
)
```
### Metric Snapshots
```go
metrics := sdk.NewMetricSnapshot(sessionID, runID, map[string]any{
"tokens_in": 1000.0,
"tokens_out": 500.0,
"cost_usd": 0.015,
"latency_ms": 300.0,
"error_count": 0,
})
```
## Event Options
Event options are functions that modify events before sending:
- `WithSource(emitter)` - Add source information (framework, client_id, host)
- `WithAttributes(attrs)` - Add arbitrary attributes
- `WithSpanKind(kind)` - Set the span_kind attribute (`llm`, `tool`, `skill`, `internal`)
- `WithName(name)` - Set the name attribute
- `WithParentSpanID(parentID)` - Set the parent span ID
- `WithPayload(payload)` - Set custom payload
- `WithSeq(seq)` - Set sequence number (for WebSocket mode)
- `WithLLMUsage(model, inTokens, outTokens, costUSD)` - Add LLM usage to run.end or span.end
- `WithErrorDetails(code, retryable)` - Add error details
## Span Kinds
Common span kinds:
- `llm` - LLM API calls
- `tool` - Tool/function calls
- `skill` - Skill execution
- `internal` - Internal operations
## WebSocket Mode
For real-time streaming, enable WebSocket mode:
```go
emitter, err := sdk.NewEmitter(sdk.Config{
ServerURL: "http://localhost:8080",
Framework: "my-agent",
ClientID: "my-client-001",
Host: "localhost",
UseWebSocket: true,
})
```
In WebSocket mode, events are sent immediately rather than buffered.
## Example
See `examples/sdk-example/main.go` for a complete example.
## Testing
Run tests with:
```bash
go test ./internal/sdk/...
```
## License
Same license as the agentmon project.
+351
View File
@@ -0,0 +1,351 @@
// Package sdk provides the agentmon emitter SDK for sending telemetry events.
package sdk
import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
const (
schemaName = "agentmon.event"
schemaVersion = 1
)
// Emitter is the main client for sending agentmon events.
type Emitter struct {
config Config
httpClient *http.Client
wsClient *WSClient
buffer []Event
bufferSize int
mu sync.Mutex
closed bool
}
// Config holds emitter configuration.
type Config struct {
// ServerURL is the base URL of the ingest gateway (e.g., "http://localhost:8080")
ServerURL string
// APIKey is optional authentication key
APIKey string
// Framework is the name of the agent framework (e.g., "opencode", "claude-code")
Framework string
// ClientID is a stable identifier for this emitter instance
ClientID string
// Host is the hostname where events originate
Host string
// BufferSize is the max number of events to buffer before flushing
BufferSize int
// UseWebSocket enables WebSocket streaming mode
UseWebSocket bool
// EnableLogging enables debug logging
EnableLogging bool
}
// Event represents a complete agentmon event.
type Event map[string]any
// WSClient handles WebSocket communication with the ingest gateway.
type WSClient struct {
conn *websocket.Conn
sendChan chan []byte
ackChan chan int
mu sync.Mutex
closed bool
}
// NewWSClient creates a new WebSocket client.
func NewWSClient(url string) (*WSClient, error) {
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return nil, err
}
return &WSClient{
conn: conn,
sendChan: make(chan []byte, 100),
ackChan: make(chan int, 1),
}, nil
}
// Run starts the WebSocket client's main loop.
func (w *WSClient) Run(ctx context.Context) {
defer w.Close()
go w.readMessages()
w.writeMessages()
}
// Send queues an event to be sent via WebSocket.
func (w *WSClient) Send(data []byte) error {
w.mu.Lock()
if w.closed {
w.mu.Unlock()
return fmt.Errorf("WebSocket client is closed")
}
w.mu.Unlock()
select {
case w.sendChan <- data:
return nil
default:
return fmt.Errorf("send buffer full")
}
}
// Close closes the WebSocket connection.
func (w *WSClient) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return nil
}
w.closed = true
if w.conn != nil {
_ = w.conn.Close()
}
close(w.sendChan)
return nil
}
func (w *WSClient) readMessages() {
for {
_, message, err := w.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
log.Printf("WebSocket read error: %v", err)
}
return
}
var ack map[string]any
if err := json.Unmarshal(message, &ack); err != nil {
log.Printf("Failed to unmarshal ack: %v", err)
continue
}
if seq, ok := ack["ack"].(map[string]any)["up_to_seq"].(float64); ok {
select {
case w.ackChan <- int(seq):
default:
}
}
}
}
func (w *WSClient) writeMessages() {
for data := range w.sendChan {
w.mu.Lock()
if w.closed {
w.mu.Unlock()
return
}
err := w.conn.WriteMessage(websocket.TextMessage, data)
w.mu.Unlock()
if err != nil {
log.Printf("WebSocket write error: %v", err)
return
}
}
}
// NewEmitter creates a new emitter with the given configuration.
func NewEmitter(cfg Config) (*Emitter, error) {
if cfg.ServerURL == "" {
return nil, fmt.Errorf("ServerURL is required")
}
if cfg.Framework == "" {
return nil, fmt.Errorf("Framework is required")
}
if cfg.ClientID == "" {
return nil, fmt.Errorf("ClientID is required")
}
if cfg.Host == "" {
cfg.Host = "localhost"
}
if cfg.BufferSize <= 0 {
cfg.BufferSize = 100
}
e := &Emitter{
config: cfg,
httpClient: &http.Client{Timeout: 30 * time.Second},
buffer: make([]Event, 0, cfg.BufferSize),
bufferSize: cfg.BufferSize,
}
if cfg.UseWebSocket {
wsURL := wsURLFromHTTP(cfg.ServerURL)
wsClient, err := NewWSClient(wsURL)
if err != nil {
return nil, fmt.Errorf("failed to create WebSocket client: %w", err)
}
e.wsClient = wsClient
go e.wsClient.Run(context.Background())
}
return e, nil
}
// Emit sends a single event.
func (e *Emitter) Emit(ctx context.Context, event Event) error {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return fmt.Errorf("emitter is closed")
}
if e.config.UseWebSocket {
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}
return e.wsClient.Send(data)
}
e.buffer = append(e.buffer, event)
if len(e.buffer) >= e.bufferSize {
return e.Flush(ctx)
}
return nil
}
// Flush sends all buffered events to the server.
func (e *Emitter) Flush(ctx context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return fmt.Errorf("emitter is closed")
}
if len(e.buffer) == 0 {
return nil
}
if e.config.EnableLogging {
log.Printf("Flushing %d events", len(e.buffer))
}
events := make([]map[string]any, len(e.buffer))
for i, ev := range e.buffer {
events[i] = ev
}
resp, err := e.sendEvents(ctx, events)
if err != nil {
return fmt.Errorf("failed to send events: %w", err)
}
e.buffer = e.buffer[:0]
if resp.Rejected > 0 && e.config.EnableLogging {
log.Printf("Rejected %d events", resp.Rejected)
if len(resp.Errors) > 0 {
log.Printf("Errors: %v", resp.Errors)
}
}
return nil
}
// Close flushes any remaining events and closes the emitter.
func (e *Emitter) Close(ctx context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return nil
}
e.closed = true
if e.wsClient != nil {
_ = e.wsClient.Close()
}
if len(e.buffer) > 0 {
_ = e.Flush(ctx)
}
return nil
}
type sendResponse struct {
Accepted int `json:"accepted"`
Rejected int `json:"rejected"`
Errors []struct {
Error string `json:"error"`
} `json:"errors,omitempty"`
}
func (e *Emitter) sendEvents(ctx context.Context, events []map[string]any) (*sendResponse, error) {
body, err := json.Marshal(events)
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, "POST", e.config.ServerURL+"/v1/events", bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
if e.config.APIKey != "" {
req.Header.Set("Authorization", "Bearer "+e.config.APIKey)
}
resp, err := e.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
var result sendResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
return &result, nil
}
func wsURLFromHTTP(httpURL string) string {
switch {
case len(httpURL) >= 8 && httpURL[:8] == "https://":
return "wss://" + httpURL[8:] + "/v1/ws"
case len(httpURL) >= 7 && httpURL[:7] == "http://":
return "ws://" + httpURL[7:] + "/v1/ws"
default:
return httpURL + "/v1/ws"
}
}
// generateID creates a new UUID-like identifier.
func generateID() string {
b := make([]byte, 16)
if _, err := rand.Read(b); err != nil {
log.Printf("Failed to generate random ID: %v", err)
return fmt.Sprintf("%d", time.Now().UnixNano())
}
return hex.EncodeToString(b)
}
+348
View File
@@ -0,0 +1,348 @@
package sdk
import (
"context"
"testing"
)
func TestNewEmitter(t *testing.T) {
tests := []struct {
name string
config Config
wantErr bool
}{
{
name: "valid config",
config: Config{
ServerURL: "http://localhost:8080",
Framework: "test-framework",
ClientID: "test-client",
Host: "test-host",
},
wantErr: false,
},
{
name: "missing server URL",
config: Config{
Framework: "test-framework",
ClientID: "test-client",
},
wantErr: true,
},
{
name: "missing framework",
config: Config{
ServerURL: "http://localhost:8080",
ClientID: "test-client",
},
wantErr: true,
},
{
name: "missing client ID",
config: Config{
ServerURL: "http://localhost:8080",
Framework: "test-framework",
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
emitter, err := NewEmitter(tt.config)
if (err != nil) != tt.wantErr {
t.Errorf("NewEmitter() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !tt.wantErr {
_ = emitter.Close(context.Background())
}
})
}
}
func TestGenerateID(t *testing.T) {
id1 := generateID()
id2 := generateID()
if id1 == id2 {
t.Errorf("generateID() should produce unique IDs, got duplicate: %s", id1)
}
if len(id1) == 0 {
t.Error("generateID() should return a non-empty string")
}
}
func TestNewSessionStart(t *testing.T) {
sessionID := "test-session-001"
event := NewSessionStart(sessionID)
if event["event"].(map[string]any)["type"] != "session.start" {
t.Error("NewSessionStart() should create session.start event")
}
if event["correlation"].(map[string]any)["session_id"] != sessionID {
t.Error("NewSessionStart() should set session_id in correlation")
}
if event["schema"].(map[string]any)["name"] != schemaName {
t.Error("NewSessionStart() should set correct schema name")
}
}
func TestNewRunStart(t *testing.T) {
sessionID := "test-session-001"
runID := "test-run-001"
event := NewRunStart(sessionID, runID)
if event["event"].(map[string]any)["type"] != "run.start" {
t.Error("NewRunStart() should create run.start event")
}
if event["correlation"].(map[string]any)["session_id"] != sessionID {
t.Error("NewRunStart() should set session_id in correlation")
}
if event["correlation"].(map[string]any)["run_id"] != runID {
t.Error("NewRunStart() should set run_id in correlation")
}
}
func TestNewSpanStart(t *testing.T) {
sessionID := "test-session-001"
runID := "test-run-001"
traceID := "test-trace-001"
spanID := "test-span-001"
event := NewSpanStart(sessionID, runID, traceID, spanID)
if event["event"].(map[string]any)["type"] != "span.start" {
t.Error("NewSpanStart() should create span.start event")
}
if event["correlation"].(map[string]any)["span_id"] != spanID {
t.Error("NewSpanStart() should set span_id in correlation")
}
if event["correlation"].(map[string]any)["trace_id"] != traceID {
t.Error("NewSpanStart() should set trace_id in correlation")
}
}
func TestNewRunEnd(t *testing.T) {
sessionID := "test-session-001"
runID := "test-run-001"
status := "success"
durationMs := int64(60000)
event := NewRunEnd(sessionID, runID, status, durationMs)
if event["event"].(map[string]any)["type"] != "run.end" {
t.Error("NewRunEnd() should create run.end event")
}
payload := event["payload"].(map[string]any)
if payload["status"] != status {
t.Errorf("NewRunEnd() should set status to %s", status)
}
if payload["duration_ms"] != durationMs {
t.Errorf("NewRunEnd() should set duration_ms to %d", durationMs)
}
}
func TestNewSpanEnd(t *testing.T) {
sessionID := "test-session-001"
runID := "test-run-001"
traceID := "test-trace-001"
spanID := "test-span-001"
status := "success"
durationMs := int64(1000)
event := NewSpanEnd(sessionID, runID, traceID, spanID, status, durationMs)
if event["event"].(map[string]any)["type"] != "span.end" {
t.Error("NewSpanEnd() should create span.end event")
}
payload := event["payload"].(map[string]any)
if payload["status"] != status {
t.Errorf("NewSpanEnd() should set status to %s", status)
}
if payload["duration_ms"] != durationMs {
t.Errorf("NewSpanEnd() should set duration_ms to %d", durationMs)
}
}
func TestNewError(t *testing.T) {
sessionID := "test-session-001"
runID := "test-run-001"
traceID := "test-trace-001"
spanID := "test-span-001"
errType := "validation"
message := "invalid input"
event := NewError(sessionID, runID, traceID, spanID, errType, message)
if event["event"].(map[string]any)["type"] != "error" {
t.Error("NewError() should create error event")
}
payload := event["payload"].(map[string]any)
err := payload["error"].(map[string]any)
if err["type"] != errType {
t.Errorf("NewError() should set error type to %s", errType)
}
if err["message"] != message {
t.Errorf("NewError() should set error message to %s", message)
}
}
func TestNewMetricSnapshot(t *testing.T) {
sessionID := "test-session-001"
runID := "test-run-001"
metrics := map[string]any{
"tokens_in": 1000.0,
"tokens_out": 500.0,
"cost_usd": 0.002,
}
event := NewMetricSnapshot(sessionID, runID, metrics)
if event["event"].(map[string]any)["type"] != "metric.snapshot" {
t.Error("NewMetricSnapshot() should create metric.snapshot event")
}
payload := event["payload"].(map[string]any)
payloadMetrics := payload["metrics"].(map[string]any)
if len(payloadMetrics) != len(metrics) {
t.Error("NewMetricSnapshot() should include all metrics")
}
}
func TestEventOptions(t *testing.T) {
emitter, err := NewEmitter(Config{
ServerURL: "http://localhost:8080",
Framework: "test-framework",
ClientID: "test-client",
Host: "test-host",
})
if err != nil {
t.Fatalf("NewEmitter() error = %v", err)
}
defer emitter.Close(context.Background())
sessionID := "test-session-001"
event := NewSessionStart(sessionID,
WithSource(emitter),
WithAttributes(map[string]any{"cwd": "/home/user"}),
WithSeq(1),
)
if _, ok := event["event"].(map[string]any)["source"]; !ok {
t.Error("WithSource() should set source")
}
if _, ok := event["attributes"]; !ok {
t.Error("WithAttributes() should set attributes")
}
if event["event"].(map[string]any)["seq"] != 1 {
t.Error("WithSeq() should set sequence number")
}
}
func TestWithSpanKind(t *testing.T) {
sessionID := "test-session-001"
runID := "test-run-001"
traceID := "test-trace-001"
spanID := "test-span-001"
event := NewSpanStart(sessionID, runID, traceID, spanID, WithSpanKind("tool"))
attrs := event["attributes"].(map[string]any)
if attrs["span_kind"] != "tool" {
t.Error("WithSpanKind() should set span_kind attribute")
}
}
func TestWithLLMUsage(t *testing.T) {
sessionID := "test-session-001"
runID := "test-run-001"
event := NewRunEnd(sessionID, runID, "success", 60000,
WithLLMUsage("claude-3-opus", 1000, 500, 0.015),
)
payload := event["payload"].(map[string]any)
llm := payload["llm"].(map[string]any)
if llm["model"] != "claude-3-opus" {
t.Error("WithLLMUsage() should set model")
}
usage := llm["usage"].(map[string]any)
if usage["input_tokens"] != 1000 {
t.Error("WithLLMUsage() should set input_tokens")
}
if usage["output_tokens"] != 500 {
t.Error("WithLLMUsage() should set output_tokens")
}
cost := llm["cost"].(map[string]any)
if cost["total_usd"] != 0.015 {
t.Error("WithLLMUsage() should set total_usd")
}
}
func TestEmit(t *testing.T) {
emitter, err := NewEmitter(Config{
ServerURL: "http://localhost:9999",
Framework: "test-framework",
ClientID: "test-client",
Host: "test-host",
BufferSize: 10,
})
if err != nil {
t.Fatalf("NewEmitter() error = %v", err)
}
sessionID := "test-session-001"
event := NewSessionStart(sessionID, WithSource(emitter))
ctx := context.Background()
err = emitter.Emit(ctx, event)
if err != nil {
t.Errorf("Emit() error = %v", err)
}
err = emitter.Emit(ctx, NewSessionStart(sessionID+"-2", WithSource(emitter)))
if err != nil {
t.Errorf("Emit() error = %v", err)
}
emitter.mu.Lock()
buffered := len(emitter.buffer)
emitter.mu.Unlock()
if buffered != 2 {
t.Errorf("Expected 2 buffered events, got %d", buffered)
}
}
func TestWsURLFromHTTP(t *testing.T) {
tests := []struct {
httpURL string
want string
}{
{"http://localhost:8080", "ws://localhost:8080/v1/ws"},
{"https://example.com", "wss://example.com/v1/ws"},
{"http://example.com:8080", "ws://example.com:8080/v1/ws"},
}
for _, tt := range tests {
t.Run(tt.httpURL, func(t *testing.T) {
if got := wsURLFromHTTP(tt.httpURL); got != tt.want {
t.Errorf("wsURLFromHTTP() = %v, want %v", got, tt.want)
}
})
}
}
+332
View File
@@ -0,0 +1,332 @@
package sdk
import (
"time"
)
// NewSessionStart creates a session.start event.
func NewSessionStart(sessionID string, opts ...EventOption) Event {
now := time.Now()
event := map[string]any{
"schema": map[string]any{
"name": schemaName,
"version": schemaVersion,
},
"event": map[string]any{
"id": generateID(),
"type": "session.start",
"ts": now.UTC().Format(time.RFC3339Nano),
},
"correlation": map[string]any{
"session_id": sessionID,
},
}
for _, opt := range opts {
opt(event)
}
return event
}
// NewSessionEnd creates a session.end event.
func NewSessionEnd(sessionID string, opts ...EventOption) Event {
now := time.Now()
event := map[string]any{
"schema": map[string]any{
"name": schemaName,
"version": schemaVersion,
},
"event": map[string]any{
"id": generateID(),
"type": "session.end",
"ts": now.UTC().Format(time.RFC3339Nano),
},
"correlation": map[string]any{
"session_id": sessionID,
},
}
for _, opt := range opts {
opt(event)
}
return event
}
// NewRunStart creates a run.start event.
func NewRunStart(sessionID, runID string, opts ...EventOption) Event {
now := time.Now()
event := map[string]any{
"schema": map[string]any{
"name": schemaName,
"version": schemaVersion,
},
"event": map[string]any{
"id": generateID(),
"type": "run.start",
"ts": now.UTC().Format(time.RFC3339Nano),
},
"correlation": map[string]any{
"session_id": sessionID,
"run_id": runID,
},
}
for _, opt := range opts {
opt(event)
}
return event
}
// NewRunEnd creates a run.end event.
func NewRunEnd(sessionID, runID string, status string, durationMs int64, opts ...EventOption) Event {
now := time.Now()
event := map[string]any{
"schema": map[string]any{
"name": schemaName,
"version": schemaVersion,
},
"event": map[string]any{
"id": generateID(),
"type": "run.end",
"ts": now.UTC().Format(time.RFC3339Nano),
},
"correlation": map[string]any{
"session_id": sessionID,
"run_id": runID,
},
"payload": map[string]any{
"status": status,
"duration_ms": durationMs,
},
}
for _, opt := range opts {
opt(event)
}
return event
}
// NewSpanStart creates a span.start event.
func NewSpanStart(sessionID, runID, traceID, spanID string, opts ...EventOption) Event {
now := time.Now()
event := map[string]any{
"schema": map[string]any{
"name": schemaName,
"version": schemaVersion,
},
"event": map[string]any{
"id": generateID(),
"type": "span.start",
"ts": now.UTC().Format(time.RFC3339Nano),
},
"correlation": map[string]any{
"session_id": sessionID,
"run_id": runID,
"trace_id": traceID,
"span_id": spanID,
},
}
for _, opt := range opts {
opt(event)
}
return event
}
// NewSpanEnd creates a span.end event.
func NewSpanEnd(sessionID, runID, traceID, spanID string, status string, durationMs int64, opts ...EventOption) Event {
now := time.Now()
event := map[string]any{
"schema": map[string]any{
"name": schemaName,
"version": schemaVersion,
},
"event": map[string]any{
"id": generateID(),
"type": "span.end",
"ts": now.UTC().Format(time.RFC3339Nano),
},
"correlation": map[string]any{
"session_id": sessionID,
"run_id": runID,
"trace_id": traceID,
"span_id": spanID,
},
"payload": map[string]any{
"status": status,
"duration_ms": durationMs,
},
}
for _, opt := range opts {
opt(event)
}
return event
}
// NewError creates an error event.
func NewError(sessionID, runID, traceID, spanID string, errType, message string, opts ...EventOption) Event {
now := time.Now()
event := map[string]any{
"schema": map[string]any{
"name": schemaName,
"version": schemaVersion,
},
"event": map[string]any{
"id": generateID(),
"type": "error",
"ts": now.UTC().Format(time.RFC3339Nano),
},
"correlation": map[string]any{
"session_id": sessionID,
"run_id": runID,
"trace_id": traceID,
"span_id": spanID,
},
"payload": map[string]any{
"error": map[string]any{
"type": errType,
"message": message,
},
},
}
for _, opt := range opts {
opt(event)
}
return event
}
// NewMetricSnapshot creates a metric.snapshot event.
func NewMetricSnapshot(sessionID, runID string, metrics map[string]any, opts ...EventOption) Event {
now := time.Now()
event := map[string]any{
"schema": map[string]any{
"name": schemaName,
"version": schemaVersion,
},
"event": map[string]any{
"id": generateID(),
"type": "metric.snapshot",
"ts": now.UTC().Format(time.RFC3339Nano),
},
"correlation": map[string]any{
"session_id": sessionID,
"run_id": runID,
},
"payload": map[string]any{
"metrics": metrics,
},
}
for _, opt := range opts {
opt(event)
}
return event
}
// EventOption is a function that modifies an event.
type EventOption func(Event)
// WithSource sets the source information on an event.
func WithSource(emitter *Emitter) EventOption {
return func(e Event) {
if event, ok := e["event"].(map[string]any); ok {
event["source"] = map[string]any{
"framework": emitter.config.Framework,
"client_id": emitter.config.ClientID,
"host": emitter.config.Host,
}
}
}
}
// WithAttributes adds attributes to an event.
func WithAttributes(attrs map[string]any) EventOption {
return func(e Event) {
if _, ok := e["attributes"]; !ok {
e["attributes"] = make(map[string]any)
}
if attrsMap, ok := e["attributes"].(map[string]any); ok {
for k, v := range attrs {
attrsMap[k] = v
}
}
}
}
// WithSpanKind sets the span_kind attribute.
func WithSpanKind(kind string) EventOption {
return WithAttributes(map[string]any{"span_kind": kind})
}
// WithName sets the name attribute.
func WithName(name string) EventOption {
return WithAttributes(map[string]any{"name": name})
}
// WithParentSpanID sets the parent_span_id in correlation.
func WithParentSpanID(parentID string) EventOption {
return func(e Event) {
if corr, ok := e["correlation"].(map[string]any); ok {
corr["parent_span_id"] = parentID
}
}
}
// WithPayload sets the payload on an event.
func WithPayload(payload map[string]any) EventOption {
return func(e Event) {
e["payload"] = payload
}
}
// WithSeq sets the sequence number on an event.
func WithSeq(seq int) EventOption {
return func(e Event) {
if event, ok := e["event"].(map[string]any); ok {
event["seq"] = seq
}
}
}
// WithLLMUsage adds LLM usage information to a span.end or run.end payload.
func WithLLMUsage(model string, inputTokens, outputTokens int, costUSD float64) EventOption {
return func(e Event) {
if payload, ok := e["payload"].(map[string]any); ok {
if _, ok := payload["llm"]; !ok {
payload["llm"] = make(map[string]any)
}
if llm, ok := payload["llm"].(map[string]any); ok {
llm["model"] = model
llm["usage"] = map[string]any{
"input_tokens": inputTokens,
"output_tokens": outputTokens,
}
llm["cost"] = map[string]any{
"total_usd": costUSD,
}
}
}
}
}
// WithErrorDetails adds error details to a payload.
func WithErrorDetails(code string, retryable bool) EventOption {
return func(e Event) {
if payload, ok := e["payload"].(map[string]any); ok {
if err, ok := payload["error"].(map[string]any); ok {
err["code"] = code
err["retryable"] = retryable
}
}
}
}
+32 -15
View File
@@ -3,6 +3,7 @@ package postgres
import (
"context"
"encoding/json"
"fmt"
"time"
)
@@ -13,20 +14,39 @@ type EventRow struct {
Payload json.RawMessage `json:"payload"`
}
func (d *DB) ListRecentEvents(ctx context.Context, limit int) ([]EventRow, error) {
if limit <= 0 {
limit = 100
type EventsFilter struct {
Limit int
EventType string
Framework string
}
func (d *DB) ListRecentEvents(ctx context.Context, f EventsFilter) ([]EventRow, error) {
if f.Limit <= 0 {
f.Limit = 100
}
if limit > 1000 {
limit = 1000
if f.Limit > 1000 {
f.Limit = 1000
}
rows, err := d.sql.QueryContext(ctx, `
select event_id, ts, type, payload
from events
order by ts desc
limit $1
`, limit)
query := "SELECT event_id, ts, type, payload FROM events WHERE 1=1"
args := []any{}
argN := 1
if f.EventType != "" {
query += fmt.Sprintf(" AND type = $%d", argN)
args = append(args, f.EventType)
argN++
}
if f.Framework != "" {
query += fmt.Sprintf(" AND source_framework = $%d", argN)
args = append(args, f.Framework)
argN++
}
query += fmt.Sprintf(" ORDER BY ts DESC LIMIT $%d", argN)
args = append(args, f.Limit)
rows, err := d.sql.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
@@ -40,8 +60,5 @@ limit $1
}
out = append(out, r)
}
if err := rows.Err(); err != nil {
return nil, err
}
return out, nil
return out, rows.Err()
}