fix(store): backfill spans in run detail
This commit is contained in:
+126
-26
@@ -144,10 +144,75 @@ func (d *DB) GetRunWithSpans(ctx context.Context, runID string) (*RunDetail, []S
|
|||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
spans, err := d.listSpansForRun(ctx, runID)
|
// Get spans directly linked to this run.
|
||||||
|
rs := &runSpans{}
|
||||||
|
directSpans, err := d.listSpansForRun(ctx, runID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
for _, s := range directSpans {
|
||||||
|
addSpanToRunSpans(rs, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Backfill orphaned spans (have session_id but no run_id) that fall
|
||||||
|
// within this run's time window, matching the same logic used by
|
||||||
|
// attachSpansToRuns in the session detail view.
|
||||||
|
if run.SessionID != "" {
|
||||||
|
// Find the upper bound: the next run's start time, or this run's
|
||||||
|
// ended_at, whichever is more precise. This prevents capturing
|
||||||
|
// spans that belong to subsequent runs.
|
||||||
|
var upperBound *time.Time
|
||||||
|
if run.EndedAt != nil {
|
||||||
|
upperBound = run.EndedAt
|
||||||
|
}
|
||||||
|
var nextRunStart *time.Time
|
||||||
|
_ = d.sql.QueryRowContext(ctx, `
|
||||||
|
SELECT MIN(ts)
|
||||||
|
FROM events
|
||||||
|
WHERE session_id = $1 AND run_id != $2 AND type = 'run.start' AND ts > $3
|
||||||
|
`, run.SessionID, runID, run.StartedAt).Scan(&nextRunStart)
|
||||||
|
if nextRunStart != nil && (upperBound == nil || nextRunStart.Before(*upperBound)) {
|
||||||
|
upperBound = nextRunStart
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, err := d.sql.QueryContext(ctx, `
|
||||||
|
SELECT
|
||||||
|
run_id,
|
||||||
|
span_id,
|
||||||
|
COALESCE(payload->'attributes'->>'name', payload->'event'->>'type', type) as name,
|
||||||
|
COALESCE(payload->'attributes'->>'span_kind', 'unknown') as kind,
|
||||||
|
ts as started_at,
|
||||||
|
(payload->'payload'->>'duration_ms')::bigint as duration_ms,
|
||||||
|
CASE WHEN type = 'error' OR payload->'payload'->>'status' = 'error' THEN 'error' ELSE 'success' END as status,
|
||||||
|
payload
|
||||||
|
FROM events
|
||||||
|
WHERE session_id = $1 AND span_id IS NOT NULL AND (run_id IS NULL OR run_id = '')
|
||||||
|
AND ts >= $2 AND ($3::timestamptz IS NULL OR ts < $3)
|
||||||
|
ORDER BY ts ASC
|
||||||
|
`, run.SessionID, run.StartedAt, upperBound)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
var s SpanRow
|
||||||
|
var dbRunID *string
|
||||||
|
if err := rows.Scan(&dbRunID, &s.SpanID, &s.Name, &s.Kind, &s.StartedAt, &s.Duration, &s.Status, &s.Payload); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
s.RunID = runID
|
||||||
|
addSpanToRunSpans(rs, s)
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
spans := make([]SpanRow, 0, len(rs.order))
|
||||||
|
for _, spanID := range rs.order {
|
||||||
|
spans = append(spans, *rs.byID[spanID])
|
||||||
|
}
|
||||||
|
|
||||||
return &run, spans, nil
|
return &run, spans, nil
|
||||||
}
|
}
|
||||||
@@ -213,6 +278,27 @@ func mergeJSONObjects(dst, src map[string]any) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type runSpans struct {
|
||||||
|
byID map[string]*SpanRow
|
||||||
|
order []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func addSpanToRunSpans(rs *runSpans, s SpanRow) {
|
||||||
|
if rs.byID == nil {
|
||||||
|
rs.byID = make(map[string]*SpanRow)
|
||||||
|
}
|
||||||
|
|
||||||
|
existing := rs.byID[s.SpanID]
|
||||||
|
if existing == nil {
|
||||||
|
copy := s
|
||||||
|
rs.byID[s.SpanID] = ©
|
||||||
|
rs.order = append(rs.order, s.SpanID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
mergeSpanEvent(existing, s)
|
||||||
|
}
|
||||||
|
|
||||||
func findRunIndexForSpan(runs []RunRow, spanStartedAt time.Time) int {
|
func findRunIndexForSpan(runs []RunRow, spanStartedAt time.Time) int {
|
||||||
for i := len(runs) - 1; i >= 0; i-- {
|
for i := len(runs) - 1; i >= 0; i-- {
|
||||||
run := runs[i]
|
run := runs[i]
|
||||||
@@ -284,6 +370,32 @@ func (d *DB) attachSpansToRuns(ctx context.Context, sessionID string, runs []Run
|
|||||||
return runs, nil
|
return runs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
spansByRun := make(map[string]*runSpans)
|
||||||
|
|
||||||
|
// First attach spans directly from each run_id so the session view still
|
||||||
|
// works even when some span events are missing session_id.
|
||||||
|
for i := range runs {
|
||||||
|
spans, err := d.listSpansForRun(ctx, runs[i].RunID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(spans) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
rs := spansByRun[runs[i].RunID]
|
||||||
|
if rs == nil {
|
||||||
|
rs = &runSpans{}
|
||||||
|
spansByRun[runs[i].RunID] = rs
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, span := range spans {
|
||||||
|
addSpanToRunSpans(rs, span)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then backfill spans that only have session_id. These are assigned to the
|
||||||
|
// most likely run by timestamp.
|
||||||
rows, err := d.sql.QueryContext(ctx, `
|
rows, err := d.sql.QueryContext(ctx, `
|
||||||
SELECT
|
SELECT
|
||||||
run_id,
|
run_id,
|
||||||
@@ -295,7 +407,7 @@ func (d *DB) attachSpansToRuns(ctx context.Context, sessionID string, runs []Run
|
|||||||
CASE WHEN type = 'error' OR payload->'payload'->>'status' = 'error' THEN 'error' ELSE 'success' END as status,
|
CASE WHEN type = 'error' OR payload->'payload'->>'status' = 'error' THEN 'error' ELSE 'success' END as status,
|
||||||
payload
|
payload
|
||||||
FROM events
|
FROM events
|
||||||
WHERE session_id = $1 AND span_id IS NOT NULL
|
WHERE session_id = $1 AND span_id IS NOT NULL AND (run_id IS NULL OR run_id = '')
|
||||||
ORDER BY ts ASC
|
ORDER BY ts ASC
|
||||||
`, sessionID)
|
`, sessionID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -303,45 +415,30 @@ func (d *DB) attachSpansToRuns(ctx context.Context, sessionID string, runs []Run
|
|||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
// Map of run_id -> (map of span_id -> *SpanRow) for merging
|
|
||||||
type runSpans struct {
|
|
||||||
byID map[string]*SpanRow
|
|
||||||
order []string
|
|
||||||
}
|
|
||||||
spansByRun := make(map[string]*runSpans)
|
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var s SpanRow
|
var s SpanRow
|
||||||
var runID *string
|
var runID *string
|
||||||
if err := rows.Scan(&runID, &s.SpanID, &s.Name, &s.Kind, &s.StartedAt, &s.Duration, &s.Status, &s.Payload); err != nil {
|
if err := rows.Scan(&runID, &s.SpanID, &s.Name, &s.Kind, &s.StartedAt, &s.Duration, &s.Status, &s.Payload); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if runID != nil {
|
|
||||||
s.RunID = *runID
|
|
||||||
}
|
|
||||||
if s.RunID == "" {
|
if s.RunID == "" {
|
||||||
runIndex := findRunIndexForSpan(runs, s.StartedAt)
|
runIndex := findRunIndexForSpan(runs, s.StartedAt)
|
||||||
if runIndex == -1 {
|
if runIndex != -1 {
|
||||||
continue
|
|
||||||
}
|
|
||||||
s.RunID = runs[runIndex].RunID
|
s.RunID = runs[runIndex].RunID
|
||||||
|
} else if runID != nil {
|
||||||
|
s.RunID = *runID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if s.RunID == "" {
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
rs := spansByRun[s.RunID]
|
rs := spansByRun[s.RunID]
|
||||||
if rs == nil {
|
if rs == nil {
|
||||||
rs = &runSpans{byID: make(map[string]*SpanRow)}
|
rs = &runSpans{}
|
||||||
spansByRun[s.RunID] = rs
|
spansByRun[s.RunID] = rs
|
||||||
}
|
}
|
||||||
|
addSpanToRunSpans(rs, s)
|
||||||
existing := rs.byID[s.SpanID]
|
|
||||||
if existing == nil {
|
|
||||||
copy := s
|
|
||||||
rs.byID[s.SpanID] = ©
|
|
||||||
rs.order = append(rs.order, s.SpanID)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
mergeSpanEvent(existing, s)
|
|
||||||
}
|
}
|
||||||
if err := rows.Err(); err != nil {
|
if err := rows.Err(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -357,6 +454,9 @@ func (d *DB) attachSpansToRuns(ctx context.Context, sessionID string, runs []Run
|
|||||||
spans = append(spans, *rs.byID[spanID])
|
spans = append(spans, *rs.byID[spanID])
|
||||||
}
|
}
|
||||||
runs[i].Spans = spans
|
runs[i].Spans = spans
|
||||||
|
// Update span_count to reflect backfilled spans, not just
|
||||||
|
// the SQL aggregate which misses orphaned (no run_id) spans.
|
||||||
|
runs[i].SpanCount = len(spans)
|
||||||
}
|
}
|
||||||
return runs, nil
|
return runs, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -128,3 +128,58 @@ func TestFindRunIndexForSpan_MatchesOpenRun(t *testing.T) {
|
|||||||
t.Fatalf("expected span to attach to open run-2, got index %d", idx)
|
t.Fatalf("expected span to attach to open run-2, got index %d", idx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAddSpanToRunSpans_MergesDuplicateSpanIDs(t *testing.T) {
|
||||||
|
rs := &runSpans{}
|
||||||
|
|
||||||
|
addSpanToRunSpans(rs, SpanRow{
|
||||||
|
SpanID: "span-1",
|
||||||
|
Name: "tool.call",
|
||||||
|
Kind: "tool",
|
||||||
|
Payload: json.RawMessage(`{
|
||||||
|
"payload":{"input":{"command":"ls"}}
|
||||||
|
}`),
|
||||||
|
})
|
||||||
|
|
||||||
|
addSpanToRunSpans(rs, SpanRow{
|
||||||
|
SpanID: "span-1",
|
||||||
|
Status: "error",
|
||||||
|
Duration: func() *int64 { v := int64(42); return &v }(),
|
||||||
|
Payload: json.RawMessage(`{
|
||||||
|
"payload":{"result_preview":"failed"}
|
||||||
|
}`),
|
||||||
|
})
|
||||||
|
|
||||||
|
if len(rs.order) != 1 {
|
||||||
|
t.Fatalf("expected one span order entry, got %d", len(rs.order))
|
||||||
|
}
|
||||||
|
|
||||||
|
span := rs.byID["span-1"]
|
||||||
|
if span == nil {
|
||||||
|
t.Fatal("expected merged span to be present")
|
||||||
|
}
|
||||||
|
if span.Name != "tool.call" {
|
||||||
|
t.Fatalf("expected name to be preserved, got %q", span.Name)
|
||||||
|
}
|
||||||
|
if span.Status != "error" {
|
||||||
|
t.Fatalf("expected error status to win, got %q", span.Status)
|
||||||
|
}
|
||||||
|
if span.Duration == nil || *span.Duration != 42 {
|
||||||
|
t.Fatalf("expected duration to be merged, got %#v", span.Duration)
|
||||||
|
}
|
||||||
|
|
||||||
|
var got map[string]any
|
||||||
|
if err := json.Unmarshal(span.Payload, &got); err != nil {
|
||||||
|
t.Fatalf("unmarshal merged payload: %v", err)
|
||||||
|
}
|
||||||
|
payload, ok := got["payload"].(map[string]any)
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("expected merged payload object")
|
||||||
|
}
|
||||||
|
if _, ok := payload["input"].(map[string]any); !ok {
|
||||||
|
t.Fatal("expected input to remain after merge")
|
||||||
|
}
|
||||||
|
if payload["result_preview"] != "failed" {
|
||||||
|
t.Fatalf("expected result_preview to be merged, got %#v", payload["result_preview"])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user