5014d89258
Tool spans already carry duration_ms and status, but the metrics layer only counted them. Expose that data: - GetTopTools now returns avg/p95 duration and error count per tool. - Timeseries buckets gain tool_avg_ms / tool_p95_ms (filtered percentile_cont over tool spans). - Dashboard Top Tools shows avg latency per tool; the Latency panel, previously always empty (it read run-level duration that is never emitted), now plots real tool-span latency (min/avg/p95). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
353 lines
9.8 KiB
Go
353 lines
9.8 KiB
Go
package postgres
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
)
|
|
|
|
type FrameworkStats struct {
|
|
ActiveSessions int `json:"active_sessions"`
|
|
Runs int `json:"runs"`
|
|
Tools int `json:"tools"`
|
|
Errors int `json:"errors"`
|
|
}
|
|
|
|
type Summary struct {
|
|
ActiveSessions int `json:"active_sessions"`
|
|
RunsToday int `json:"runs_today"`
|
|
ToolCallsToday int `json:"tool_calls_today"`
|
|
ErrorsToday int `json:"errors_today"`
|
|
TokensToday int64 `json:"tokens_today"`
|
|
CostToday float64 `json:"cost_today"`
|
|
AvgDurationMS float64 `json:"avg_duration_ms"`
|
|
ByFramework map[string]FrameworkStats `json:"by_framework"`
|
|
}
|
|
|
|
type TimeseriesBucket struct {
|
|
TS time.Time `json:"ts"`
|
|
Runs int `json:"runs"`
|
|
Tools int `json:"tools"`
|
|
Errors int `json:"errors"`
|
|
Tokens int64 `json:"tokens"`
|
|
InputTokens int64 `json:"input_tokens"`
|
|
OutputTokens int64 `json:"output_tokens"`
|
|
Cost float64 `json:"cost"`
|
|
AvgDurationMS float64 `json:"avg_duration_ms"`
|
|
ToolAvgMS float64 `json:"tool_avg_ms"`
|
|
ToolP95MS float64 `json:"tool_p95_ms"`
|
|
}
|
|
|
|
type TimeseriesResult struct {
|
|
Window string `json:"window"`
|
|
Bucket string `json:"bucket"`
|
|
Series []TimeseriesBucket `json:"series"`
|
|
}
|
|
|
|
func (d *DB) GetSummary(ctx context.Context) (*Summary, error) {
|
|
now := time.Now()
|
|
midnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
|
|
|
|
// Active sessions are open sessions with recent activity. Some hook sources can
|
|
// miss session.end, so "started but never ended" alone overstates live work.
|
|
activeQ := `
|
|
WITH session_groups AS (
|
|
SELECT
|
|
session_id,
|
|
COALESCE((ARRAY_AGG(source_framework ORDER BY CASE WHEN type = 'session.start' THEN 0 ELSE 1 END, ts) FILTER (WHERE source_framework IS NOT NULL))[1], 'unknown') AS framework,
|
|
MAX(ts) AS last_event_at,
|
|
BOOL_OR(type = 'session.start') AS has_start,
|
|
BOOL_OR(type = 'session.end') AS has_end
|
|
FROM events
|
|
WHERE session_id IS NOT NULL
|
|
GROUP BY session_id
|
|
)
|
|
SELECT framework, COUNT(*)
|
|
FROM session_groups
|
|
WHERE has_start
|
|
AND NOT has_end
|
|
AND last_event_at >= $1
|
|
GROUP BY framework
|
|
`
|
|
activeRows, err := d.sql.QueryContext(ctx, activeQ, now.Add(-15*time.Minute))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer activeRows.Close()
|
|
|
|
byFramework := make(map[string]FrameworkStats)
|
|
var activeSessions int
|
|
for activeRows.Next() {
|
|
var fw string
|
|
var count int
|
|
if err := activeRows.Scan(&fw, &count); err != nil {
|
|
return nil, err
|
|
}
|
|
fs := byFramework[fw]
|
|
fs.ActiveSessions = count
|
|
byFramework[fw] = fs
|
|
activeSessions += count
|
|
}
|
|
if err := activeRows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Per-framework aggregates for today
|
|
fwQ := `
|
|
SELECT
|
|
COALESCE(source_framework, 'unknown'),
|
|
COUNT(*) FILTER (WHERE type = 'run.start') AS runs,
|
|
COUNT(*) FILTER (WHERE type = 'span.end'
|
|
AND payload->'attributes'->>'span_kind' = 'tool') AS tools,
|
|
COUNT(*) FILTER (WHERE type = 'error') AS errors
|
|
FROM events
|
|
WHERE ts >= $1
|
|
AND type IN ('run.start', 'span.end', 'error')
|
|
GROUP BY source_framework
|
|
`
|
|
rows, err := d.sql.QueryContext(ctx, fwQ, midnight)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var totalRuns, totalTools, totalErrors int
|
|
|
|
for rows.Next() {
|
|
var fw string
|
|
var fs FrameworkStats
|
|
if err := rows.Scan(&fw, &fs.Runs, &fs.Tools, &fs.Errors); err != nil {
|
|
return nil, err
|
|
}
|
|
if existing, ok := byFramework[fw]; ok {
|
|
fs.ActiveSessions = existing.ActiveSessions
|
|
}
|
|
byFramework[fw] = fs
|
|
totalRuns += fs.Runs
|
|
totalTools += fs.Tools
|
|
totalErrors += fs.Errors
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Usage stats for today (tokens, cost, avg latency)
|
|
usageQ := `
|
|
SELECT
|
|
COALESCE(SUM((payload->'payload'->'usage'->>'total_tokens')::bigint), 0),
|
|
COALESCE(SUM((payload->'payload'->'usage'->>'total_cost')::float8), 0),
|
|
COALESCE(AVG((payload->'payload'->>'duration_ms')::float8), 0)
|
|
FROM events
|
|
WHERE type = 'run.end'
|
|
AND ts >= $1
|
|
`
|
|
var tokensToday int64
|
|
var costToday, avgDurationMS float64
|
|
if err := d.sql.QueryRowContext(ctx, usageQ, midnight).Scan(&tokensToday, &costToday, &avgDurationMS); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &Summary{
|
|
ActiveSessions: activeSessions,
|
|
RunsToday: totalRuns,
|
|
ToolCallsToday: totalTools,
|
|
ErrorsToday: totalErrors,
|
|
TokensToday: tokensToday,
|
|
CostToday: costToday,
|
|
AvgDurationMS: avgDurationMS,
|
|
ByFramework: byFramework,
|
|
}, nil
|
|
}
|
|
|
|
type TopTool struct {
|
|
Name string `json:"name"`
|
|
Count int `json:"count"`
|
|
AvgMS float64 `json:"avg_ms"`
|
|
P95MS float64 `json:"p95_ms"`
|
|
Errors int `json:"errors"`
|
|
}
|
|
|
|
type TopModel struct {
|
|
Name string `json:"name"`
|
|
Count int `json:"count"`
|
|
}
|
|
|
|
func (d *DB) GetTopTools(ctx context.Context, limit int) ([]TopTool, error) {
|
|
if limit <= 0 {
|
|
limit = 10
|
|
}
|
|
now := time.Now()
|
|
midnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
|
|
|
|
q := `
|
|
SELECT
|
|
payload->'attributes'->>'name' AS tool_name,
|
|
COUNT(*) AS cnt,
|
|
COALESCE(AVG((payload->'payload'->>'duration_ms')::float8), 0) AS avg_ms,
|
|
COALESCE(percentile_cont(0.95) WITHIN GROUP (
|
|
ORDER BY (payload->'payload'->>'duration_ms')::float8), 0) AS p95_ms,
|
|
COUNT(*) FILTER (WHERE payload->'payload'->>'status' = 'error') AS errors
|
|
FROM events
|
|
WHERE type = 'span.end'
|
|
AND payload->'attributes'->>'span_kind' = 'tool'
|
|
AND payload->'attributes'->>'name' IS NOT NULL
|
|
AND ts >= $1
|
|
GROUP BY tool_name
|
|
ORDER BY cnt DESC
|
|
LIMIT $2
|
|
`
|
|
rows, err := d.sql.QueryContext(ctx, q, midnight, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var out []TopTool
|
|
for rows.Next() {
|
|
var t TopTool
|
|
if err := rows.Scan(&t.Name, &t.Count, &t.AvgMS, &t.P95MS, &t.Errors); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, t)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func (d *DB) GetTopModels(ctx context.Context, limit int) ([]TopModel, error) {
|
|
if limit <= 0 {
|
|
limit = 10
|
|
}
|
|
now := time.Now()
|
|
midnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
|
|
|
|
q := `
|
|
SELECT
|
|
payload->'payload'->>'model' AS model_name,
|
|
COUNT(*) AS cnt
|
|
FROM events
|
|
WHERE type = 'run.end'
|
|
AND payload->'payload'->>'model' IS NOT NULL
|
|
AND payload->'payload'->>'model' <> ''
|
|
AND ts >= $1
|
|
GROUP BY model_name
|
|
ORDER BY cnt DESC
|
|
LIMIT $2
|
|
`
|
|
rows, err := d.sql.QueryContext(ctx, q, midnight, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var out []TopModel
|
|
for rows.Next() {
|
|
var m TopModel
|
|
if err := rows.Scan(&m.Name, &m.Count); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, m)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func bucketForWindow(window string) string {
|
|
switch window {
|
|
case "1h":
|
|
return "1 minute"
|
|
case "6h":
|
|
return "5 minutes"
|
|
case "7d":
|
|
return "1 hour"
|
|
default: // "24h"
|
|
return "15 minutes"
|
|
}
|
|
}
|
|
|
|
func durationForWindow(window string) time.Duration {
|
|
switch window {
|
|
case "1h":
|
|
return 1 * time.Hour
|
|
case "6h":
|
|
return 6 * time.Hour
|
|
case "7d":
|
|
return 7 * 24 * time.Hour
|
|
default: // "24h"
|
|
return 24 * time.Hour
|
|
}
|
|
}
|
|
|
|
func bucketLabelForWindow(window string) string {
|
|
switch window {
|
|
case "1h":
|
|
return "1m"
|
|
case "6h":
|
|
return "5m"
|
|
case "7d":
|
|
return "1h"
|
|
default: // "24h"
|
|
return "15m"
|
|
}
|
|
}
|
|
|
|
func (d *DB) GetTimeseries(ctx context.Context, window string) (*TimeseriesResult, error) {
|
|
bucket := bucketForWindow(window)
|
|
dur := durationForWindow(window)
|
|
since := time.Now().Add(-dur)
|
|
|
|
q := `
|
|
SELECT
|
|
date_bin($1::interval, ts, '2000-01-01'::timestamptz) AS bucket_ts,
|
|
COUNT(*) FILTER (WHERE type = 'run.start') AS runs,
|
|
COUNT(*) FILTER (WHERE type = 'span.end'
|
|
AND payload->'attributes'->>'span_kind' = 'tool') AS tools,
|
|
COUNT(*) FILTER (WHERE type = 'error') AS errors,
|
|
COALESCE(SUM((payload->'payload'->'usage'->>'total_tokens')::bigint)
|
|
FILTER (WHERE type = 'run.end'), 0) AS tokens,
|
|
COALESCE(SUM((payload->'payload'->'usage'->>'input_tokens')::bigint)
|
|
FILTER (WHERE type = 'run.end'), 0) AS input_tokens,
|
|
COALESCE(SUM((payload->'payload'->'usage'->>'output_tokens')::bigint)
|
|
FILTER (WHERE type = 'run.end'), 0) AS output_tokens,
|
|
COALESCE(SUM((payload->'payload'->'usage'->>'total_cost')::float8)
|
|
FILTER (WHERE type = 'run.end'), 0) AS cost,
|
|
COALESCE(AVG((payload->'payload'->>'duration_ms')::float8)
|
|
FILTER (WHERE type = 'run.end'), 0) AS avg_duration_ms,
|
|
COALESCE(AVG((payload->'payload'->>'duration_ms')::float8)
|
|
FILTER (WHERE type = 'span.end'
|
|
AND payload->'attributes'->>'span_kind' = 'tool'), 0) AS tool_avg_ms,
|
|
COALESCE(percentile_cont(0.95) WITHIN GROUP (
|
|
ORDER BY (payload->'payload'->>'duration_ms')::float8)
|
|
FILTER (WHERE type = 'span.end'
|
|
AND payload->'attributes'->>'span_kind' = 'tool'), 0) AS tool_p95_ms
|
|
FROM events
|
|
WHERE ts >= $2
|
|
AND type IN ('run.start', 'run.end', 'span.end', 'error')
|
|
GROUP BY bucket_ts
|
|
ORDER BY bucket_ts ASC
|
|
`
|
|
|
|
rows, err := d.sql.QueryContext(ctx, q, bucket, since)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var series []TimeseriesBucket
|
|
for rows.Next() {
|
|
var b TimeseriesBucket
|
|
if err := rows.Scan(&b.TS, &b.Runs, &b.Tools, &b.Errors,
|
|
&b.Tokens, &b.InputTokens, &b.OutputTokens, &b.Cost, &b.AvgDurationMS,
|
|
&b.ToolAvgMS, &b.ToolP95MS); err != nil {
|
|
return nil, err
|
|
}
|
|
series = append(series, b)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &TimeseriesResult{
|
|
Window: window,
|
|
Bucket: bucketLabelForWindow(window),
|
|
Series: series,
|
|
}, nil
|
|
}
|