feat: add summary and timeseries stats queries
This commit is contained in:
@@ -0,0 +1,184 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
type FrameworkStats struct {
|
||||
Runs int `json:"runs"`
|
||||
Tools int `json:"tools"`
|
||||
Errors int `json:"errors"`
|
||||
}
|
||||
|
||||
type Summary struct {
|
||||
ActiveSessions int `json:"active_sessions"`
|
||||
RunsToday int `json:"runs_today"`
|
||||
ToolCallsToday int `json:"tool_calls_today"`
|
||||
ErrorsToday int `json:"errors_today"`
|
||||
ByFramework map[string]FrameworkStats `json:"by_framework"`
|
||||
}
|
||||
|
||||
type TimeseriesBucket struct {
|
||||
TS time.Time `json:"ts"`
|
||||
Runs int `json:"runs"`
|
||||
Tools int `json:"tools"`
|
||||
Errors int `json:"errors"`
|
||||
}
|
||||
|
||||
type TimeseriesResult struct {
|
||||
Window string `json:"window"`
|
||||
Bucket string `json:"bucket"`
|
||||
Series []TimeseriesBucket `json:"series"`
|
||||
}
|
||||
|
||||
func (d *DB) GetSummary(ctx context.Context) (*Summary, error) {
|
||||
now := time.Now()
|
||||
midnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
|
||||
|
||||
// Active sessions: sessions that started today but have not ended today
|
||||
activeQ := `
|
||||
SELECT COUNT(DISTINCT session_id)
|
||||
FROM events
|
||||
WHERE type = 'session.start'
|
||||
AND ts >= $1
|
||||
AND session_id IS NOT NULL
|
||||
AND session_id NOT IN (
|
||||
SELECT DISTINCT session_id
|
||||
FROM events
|
||||
WHERE type = 'session.end'
|
||||
AND ts >= $1
|
||||
AND session_id IS NOT NULL
|
||||
)
|
||||
`
|
||||
var activeSessions int
|
||||
if err := d.sql.QueryRowContext(ctx, activeQ, midnight).Scan(&activeSessions); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Per-framework aggregates for today
|
||||
fwQ := `
|
||||
SELECT
|
||||
COALESCE(source_framework, ''),
|
||||
COUNT(*) FILTER (WHERE type = 'run.start') AS runs,
|
||||
COUNT(*) FILTER (WHERE type = 'span.end'
|
||||
AND payload->'attributes'->>'span_kind' = 'tool') AS tools,
|
||||
COUNT(*) FILTER (WHERE type = 'error') AS errors
|
||||
FROM events
|
||||
WHERE ts >= $1
|
||||
GROUP BY source_framework
|
||||
`
|
||||
rows, err := d.sql.QueryContext(ctx, fwQ, midnight)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
byFramework := make(map[string]FrameworkStats)
|
||||
var totalRuns, totalTools, totalErrors int
|
||||
|
||||
for rows.Next() {
|
||||
var fw string
|
||||
var fs FrameworkStats
|
||||
if err := rows.Scan(&fw, &fs.Runs, &fs.Tools, &fs.Errors); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
byFramework[fw] = fs
|
||||
totalRuns += fs.Runs
|
||||
totalTools += fs.Tools
|
||||
totalErrors += fs.Errors
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Summary{
|
||||
ActiveSessions: activeSessions,
|
||||
RunsToday: totalRuns,
|
||||
ToolCallsToday: totalTools,
|
||||
ErrorsToday: totalErrors,
|
||||
ByFramework: byFramework,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func bucketForWindow(window string) string {
|
||||
switch window {
|
||||
case "1h":
|
||||
return "1 minute"
|
||||
case "6h":
|
||||
return "5 minutes"
|
||||
case "7d":
|
||||
return "1 hour"
|
||||
default: // "24h"
|
||||
return "15 minutes"
|
||||
}
|
||||
}
|
||||
|
||||
func durationForWindow(window string) time.Duration {
|
||||
switch window {
|
||||
case "1h":
|
||||
return 1 * time.Hour
|
||||
case "6h":
|
||||
return 6 * time.Hour
|
||||
case "7d":
|
||||
return 7 * 24 * time.Hour
|
||||
default: // "24h"
|
||||
return 24 * time.Hour
|
||||
}
|
||||
}
|
||||
|
||||
func bucketLabelForWindow(window string) string {
|
||||
switch window {
|
||||
case "1h":
|
||||
return "1m"
|
||||
case "6h":
|
||||
return "5m"
|
||||
case "7d":
|
||||
return "1h"
|
||||
default: // "24h"
|
||||
return "15m"
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DB) GetTimeseries(ctx context.Context, window string) (*TimeseriesResult, error) {
|
||||
bucket := bucketForWindow(window)
|
||||
dur := durationForWindow(window)
|
||||
since := time.Now().Add(-dur)
|
||||
|
||||
q := `
|
||||
SELECT
|
||||
date_bin($1::interval, ts, '2000-01-01'::timestamptz) AS bucket_ts,
|
||||
COUNT(*) FILTER (WHERE type = 'run.start') AS runs,
|
||||
COUNT(*) FILTER (WHERE type = 'span.end'
|
||||
AND payload->'attributes'->>'span_kind' = 'tool') AS tools,
|
||||
COUNT(*) FILTER (WHERE type = 'error') AS errors
|
||||
FROM events
|
||||
WHERE ts >= $2
|
||||
GROUP BY bucket_ts
|
||||
ORDER BY bucket_ts ASC
|
||||
`
|
||||
|
||||
rows, err := d.sql.QueryContext(ctx, q, bucket, since)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var series []TimeseriesBucket
|
||||
for rows.Next() {
|
||||
var b TimeseriesBucket
|
||||
if err := rows.Scan(&b.TS, &b.Runs, &b.Tools, &b.Errors); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
series = append(series, b)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &TimeseriesResult{
|
||||
Window: window,
|
||||
Bucket: bucketLabelForWindow(window),
|
||||
Series: series,
|
||||
}, nil
|
||||
}
|
||||
Reference in New Issue
Block a user