feat: add sessions list query
Supports filtering by time range, framework, and host. Returns paginated results with cursor-based pagination. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,118 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type SessionRow struct {
|
||||
SessionID string `json:"session_id"`
|
||||
StartedAt time.Time `json:"started_at"`
|
||||
EndedAt *time.Time `json:"ended_at,omitempty"`
|
||||
Framework string `json:"framework"`
|
||||
Host string `json:"host"`
|
||||
RunCount int `json:"run_count"`
|
||||
}
|
||||
|
||||
type SessionsFilter struct {
|
||||
From *time.Time
|
||||
To *time.Time
|
||||
Framework string
|
||||
Host string
|
||||
Limit int
|
||||
Cursor *time.Time // cursor is the last started_at seen
|
||||
}
|
||||
|
||||
func (d *DB) ListSessions(ctx context.Context, f SessionsFilter) ([]SessionRow, *time.Time, error) {
|
||||
if f.Limit <= 0 {
|
||||
f.Limit = 50
|
||||
}
|
||||
if f.Limit > 200 {
|
||||
f.Limit = 200
|
||||
}
|
||||
|
||||
// Build query dynamically
|
||||
var conditions []string
|
||||
var args []any
|
||||
argN := 1
|
||||
|
||||
// Default time range: last 24h
|
||||
if f.From == nil {
|
||||
t := time.Now().Add(-24 * time.Hour)
|
||||
f.From = &t
|
||||
}
|
||||
conditions = append(conditions, fmt.Sprintf("ts >= $%d", argN))
|
||||
args = append(args, *f.From)
|
||||
argN++
|
||||
|
||||
if f.To != nil {
|
||||
conditions = append(conditions, fmt.Sprintf("ts <= $%d", argN))
|
||||
args = append(args, *f.To)
|
||||
argN++
|
||||
}
|
||||
|
||||
if f.Framework != "" {
|
||||
conditions = append(conditions, fmt.Sprintf("source_framework = $%d", argN))
|
||||
args = append(args, f.Framework)
|
||||
argN++
|
||||
}
|
||||
|
||||
if f.Host != "" {
|
||||
conditions = append(conditions, fmt.Sprintf("payload->'event'->'source'->>'host' = $%d", argN))
|
||||
args = append(args, f.Host)
|
||||
argN++
|
||||
}
|
||||
|
||||
if f.Cursor != nil {
|
||||
conditions = append(conditions, fmt.Sprintf("ts < $%d", argN))
|
||||
args = append(args, *f.Cursor)
|
||||
argN++
|
||||
}
|
||||
|
||||
where := strings.Join(conditions, " AND ")
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
session_id,
|
||||
MIN(ts) as started_at,
|
||||
MAX(ts) as ended_at,
|
||||
MAX(source_framework) as framework,
|
||||
MAX(payload->'event'->'source'->>'host') as host,
|
||||
COUNT(DISTINCT run_id) as run_count
|
||||
FROM events
|
||||
WHERE session_id IS NOT NULL AND %s
|
||||
GROUP BY session_id
|
||||
ORDER BY started_at DESC
|
||||
LIMIT $%d
|
||||
`, where, argN)
|
||||
args = append(args, f.Limit+1) // fetch one extra to detect next page
|
||||
|
||||
rows, err := d.sql.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var out []SessionRow
|
||||
for rows.Next() {
|
||||
var r SessionRow
|
||||
var host *string
|
||||
if err := rows.Scan(&r.SessionID, &r.StartedAt, &r.EndedAt, &r.Framework, &host, &r.RunCount); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if host != nil {
|
||||
r.Host = *host
|
||||
}
|
||||
out = append(out, r)
|
||||
}
|
||||
|
||||
var nextCursor *time.Time
|
||||
if len(out) > f.Limit {
|
||||
out = out[:f.Limit]
|
||||
nextCursor = &out[len(out)-1].StartedAt
|
||||
}
|
||||
|
||||
return out, nextCursor, rows.Err()
|
||||
}
|
||||
Reference in New Issue
Block a user