package postgres import ( "context" "encoding/json" "time" ) type RunRow struct { RunID string `json:"run_id"` SessionID string `json:"session_id"` StartedAt time.Time `json:"started_at"` EndedAt *time.Time `json:"ended_at,omitempty"` Status string `json:"status"` SpanCount int `json:"span_count"` ToolCount int `json:"tool_count"` Model string `json:"model,omitempty"` Spans []SpanRow `json:"spans,omitempty"` } type SessionDetail struct { SessionID string `json:"session_id"` StartedAt time.Time `json:"started_at"` EndedAt *time.Time `json:"ended_at,omitempty"` Framework string `json:"framework"` Host string `json:"host"` } func (d *DB) GetSessionWithRuns(ctx context.Context, sessionID string) (*SessionDetail, []RunRow, error) { // Get session info sessionQuery := ` SELECT session_id, MIN(ts) as started_at, MAX(CASE WHEN type = 'session.end' THEN ts END) as ended_at, COALESCE((ARRAY_AGG(source_framework ORDER BY CASE WHEN type = 'session.start' THEN 0 ELSE 1 END, ts) FILTER (WHERE source_framework IS NOT NULL))[1], 'unknown') as framework, MAX(payload->'event'->'source'->>'host') as host FROM events WHERE session_id = $1 GROUP BY session_id ` var session SessionDetail var host *string err := d.sql.QueryRowContext(ctx, sessionQuery, sessionID).Scan( &session.SessionID, &session.StartedAt, &session.EndedAt, &session.Framework, &host, ) if err != nil { return nil, nil, err } if host != nil { session.Host = *host } // Get runs runsQuery := ` SELECT run_id, session_id, MIN(ts) as started_at, MAX(CASE WHEN type = 'run.end' THEN ts END) as ended_at, CASE WHEN bool_or(type = 'error' OR payload->'payload'->>'status' = 'error') THEN 'error' ELSE 'success' END as status, COUNT(DISTINCT span_id) as span_count, COUNT(DISTINCT CASE WHEN payload->'attributes'->>'span_kind' = 'tool' THEN span_id END) as tool_count, COALESCE( MAX(CASE WHEN type = 'run.end' THEN payload->'payload'->>'model' END), MAX(CASE WHEN type = 'metric.snapshot' THEN payload->'payload'->'metrics'->>'model' END), '' ) as model FROM events WHERE session_id = $1 AND run_id IS NOT NULL GROUP BY run_id, session_id ORDER BY started_at ASC ` rows, err := d.sql.QueryContext(ctx, runsQuery, sessionID) if err != nil { return nil, nil, err } defer rows.Close() var runs []RunRow for rows.Next() { var r RunRow if err := rows.Scan(&r.RunID, &r.SessionID, &r.StartedAt, &r.EndedAt, &r.Status, &r.SpanCount, &r.ToolCount, &r.Model); err != nil { return nil, nil, err } runs = append(runs, r) } if err := rows.Err(); err != nil { return nil, nil, err } runs, err = d.attachSpansToRuns(ctx, sessionID, runs) if err != nil { return nil, nil, err } return &session, runs, nil } type SpanRow struct { RunID string `json:"run_id,omitempty"` SpanID string `json:"span_id"` Name string `json:"name"` Kind string `json:"kind"` StartedAt time.Time `json:"started_at"` Duration *int64 `json:"duration_ms,omitempty"` Status string `json:"status"` Payload json.RawMessage `json:"payload"` } type RunDetail struct { RunID string `json:"run_id"` SessionID string `json:"session_id"` StartedAt time.Time `json:"started_at"` EndedAt *time.Time `json:"ended_at,omitempty"` Status string `json:"status"` } func (d *DB) GetRunWithSpans(ctx context.Context, runID string) (*RunDetail, []SpanRow, error) { // Get run info runQuery := ` SELECT run_id, session_id, MIN(ts) as started_at, MAX(CASE WHEN type = 'run.end' THEN ts END) as ended_at, CASE WHEN bool_or(type = 'error' OR payload->'payload'->>'status' = 'error') THEN 'error' ELSE 'success' END as status FROM events WHERE run_id = $1 GROUP BY run_id, session_id ` var run RunDetail err := d.sql.QueryRowContext(ctx, runQuery, runID).Scan( &run.RunID, &run.SessionID, &run.StartedAt, &run.EndedAt, &run.Status, ) if err != nil { return nil, nil, err } // Get spans directly linked to this run. rs := &runSpans{} directSpans, err := d.listSpansForRun(ctx, runID) if err != nil { return nil, nil, err } for _, s := range directSpans { addSpanToRunSpans(rs, s) } // Backfill orphaned spans (have session_id but no run_id) that fall // within this run's time window, matching the same logic used by // attachSpansToRuns in the session detail view. if run.SessionID != "" { // Find the upper bound: the next run's start time, or this run's // ended_at, whichever is more precise. This prevents capturing // spans that belong to subsequent runs. var upperBound *time.Time if run.EndedAt != nil { upperBound = run.EndedAt } var nextRunStart *time.Time _ = d.sql.QueryRowContext(ctx, ` SELECT MIN(ts) FROM events WHERE session_id = $1 AND run_id != $2 AND type = 'run.start' AND ts > $3 `, run.SessionID, runID, run.StartedAt).Scan(&nextRunStart) if nextRunStart != nil && (upperBound == nil || nextRunStart.Before(*upperBound)) { upperBound = nextRunStart } rows, err := d.sql.QueryContext(ctx, ` SELECT run_id, span_id, COALESCE(payload->'attributes'->>'name', payload->'event'->>'type', type) as name, COALESCE(payload->'attributes'->>'span_kind', 'unknown') as kind, ts as started_at, (payload->'payload'->>'duration_ms')::bigint as duration_ms, CASE WHEN type = 'error' OR payload->'payload'->>'status' = 'error' THEN 'error' ELSE 'success' END as status, payload FROM events WHERE session_id = $1 AND span_id IS NOT NULL AND (run_id IS NULL OR run_id = '') AND ts >= $2 AND ($3::timestamptz IS NULL OR ts < $3) ORDER BY ts ASC `, run.SessionID, run.StartedAt, upperBound) if err != nil { return nil, nil, err } defer rows.Close() for rows.Next() { var s SpanRow var dbRunID *string if err := rows.Scan(&dbRunID, &s.SpanID, &s.Name, &s.Kind, &s.StartedAt, &s.Duration, &s.Status, &s.Payload); err != nil { return nil, nil, err } s.RunID = runID addSpanToRunSpans(rs, s) } if err := rows.Err(); err != nil { return nil, nil, err } } spans := make([]SpanRow, 0, len(rs.order)) for _, spanID := range rs.order { spans = append(spans, *rs.byID[spanID]) } return &run, spans, nil } func mergeSpanEvent(existing *SpanRow, s SpanRow) { if existing.Name == "" && s.Name != "" { existing.Name = s.Name } if existing.Kind == "" || existing.Kind == "unknown" { existing.Kind = s.Kind } if s.Duration != nil { existing.Duration = s.Duration } if s.Status == "error" { existing.Status = "error" } existing.Payload = mergeEnvelopeJSON(existing.Payload, s.Payload) } func mergeEnvelopeJSON(existing, next json.RawMessage) json.RawMessage { if len(existing) == 0 { return next } if len(next) == 0 { return existing } var dst map[string]any if err := json.Unmarshal(existing, &dst); err != nil { return next } var src map[string]any if err := json.Unmarshal(next, &src); err != nil { return existing } mergeJSONObjects(dst, src) merged, err := json.Marshal(dst) if err != nil { return next } return merged } func mergeJSONObjects(dst, src map[string]any) { for key, value := range src { srcMap, srcOK := value.(map[string]any) if !srcOK { dst[key] = value continue } dstMap, dstOK := dst[key].(map[string]any) if !dstOK { dst[key] = srcMap continue } mergeJSONObjects(dstMap, srcMap) } } type runSpans struct { byID map[string]*SpanRow order []string } func addSpanToRunSpans(rs *runSpans, s SpanRow) { if rs.byID == nil { rs.byID = make(map[string]*SpanRow) } existing := rs.byID[s.SpanID] if existing == nil { copy := s rs.byID[s.SpanID] = © rs.order = append(rs.order, s.SpanID) return } mergeSpanEvent(existing, s) } func findRunIndexForSpan(runs []RunRow, spanStartedAt time.Time) int { for i := len(runs) - 1; i >= 0; i-- { run := runs[i] if spanStartedAt.Before(run.StartedAt) { continue } if run.EndedAt == nil || !spanStartedAt.After(*run.EndedAt) { return i } } return -1 } func (d *DB) listSpansForRun(ctx context.Context, runID string) ([]SpanRow, error) { rows, err := d.sql.QueryContext(ctx, ` SELECT run_id, span_id, COALESCE(payload->'attributes'->>'name', payload->'event'->>'type', type) as name, COALESCE(payload->'attributes'->>'span_kind', 'unknown') as kind, ts as started_at, (payload->'payload'->>'duration_ms')::bigint as duration_ms, CASE WHEN type = 'error' OR payload->'payload'->>'status' = 'error' THEN 'error' ELSE 'success' END as status, payload FROM events WHERE run_id = $1 AND span_id IS NOT NULL ORDER BY ts ASC `, runID) if err != nil { return nil, err } defer rows.Close() spansByID := make(map[string]*SpanRow) var spanOrder []string for rows.Next() { var s SpanRow if err := rows.Scan(&s.RunID, &s.SpanID, &s.Name, &s.Kind, &s.StartedAt, &s.Duration, &s.Status, &s.Payload); err != nil { return nil, err } existing := spansByID[s.SpanID] if existing == nil { copy := s spansByID[s.SpanID] = © spanOrder = append(spanOrder, s.SpanID) continue } mergeSpanEvent(existing, s) } if err := rows.Err(); err != nil { return nil, err } spans := make([]SpanRow, 0, len(spanOrder)) for _, spanID := range spanOrder { spans = append(spans, *spansByID[spanID]) } return spans, nil } func (d *DB) attachSpansToRuns(ctx context.Context, sessionID string, runs []RunRow) ([]RunRow, error) { if len(runs) == 0 { return runs, nil } spansByRun := make(map[string]*runSpans) // First attach spans directly from each run_id so the session view still // works even when some span events are missing session_id. for i := range runs { spans, err := d.listSpansForRun(ctx, runs[i].RunID) if err != nil { return nil, err } if len(spans) == 0 { continue } rs := spansByRun[runs[i].RunID] if rs == nil { rs = &runSpans{} spansByRun[runs[i].RunID] = rs } for _, span := range spans { addSpanToRunSpans(rs, span) } } // Then backfill spans that only have session_id. These are assigned to the // most likely run by timestamp. rows, err := d.sql.QueryContext(ctx, ` SELECT run_id, span_id, COALESCE(payload->'attributes'->>'name', payload->'event'->>'type', type) as name, COALESCE(payload->'attributes'->>'span_kind', 'unknown') as kind, ts as started_at, (payload->'payload'->>'duration_ms')::bigint as duration_ms, CASE WHEN type = 'error' OR payload->'payload'->>'status' = 'error' THEN 'error' ELSE 'success' END as status, payload FROM events WHERE session_id = $1 AND span_id IS NOT NULL AND (run_id IS NULL OR run_id = '') ORDER BY ts ASC `, sessionID) if err != nil { return nil, err } defer rows.Close() for rows.Next() { var s SpanRow var runID *string if err := rows.Scan(&runID, &s.SpanID, &s.Name, &s.Kind, &s.StartedAt, &s.Duration, &s.Status, &s.Payload); err != nil { return nil, err } if s.RunID == "" { runIndex := findRunIndexForSpan(runs, s.StartedAt) if runIndex != -1 { s.RunID = runs[runIndex].RunID } else if runID != nil { s.RunID = *runID } } if s.RunID == "" { continue } rs := spansByRun[s.RunID] if rs == nil { rs = &runSpans{} spansByRun[s.RunID] = rs } addSpanToRunSpans(rs, s) } if err := rows.Err(); err != nil { return nil, err } for i := range runs { rs := spansByRun[runs[i].RunID] if rs == nil { continue } spans := make([]SpanRow, 0, len(rs.order)) for _, spanID := range rs.order { spans = append(spans, *rs.byID[spanID]) } runs[i].Spans = spans // Update span_count to reflect backfilled spans, not just // the SQL aggregate which misses orphaned (no run_id) spans. runs[i].SpanCount = len(spans) } return runs, nil }