From 41b7165800b242e5141e03d3a65b217d7b6bdfa6 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Tue, 21 Apr 2026 13:07:09 -0700 Subject: [PATCH] fix(store): backfill spans in run detail --- internal/store/postgres/runs.go | 166 +++++++++++++++++++++------ internal/store/postgres/runs_test.go | 55 +++++++++ 2 files changed, 188 insertions(+), 33 deletions(-) diff --git a/internal/store/postgres/runs.go b/internal/store/postgres/runs.go index cd3bf69..5b49832 100644 --- a/internal/store/postgres/runs.go +++ b/internal/store/postgres/runs.go @@ -144,10 +144,75 @@ func (d *DB) GetRunWithSpans(ctx context.Context, runID string) (*RunDetail, []S return nil, nil, err } - spans, err := d.listSpansForRun(ctx, runID) + // Get spans directly linked to this run. + rs := &runSpans{} + directSpans, err := d.listSpansForRun(ctx, runID) if err != nil { return nil, nil, err } + for _, s := range directSpans { + addSpanToRunSpans(rs, s) + } + + // Backfill orphaned spans (have session_id but no run_id) that fall + // within this run's time window, matching the same logic used by + // attachSpansToRuns in the session detail view. + if run.SessionID != "" { + // Find the upper bound: the next run's start time, or this run's + // ended_at, whichever is more precise. This prevents capturing + // spans that belong to subsequent runs. + var upperBound *time.Time + if run.EndedAt != nil { + upperBound = run.EndedAt + } + var nextRunStart *time.Time + _ = d.sql.QueryRowContext(ctx, ` + SELECT MIN(ts) + FROM events + WHERE session_id = $1 AND run_id != $2 AND type = 'run.start' AND ts > $3 + `, run.SessionID, runID, run.StartedAt).Scan(&nextRunStart) + if nextRunStart != nil && (upperBound == nil || nextRunStart.Before(*upperBound)) { + upperBound = nextRunStart + } + + rows, err := d.sql.QueryContext(ctx, ` + SELECT + run_id, + span_id, + COALESCE(payload->'attributes'->>'name', payload->'event'->>'type', type) as name, + COALESCE(payload->'attributes'->>'span_kind', 'unknown') as kind, + ts as started_at, + (payload->'payload'->>'duration_ms')::bigint as duration_ms, + CASE WHEN type = 'error' OR payload->'payload'->>'status' = 'error' THEN 'error' ELSE 'success' END as status, + payload + FROM events + WHERE session_id = $1 AND span_id IS NOT NULL AND (run_id IS NULL OR run_id = '') + AND ts >= $2 AND ($3::timestamptz IS NULL OR ts < $3) + ORDER BY ts ASC + `, run.SessionID, run.StartedAt, upperBound) + if err != nil { + return nil, nil, err + } + defer rows.Close() + + for rows.Next() { + var s SpanRow + var dbRunID *string + if err := rows.Scan(&dbRunID, &s.SpanID, &s.Name, &s.Kind, &s.StartedAt, &s.Duration, &s.Status, &s.Payload); err != nil { + return nil, nil, err + } + s.RunID = runID + addSpanToRunSpans(rs, s) + } + if err := rows.Err(); err != nil { + return nil, nil, err + } + } + + spans := make([]SpanRow, 0, len(rs.order)) + for _, spanID := range rs.order { + spans = append(spans, *rs.byID[spanID]) + } return &run, spans, nil } @@ -213,6 +278,27 @@ func mergeJSONObjects(dst, src map[string]any) { } } +type runSpans struct { + byID map[string]*SpanRow + order []string +} + +func addSpanToRunSpans(rs *runSpans, s SpanRow) { + if rs.byID == nil { + rs.byID = make(map[string]*SpanRow) + } + + existing := rs.byID[s.SpanID] + if existing == nil { + copy := s + rs.byID[s.SpanID] = © + rs.order = append(rs.order, s.SpanID) + return + } + + mergeSpanEvent(existing, s) +} + func findRunIndexForSpan(runs []RunRow, spanStartedAt time.Time) int { for i := len(runs) - 1; i >= 0; i-- { run := runs[i] @@ -284,18 +370,44 @@ func (d *DB) attachSpansToRuns(ctx context.Context, sessionID string, runs []Run return runs, nil } + spansByRun := make(map[string]*runSpans) + + // First attach spans directly from each run_id so the session view still + // works even when some span events are missing session_id. + for i := range runs { + spans, err := d.listSpansForRun(ctx, runs[i].RunID) + if err != nil { + return nil, err + } + if len(spans) == 0 { + continue + } + + rs := spansByRun[runs[i].RunID] + if rs == nil { + rs = &runSpans{} + spansByRun[runs[i].RunID] = rs + } + + for _, span := range spans { + addSpanToRunSpans(rs, span) + } + } + + // Then backfill spans that only have session_id. These are assigned to the + // most likely run by timestamp. rows, err := d.sql.QueryContext(ctx, ` SELECT run_id, - span_id, - COALESCE(payload->'attributes'->>'name', payload->'event'->>'type', type) as name, - COALESCE(payload->'attributes'->>'span_kind', 'unknown') as kind, - ts as started_at, - (payload->'payload'->>'duration_ms')::bigint as duration_ms, - CASE WHEN type = 'error' OR payload->'payload'->>'status' = 'error' THEN 'error' ELSE 'success' END as status, - payload + span_id, + COALESCE(payload->'attributes'->>'name', payload->'event'->>'type', type) as name, + COALESCE(payload->'attributes'->>'span_kind', 'unknown') as kind, + ts as started_at, + (payload->'payload'->>'duration_ms')::bigint as duration_ms, + CASE WHEN type = 'error' OR payload->'payload'->>'status' = 'error' THEN 'error' ELSE 'success' END as status, + payload FROM events - WHERE session_id = $1 AND span_id IS NOT NULL + WHERE session_id = $1 AND span_id IS NOT NULL AND (run_id IS NULL OR run_id = '') ORDER BY ts ASC `, sessionID) if err != nil { @@ -303,45 +415,30 @@ func (d *DB) attachSpansToRuns(ctx context.Context, sessionID string, runs []Run } defer rows.Close() - // Map of run_id -> (map of span_id -> *SpanRow) for merging - type runSpans struct { - byID map[string]*SpanRow - order []string - } - spansByRun := make(map[string]*runSpans) - for rows.Next() { var s SpanRow var runID *string if err := rows.Scan(&runID, &s.SpanID, &s.Name, &s.Kind, &s.StartedAt, &s.Duration, &s.Status, &s.Payload); err != nil { return nil, err } - if runID != nil { - s.RunID = *runID - } if s.RunID == "" { runIndex := findRunIndexForSpan(runs, s.StartedAt) - if runIndex == -1 { - continue + if runIndex != -1 { + s.RunID = runs[runIndex].RunID + } else if runID != nil { + s.RunID = *runID } - s.RunID = runs[runIndex].RunID + } + if s.RunID == "" { + continue } rs := spansByRun[s.RunID] if rs == nil { - rs = &runSpans{byID: make(map[string]*SpanRow)} + rs = &runSpans{} spansByRun[s.RunID] = rs } - - existing := rs.byID[s.SpanID] - if existing == nil { - copy := s - rs.byID[s.SpanID] = © - rs.order = append(rs.order, s.SpanID) - continue - } - - mergeSpanEvent(existing, s) + addSpanToRunSpans(rs, s) } if err := rows.Err(); err != nil { return nil, err @@ -357,6 +454,9 @@ func (d *DB) attachSpansToRuns(ctx context.Context, sessionID string, runs []Run spans = append(spans, *rs.byID[spanID]) } runs[i].Spans = spans + // Update span_count to reflect backfilled spans, not just + // the SQL aggregate which misses orphaned (no run_id) spans. + runs[i].SpanCount = len(spans) } return runs, nil } diff --git a/internal/store/postgres/runs_test.go b/internal/store/postgres/runs_test.go index 37a33f9..7a0d120 100644 --- a/internal/store/postgres/runs_test.go +++ b/internal/store/postgres/runs_test.go @@ -128,3 +128,58 @@ func TestFindRunIndexForSpan_MatchesOpenRun(t *testing.T) { t.Fatalf("expected span to attach to open run-2, got index %d", idx) } } + +func TestAddSpanToRunSpans_MergesDuplicateSpanIDs(t *testing.T) { + rs := &runSpans{} + + addSpanToRunSpans(rs, SpanRow{ + SpanID: "span-1", + Name: "tool.call", + Kind: "tool", + Payload: json.RawMessage(`{ + "payload":{"input":{"command":"ls"}} + }`), + }) + + addSpanToRunSpans(rs, SpanRow{ + SpanID: "span-1", + Status: "error", + Duration: func() *int64 { v := int64(42); return &v }(), + Payload: json.RawMessage(`{ + "payload":{"result_preview":"failed"} + }`), + }) + + if len(rs.order) != 1 { + t.Fatalf("expected one span order entry, got %d", len(rs.order)) + } + + span := rs.byID["span-1"] + if span == nil { + t.Fatal("expected merged span to be present") + } + if span.Name != "tool.call" { + t.Fatalf("expected name to be preserved, got %q", span.Name) + } + if span.Status != "error" { + t.Fatalf("expected error status to win, got %q", span.Status) + } + if span.Duration == nil || *span.Duration != 42 { + t.Fatalf("expected duration to be merged, got %#v", span.Duration) + } + + var got map[string]any + if err := json.Unmarshal(span.Payload, &got); err != nil { + t.Fatalf("unmarshal merged payload: %v", err) + } + payload, ok := got["payload"].(map[string]any) + if !ok { + t.Fatal("expected merged payload object") + } + if _, ok := payload["input"].(map[string]any); !ok { + t.Fatal("expected input to remain after merge") + } + if payload["result_preview"] != "failed" { + t.Fatalf("expected result_preview to be merged, got %#v", payload["result_preview"]) + } +}