feat(query-api): add richer stats and retention
This commit is contained in:
+142
-28
@@ -32,7 +32,7 @@ func (d *DB) GetSessionWithRuns(ctx context.Context, sessionID string) (*Session
|
||||
SELECT
|
||||
session_id,
|
||||
MIN(ts) as started_at,
|
||||
MAX(ts) as ended_at,
|
||||
MAX(CASE WHEN type = 'session.end' THEN ts END) as ended_at,
|
||||
MAX(source_framework) as framework,
|
||||
MAX(payload->'event'->'source'->>'host') as host
|
||||
FROM events
|
||||
@@ -57,14 +57,18 @@ func (d *DB) GetSessionWithRuns(ctx context.Context, sessionID string) (*Session
|
||||
run_id,
|
||||
session_id,
|
||||
MIN(ts) as started_at,
|
||||
MAX(ts) as ended_at,
|
||||
MAX(CASE WHEN type = 'run.end' THEN ts END) as ended_at,
|
||||
CASE
|
||||
WHEN bool_or(type = 'error' OR payload->'payload'->>'status' = 'error') THEN 'error'
|
||||
ELSE 'success'
|
||||
END as status,
|
||||
COUNT(DISTINCT span_id) as span_count,
|
||||
COUNT(DISTINCT CASE WHEN payload->'attributes'->>'span_kind' = 'tool' THEN span_id END) as tool_count,
|
||||
COALESCE(MAX(CASE WHEN type = 'run.end' THEN payload->'payload'->>'model' END), '') as model
|
||||
COALESCE(
|
||||
MAX(CASE WHEN type = 'run.end' THEN payload->'payload'->>'model' END),
|
||||
MAX(CASE WHEN type = 'metric.snapshot' THEN payload->'payload'->'metrics'->>'model' END),
|
||||
''
|
||||
) as model
|
||||
FROM events
|
||||
WHERE session_id = $1 AND run_id IS NOT NULL
|
||||
GROUP BY run_id, session_id
|
||||
@@ -123,7 +127,7 @@ func (d *DB) GetRunWithSpans(ctx context.Context, runID string) (*RunDetail, []S
|
||||
run_id,
|
||||
session_id,
|
||||
MIN(ts) as started_at,
|
||||
MAX(ts) as ended_at,
|
||||
MAX(CASE WHEN type = 'run.end' THEN ts END) as ended_at,
|
||||
CASE
|
||||
WHEN bool_or(type = 'error' OR payload->'payload'->>'status' = 'error') THEN 'error'
|
||||
ELSE 'success'
|
||||
@@ -148,6 +152,80 @@ func (d *DB) GetRunWithSpans(ctx context.Context, runID string) (*RunDetail, []S
|
||||
return &run, spans, nil
|
||||
}
|
||||
|
||||
func mergeSpanEvent(existing *SpanRow, s SpanRow) {
|
||||
if existing.Name == "" && s.Name != "" {
|
||||
existing.Name = s.Name
|
||||
}
|
||||
if existing.Kind == "" || existing.Kind == "unknown" {
|
||||
existing.Kind = s.Kind
|
||||
}
|
||||
if s.Duration != nil {
|
||||
existing.Duration = s.Duration
|
||||
}
|
||||
if s.Status == "error" {
|
||||
existing.Status = "error"
|
||||
}
|
||||
existing.Payload = mergeEnvelopeJSON(existing.Payload, s.Payload)
|
||||
}
|
||||
|
||||
func mergeEnvelopeJSON(existing, next json.RawMessage) json.RawMessage {
|
||||
if len(existing) == 0 {
|
||||
return next
|
||||
}
|
||||
if len(next) == 0 {
|
||||
return existing
|
||||
}
|
||||
|
||||
var dst map[string]any
|
||||
if err := json.Unmarshal(existing, &dst); err != nil {
|
||||
return next
|
||||
}
|
||||
|
||||
var src map[string]any
|
||||
if err := json.Unmarshal(next, &src); err != nil {
|
||||
return existing
|
||||
}
|
||||
|
||||
mergeJSONObjects(dst, src)
|
||||
|
||||
merged, err := json.Marshal(dst)
|
||||
if err != nil {
|
||||
return next
|
||||
}
|
||||
return merged
|
||||
}
|
||||
|
||||
func mergeJSONObjects(dst, src map[string]any) {
|
||||
for key, value := range src {
|
||||
srcMap, srcOK := value.(map[string]any)
|
||||
if !srcOK {
|
||||
dst[key] = value
|
||||
continue
|
||||
}
|
||||
|
||||
dstMap, dstOK := dst[key].(map[string]any)
|
||||
if !dstOK {
|
||||
dst[key] = srcMap
|
||||
continue
|
||||
}
|
||||
|
||||
mergeJSONObjects(dstMap, srcMap)
|
||||
}
|
||||
}
|
||||
|
||||
func findRunIndexForSpan(runs []RunRow, spanStartedAt time.Time) int {
|
||||
for i := len(runs) - 1; i >= 0; i-- {
|
||||
run := runs[i]
|
||||
if spanStartedAt.Before(run.StartedAt) {
|
||||
continue
|
||||
}
|
||||
if run.EndedAt == nil || !spanStartedAt.After(*run.EndedAt) {
|
||||
return i
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
func (d *DB) listSpansForRun(ctx context.Context, runID string) ([]SpanRow, error) {
|
||||
rows, err := d.sql.QueryContext(ctx, `
|
||||
SELECT
|
||||
@@ -187,19 +265,7 @@ func (d *DB) listSpansForRun(ctx context.Context, runID string) ([]SpanRow, erro
|
||||
continue
|
||||
}
|
||||
|
||||
if existing.Name == "" && s.Name != "" {
|
||||
existing.Name = s.Name
|
||||
}
|
||||
if existing.Kind == "" || existing.Kind == "unknown" {
|
||||
existing.Kind = s.Kind
|
||||
}
|
||||
if s.Duration != nil {
|
||||
existing.Duration = s.Duration
|
||||
}
|
||||
if s.Status == "error" {
|
||||
existing.Status = "error"
|
||||
}
|
||||
existing.Payload = s.Payload
|
||||
mergeSpanEvent(existing, s)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
@@ -214,35 +280,83 @@ func (d *DB) listSpansForRun(ctx context.Context, runID string) ([]SpanRow, erro
|
||||
}
|
||||
|
||||
func (d *DB) attachSpansToRuns(ctx context.Context, sessionID string, runs []RunRow) ([]RunRow, error) {
|
||||
if len(runs) == 0 {
|
||||
return runs, nil
|
||||
}
|
||||
|
||||
rows, err := d.sql.QueryContext(ctx, `
|
||||
SELECT DISTINCT run_id
|
||||
SELECT
|
||||
run_id,
|
||||
span_id,
|
||||
COALESCE(payload->'attributes'->>'name', payload->'event'->>'type', type) as name,
|
||||
COALESCE(payload->'attributes'->>'span_kind', 'unknown') as kind,
|
||||
ts as started_at,
|
||||
(payload->'payload'->>'duration_ms')::bigint as duration_ms,
|
||||
CASE WHEN type = 'error' OR payload->'payload'->>'status' = 'error' THEN 'error' ELSE 'success' END as status,
|
||||
payload
|
||||
FROM events
|
||||
WHERE session_id = $1 AND run_id IS NOT NULL
|
||||
ORDER BY run_id
|
||||
WHERE session_id = $1 AND span_id IS NOT NULL
|
||||
ORDER BY ts ASC
|
||||
`, sessionID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
spansByRun := make(map[string][]SpanRow)
|
||||
// Map of run_id -> (map of span_id -> *SpanRow) for merging
|
||||
type runSpans struct {
|
||||
byID map[string]*SpanRow
|
||||
order []string
|
||||
}
|
||||
spansByRun := make(map[string]*runSpans)
|
||||
|
||||
for rows.Next() {
|
||||
var runID string
|
||||
if err := rows.Scan(&runID); err != nil {
|
||||
var s SpanRow
|
||||
var runID *string
|
||||
if err := rows.Scan(&runID, &s.SpanID, &s.Name, &s.Kind, &s.StartedAt, &s.Duration, &s.Status, &s.Payload); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
spans, err := d.listSpansForRun(ctx, runID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if runID != nil {
|
||||
s.RunID = *runID
|
||||
}
|
||||
spansByRun[runID] = spans
|
||||
if s.RunID == "" {
|
||||
runIndex := findRunIndexForSpan(runs, s.StartedAt)
|
||||
if runIndex == -1 {
|
||||
continue
|
||||
}
|
||||
s.RunID = runs[runIndex].RunID
|
||||
}
|
||||
|
||||
rs := spansByRun[s.RunID]
|
||||
if rs == nil {
|
||||
rs = &runSpans{byID: make(map[string]*SpanRow)}
|
||||
spansByRun[s.RunID] = rs
|
||||
}
|
||||
|
||||
existing := rs.byID[s.SpanID]
|
||||
if existing == nil {
|
||||
copy := s
|
||||
rs.byID[s.SpanID] = ©
|
||||
rs.order = append(rs.order, s.SpanID)
|
||||
continue
|
||||
}
|
||||
|
||||
mergeSpanEvent(existing, s)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i := range runs {
|
||||
runs[i].Spans = spansByRun[runs[i].RunID]
|
||||
rs := spansByRun[runs[i].RunID]
|
||||
if rs == nil {
|
||||
continue
|
||||
}
|
||||
spans := make([]SpanRow, 0, len(rs.order))
|
||||
for _, spanID := range rs.order {
|
||||
spans = append(spans, *rs.byID[spanID])
|
||||
}
|
||||
runs[i].Spans = spans
|
||||
}
|
||||
return runs, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user