From 7c043b78a43d90223c60d0ae191a316b9848d66d Mon Sep 17 00:00:00 2001 From: William Valentin Date: Wed, 18 Mar 2026 10:12:18 -0700 Subject: [PATCH] feat: add swarm-monitor binary --- cmd/swarm-monitor/main.go | 126 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 cmd/swarm-monitor/main.go diff --git a/cmd/swarm-monitor/main.go b/cmd/swarm-monitor/main.go new file mode 100644 index 0000000..e812a54 --- /dev/null +++ b/cmd/swarm-monitor/main.go @@ -0,0 +1,126 @@ +package main + +import ( + "context" + "encoding/json" + "log" + "os" + "time" + + "agentmon/internal/monitor/swarm" + qnats "agentmon/internal/queue/nats" +) + +func main() { + natsURL := envDefault("NATS_URL", "nats://nats:4222") + natsTopic := envDefault("NATS_TOPIC", "agentmon.events.v1") + interval := envDefault("POLL_INTERVAL", "30s") + litellmBase := envDefault("LITELLM_BASE_URL", "http://localhost:18804") + litellmKey := os.Getenv("LITELLM_MASTER_KEY") + + pub, err := qnats.NewPublisher(natsURL, natsTopic) + if err != nil { + log.Fatalf("failed to connect to NATS: %v", err) + } + defer pub.Close() + + pollDuration, err := time.ParseDuration(interval) + if err != nil { + log.Fatalf("invalid poll interval: %v", err) + } + + cfg := swarm.Config{ + LiteLLMBaseURL: litellmBase, + LiteLLMAPIKey: litellmKey, + HTTPTimeout: 5 * time.Second, + } + + ticker := time.NewTicker(pollDuration) + defer ticker.Stop() + + ctx := context.Background() + log.Printf("swarm-monitor started, polling every %s", pollDuration) + + // Poll immediately on start. + if err := poll(ctx, pub, cfg); err != nil { + log.Printf("initial poll error: %v", err) + } + + for range ticker.C { + if err := poll(ctx, pub, cfg); err != nil { + log.Printf("poll error: %v", err) + } + } +} + +func poll(ctx context.Context, pub *qnats.Publisher, cfg swarm.Config) error { + services, err := swarm.CollectAll(ctx, cfg) + if err != nil { + return err + } + + issues := swarm.DetectIssues(services) + now := time.Now().UTC() + + // Emit rolled-up swarm.snapshot. + if err := emit(ctx, pub, "swarm.snapshot", "agentmon.swarm", map[string]any{ + "services": services, + "issues": issues, + }, now); err != nil { + log.Printf("failed to emit swarm.snapshot: %v", err) + } + + // Emit one swarm.service.snapshot per service. + for _, svc := range services { + if err := emit(ctx, pub, "swarm.service.snapshot", "agentmon.swarm.service", map[string]any{ + "service": svc, + }, now); err != nil { + log.Printf("failed to emit swarm.service.snapshot for %s: %v", svc.Name, err) + } + } + + return nil +} + +func emit(ctx context.Context, pub *qnats.Publisher, eventType, schemaName string, payload map[string]any, ts time.Time) error { + event := map[string]any{ + "schema": map[string]any{ + "name": schemaName, + "version": 1, + }, + "event": map[string]any{ + "id": generateID(), + "type": eventType, + "ts": ts.Format(time.RFC3339Nano), + }, + "payload": payload, + } + + data, err := json.Marshal(event) + if err != nil { + return err + } + + return pub.Publish(ctx, data) +} + +func generateID() string { + return time.Now().Format("20060102150405") + "-" + randomString(8) +} + +func randomString(n int) string { + const chars = "abcdefghijklmnopqrstuvwxyz0123456789" + b := make([]byte, n) + for i := range b { + b[i] = chars[time.Now().Nanosecond()%len(chars)] + time.Sleep(time.Nanosecond) + } + return string(b) +} + +func envDefault(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +}