463 lines
12 KiB
Go
463 lines
12 KiB
Go
package postgres
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
)
|
|
|
|
type RunRow struct {
|
|
RunID string `json:"run_id"`
|
|
SessionID string `json:"session_id"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
EndedAt *time.Time `json:"ended_at,omitempty"`
|
|
Status string `json:"status"`
|
|
SpanCount int `json:"span_count"`
|
|
ToolCount int `json:"tool_count"`
|
|
Model string `json:"model,omitempty"`
|
|
Spans []SpanRow `json:"spans,omitempty"`
|
|
}
|
|
|
|
type SessionDetail struct {
|
|
SessionID string `json:"session_id"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
EndedAt *time.Time `json:"ended_at,omitempty"`
|
|
Framework string `json:"framework"`
|
|
Host string `json:"host"`
|
|
}
|
|
|
|
func (d *DB) GetSessionWithRuns(ctx context.Context, sessionID string) (*SessionDetail, []RunRow, error) {
|
|
// Get session info
|
|
sessionQuery := `
|
|
SELECT
|
|
session_id,
|
|
MIN(ts) as started_at,
|
|
MAX(CASE WHEN type = 'session.end' THEN ts END) as ended_at,
|
|
COALESCE((ARRAY_AGG(source_framework ORDER BY CASE WHEN type = 'session.start' THEN 0 ELSE 1 END, ts) FILTER (WHERE source_framework IS NOT NULL))[1], 'unknown') as framework,
|
|
MAX(payload->'event'->'source'->>'host') as host
|
|
FROM events
|
|
WHERE session_id = $1
|
|
GROUP BY session_id
|
|
`
|
|
var session SessionDetail
|
|
var host *string
|
|
err := d.sql.QueryRowContext(ctx, sessionQuery, sessionID).Scan(
|
|
&session.SessionID, &session.StartedAt, &session.EndedAt, &session.Framework, &host,
|
|
)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if host != nil {
|
|
session.Host = *host
|
|
}
|
|
|
|
// Get runs
|
|
runsQuery := `
|
|
SELECT
|
|
run_id,
|
|
session_id,
|
|
MIN(ts) as started_at,
|
|
MAX(CASE WHEN type = 'run.end' THEN ts END) as ended_at,
|
|
CASE
|
|
WHEN bool_or(type = 'error' OR payload->'payload'->>'status' = 'error') THEN 'error'
|
|
ELSE 'success'
|
|
END as status,
|
|
COUNT(DISTINCT span_id) as span_count,
|
|
COUNT(DISTINCT CASE WHEN payload->'attributes'->>'span_kind' = 'tool' THEN span_id END) as tool_count,
|
|
COALESCE(
|
|
MAX(CASE WHEN type = 'run.end' THEN payload->'payload'->>'model' END),
|
|
MAX(CASE WHEN type = 'metric.snapshot' THEN payload->'payload'->'metrics'->>'model' END),
|
|
''
|
|
) as model
|
|
FROM events
|
|
WHERE session_id = $1 AND run_id IS NOT NULL
|
|
GROUP BY run_id, session_id
|
|
ORDER BY started_at ASC
|
|
`
|
|
rows, err := d.sql.QueryContext(ctx, runsQuery, sessionID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var runs []RunRow
|
|
for rows.Next() {
|
|
var r RunRow
|
|
if err := rows.Scan(&r.RunID, &r.SessionID, &r.StartedAt, &r.EndedAt, &r.Status, &r.SpanCount, &r.ToolCount, &r.Model); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
runs = append(runs, r)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
runs, err = d.attachSpansToRuns(ctx, sessionID, runs)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return &session, runs, nil
|
|
}
|
|
|
|
type SpanRow struct {
|
|
RunID string `json:"run_id,omitempty"`
|
|
SpanID string `json:"span_id"`
|
|
Name string `json:"name"`
|
|
Kind string `json:"kind"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
Duration *int64 `json:"duration_ms,omitempty"`
|
|
Status string `json:"status"`
|
|
Payload json.RawMessage `json:"payload"`
|
|
}
|
|
|
|
type RunDetail struct {
|
|
RunID string `json:"run_id"`
|
|
SessionID string `json:"session_id"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
EndedAt *time.Time `json:"ended_at,omitempty"`
|
|
Status string `json:"status"`
|
|
}
|
|
|
|
func (d *DB) GetRunWithSpans(ctx context.Context, runID string) (*RunDetail, []SpanRow, error) {
|
|
// Get run info
|
|
runQuery := `
|
|
SELECT
|
|
run_id,
|
|
session_id,
|
|
MIN(ts) as started_at,
|
|
MAX(CASE WHEN type = 'run.end' THEN ts END) as ended_at,
|
|
CASE
|
|
WHEN bool_or(type = 'error' OR payload->'payload'->>'status' = 'error') THEN 'error'
|
|
ELSE 'success'
|
|
END as status
|
|
FROM events
|
|
WHERE run_id = $1
|
|
GROUP BY run_id, session_id
|
|
`
|
|
var run RunDetail
|
|
err := d.sql.QueryRowContext(ctx, runQuery, runID).Scan(
|
|
&run.RunID, &run.SessionID, &run.StartedAt, &run.EndedAt, &run.Status,
|
|
)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Get spans directly linked to this run.
|
|
rs := &runSpans{}
|
|
directSpans, err := d.listSpansForRun(ctx, runID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
for _, s := range directSpans {
|
|
addSpanToRunSpans(rs, s)
|
|
}
|
|
|
|
// Backfill orphaned spans (have session_id but no run_id) that fall
|
|
// within this run's time window, matching the same logic used by
|
|
// attachSpansToRuns in the session detail view.
|
|
if run.SessionID != "" {
|
|
// Find the upper bound: the next run's start time, or this run's
|
|
// ended_at, whichever is more precise. This prevents capturing
|
|
// spans that belong to subsequent runs.
|
|
var upperBound *time.Time
|
|
if run.EndedAt != nil {
|
|
upperBound = run.EndedAt
|
|
}
|
|
var nextRunStart *time.Time
|
|
_ = d.sql.QueryRowContext(ctx, `
|
|
SELECT MIN(ts)
|
|
FROM events
|
|
WHERE session_id = $1 AND run_id != $2 AND type = 'run.start' AND ts > $3
|
|
`, run.SessionID, runID, run.StartedAt).Scan(&nextRunStart)
|
|
if nextRunStart != nil && (upperBound == nil || nextRunStart.Before(*upperBound)) {
|
|
upperBound = nextRunStart
|
|
}
|
|
|
|
rows, err := d.sql.QueryContext(ctx, `
|
|
SELECT
|
|
run_id,
|
|
span_id,
|
|
COALESCE(payload->'attributes'->>'name', payload->'event'->>'type', type) as name,
|
|
COALESCE(payload->'attributes'->>'span_kind', 'unknown') as kind,
|
|
ts as started_at,
|
|
(payload->'payload'->>'duration_ms')::bigint as duration_ms,
|
|
CASE WHEN type = 'error' OR payload->'payload'->>'status' = 'error' THEN 'error' ELSE 'success' END as status,
|
|
payload
|
|
FROM events
|
|
WHERE session_id = $1 AND span_id IS NOT NULL AND (run_id IS NULL OR run_id = '')
|
|
AND ts >= $2 AND ($3::timestamptz IS NULL OR ts < $3)
|
|
ORDER BY ts ASC
|
|
`, run.SessionID, run.StartedAt, upperBound)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var s SpanRow
|
|
var dbRunID *string
|
|
if err := rows.Scan(&dbRunID, &s.SpanID, &s.Name, &s.Kind, &s.StartedAt, &s.Duration, &s.Status, &s.Payload); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
s.RunID = runID
|
|
addSpanToRunSpans(rs, s)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
spans := make([]SpanRow, 0, len(rs.order))
|
|
for _, spanID := range rs.order {
|
|
spans = append(spans, *rs.byID[spanID])
|
|
}
|
|
|
|
return &run, spans, nil
|
|
}
|
|
|
|
func mergeSpanEvent(existing *SpanRow, s SpanRow) {
|
|
if existing.Name == "" && s.Name != "" {
|
|
existing.Name = s.Name
|
|
}
|
|
if existing.Kind == "" || existing.Kind == "unknown" {
|
|
existing.Kind = s.Kind
|
|
}
|
|
if s.Duration != nil {
|
|
existing.Duration = s.Duration
|
|
}
|
|
if s.Status == "error" {
|
|
existing.Status = "error"
|
|
}
|
|
existing.Payload = mergeEnvelopeJSON(existing.Payload, s.Payload)
|
|
}
|
|
|
|
func mergeEnvelopeJSON(existing, next json.RawMessage) json.RawMessage {
|
|
if len(existing) == 0 {
|
|
return next
|
|
}
|
|
if len(next) == 0 {
|
|
return existing
|
|
}
|
|
|
|
var dst map[string]any
|
|
if err := json.Unmarshal(existing, &dst); err != nil {
|
|
return next
|
|
}
|
|
|
|
var src map[string]any
|
|
if err := json.Unmarshal(next, &src); err != nil {
|
|
return existing
|
|
}
|
|
|
|
mergeJSONObjects(dst, src)
|
|
|
|
merged, err := json.Marshal(dst)
|
|
if err != nil {
|
|
return next
|
|
}
|
|
return merged
|
|
}
|
|
|
|
func mergeJSONObjects(dst, src map[string]any) {
|
|
for key, value := range src {
|
|
srcMap, srcOK := value.(map[string]any)
|
|
if !srcOK {
|
|
dst[key] = value
|
|
continue
|
|
}
|
|
|
|
dstMap, dstOK := dst[key].(map[string]any)
|
|
if !dstOK {
|
|
dst[key] = srcMap
|
|
continue
|
|
}
|
|
|
|
mergeJSONObjects(dstMap, srcMap)
|
|
}
|
|
}
|
|
|
|
type runSpans struct {
|
|
byID map[string]*SpanRow
|
|
order []string
|
|
}
|
|
|
|
func addSpanToRunSpans(rs *runSpans, s SpanRow) {
|
|
if rs.byID == nil {
|
|
rs.byID = make(map[string]*SpanRow)
|
|
}
|
|
|
|
existing := rs.byID[s.SpanID]
|
|
if existing == nil {
|
|
copy := s
|
|
rs.byID[s.SpanID] = ©
|
|
rs.order = append(rs.order, s.SpanID)
|
|
return
|
|
}
|
|
|
|
mergeSpanEvent(existing, s)
|
|
}
|
|
|
|
func findRunIndexForSpan(runs []RunRow, spanStartedAt time.Time) int {
|
|
for i := len(runs) - 1; i >= 0; i-- {
|
|
run := runs[i]
|
|
if spanStartedAt.Before(run.StartedAt) {
|
|
continue
|
|
}
|
|
if run.EndedAt == nil || !spanStartedAt.After(*run.EndedAt) {
|
|
return i
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
func (d *DB) listSpansForRun(ctx context.Context, runID string) ([]SpanRow, error) {
|
|
rows, err := d.sql.QueryContext(ctx, `
|
|
SELECT
|
|
run_id,
|
|
span_id,
|
|
COALESCE(payload->'attributes'->>'name', payload->'event'->>'type', type) as name,
|
|
COALESCE(payload->'attributes'->>'span_kind', 'unknown') as kind,
|
|
ts as started_at,
|
|
(payload->'payload'->>'duration_ms')::bigint as duration_ms,
|
|
CASE
|
|
WHEN type = 'error' OR payload->'payload'->>'status' = 'error' THEN 'error'
|
|
ELSE 'success'
|
|
END as status,
|
|
payload
|
|
FROM events
|
|
WHERE run_id = $1 AND span_id IS NOT NULL
|
|
ORDER BY ts ASC
|
|
`, runID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
spansByID := make(map[string]*SpanRow)
|
|
var spanOrder []string
|
|
for rows.Next() {
|
|
var s SpanRow
|
|
if err := rows.Scan(&s.RunID, &s.SpanID, &s.Name, &s.Kind, &s.StartedAt, &s.Duration, &s.Status, &s.Payload); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
existing := spansByID[s.SpanID]
|
|
if existing == nil {
|
|
copy := s
|
|
spansByID[s.SpanID] = ©
|
|
spanOrder = append(spanOrder, s.SpanID)
|
|
continue
|
|
}
|
|
|
|
mergeSpanEvent(existing, s)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
spans := make([]SpanRow, 0, len(spanOrder))
|
|
for _, spanID := range spanOrder {
|
|
spans = append(spans, *spansByID[spanID])
|
|
}
|
|
return spans, nil
|
|
}
|
|
|
|
func (d *DB) attachSpansToRuns(ctx context.Context, sessionID string, runs []RunRow) ([]RunRow, error) {
|
|
if len(runs) == 0 {
|
|
return runs, nil
|
|
}
|
|
|
|
spansByRun := make(map[string]*runSpans)
|
|
|
|
// First attach spans directly from each run_id so the session view still
|
|
// works even when some span events are missing session_id.
|
|
for i := range runs {
|
|
spans, err := d.listSpansForRun(ctx, runs[i].RunID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(spans) == 0 {
|
|
continue
|
|
}
|
|
|
|
rs := spansByRun[runs[i].RunID]
|
|
if rs == nil {
|
|
rs = &runSpans{}
|
|
spansByRun[runs[i].RunID] = rs
|
|
}
|
|
|
|
for _, span := range spans {
|
|
addSpanToRunSpans(rs, span)
|
|
}
|
|
}
|
|
|
|
// Then backfill spans that only have session_id. These are assigned to the
|
|
// most likely run by timestamp.
|
|
rows, err := d.sql.QueryContext(ctx, `
|
|
SELECT
|
|
run_id,
|
|
span_id,
|
|
COALESCE(payload->'attributes'->>'name', payload->'event'->>'type', type) as name,
|
|
COALESCE(payload->'attributes'->>'span_kind', 'unknown') as kind,
|
|
ts as started_at,
|
|
(payload->'payload'->>'duration_ms')::bigint as duration_ms,
|
|
CASE WHEN type = 'error' OR payload->'payload'->>'status' = 'error' THEN 'error' ELSE 'success' END as status,
|
|
payload
|
|
FROM events
|
|
WHERE session_id = $1 AND span_id IS NOT NULL AND (run_id IS NULL OR run_id = '')
|
|
ORDER BY ts ASC
|
|
`, sessionID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var s SpanRow
|
|
var runID *string
|
|
if err := rows.Scan(&runID, &s.SpanID, &s.Name, &s.Kind, &s.StartedAt, &s.Duration, &s.Status, &s.Payload); err != nil {
|
|
return nil, err
|
|
}
|
|
if s.RunID == "" {
|
|
runIndex := findRunIndexForSpan(runs, s.StartedAt)
|
|
if runIndex != -1 {
|
|
s.RunID = runs[runIndex].RunID
|
|
} else if runID != nil {
|
|
s.RunID = *runID
|
|
}
|
|
}
|
|
if s.RunID == "" {
|
|
continue
|
|
}
|
|
|
|
rs := spansByRun[s.RunID]
|
|
if rs == nil {
|
|
rs = &runSpans{}
|
|
spansByRun[s.RunID] = rs
|
|
}
|
|
addSpanToRunSpans(rs, s)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for i := range runs {
|
|
rs := spansByRun[runs[i].RunID]
|
|
if rs == nil {
|
|
continue
|
|
}
|
|
spans := make([]SpanRow, 0, len(rs.order))
|
|
for _, spanID := range rs.order {
|
|
spans = append(spans, *rs.byID[spanID])
|
|
}
|
|
runs[i].Spans = spans
|
|
// Update span_count to reflect backfilled spans, not just
|
|
// the SQL aggregate which misses orphaned (no run_id) spans.
|
|
runs[i].SpanCount = len(spans)
|
|
}
|
|
return runs, nil
|
|
}
|