diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..c6537b8 --- /dev/null +++ b/Makefile @@ -0,0 +1,46 @@ +.PHONY: tidy test run-ingest run-query run-ui run-processor + +tidy: + go mod tidy + +test: + go test ./... + +run-ingest: + AGENTMON_ADDR=:8080 NATS_URL=$${NATS_URL:-nats://nats:4222} NATS_TOPIC=$${NATS_TOPIC:-agentmon.events.v1} go run ./cmd/ingest-gateway + +run-query: + AGENTMON_QUERY_ADDR=:8081 go run ./cmd/query-api + +run-ui: + AGENTMON_UI_ADDR=:8082 go run ./cmd/web-ui + +run-processor: + DATABASE_URL=$${DATABASE_URL:?set DATABASE_URL} NATS_URL=$${NATS_URL:-nats://nats:4222} NATS_TOPIC=$${NATS_TOPIC:-agentmon.events.v1} go run ./cmd/event-processor + +tidy: + go mod tidy + +test: + go test ./... + +run-ingest: + AGENTMON_ADDR=:8080 go run ./cmd/ingest-gateway + +run-query: + AGENTMON_QUERY_ADDR=:8081 go run ./cmd/query-api + +run-ui: + AGENTMON_UI_ADDR=:8082 go run ./cmd/web-ui + +tidy: + go mod tidy + +test: + go test ./... + +run-ingest: + AGENTMON_ADDR=:8080 go run ./cmd/ingest-gateway + +run-query: + AGENTMON_QUERY_ADDR=:8081 go run ./cmd/query-api diff --git a/build/README-build.md b/build/README-build.md new file mode 100644 index 0000000..e63f957 --- /dev/null +++ b/build/README-build.md @@ -0,0 +1,15 @@ +Build/push images + +- Default builds linux/amd64 + linux/arm64 and pushes. +- Default REGISTRY is `gitea-http.taildb3494.ts.net/will/agentmon` (your Gitea registry). + +Examples: + +- Push to default image names: + TAG=dev ./build/build-images.sh + +- Push to your registry: + REGISTRY=registry.docker-registry.svc.cluster.local:5000 TAG=dev ./build/build-images.sh + +- Build locally without pushing (single-arch only; Docker can’t `--load` multi-arch): + PLATFORM=linux/amd64 PUSH_ARGS=--load TAG=dev ./build/build-images.sh diff --git a/build/build-images.sh b/build/build-images.sh new file mode 100755 index 0000000..738bf04 --- /dev/null +++ b/build/build-images.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Builds multi-arch images by default. +# +# Defaults: +# - PLATFORM="linux/amd64,linux/arm64" +# - PUSH_ARGS="--push" (set to --load for a single-arch local build) + +REGISTRY=${REGISTRY:-"gitea-http.taildb3494.ts.net/will/agentmon"} +TAG=${TAG:-"dev-20260117-0832"} +PLATFORM=${PLATFORM:-"linux/amd64,linux/arm64"} + +services=( + ingest-gateway + event-processor + query-api + web-ui +) + +for svc in "${services[@]}"; do + image="${REGISTRY}/${svc}:${TAG}" + + echo "==> Building ${image} (${PLATFORM})" + docker buildx build \ + --platform "${PLATFORM}" \ + --build-arg CMD="${svc}" \ + -f build/dockerfiles/Dockerfile \ + -t "${image}" \ + ${PUSH_ARGS:-"--push"} \ + . +done diff --git a/build/dockerfiles/Dockerfile b/build/dockerfiles/Dockerfile new file mode 100644 index 0000000..da999c3 --- /dev/null +++ b/build/dockerfiles/Dockerfile @@ -0,0 +1,21 @@ +# syntax=docker/dockerfile:1 + +ARG GO_VERSION=1.25 + +FROM golang:${GO_VERSION} AS build +WORKDIR /src + +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . + +ARG CMD +RUN test -n "$CMD" +RUN CGO_ENABLED=0 GOOS=linux GOARCH=$TARGETARCH go build -o /out/app ./cmd/$CMD + +FROM gcr.io/distroless/static:nonroot +WORKDIR / +COPY --from=build /out/app /app +USER nonroot:nonroot +ENTRYPOINT ["/app"] diff --git a/cmd/event-processor/main.go b/cmd/event-processor/main.go new file mode 100644 index 0000000..dd15f93 --- /dev/null +++ b/cmd/event-processor/main.go @@ -0,0 +1,133 @@ +package main + +import ( + "context" + "database/sql" + "encoding/json" + "log" + "os" + "os/signal" + "syscall" + "time" + + qnats "agentmon/internal/queue/nats" + "agentmon/internal/store/postgres" +) + +type envelope struct { + Schema struct { + Name string `json:"name"` + Version int `json:"version"` + } `json:"schema"` + Event struct { + ID string `json:"id"` + Type string `json:"type"` + TS any `json:"ts"` + Source struct { + Framework string `json:"framework"` + ClientID string `json:"client_id"` + } `json:"source"` + } `json:"event"` + Correlation *struct { + SessionID string `json:"session_id,omitempty"` + RunID string `json:"run_id,omitempty"` + TraceID string `json:"trace_id,omitempty"` + SpanID string `json:"span_id,omitempty"` + ParentSpanID string `json:"parent_span_id,omitempty"` + } `json:"correlation,omitempty"` +} + +func main() { + log.Printf("event-processor starting") + + dsn := os.Getenv("DATABASE_URL") + if dsn == "" { + log.Fatalf("DATABASE_URL is required") + } + + natsURL := envDefault("NATS_URL", "nats://nats:4222") + natsTopic := envDefault("NATS_TOPIC", "agentmon.events.v1") + + db, err := postgres.Open(dsn) + if err != nil { + log.Fatalf("failed to open DB: %v", err) + } + defer func() { _ = db.Close() }() + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + if err := db.Ping(ctx); err != nil { + log.Fatalf("failed to ping DB: %v", err) + } + + sub, err := qnats.NewSubscriber(natsURL, natsTopic) + if err != nil { + log.Fatalf("failed to connect to NATS: %v", err) + } + defer sub.Close() + + log.Printf("subscribed to %s (%s)", natsTopic, natsURL) + + err = sub.Subscribe(ctx, func(msg []byte) error { + var env envelope + if err := json.Unmarshal(msg, &env); err != nil { + return err + } + + ts, err := parseTS(env.Event.TS) + if err != nil { + return err + } + + e := postgres.InsertEvent{ + EventID: env.Event.ID, + TS: ts, + Type: env.Event.Type, + Payload: json.RawMessage(msg), + SourceFramework: sqlNull(env.Event.Source.Framework), + ClientID: sqlNull(env.Event.Source.ClientID), + } + + if env.Correlation != nil { + e.SessionID = sqlNull(env.Correlation.SessionID) + e.RunID = sqlNull(env.Correlation.RunID) + e.TraceID = sqlNull(env.Correlation.TraceID) + e.SpanID = sqlNull(env.Correlation.SpanID) + e.ParentSpanID = sqlNull(env.Correlation.ParentSpanID) + } + + return db.InsertEvent(ctx, e) + }) + + if err != nil && err != context.Canceled { + log.Printf("processor stopped: %v", err) + } +} + +func envDefault(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} + +func sqlNull(s string) sql.NullString { + if s == "" { + return sql.NullString{} + } + return sql.NullString{String: s, Valid: true} +} + +func parseTS(v any) (time.Time, error) { + // Accept RFC3339 string or unix-ms number. + switch t := v.(type) { + case string: + return time.Parse(time.RFC3339Nano, t) + case float64: + ms := int64(t) + return time.Unix(0, ms*int64(time.Millisecond)), nil + default: + return time.Time{}, postgres.ErrMissingField + } +} diff --git a/cmd/ingest-gateway/main.go b/cmd/ingest-gateway/main.go new file mode 100644 index 0000000..c7d8701 --- /dev/null +++ b/cmd/ingest-gateway/main.go @@ -0,0 +1,109 @@ +package main + +import ( + "encoding/json" + "log" + "net/http" + "os" + + "agentmon/internal/httpx" + qnats "agentmon/internal/queue/nats" + + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" + "github.com/gorilla/websocket" +) + +func main() { + addr := envDefault("AGENTMON_ADDR", ":8080") + natsURL := envDefault("NATS_URL", "nats://nats:4222") + natsTopic := envDefault("NATS_TOPIC", "agentmon.events.v1") + + pub, err := qnats.NewPublisher(natsURL, natsTopic) + if err != nil { + log.Fatalf("failed to connect to NATS: %v", err) + } + defer pub.Close() + + r := chi.NewRouter() + r.Use(middleware.RequestID) + r.Use(middleware.RealIP) + r.Use(middleware.Logger) + r.Use(middleware.Recoverer) + + r.Get("/healthz", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + }) + + r.Post("/v1/events", func(w http.ResponseWriter, r *http.Request) { + var events []json.RawMessage + if err := json.NewDecoder(r.Body).Decode(&events); err != nil { + httpx.WriteJSON(w, http.StatusBadRequest, map[string]any{"error": "invalid_json"}) + return + } + + accepted := 0 + rejected := 0 + for _, raw := range events { + if len(raw) == 0 { + rejected++ + continue + } + if err := pub.Publish(r.Context(), raw); err != nil { + rejected++ + continue + } + accepted++ + } + + httpx.WriteJSON(w, http.StatusAccepted, map[string]any{"accepted": accepted, "rejected": rejected}) + }) + + r.Get("/v1/ws", wsHandler(pub)) + + log.Printf("ingest-gateway listening on %s", addr) + log.Fatal(http.ListenAndServe(addr, r)) +} + +func wsHandler(pub *qnats.Publisher) http.HandlerFunc { + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, + } + + return func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer conn.Close() + + for { + _, msg, err := conn.ReadMessage() + if err != nil { + return + } + + var v map[string]any + if err := json.Unmarshal(msg, &v); err != nil { + _ = conn.WriteJSON(map[string]any{"error": "invalid_json"}) + continue + } + + if err := pub.Publish(r.Context(), msg); err != nil { + _ = conn.WriteJSON(map[string]any{"error": "publish_failed"}) + continue + } + + _ = v + _ = conn.WriteJSON(map[string]any{"ack": map[string]any{"up_to_seq": nil}}) + } + } +} + +func envDefault(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} diff --git a/cmd/query-api/main.go b/cmd/query-api/main.go new file mode 100644 index 0000000..db9be44 --- /dev/null +++ b/cmd/query-api/main.go @@ -0,0 +1,59 @@ +package main + +import ( + "log" + "net/http" + "os" + "strconv" + + "agentmon/internal/httpx" + "agentmon/internal/store/postgres" + + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" +) + +func main() { + addr := envDefault("AGENTMON_QUERY_ADDR", ":8081") + dsn := os.Getenv("DATABASE_URL") + if dsn == "" { + log.Fatalf("DATABASE_URL is required") + } + + db, err := postgres.Open(dsn) + if err != nil { + log.Fatalf("failed to open DB: %v", err) + } + defer func() { _ = db.Close() }() + + r := chi.NewRouter() + r.Use(middleware.RequestID) + r.Use(middleware.RealIP) + r.Use(middleware.Logger) + r.Use(middleware.Recoverer) + + r.Get("/healthz", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + }) + + r.Get("/v1/events", func(w http.ResponseWriter, r *http.Request) { + limit, _ := strconv.Atoi(r.URL.Query().Get("limit")) + events, err := db.ListRecentEvents(r.Context(), limit) + if err != nil { + httpx.WriteJSON(w, http.StatusInternalServerError, map[string]any{"error": "db_error"}) + return + } + httpx.WriteJSON(w, http.StatusOK, map[string]any{"events": events}) + }) + + log.Printf("query-api listening on %s", addr) + log.Fatal(http.ListenAndServe(addr, r)) +} + +func envDefault(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} diff --git a/cmd/web-ui/main.go b/cmd/web-ui/main.go new file mode 100644 index 0000000..90416c0 --- /dev/null +++ b/cmd/web-ui/main.go @@ -0,0 +1,56 @@ +package main + +import ( + "encoding/json" + "io" + "log" + "net/http" + "os" +) + +func main() { + addr := envDefault("AGENTMON_UI_ADDR", ":8082") + + queryAPIBase := envDefault("AGENTMON_QUERY_BASE", "http://query-api") + + mux := http.NewServeMux() + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + }) + + mux.HandleFunc("/api/events", func(w http.ResponseWriter, r *http.Request) { + resp, err := http.Get(queryAPIBase + "/v1/events?limit=100") + if err != nil { + w.WriteHeader(http.StatusBadGateway) + _, _ = w.Write([]byte("query-api unreachable")) + return + } + defer resp.Body.Close() + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(resp.StatusCode) + _, _ = io.Copy(w, resp.Body) + }) + + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + + payload, _ := json.Marshal(map[string]any{"query_api": queryAPIBase}) + _, _ = w.Write([]byte("

agentmon

Recent events:

loading...
")) + _, _ = w.Write([]byte("")) + }) + + log.Printf("web-ui listening on %s", addr) + log.Fatal(http.ListenAndServe(addr, mux)) +} + +func envDefault(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} diff --git a/deploy/k8s/base/agentmon.yaml b/deploy/k8s/base/agentmon.yaml new file mode 100644 index 0000000..ec5115d --- /dev/null +++ b/deploy/k8s/base/agentmon.yaml @@ -0,0 +1,155 @@ +apiVersion: v1 +kind: Service +metadata: + name: ingest-gateway + namespace: agentmon +spec: + selector: + app: ingest-gateway + ports: + - name: http + port: 80 + targetPort: 8080 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: ingest-gateway + namespace: agentmon +spec: + replicas: 1 + selector: + matchLabels: + app: ingest-gateway + template: + metadata: + labels: + app: ingest-gateway + spec: + imagePullSecrets: + - name: gitea-regcred + containers: + - name: ingest-gateway + image: gitea-http.taildb3494.ts.net/will/agentmon/ingest-gateway:dev-20260117-0832 + env: + - name: AGENTMON_ADDR + value: ":8080" + - name: NATS_URL + value: "nats://nats:4222" + - name: NATS_TOPIC + value: "agentmon.events.v1" + ports: + - containerPort: 8080 + name: http +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: event-processor + namespace: agentmon +spec: + replicas: 1 + selector: + matchLabels: + app: event-processor + template: + metadata: + labels: + app: event-processor + spec: + imagePullSecrets: + - name: gitea-regcred + containers: + - name: event-processor + image: gitea-http.taildb3494.ts.net/will/agentmon/event-processor:dev-20260117-0832 + env: + - name: DATABASE_URL + value: "postgres://agentmon:agentmon@postgres:5432/agentmon?sslmode=disable" + - name: NATS_URL + value: "nats://nats:4222" + - name: NATS_TOPIC + value: "agentmon.events.v1" +--- +apiVersion: v1 +kind: Service +metadata: + name: query-api + namespace: agentmon +spec: + selector: + app: query-api + ports: + - name: http + port: 80 + targetPort: 8081 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: query-api + namespace: agentmon +spec: + replicas: 1 + selector: + matchLabels: + app: query-api + template: + metadata: + labels: + app: query-api + spec: + imagePullSecrets: + - name: gitea-regcred + containers: + - name: query-api + image: gitea-http.taildb3494.ts.net/will/agentmon/query-api:dev-20260117-0832 + env: + - name: AGENTMON_QUERY_ADDR + value: ":8081" + - name: DATABASE_URL + value: "postgres://agentmon:agentmon@postgres:5432/agentmon?sslmode=disable" + ports: + - containerPort: 8081 + name: http +--- +apiVersion: v1 +kind: Service +metadata: + name: web-ui + namespace: agentmon +spec: + selector: + app: web-ui + ports: + - name: http + port: 80 + targetPort: 8082 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: web-ui + namespace: agentmon +spec: + replicas: 1 + selector: + matchLabels: + app: web-ui + template: + metadata: + labels: + app: web-ui + spec: + imagePullSecrets: + - name: gitea-regcred + containers: + - name: web-ui + image: gitea-http.taildb3494.ts.net/will/agentmon/web-ui:dev-20260117-0832 + env: + - name: AGENTMON_UI_ADDR + value: ":8082" + - name: AGENTMON_QUERY_BASE + value: "http://query-api" + ports: + - containerPort: 8082 + name: http diff --git a/deploy/k8s/base/db/postgres-init-job.yaml b/deploy/k8s/base/db/postgres-init-job.yaml new file mode 100644 index 0000000..5ca8c0d --- /dev/null +++ b/deploy/k8s/base/db/postgres-init-job.yaml @@ -0,0 +1,58 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: postgres-init-sql + namespace: agentmon +data: + init.sql: | + -- applied by init job + create table if not exists events ( + event_id text primary key, + ts timestamptz not null, + type text not null, + session_id text null, + run_id text null, + trace_id text null, + span_id text null, + parent_span_id text null, + source_framework text null, + client_id text null, + payload jsonb not null + ); + + create index if not exists events_ts_idx on events (ts); + create index if not exists events_session_idx on events (session_id); + create index if not exists events_run_idx on events (run_id); + create index if not exists events_type_ts_idx on events (type, ts); +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: postgres-init + namespace: agentmon +spec: + template: + spec: + restartPolicy: OnFailure + containers: + - name: psql + image: postgres:16 + env: + - name: PGPASSWORD + value: agentmon + command: + - bash + - -lc + - | + until pg_isready -h postgres -p 5432 -U agentmon; do + echo "waiting for postgres"; + sleep 2; + done + psql -h postgres -p 5432 -U agentmon -d agentmon -f /sql/init.sql + volumeMounts: + - name: sql + mountPath: /sql + volumes: + - name: sql + configMap: + name: postgres-init-sql diff --git a/deploy/k8s/base/ingress/nginx-ingress.yaml b/deploy/k8s/base/ingress/nginx-ingress.yaml new file mode 100644 index 0000000..baf6c4e --- /dev/null +++ b/deploy/k8s/base/ingress/nginx-ingress.yaml @@ -0,0 +1,39 @@ +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: agentmon-web-ui + namespace: agentmon + annotations: + kubernetes.io/ingress.class: nginx +spec: + rules: + - host: web-ui.agentmon.192.168.153.240.nip.io + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: web-ui + port: + number: 80 +--- +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: agentmon-ingest-gateway + namespace: agentmon + annotations: + kubernetes.io/ingress.class: nginx +spec: + rules: + - host: ingest-gateway.agentmon.192.168.153.240.nip.io + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: ingest-gateway + port: + number: 80 diff --git a/deploy/k8s/base/ingress/tailscale-ingress.yaml b/deploy/k8s/base/ingress/tailscale-ingress.yaml new file mode 100644 index 0000000..ad2dad2 --- /dev/null +++ b/deploy/k8s/base/ingress/tailscale-ingress.yaml @@ -0,0 +1,39 @@ +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: agentmon-web-ui-ts + namespace: agentmon + annotations: + kubernetes.io/ingress.class: tailscale +spec: + rules: + - host: web-ui.agentmon.taildb3494.ts.net + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: web-ui + port: + number: 80 +--- +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: agentmon-ingest-gateway-ts + namespace: agentmon + annotations: + kubernetes.io/ingress.class: tailscale +spec: + rules: + - host: ingest-gateway.agentmon.taildb3494.ts.net + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: ingest-gateway + port: + number: 80 diff --git a/deploy/k8s/base/kustomization.yaml b/deploy/k8s/base/kustomization.yaml new file mode 100644 index 0000000..ade551e --- /dev/null +++ b/deploy/k8s/base/kustomization.yaml @@ -0,0 +1,12 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +namespace: agentmon +resources: + - namespace.yaml + - postgres.yaml + - nats.yaml + - agentmon.yaml + # networkpolicy.yaml intentionally omitted (no tight policies) + - ingress/nginx-ingress.yaml + - ingress/tailscale-ingress.yaml + - db/postgres-init-job.yaml diff --git a/deploy/k8s/base/namespace.yaml b/deploy/k8s/base/namespace.yaml new file mode 100644 index 0000000..6d6155e --- /dev/null +++ b/deploy/k8s/base/namespace.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: agentmon diff --git a/deploy/k8s/base/nats.yaml b/deploy/k8s/base/nats.yaml new file mode 100644 index 0000000..7a41dd7 --- /dev/null +++ b/deploy/k8s/base/nats.yaml @@ -0,0 +1,51 @@ +apiVersion: v1 +kind: Service +metadata: + name: nats + namespace: agentmon +spec: + selector: + app: nats + ports: + - name: client + port: 4222 + targetPort: 4222 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: nats + namespace: agentmon +spec: + serviceName: nats + replicas: 1 + selector: + matchLabels: + app: nats + template: + metadata: + labels: + app: nats + spec: + containers: + - name: nats + image: nats:2.10 + args: + - -js + - -sd + - /data + ports: + - containerPort: 4222 + name: client + volumeMounts: + - name: data + mountPath: /data + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: ["ReadWriteOnce"] + storageClassName: longhorn + resources: + requests: + storage: 5Gi diff --git a/deploy/k8s/base/networkpolicy.yaml b/deploy/k8s/base/networkpolicy.yaml new file mode 100644 index 0000000..a335231 --- /dev/null +++ b/deploy/k8s/base/networkpolicy.yaml @@ -0,0 +1,72 @@ +apiVersion: networking.k8s.io/v1 +kind: NetworkPolicy +metadata: + name: default-deny-ingress + namespace: agentmon +spec: + podSelector: {} + policyTypes: + - Ingress +--- +apiVersion: networking.k8s.io/v1 +kind: NetworkPolicy +metadata: + name: allow-web-ui-to-query-api + namespace: agentmon +spec: + podSelector: + matchLabels: + app: query-api + policyTypes: [Ingress] + ingress: + - from: + - podSelector: + matchLabels: + app: web-ui + ports: + - protocol: TCP + port: 8081 +--- +apiVersion: networking.k8s.io/v1 +kind: NetworkPolicy +metadata: + name: allow-query-api-to-postgres + namespace: agentmon +spec: + podSelector: + matchLabels: + app: postgres + policyTypes: [Ingress] + ingress: + - from: + - podSelector: + matchLabels: + app: query-api + - podSelector: + matchLabels: + app: event-processor + ports: + - protocol: TCP + port: 5432 +--- +apiVersion: networking.k8s.io/v1 +kind: NetworkPolicy +metadata: + name: allow-ingest-to-nats + namespace: agentmon +spec: + podSelector: + matchLabels: + app: nats + policyTypes: [Ingress] + ingress: + - from: + - podSelector: + matchLabels: + app: ingest-gateway + - podSelector: + matchLabels: + app: event-processor + ports: + - protocol: TCP + port: 4222 diff --git a/deploy/k8s/base/postgres.yaml b/deploy/k8s/base/postgres.yaml new file mode 100644 index 0000000..56a0bd4 --- /dev/null +++ b/deploy/k8s/base/postgres.yaml @@ -0,0 +1,55 @@ +apiVersion: v1 +kind: Service +metadata: + name: postgres + namespace: agentmon +spec: + selector: + app: postgres + ports: + - name: postgres + port: 5432 + targetPort: 5432 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: postgres + namespace: agentmon +spec: + serviceName: postgres + replicas: 1 + selector: + matchLabels: + app: postgres + template: + metadata: + labels: + app: postgres + spec: + containers: + - name: postgres + image: postgres:16 + ports: + - containerPort: 5432 + name: postgres + env: + - name: POSTGRES_DB + value: agentmon + - name: POSTGRES_USER + value: agentmon + - name: POSTGRES_PASSWORD + value: agentmon + volumeMounts: + - name: data + mountPath: /var/lib/postgresql/data + subPath: pgdata + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: ["ReadWriteOnce"] + storageClassName: longhorn + resources: + requests: + storage: 10Gi diff --git a/deploy/k8s/postgres.sql b/deploy/k8s/postgres.sql new file mode 100644 index 0000000..4428446 --- /dev/null +++ b/deploy/k8s/postgres.sql @@ -0,0 +1,20 @@ +-- Minimal schema v1 + +create table if not exists events ( + event_id text primary key, + ts timestamptz not null, + type text not null, + session_id text null, + run_id text null, + trace_id text null, + span_id text null, + parent_span_id text null, + source_framework text null, + client_id text null, + payload jsonb not null +); + +create index if not exists events_ts_idx on events (ts); +create index if not exists events_session_idx on events (session_id); +create index if not exists events_run_idx on events (run_id); +create index if not exists events_type_ts_idx on events (type, ts); diff --git a/docs/plans/2026-01-16-agentmon-design.md b/docs/plans/2026-01-16-agentmon-design.md new file mode 100644 index 0000000..dd98520 --- /dev/null +++ b/docs/plans/2026-01-16-agentmon-design.md @@ -0,0 +1,254 @@ +# agentmon — design (2026-01-16) + +## Goals + +agentmon is a self-hosted (homelab K8s) telemetry/analytics system for local agent runs (OpenCode + Claude Code). It captures structured events/spans and produces a custom web UI focused on: token usage, cost, latency, errors, and efficiency across sessions, agents, skills/commands, and models. + +Primary requirements +- Collect telemetry from OpenCode and Claude Code on local machines. +- Ship telemetry to cluster over Tailnet + LAN. +- Ingestion supports **WebSocket + HTTP**. +- UI is a **custom web UI** (not Grafana). +- Storage: SQLite for dev/small; Postgres for production. +- Retention: keep forever by default; allow optional cleanup tooling. +- No app-level auth (trusted network); enforce access with network-layer controls. + +Non-goals +- Distributed tracing compatibility (OTel) in v1; we may map later. +- “Perfect” cost computation across all providers in v1; we store enough fields to recompute. + +## Event Model + +### Core concepts +- **Session**: a human-initiated interactive timeframe (e.g., one terminal session). +- **Run**: a single user invocation that triggers an agent workflow (a command, task, or skill execution). +- **Trace**: a logical end-to-end chain of work (often equals a run, but can span). +- **Span**: a timed unit of work (tool call, model call, indexing job, etc.). +- **Event**: an append-only record (span start/end, run start/end, error, metric snapshot). + +### Envelope (common to all events) +All events are JSON objects. Fields marked **required** are required for every event type. + +- `schema` (**required**): `{ name: "agentmon.event", version: 1 }` +- `event` (**required**): + - `id` (**required**): stable event UUID (UUIDv7 recommended) + - `type` (**required**): one of: + - `session.start`, `session.end` + - `run.start`, `run.end` + - `span.start`, `span.end` + - `error` + - `metric.snapshot` + - `ts` (**required**): event timestamp (RFC3339 or unix-ms; choose one and standardize in SDK) + - `seq` (optional): monotonic sequence per connection (enables gap detection + ACKing) + - `source` (**required**): + - `framework` (**required**): `opencode` | `claude-code` | `other` + - `client_id` (**required**): stable id for the emitter install (per-machine is fine) + - `host` (**required**): hostname + - `user` (optional): local username + - `version` (optional): emitter/SDK version +- `correlation` (optional but strongly recommended): + - `session_id` (recommended) + - `run_id` (recommended) + - `trace_id` (recommended) + - `span_id` (required for span events) + - `parent_span_id` (optional) +- `attributes` (optional): freeform map for tags (agent name, skill, tool, model, repo, branch, etc.) +- `payload` (optional): event-type-specific object (see below) + +### Event types (minimum v1) + +#### `session.start` / `session.end` +- Required: `correlation.session_id` +- Recommended `attributes`: `framework_session` (native id if exists), `cwd`, `repo`, `branch` + +#### `run.start` / `run.end` +- Required: `correlation.session_id`, `correlation.run_id` +- Recommended `attributes`: `command`, `agent`, `workflow`, `prompt_hash?` +- `run.end` may include aggregate `payload.usage` (token totals, cost totals, status) + +#### `span.start` / `span.end` +- Required: `correlation.trace_id`, `correlation.span_id` +- Recommended `attributes`: `span_kind` (`llm`|`tool`|`skill`|`internal`), `name` +- `span.start` `payload`: `{ start_ts }` (or omit if you trust `event.ts`) +- `span.end` `payload`: `{ end_ts, status, duration_ms?, llm?, error? }` + +#### `error` +- Required: `payload.error` +- Recommended: attach correlation ids when known (session/run/trace/span) + +#### `metric.snapshot` +- Used for periodic gauges (queue lag, emitter buffer size, etc.) +- `payload.metrics`: map of numeric values + +### LLM usage fields (when applicable) +Stored on `span.end` (and optionally on `run.end` aggregates): +- `llm.model`: provider/model string +- `llm.usage`: `{ input_tokens, output_tokens, cache_write_tokens?, cache_read_tokens? }` +- `llm.cost`: `{ input_usd?, output_usd?, total_usd? }` +- `llm.finish_reason?` + +### Errors +- `error`: `{ type, message, code?, retryable?, source? }` +- Errors can be standalone `error` events or embedded on `span.end`/`run.end`. + +## Ingestion + +### WebSocket (primary/live) +- Endpoint: `GET /v1/ws` +- Client sends one JSON event per WS message (simplest) or small batches. +- Server acks in-order sequences: + - Client includes `event.seq` (monotonic per connection) + - Server responds: `{ "ack": { "up_to_seq": N } }` +- Reconnect behavior: + - Client reconnects with `?client_id=...` + - Client replays any unacked events (idempotent by `event.id`) + +### HTTP (batch/backfill) +- Endpoint: `POST /v1/events` +- Body: JSON array of events. +- Response: `{ accepted: , rejected: , errors?: [...] }` + +### Validation +- Gateway validates `schema.name/version`, required envelope fields, and basic typing. +- Non-strict mode (v1): allow unknown fields, store raw `payload`/`attributes`. + +### Idempotency +- Every event has stable `event.id` (UUIDv7 recommended). +- Storage enforces unique `(event_id)` so retries + replays are safe. + +### Backpressure +- WS gateway may send `{ "control": { "slow_down_ms": 250 } }`. +- If overloaded, close WS with a retryable close code; client backs off. +- Emitters cap in-memory buffers and may optionally spool to disk. + +## Service Architecture (microservices) + +### v1 recommended services +1) **ingest-gateway** +- Exposes WS + HTTP endpoints. +- Validates schema, assigns arrival timestamps, and publishes to queue. +- Stateless; horizontal scaling. + +2) **event-processor** +- Consumes from queue. +- Deduplicates by `event.id` and writes to DB. +- Builds/updates rollup tables. + +3) **query-api** +- Read-only API used by the UI. +- Performs filtered queries and serves aggregates. + +4) **web-ui** +- Custom frontend. +- Talks only to query-api. + +5) **retention-job** (CronJob) +- Optional cleanup/compression policies. + +### Queue choice +- Preferred: **NATS JetStream** (durable stream, consumer groups). +- Stream: `agentmon_events` +- Subject pattern: `agentmon.events.v1` + +Decision: queue-based to decouple ingest spikes from DB writes and to support reliable WS replay without holding DB transactions open. + +## Kubernetes deployment outline + +Namespace +- `agentmon` + +Workloads +- Deployments: + - `ingest-gateway` (Service `ingest-gateway`) + - `event-processor` (no Service; talks to NATS + DB) + - `query-api` (Service `query-api`) + - `web-ui` (Service `web-ui`) +- Stateful: + - Postgres (or external managed); PVC via `longhorn` + - NATS JetStream (or external); PVC via `longhorn` +- CronJob: + - `retention-job` + +Ingress / exposure +- `web-ui` and `ingest-gateway` exposed via Ingress with Tailnet/LAN allowlisting. +- `query-api` internal-only (ClusterIP), only reachable from `web-ui`. + +NetworkPolicies (default-deny) +- Allow `web-ui` -> `query-api` TCP 80/443 +- Allow `query-api` -> Postgres TCP 5432 +- Allow `ingest-gateway` -> NATS TCP 4222 (and JetStream as needed) +- Allow `event-processor` -> NATS + Postgres +- Allow ingress-controller namespace -> `web-ui`/`ingest-gateway` Services + +Config/Secrets +- `DATABASE_URL` for processor/query-api +- NATS connection URL for ingest/processor +- (Optional) per-emitter shared secret, if you later decide to add lightweight auth + +## Storage + +### Postgres (production) +Suggested tables (minimal): +- `events`: append-only canonical storage + - columns: `event_id` (pk), `ts`, `type`, `session_id`, `run_id`, `trace_id`, `span_id`, `parent_span_id`, `source_framework`, `client_id`, `payload_jsonb` + - indexes on `(ts)`, `(session_id)`, `(run_id)`, `(type, ts)` +- `runs`: derived/rollup + - status, start/end ts, token totals, cost totals, error counts +- `sessions`: derived/rollup + - start/end ts, host/user, run counts + +### SQLite (dev/small) +- Same logical schema; keep SQL portable. +- JSON stored as TEXT; use generated columns where supported. + +## UI (MVP pages) + +1) Overview +- Total tokens/cost today/7d/30d +- Latency and error rate trends +- Top agents/skills by cost and failures + +2) Sessions +- Filter by date, host, agent, framework +- Drilldown into a session timeline + +3) Run detail +- Waterfall view of spans +- Per-span tokens/cost/latency +- Error stack and retry chain + +4) Errors +- Aggregations by error type/source +- Most common failing skills/tools + +5) Agents +- Leaderboard: cost, tokens/sec, success rate +- Regression detection (compare last 24h vs baseline) + +## Security model (no app auth) +- Expose services only on Tailnet/LAN. +- Ingress restricted to known source CIDRs / tailscale ingress. +- K8s NetworkPolicies: + - allow UI -> query-api + - allow query-api -> DB + - allow ingest-gateway from tailnet ingress only + - deny all else by default + +## Operational concerns + +### Observability +agentmon should expose its own metrics: +- ingest rate, queue lag, processor throughput +- DB write latency +- dropped events, invalid schema counts + +### Testing +- Contract tests for event schema validation. +- Ingestion idempotency tests (replay same event). +- Processor tests for rollups. +- UI smoke tests for main dashboards. + +## Open questions +- Do we want to support partial OTel mapping (trace/span ids) early? +- Do we want per-client local disk spool as part of the “official” SDK? +- How strict should schema validation be (reject unknown fields vs allow)? diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..ef809b3 --- /dev/null +++ b/go.mod @@ -0,0 +1,23 @@ +module agentmon + +go 1.25.0 + +require ( + github.com/go-chi/chi/v5 v5.2.4 + github.com/gorilla/websocket v1.5.3 + github.com/jackc/pgx/v5 v5.8.0 + github.com/nats-io/nats.go v1.48.0 +) + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/nats-io/nkeys v0.4.11 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.37.0 // indirect + golang.org/x/sync v0.17.0 // indirect + golang.org/x/sys v0.32.0 // indirect + golang.org/x/text v0.29.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..616a64f --- /dev/null +++ b/go.sum @@ -0,0 +1,42 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-chi/chi/v5 v5.2.4 h1:WtFKPHwlywe8Srng8j2BhOD9312j9cGUxG1SP4V2cR4= +github.com/go-chi/chi/v5 v5.2.4/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo= +github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U= +github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= +github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= +golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/event/event.go b/internal/event/event.go new file mode 100644 index 0000000..eedda94 --- /dev/null +++ b/internal/event/event.go @@ -0,0 +1,41 @@ +package event + +type Schema struct { + Name string `json:"name"` + Version int `json:"version"` +} + +type Source struct { + Framework string `json:"framework"` + ClientID string `json:"client_id"` + Host string `json:"host"` + User string `json:"user,omitempty"` + Version string `json:"version,omitempty"` +} + +type EventMeta struct { + ID string `json:"id"` + Type string `json:"type"` + TS any `json:"ts"` + Seq *int64 `json:"seq,omitempty"` + + Source Source `json:"source"` +} + +type Correlation struct { + SessionID string `json:"session_id,omitempty"` + RunID string `json:"run_id,omitempty"` + TraceID string `json:"trace_id,omitempty"` + SpanID string `json:"span_id,omitempty"` + ParentSpanID string `json:"parent_span_id,omitempty"` +} + +type Envelope struct { + Schema Schema `json:"schema"` + Event EventMeta `json:"event"` + Correlation *Correlation `json:"correlation,omitempty"` + Attributes map[string]any `json:"attributes,omitempty"` + Payload map[string]any `json:"payload,omitempty"` + Raw map[string]any `json:"-"` + Extra map[string]any `json:"-"` +} diff --git a/internal/httpx/json.go b/internal/httpx/json.go new file mode 100644 index 0000000..bc2357d --- /dev/null +++ b/internal/httpx/json.go @@ -0,0 +1,12 @@ +package httpx + +import ( + "encoding/json" + "net/http" +) + +func WriteJSON(w http.ResponseWriter, status int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(v) +} diff --git a/internal/queue/nats/nats.go b/internal/queue/nats/nats.go new file mode 100644 index 0000000..74d9bb2 --- /dev/null +++ b/internal/queue/nats/nats.go @@ -0,0 +1,34 @@ +package nats + +import ( + "context" + "time" + + gnats "github.com/nats-io/nats.go" +) + +type Publisher struct { + conn *gnats.Conn + topic string + timeout time.Duration +} + +func NewPublisher(url, topic string) (*Publisher, error) { + conn, err := gnats.Connect(url) + if err != nil { + return nil, err + } + return &Publisher{conn: conn, topic: topic, timeout: 5 * time.Second}, nil +} + +func (p *Publisher) Close() { + p.conn.Close() +} + +func (p *Publisher) Publish(ctx context.Context, data []byte) error { + ctx, cancel := context.WithTimeout(ctx, p.timeout) + defer cancel() + + _ = ctx + return p.conn.Publish(p.topic, data) +} diff --git a/internal/queue/nats/subscribe.go b/internal/queue/nats/subscribe.go new file mode 100644 index 0000000..7e9b0fc --- /dev/null +++ b/internal/queue/nats/subscribe.go @@ -0,0 +1,41 @@ +package nats + +import ( + "context" + + gnats "github.com/nats-io/nats.go" +) + +type Subscriber struct { + conn *gnats.Conn + sub *gnats.Subscription + topic string +} + +func NewSubscriber(url, topic string) (*Subscriber, error) { + conn, err := gnats.Connect(url) + if err != nil { + return nil, err + } + return &Subscriber{conn: conn, topic: topic}, nil +} + +func (s *Subscriber) Close() { + if s.sub != nil { + _ = s.sub.Unsubscribe() + } + s.conn.Close() +} + +func (s *Subscriber) Subscribe(ctx context.Context, handler func(msg []byte) error) error { + sub, err := s.conn.Subscribe(s.topic, func(m *gnats.Msg) { + _ = handler(m.Data) + }) + if err != nil { + return err + } + s.sub = sub + + <-ctx.Done() + return ctx.Err() +} diff --git a/internal/store/postgres/events.go b/internal/store/postgres/events.go new file mode 100644 index 0000000..4fabb6e --- /dev/null +++ b/internal/store/postgres/events.go @@ -0,0 +1,57 @@ +package postgres + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "time" +) + +type InsertEvent struct { + EventID string + TS time.Time + Type string + + SessionID sql.NullString + RunID sql.NullString + TraceID sql.NullString + SpanID sql.NullString + ParentSpanID sql.NullString + + SourceFramework sql.NullString + ClientID sql.NullString + Payload any +} + +func (d *DB) InsertEvent(ctx context.Context, e InsertEvent) error { + payload, err := json.Marshal(e.Payload) + if err != nil { + return err + } + + _, err = d.sql.ExecContext(ctx, ` +insert into events ( + event_id, ts, type, session_id, run_id, trace_id, span_id, parent_span_id, + source_framework, client_id, payload +) values ( + $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11 +) +on conflict (event_id) do nothing +`, + e.EventID, + e.TS, + e.Type, + e.SessionID, + e.RunID, + e.TraceID, + e.SpanID, + e.ParentSpanID, + e.SourceFramework, + e.ClientID, + payload, + ) + return err +} + +var ErrMissingField = errors.New("missing required field") diff --git a/internal/store/postgres/postgres.go b/internal/store/postgres/postgres.go new file mode 100644 index 0000000..913bbea --- /dev/null +++ b/internal/store/postgres/postgres.go @@ -0,0 +1,27 @@ +package postgres + +import ( + "context" + "database/sql" + _ "github.com/jackc/pgx/v5/stdlib" +) + +type DB struct { + sql *sql.DB +} + +func Open(url string) (*DB, error) { + db, err := sql.Open("pgx", url) + if err != nil { + return nil, err + } + return &DB{sql: db}, nil +} + +func (d *DB) Close() error { + return d.sql.Close() +} + +func (d *DB) Ping(ctx context.Context) error { + return d.sql.PingContext(ctx) +} diff --git a/internal/store/postgres/query.go b/internal/store/postgres/query.go new file mode 100644 index 0000000..ed161e3 --- /dev/null +++ b/internal/store/postgres/query.go @@ -0,0 +1,47 @@ +package postgres + +import ( + "context" + "encoding/json" + "time" +) + +type EventRow struct { + EventID string `json:"event_id"` + TS time.Time `json:"ts"` + Type string `json:"type"` + Payload json.RawMessage `json:"payload"` +} + +func (d *DB) ListRecentEvents(ctx context.Context, limit int) ([]EventRow, error) { + if limit <= 0 { + limit = 100 + } + if limit > 1000 { + limit = 1000 + } + + rows, err := d.sql.QueryContext(ctx, ` +select event_id, ts, type, payload +from events +order by ts desc +limit $1 +`, limit) + if err != nil { + return nil, err + } + defer rows.Close() + + var out []EventRow + for rows.Next() { + var r EventRow + if err := rows.Scan(&r.EventID, &r.TS, &r.Type, &r.Payload); err != nil { + return nil, err + } + out = append(out, r) + } + if err := rows.Err(); err != nil { + return nil, err + } + return out, nil +}