From 43877a5448596c986b4eca5fb493f4cd113564af Mon Sep 17 00:00:00 2001 From: William Valentin Date: Thu, 26 Mar 2026 11:22:34 -0700 Subject: [PATCH] feat(query-api): add richer stats and retention --- cmd/query-api/main.go | 115 +++++++++++++++--- deploy/k8s/postgres.sql | 2 + internal/event/validate.go | 6 +- internal/store/postgres/events.go | 51 ++++++++ internal/store/postgres/postgres.go | 5 + internal/store/postgres/query.go | 11 ++ internal/store/postgres/runs.go | 170 ++++++++++++++++++++++----- internal/store/postgres/runs_test.go | 130 ++++++++++++++++++++ internal/store/postgres/sessions.go | 60 ++++++---- internal/store/postgres/stats.go | 118 ++++++++++++++++--- 10 files changed, 583 insertions(+), 85 deletions(-) create mode 100644 internal/store/postgres/runs_test.go diff --git a/cmd/query-api/main.go b/cmd/query-api/main.go index 0915263..13d3fd7 100644 --- a/cmd/query-api/main.go +++ b/cmd/query-api/main.go @@ -1,7 +1,9 @@ package main import ( + "context" "database/sql" + "encoding/json" "log" "net/http" "os" @@ -18,11 +20,16 @@ import ( "github.com/nats-io/nats.go" ) +type wsClient struct { + conn *websocket.Conn + send chan []byte +} + var ( wsUpgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } - wsClients = make(map[*websocket.Conn]bool) + wsClients = make(map[*wsClient]bool) wsMu sync.RWMutex natsConn *nats.Conn ) @@ -31,23 +38,15 @@ func subscribeToNATS(nc *nats.Conn) { topic := envDefault("NATS_TOPIC", "agentmon.events.v1") sub, err := nc.Subscribe(topic, func(msg *nats.Msg) { wsMu.RLock() - var stale []*websocket.Conn - for conn := range wsClients { - err := conn.WriteMessage(websocket.TextMessage, msg.Data) - if err != nil { - conn.Close() - stale = append(stale, conn) + for client := range wsClients { + select { + case client.send <- msg.Data: + default: + // Slow client; close and remove in background. + go removeClient(client) } } wsMu.RUnlock() - - if len(stale) > 0 { - wsMu.Lock() - for _, conn := range stale { - delete(wsClients, conn) - } - wsMu.Unlock() - } }) if err != nil { log.Printf("failed to subscribe to NATS: %v", err) @@ -57,19 +56,44 @@ func subscribeToNATS(nc *nats.Conn) { _ = sub } +func removeClient(c *wsClient) { + wsMu.Lock() + if wsClients[c] { + delete(wsClients, c) + close(c.send) + c.conn.Close() + } + wsMu.Unlock() +} + func wsHandler(w http.ResponseWriter, r *http.Request) { conn, err := wsUpgrader.Upgrade(w, r, nil) if err != nil { return } - defer conn.Close() + + client := &wsClient{ + conn: conn, + send: make(chan []byte, 256), + } wsMu.Lock() - wsClients[conn] = true + wsClients[client] = true wsMu.Unlock() log.Printf("WebSocket client connected") + // Writer goroutine: sole owner of conn writes. + go func() { + defer conn.Close() + for msg := range client.send { + if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil { + break + } + } + }() + + // Read loop blocks until the client disconnects. for { _, _, err := conn.ReadMessage() if err != nil { @@ -78,8 +102,12 @@ func wsHandler(w http.ResponseWriter, r *http.Request) { } wsMu.Lock() - delete(wsClients, conn) + if wsClients[client] { + delete(wsClients, client) + close(client.send) + } wsMu.Unlock() + log.Printf("WebSocket client disconnected") } @@ -249,6 +277,36 @@ func main() { httpx.WriteJSON(w, http.StatusOK, map[string]any{"tools": tools}) }) + r.Get("/v1/stats/top-models", func(w http.ResponseWriter, r *http.Request) { + limit, _ := strconv.Atoi(r.URL.Query().Get("limit")) + models, err := db.GetTopModels(r.Context(), limit) + if err != nil { + httpx.WriteJSON(w, http.StatusInternalServerError, map[string]any{"error": "db_error"}) + return + } + if models == nil { + models = []postgres.TopModel{} + } + httpx.WriteJSON(w, http.StatusOK, map[string]any{"models": models}) + }) + + r.Post("/v1/admin/retention", func(w http.ResponseWriter, r *http.Request) { + var req struct { + Days int `json:"days"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Days <= 0 { + httpx.WriteJSON(w, http.StatusBadRequest, map[string]any{"error": "invalid_request", "message": "days must be a positive integer"}) + return + } + cutoff := time.Now().AddDate(0, 0, -req.Days) + deleted, err := db.DeleteOlderThan(r.Context(), cutoff) + if err != nil { + httpx.WriteJSON(w, http.StatusInternalServerError, map[string]any{"error": "db_error"}) + return + } + httpx.WriteJSON(w, http.StatusOK, map[string]any{"deleted": deleted, "cutoff": cutoff.Format(time.RFC3339)}) + }) + r.Get("/v1/stats/timeseries", func(w http.ResponseWriter, r *http.Request) { window := r.URL.Query().Get("window") switch window { @@ -267,6 +325,27 @@ func main() { httpx.WriteJSON(w, http.StatusOK, timeseries) }) + // Background retention cleanup + retentionDays := 30 + if v := os.Getenv("RETENTION_DAYS"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + retentionDays = n + } + } + go func() { + ticker := time.NewTicker(24 * time.Hour) + defer ticker.Stop() + for range ticker.C { + cutoff := time.Now().AddDate(0, 0, -retentionDays) + deleted, err := db.DeleteOlderThan(context.Background(), cutoff) + if err != nil { + log.Printf("retention cleanup error: %v", err) + } else if deleted > 0 { + log.Printf("retention cleanup: deleted %d events older than %s", deleted, cutoff.Format(time.RFC3339)) + } + } + }() + log.Printf("query-api listening on %s", addr) log.Fatal(http.ListenAndServe(addr, r)) } diff --git a/deploy/k8s/postgres.sql b/deploy/k8s/postgres.sql index 4428446..d560298 100644 --- a/deploy/k8s/postgres.sql +++ b/deploy/k8s/postgres.sql @@ -18,3 +18,5 @@ create index if not exists events_ts_idx on events (ts); create index if not exists events_session_idx on events (session_id); create index if not exists events_run_idx on events (run_id); create index if not exists events_type_ts_idx on events (type, ts); +create index if not exists events_framework_client_ts_idx on events (source_framework, client_id, ts); +create index if not exists events_framework_ts_idx on events (source_framework, ts); diff --git a/internal/event/validate.go b/internal/event/validate.go index a717b8c..993d9f3 100644 --- a/internal/event/validate.go +++ b/internal/event/validate.go @@ -14,7 +14,9 @@ var validTypes = map[string]bool{ "span.end": true, "error": true, "metric.snapshot": true, - "openclaw.snapshot": true, + "openclaw.snapshot": true, + "swarm.snapshot": true, + "swarm.service.snapshot": true, } type ValidationError struct { @@ -62,7 +64,7 @@ func Validate(m map[string]any) error { } // Source is optional for openclaw.snapshot events - if eventType != "openclaw.snapshot" { + if eventType != "openclaw.snapshot" && eventType != "swarm.snapshot" && eventType != "swarm.service.snapshot" { source, ok := event["source"].(map[string]any) if !ok { return ValidationError{Field: "event.source", Message: "missing or invalid"} diff --git a/internal/store/postgres/events.go b/internal/store/postgres/events.go index 4fabb6e..54a2ec3 100644 --- a/internal/store/postgres/events.go +++ b/internal/store/postgres/events.go @@ -54,4 +54,55 @@ on conflict (event_id) do nothing return err } +func (d *DB) InsertEventBatch(ctx context.Context, events []InsertEvent) error { + if len(events) == 0 { + return nil + } + if len(events) == 1 { + return d.InsertEvent(ctx, events[0]) + } + + tx, err := d.sql.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + + stmt, err := tx.PrepareContext(ctx, ` +INSERT INTO events ( + event_id, ts, type, session_id, run_id, trace_id, span_id, parent_span_id, + source_framework, client_id, payload +) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) +ON CONFLICT (event_id) DO NOTHING +`) + if err != nil { + return err + } + defer stmt.Close() + + for _, e := range events { + payload, err := json.Marshal(e.Payload) + if err != nil { + return err + } + _, err = stmt.ExecContext(ctx, e.EventID, e.TS, e.Type, e.SessionID, e.RunID, + e.TraceID, e.SpanID, e.ParentSpanID, e.SourceFramework, e.ClientID, payload) + if err != nil { + return err + } + } + + return tx.Commit() +} + +// DeleteOlderThan removes events with ts older than the given cutoff. +// Returns the number of rows deleted. +func (d *DB) DeleteOlderThan(ctx context.Context, cutoff time.Time) (int64, error) { + result, err := d.sql.ExecContext(ctx, `DELETE FROM events WHERE ts < $1`, cutoff) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + var ErrMissingField = errors.New("missing required field") diff --git a/internal/store/postgres/postgres.go b/internal/store/postgres/postgres.go index 913bbea..3eb644d 100644 --- a/internal/store/postgres/postgres.go +++ b/internal/store/postgres/postgres.go @@ -3,6 +3,8 @@ package postgres import ( "context" "database/sql" + "time" + _ "github.com/jackc/pgx/v5/stdlib" ) @@ -15,6 +17,9 @@ func Open(url string) (*DB, error) { if err != nil { return nil, err } + db.SetMaxOpenConns(25) + db.SetMaxIdleConns(5) + db.SetConnMaxLifetime(5 * time.Minute) return &DB{sql: db}, nil } diff --git a/internal/store/postgres/query.go b/internal/store/postgres/query.go index 67e5682..1a4b279 100644 --- a/internal/store/postgres/query.go +++ b/internal/store/postgres/query.go @@ -19,6 +19,7 @@ type EventsFilter struct { EventType string Framework string ClientID string + Since *time.Time // if nil, defaults to 24h ago } func (d *DB) ListRecentEvents(ctx context.Context, f EventsFilter) ([]EventRow, error) { @@ -29,10 +30,20 @@ func (d *DB) ListRecentEvents(ctx context.Context, f EventsFilter) ([]EventRow, f.Limit = 1000 } + since := f.Since + if since == nil { + t := time.Now().Add(-24 * time.Hour) + since = &t + } + query := "SELECT event_id, ts, type, payload FROM events WHERE 1=1" args := []any{} argN := 1 + query += fmt.Sprintf(" AND ts >= $%d", argN) + args = append(args, *since) + argN++ + if f.EventType != "" { query += fmt.Sprintf(" AND type = $%d", argN) args = append(args, f.EventType) diff --git a/internal/store/postgres/runs.go b/internal/store/postgres/runs.go index a0a6ec9..cd3bf69 100644 --- a/internal/store/postgres/runs.go +++ b/internal/store/postgres/runs.go @@ -32,7 +32,7 @@ func (d *DB) GetSessionWithRuns(ctx context.Context, sessionID string) (*Session SELECT session_id, MIN(ts) as started_at, - MAX(ts) as ended_at, + MAX(CASE WHEN type = 'session.end' THEN ts END) as ended_at, MAX(source_framework) as framework, MAX(payload->'event'->'source'->>'host') as host FROM events @@ -57,14 +57,18 @@ func (d *DB) GetSessionWithRuns(ctx context.Context, sessionID string) (*Session run_id, session_id, MIN(ts) as started_at, - MAX(ts) as ended_at, + MAX(CASE WHEN type = 'run.end' THEN ts END) as ended_at, CASE WHEN bool_or(type = 'error' OR payload->'payload'->>'status' = 'error') THEN 'error' ELSE 'success' END as status, COUNT(DISTINCT span_id) as span_count, COUNT(DISTINCT CASE WHEN payload->'attributes'->>'span_kind' = 'tool' THEN span_id END) as tool_count, - COALESCE(MAX(CASE WHEN type = 'run.end' THEN payload->'payload'->>'model' END), '') as model + COALESCE( + MAX(CASE WHEN type = 'run.end' THEN payload->'payload'->>'model' END), + MAX(CASE WHEN type = 'metric.snapshot' THEN payload->'payload'->'metrics'->>'model' END), + '' + ) as model FROM events WHERE session_id = $1 AND run_id IS NOT NULL GROUP BY run_id, session_id @@ -123,7 +127,7 @@ func (d *DB) GetRunWithSpans(ctx context.Context, runID string) (*RunDetail, []S run_id, session_id, MIN(ts) as started_at, - MAX(ts) as ended_at, + MAX(CASE WHEN type = 'run.end' THEN ts END) as ended_at, CASE WHEN bool_or(type = 'error' OR payload->'payload'->>'status' = 'error') THEN 'error' ELSE 'success' @@ -148,6 +152,80 @@ func (d *DB) GetRunWithSpans(ctx context.Context, runID string) (*RunDetail, []S return &run, spans, nil } +func mergeSpanEvent(existing *SpanRow, s SpanRow) { + if existing.Name == "" && s.Name != "" { + existing.Name = s.Name + } + if existing.Kind == "" || existing.Kind == "unknown" { + existing.Kind = s.Kind + } + if s.Duration != nil { + existing.Duration = s.Duration + } + if s.Status == "error" { + existing.Status = "error" + } + existing.Payload = mergeEnvelopeJSON(existing.Payload, s.Payload) +} + +func mergeEnvelopeJSON(existing, next json.RawMessage) json.RawMessage { + if len(existing) == 0 { + return next + } + if len(next) == 0 { + return existing + } + + var dst map[string]any + if err := json.Unmarshal(existing, &dst); err != nil { + return next + } + + var src map[string]any + if err := json.Unmarshal(next, &src); err != nil { + return existing + } + + mergeJSONObjects(dst, src) + + merged, err := json.Marshal(dst) + if err != nil { + return next + } + return merged +} + +func mergeJSONObjects(dst, src map[string]any) { + for key, value := range src { + srcMap, srcOK := value.(map[string]any) + if !srcOK { + dst[key] = value + continue + } + + dstMap, dstOK := dst[key].(map[string]any) + if !dstOK { + dst[key] = srcMap + continue + } + + mergeJSONObjects(dstMap, srcMap) + } +} + +func findRunIndexForSpan(runs []RunRow, spanStartedAt time.Time) int { + for i := len(runs) - 1; i >= 0; i-- { + run := runs[i] + if spanStartedAt.Before(run.StartedAt) { + continue + } + if run.EndedAt == nil || !spanStartedAt.After(*run.EndedAt) { + return i + } + } + return -1 +} + func (d *DB) listSpansForRun(ctx context.Context, runID string) ([]SpanRow, error) { rows, err := d.sql.QueryContext(ctx, ` SELECT @@ -187,19 +265,7 @@ func (d *DB) listSpansForRun(ctx context.Context, runID string) ([]SpanRow, erro continue } - if existing.Name == "" && s.Name != "" { - existing.Name = s.Name - } - if existing.Kind == "" || existing.Kind == "unknown" { - existing.Kind = s.Kind - } - if s.Duration != nil { - existing.Duration = s.Duration - } - if s.Status == "error" { - existing.Status = "error" - } - existing.Payload = s.Payload + mergeSpanEvent(existing, s) } if err := rows.Err(); err != nil { @@ -214,35 +280,83 @@ func (d *DB) listSpansForRun(ctx context.Context, runID string) ([]SpanRow, erro } func (d *DB) attachSpansToRuns(ctx context.Context, sessionID string, runs []RunRow) ([]RunRow, error) { + if len(runs) == 0 { + return runs, nil + } + rows, err := d.sql.QueryContext(ctx, ` - SELECT DISTINCT run_id + SELECT + run_id, + span_id, + COALESCE(payload->'attributes'->>'name', payload->'event'->>'type', type) as name, + COALESCE(payload->'attributes'->>'span_kind', 'unknown') as kind, + ts as started_at, + (payload->'payload'->>'duration_ms')::bigint as duration_ms, + CASE WHEN type = 'error' OR payload->'payload'->>'status' = 'error' THEN 'error' ELSE 'success' END as status, + payload FROM events - WHERE session_id = $1 AND run_id IS NOT NULL - ORDER BY run_id + WHERE session_id = $1 AND span_id IS NOT NULL + ORDER BY ts ASC `, sessionID) if err != nil { return nil, err } defer rows.Close() - spansByRun := make(map[string][]SpanRow) + // Map of run_id -> (map of span_id -> *SpanRow) for merging + type runSpans struct { + byID map[string]*SpanRow + order []string + } + spansByRun := make(map[string]*runSpans) + for rows.Next() { - var runID string - if err := rows.Scan(&runID); err != nil { + var s SpanRow + var runID *string + if err := rows.Scan(&runID, &s.SpanID, &s.Name, &s.Kind, &s.StartedAt, &s.Duration, &s.Status, &s.Payload); err != nil { return nil, err } - spans, err := d.listSpansForRun(ctx, runID) - if err != nil { - return nil, err + if runID != nil { + s.RunID = *runID } - spansByRun[runID] = spans + if s.RunID == "" { + runIndex := findRunIndexForSpan(runs, s.StartedAt) + if runIndex == -1 { + continue + } + s.RunID = runs[runIndex].RunID + } + + rs := spansByRun[s.RunID] + if rs == nil { + rs = &runSpans{byID: make(map[string]*SpanRow)} + spansByRun[s.RunID] = rs + } + + existing := rs.byID[s.SpanID] + if existing == nil { + copy := s + rs.byID[s.SpanID] = © + rs.order = append(rs.order, s.SpanID) + continue + } + + mergeSpanEvent(existing, s) } if err := rows.Err(); err != nil { return nil, err } for i := range runs { - runs[i].Spans = spansByRun[runs[i].RunID] + rs := spansByRun[runs[i].RunID] + if rs == nil { + continue + } + spans := make([]SpanRow, 0, len(rs.order)) + for _, spanID := range rs.order { + spans = append(spans, *rs.byID[spanID]) + } + runs[i].Spans = spans } return runs, nil } diff --git a/internal/store/postgres/runs_test.go b/internal/store/postgres/runs_test.go new file mode 100644 index 0000000..37a33f9 --- /dev/null +++ b/internal/store/postgres/runs_test.go @@ -0,0 +1,130 @@ +package postgres + +import ( + "encoding/json" + "testing" + "time" +) + +func TestMergeEnvelopeJSON_MergesNestedPayloads(t *testing.T) { + existing := json.RawMessage(`{ + "event":{"type":"span.start"}, + "attributes":{"name":"tool.call","span_kind":"tool"}, + "payload":{"input":{"query":"status"},"prompt_preview":"summarize"} + }`) + next := json.RawMessage(`{ + "event":{"type":"span.end"}, + "payload":{"result_preview":"ok","duration_ms":42} + }`) + + merged := mergeEnvelopeJSON(existing, next) + + var got map[string]any + if err := json.Unmarshal(merged, &got); err != nil { + t.Fatalf("unmarshal merged payload: %v", err) + } + + payload, ok := got["payload"].(map[string]any) + if !ok { + t.Fatal("expected merged payload object") + } + if _, ok := payload["input"].(map[string]any); !ok { + t.Fatal("expected input from first event to be preserved") + } + if payload["result_preview"] != "ok" { + t.Fatalf("expected result_preview to be merged, got %#v", payload["result_preview"]) + } + if payload["duration_ms"] != float64(42) { + t.Fatalf("expected duration_ms to be merged, got %#v", payload["duration_ms"]) + } + + attrs, ok := got["attributes"].(map[string]any) + if !ok || attrs["name"] != "tool.call" { + t.Fatalf("expected attributes to be preserved, got %#v", got["attributes"]) + } + + event, ok := got["event"].(map[string]any) + if !ok || event["type"] != "span.end" { + t.Fatalf("expected later event metadata to win, got %#v", got["event"]) + } +} + +func TestMergeSpanEvent_PreservesStartPayloadDetails(t *testing.T) { + existing := &SpanRow{ + Name: "tool.call", + Kind: "tool", + Status: "success", + Payload: json.RawMessage(`{ + "attributes":{"name":"tool.call","span_kind":"tool"}, + "payload":{"input":{"command":"ls"}} + }`), + } + + mergeSpanEvent(existing, SpanRow{ + Status: "success", + Payload: json.RawMessage(`{ + "payload":{"result_preview":"done","duration_ms":12} + }`), + }) + + var got map[string]any + if err := json.Unmarshal(existing.Payload, &got); err != nil { + t.Fatalf("unmarshal merged span payload: %v", err) + } + + payload := got["payload"].(map[string]any) + if _, ok := payload["input"].(map[string]any); !ok { + t.Fatal("expected input to remain after merge") + } + if payload["result_preview"] != "done" { + t.Fatalf("expected result_preview after merge, got %#v", payload["result_preview"]) + } +} + +func TestFindRunIndexForSpan_MatchesContainingRunWindow(t *testing.T) { + start := time.Date(2026, 3, 23, 10, 0, 0, 0, time.UTC) + run1End := start.Add(2 * time.Minute) + run2Start := start.Add(3 * time.Minute) + run2End := start.Add(7 * time.Minute) + + runs := []RunRow{ + { + RunID: "run-1", + StartedAt: start, + EndedAt: &run1End, + }, + { + RunID: "run-2", + StartedAt: run2Start, + EndedAt: &run2End, + }, + } + + idx := findRunIndexForSpan(runs, start.Add(4*time.Minute)) + if idx != 1 { + t.Fatalf("expected span to attach to run-2, got index %d", idx) + } +} + +func TestFindRunIndexForSpan_MatchesOpenRun(t *testing.T) { + start := time.Date(2026, 3, 23, 10, 0, 0, 0, time.UTC) + run1End := start.Add(2 * time.Minute) + run2Start := start.Add(3 * time.Minute) + + runs := []RunRow{ + { + RunID: "run-1", + StartedAt: start, + EndedAt: &run1End, + }, + { + RunID: "run-2", + StartedAt: run2Start, + }, + } + + idx := findRunIndexForSpan(runs, start.Add(5*time.Minute)) + if idx != 1 { + t.Fatalf("expected span to attach to open run-2, got index %d", idx) + } +} diff --git a/internal/store/postgres/sessions.go b/internal/store/postgres/sessions.go index 053e07f..37b02da 100644 --- a/internal/store/postgres/sessions.go +++ b/internal/store/postgres/sessions.go @@ -12,6 +12,7 @@ type SessionRow struct { StartedAt time.Time `json:"started_at"` EndedAt *time.Time `json:"ended_at,omitempty"` Framework string `json:"framework"` + ClientID string `json:"client_id,omitempty"` Host string `json:"host"` RunCount int `json:"run_count"` } @@ -33,8 +34,10 @@ func (d *DB) ListSessions(ctx context.Context, f SessionsFilter) ([]SessionRow, f.Limit = 200 } - // Build query dynamically - var conditions []string + // Build query dynamically using a CTE so cursor compares against + // the grouped started_at rather than individual event timestamps. + var innerConditions []string + var outerConditions []string var args []any argN := 1 @@ -43,50 +46,63 @@ func (d *DB) ListSessions(ctx context.Context, f SessionsFilter) ([]SessionRow, t := time.Now().Add(-24 * time.Hour) f.From = &t } - conditions = append(conditions, fmt.Sprintf("ts >= $%d", argN)) + innerConditions = append(innerConditions, fmt.Sprintf("ts >= $%d", argN)) args = append(args, *f.From) argN++ if f.To != nil { - conditions = append(conditions, fmt.Sprintf("ts <= $%d", argN)) + innerConditions = append(innerConditions, fmt.Sprintf("ts <= $%d", argN)) args = append(args, *f.To) argN++ } if f.Framework != "" { - conditions = append(conditions, fmt.Sprintf("source_framework = $%d", argN)) + innerConditions = append(innerConditions, fmt.Sprintf("source_framework = $%d", argN)) args = append(args, f.Framework) argN++ } + // Host filter applies to an aggregate, so it goes in the outer WHERE if f.Host != "" { - conditions = append(conditions, fmt.Sprintf("payload->'event'->'source'->>'host' = $%d", argN)) + outerConditions = append(outerConditions, fmt.Sprintf("host = $%d", argN)) args = append(args, f.Host) argN++ } + // Cursor compares against grouped started_at, so it goes in the outer WHERE if f.Cursor != nil { - conditions = append(conditions, fmt.Sprintf("ts < $%d", argN)) + outerConditions = append(outerConditions, fmt.Sprintf("started_at < $%d", argN)) args = append(args, *f.Cursor) argN++ } - where := strings.Join(conditions, " AND ") + innerWhere := strings.Join(innerConditions, " AND ") + + outerWhere := "" + if len(outerConditions) > 0 { + outerWhere = "WHERE " + strings.Join(outerConditions, " AND ") + } query := fmt.Sprintf(` - SELECT - session_id, - MIN(ts) as started_at, - MAX(ts) as ended_at, - MAX(source_framework) as framework, - MAX(payload->'event'->'source'->>'host') as host, - COUNT(DISTINCT run_id) as run_count - FROM events - WHERE session_id IS NOT NULL AND %s - GROUP BY session_id + WITH session_groups AS ( + SELECT + session_id, + MIN(ts) as started_at, + MAX(CASE WHEN type = 'session.end' THEN ts END) as ended_at, + MAX(source_framework) as framework, + MAX(client_id) as client_id, + MAX(payload->'event'->'source'->>'host') as host, + COUNT(DISTINCT run_id) as run_count + FROM events + WHERE session_id IS NOT NULL AND %s + GROUP BY session_id + ) + SELECT session_id, started_at, ended_at, framework, client_id, host, run_count + FROM session_groups + %s ORDER BY started_at DESC LIMIT $%d - `, where, argN) + `, innerWhere, outerWhere, argN) args = append(args, f.Limit+1) // fetch one extra to detect next page rows, err := d.sql.QueryContext(ctx, query, args...) @@ -98,10 +114,14 @@ func (d *DB) ListSessions(ctx context.Context, f SessionsFilter) ([]SessionRow, var out []SessionRow for rows.Next() { var r SessionRow + var clientID *string var host *string - if err := rows.Scan(&r.SessionID, &r.StartedAt, &r.EndedAt, &r.Framework, &host, &r.RunCount); err != nil { + if err := rows.Scan(&r.SessionID, &r.StartedAt, &r.EndedAt, &r.Framework, &clientID, &host, &r.RunCount); err != nil { return nil, nil, err } + if clientID != nil { + r.ClientID = *clientID + } if host != nil { r.Host = *host } diff --git a/internal/store/postgres/stats.go b/internal/store/postgres/stats.go index 375e78c..78af03a 100644 --- a/internal/store/postgres/stats.go +++ b/internal/store/postgres/stats.go @@ -16,14 +16,22 @@ type Summary struct { RunsToday int `json:"runs_today"` ToolCallsToday int `json:"tool_calls_today"` ErrorsToday int `json:"errors_today"` + TokensToday int64 `json:"tokens_today"` + CostToday float64 `json:"cost_today"` + AvgDurationMS float64 `json:"avg_duration_ms"` ByFramework map[string]FrameworkStats `json:"by_framework"` } type TimeseriesBucket struct { - TS time.Time `json:"ts"` - Runs int `json:"runs"` - Tools int `json:"tools"` - Errors int `json:"errors"` + TS time.Time `json:"ts"` + Runs int `json:"runs"` + Tools int `json:"tools"` + Errors int `json:"errors"` + Tokens int64 `json:"tokens"` + InputTokens int64 `json:"input_tokens"` + OutputTokens int64 `json:"output_tokens"` + Cost float64 `json:"cost"` + AvgDurationMS float64 `json:"avg_duration_ms"` } type TimeseriesResult struct { @@ -36,21 +44,23 @@ func (d *DB) GetSummary(ctx context.Context) (*Summary, error) { now := time.Now() midnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) - // Active sessions: sessions with a session.start but no session.end (ever) + // Active sessions: sessions with a session.start but no session.end (last 7 days) activeQ := ` - SELECT COUNT(DISTINCT session_id) - FROM events - WHERE type = 'session.start' - AND session_id IS NOT NULL - AND session_id NOT IN ( - SELECT DISTINCT session_id - FROM events - WHERE type = 'session.end' - AND session_id IS NOT NULL + SELECT COUNT(DISTINCT e.session_id) + FROM events e + WHERE e.type = 'session.start' + AND e.session_id IS NOT NULL + AND e.ts >= $1 + AND NOT EXISTS ( + SELECT 1 + FROM events e2 + WHERE e2.type = 'session.end' + AND e2.session_id = e.session_id ) ` var activeSessions int - if err := d.sql.QueryRowContext(ctx, activeQ).Scan(&activeSessions); err != nil { + activeSessionsSince := time.Now().Add(-7 * 24 * time.Hour) + if err := d.sql.QueryRowContext(ctx, activeQ, activeSessionsSince).Scan(&activeSessions); err != nil { return nil, err } @@ -64,6 +74,7 @@ func (d *DB) GetSummary(ctx context.Context) (*Summary, error) { COUNT(*) FILTER (WHERE type = 'error') AS errors FROM events WHERE ts >= $1 + AND type IN ('run.start', 'span.end', 'error') GROUP BY source_framework ` rows, err := d.sql.QueryContext(ctx, fwQ, midnight) @@ -90,11 +101,30 @@ func (d *DB) GetSummary(ctx context.Context) (*Summary, error) { return nil, err } + // Usage stats for today (tokens, cost, avg latency) + usageQ := ` + SELECT + COALESCE(SUM((payload->'payload'->'usage'->>'total_tokens')::bigint), 0), + COALESCE(SUM((payload->'payload'->'usage'->>'total_cost')::float8), 0), + COALESCE(AVG((payload->'payload'->>'duration_ms')::float8), 0) + FROM events + WHERE type = 'run.end' + AND ts >= $1 + ` + var tokensToday int64 + var costToday, avgDurationMS float64 + if err := d.sql.QueryRowContext(ctx, usageQ, midnight).Scan(&tokensToday, &costToday, &avgDurationMS); err != nil { + return nil, err + } + return &Summary{ ActiveSessions: activeSessions, RunsToday: totalRuns, ToolCallsToday: totalTools, ErrorsToday: totalErrors, + TokensToday: tokensToday, + CostToday: costToday, + AvgDurationMS: avgDurationMS, ByFramework: byFramework, }, nil } @@ -104,6 +134,11 @@ type TopTool struct { Count int `json:"count"` } +type TopModel struct { + Name string `json:"name"` + Count int `json:"count"` +} + func (d *DB) GetTopTools(ctx context.Context, limit int) ([]TopTool, error) { if limit <= 0 { limit = 10 @@ -141,6 +176,43 @@ func (d *DB) GetTopTools(ctx context.Context, limit int) ([]TopTool, error) { return out, rows.Err() } +func (d *DB) GetTopModels(ctx context.Context, limit int) ([]TopModel, error) { + if limit <= 0 { + limit = 10 + } + now := time.Now() + midnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) + + q := ` + SELECT + payload->'payload'->>'model' AS model_name, + COUNT(*) AS cnt + FROM events + WHERE type = 'run.end' + AND payload->'payload'->>'model' IS NOT NULL + AND payload->'payload'->>'model' <> '' + AND ts >= $1 + GROUP BY model_name + ORDER BY cnt DESC + LIMIT $2 + ` + rows, err := d.sql.QueryContext(ctx, q, midnight, limit) + if err != nil { + return nil, err + } + defer rows.Close() + + var out []TopModel + for rows.Next() { + var m TopModel + if err := rows.Scan(&m.Name, &m.Count); err != nil { + return nil, err + } + out = append(out, m) + } + return out, rows.Err() +} + func bucketForWindow(window string) string { switch window { case "1h": @@ -191,9 +263,20 @@ func (d *DB) GetTimeseries(ctx context.Context, window string) (*TimeseriesResul COUNT(*) FILTER (WHERE type = 'run.start') AS runs, COUNT(*) FILTER (WHERE type = 'span.end' AND payload->'attributes'->>'span_kind' = 'tool') AS tools, - COUNT(*) FILTER (WHERE type = 'error') AS errors + COUNT(*) FILTER (WHERE type = 'error') AS errors, + COALESCE(SUM((payload->'payload'->'usage'->>'total_tokens')::bigint) + FILTER (WHERE type = 'run.end'), 0) AS tokens, + COALESCE(SUM((payload->'payload'->'usage'->>'input_tokens')::bigint) + FILTER (WHERE type = 'run.end'), 0) AS input_tokens, + COALESCE(SUM((payload->'payload'->'usage'->>'output_tokens')::bigint) + FILTER (WHERE type = 'run.end'), 0) AS output_tokens, + COALESCE(SUM((payload->'payload'->'usage'->>'total_cost')::float8) + FILTER (WHERE type = 'run.end'), 0) AS cost, + COALESCE(AVG((payload->'payload'->>'duration_ms')::float8) + FILTER (WHERE type = 'run.end'), 0) AS avg_duration_ms FROM events WHERE ts >= $2 + AND type IN ('run.start', 'run.end', 'span.end', 'error') GROUP BY bucket_ts ORDER BY bucket_ts ASC ` @@ -207,7 +290,8 @@ func (d *DB) GetTimeseries(ctx context.Context, window string) (*TimeseriesResul var series []TimeseriesBucket for rows.Next() { var b TimeseriesBucket - if err := rows.Scan(&b.TS, &b.Runs, &b.Tools, &b.Errors); err != nil { + if err := rows.Scan(&b.TS, &b.Runs, &b.Tools, &b.Errors, + &b.Tokens, &b.InputTokens, &b.OutputTokens, &b.Cost, &b.AvgDurationMS); err != nil { return nil, err } series = append(series, b)