Add complete TUI application for monitoring Kubernetes clusters and host systems. Features include: Core features: - Collector framework with concurrent scheduling - Host collectors: disk, memory, load, network - Kubernetes collectors: pods, nodes, workloads, events with informers - Issue deduplication, state management, and resolve-after logic - Bubble Tea TUI with table view, details pane, and filtering - JSON export functionality UX improvements: - Help overlay with keybindings - Priority/category filters with visual indicators - Direct priority jump (0/1/2/3) - Bulk acknowledge (Shift+A) - Clipboard copy (y) - Theme toggle (T) - Age format toggle (d) - Wide title toggle (t) - Vi-style navigation (j/k) - Home/End jump (g/G) - Rollup drill-down in details Robustness: - Grace period for unreachable clusters - Rollups for high-volume issues - Flap suppression - RBAC error handling Files: All core application code with tests for host collectors, engine, store, model, and export packages.
226 lines
5.1 KiB
Go
226 lines
5.1 KiB
Go
package engine
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"tower/internal/collectors"
|
|
"tower/internal/model"
|
|
)
|
|
|
|
type fakeStore struct {
|
|
mu sync.Mutex
|
|
|
|
upsertCalls int
|
|
lastNow time.Time
|
|
lastIssues []model.Issue
|
|
}
|
|
|
|
func (s *fakeStore) Upsert(now time.Time, issues []model.Issue) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
s.upsertCalls++
|
|
s.lastNow = now
|
|
// Deep-ish copy: slice copy is enough for our tests.
|
|
s.lastIssues = append([]model.Issue(nil), issues...)
|
|
}
|
|
|
|
func (s *fakeStore) Snapshot(now time.Time) []model.Issue {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return append([]model.Issue(nil), s.lastIssues...)
|
|
}
|
|
|
|
func (s *fakeStore) UpsertCount() int {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.upsertCalls
|
|
}
|
|
|
|
type fakeCollector struct {
|
|
name string
|
|
interval time.Duration
|
|
|
|
// delay simulates work. If ctx is canceled/timeout hits, Collect returns ctx.Err().
|
|
delay time.Duration
|
|
|
|
issuesFn func(call int64) []model.Issue
|
|
|
|
calls atomic.Int64
|
|
callCh chan time.Time
|
|
}
|
|
|
|
func (c *fakeCollector) Name() string { return c.name }
|
|
func (c *fakeCollector) Interval() time.Duration {
|
|
return c.interval
|
|
}
|
|
|
|
func (c *fakeCollector) Collect(ctx context.Context) ([]model.Issue, collectors.Status, error) {
|
|
call := c.calls.Add(1)
|
|
if c.callCh != nil {
|
|
select {
|
|
case c.callCh <- time.Now():
|
|
default:
|
|
}
|
|
}
|
|
|
|
if c.delay > 0 {
|
|
t := time.NewTimer(c.delay)
|
|
defer t.Stop()
|
|
select {
|
|
case <-ctx.Done():
|
|
var st collectors.Status
|
|
return nil, st, ctx.Err()
|
|
case <-t.C:
|
|
}
|
|
}
|
|
|
|
var st collectors.Status
|
|
if c.issuesFn != nil {
|
|
return c.issuesFn(call), st, nil
|
|
}
|
|
return nil, st, nil
|
|
}
|
|
|
|
func recvSnapshot(t *testing.T, ch <-chan Snapshot, within time.Duration) Snapshot {
|
|
t.Helper()
|
|
select {
|
|
case s := <-ch:
|
|
return s
|
|
case <-time.After(within):
|
|
t.Fatalf("timed out waiting for snapshot")
|
|
return Snapshot{}
|
|
}
|
|
}
|
|
|
|
func TestEngine_UpsertAndSnapshotsEmitted(t *testing.T) {
|
|
st := &fakeStore{}
|
|
c := &fakeCollector{
|
|
name: "c1",
|
|
interval: 100 * time.Millisecond,
|
|
issuesFn: func(call int64) []model.Issue {
|
|
return []model.Issue{{
|
|
ID: "id-1",
|
|
Priority: model.PriorityP1,
|
|
Title: "hello",
|
|
LastSeen: time.Now(),
|
|
}}
|
|
},
|
|
}
|
|
|
|
e := New(st, []CollectorConfig{{Collector: c, Timeout: 200 * time.Millisecond}}, 0)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
defer e.Stop()
|
|
|
|
e.Start(ctx)
|
|
|
|
snap := recvSnapshot(t, e.Snapshots(), 300*time.Millisecond)
|
|
if st.UpsertCount() < 1 {
|
|
t.Fatalf("expected store.Upsert to be called")
|
|
}
|
|
if len(snap.Issues) != 1 || snap.Issues[0].ID != "id-1" {
|
|
t.Fatalf("expected snapshot to contain issue id-1; got %+v", snap.Issues)
|
|
}
|
|
if _, ok := snap.Collectors["c1"]; !ok {
|
|
t.Fatalf("expected collector health entry for c1")
|
|
}
|
|
}
|
|
|
|
func TestEngine_CollectorTimeoutCancelsLongCollect(t *testing.T) {
|
|
st := &fakeStore{}
|
|
c := &fakeCollector{
|
|
name: "slow",
|
|
interval: time.Hour,
|
|
delay: 200 * time.Millisecond,
|
|
}
|
|
|
|
e := New(st, []CollectorConfig{{Collector: c, Timeout: 20 * time.Millisecond}}, 0)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
defer e.Stop()
|
|
|
|
e.Start(ctx)
|
|
|
|
snap := recvSnapshot(t, e.Snapshots(), 400*time.Millisecond)
|
|
ch, ok := snap.Collectors["slow"]
|
|
if !ok {
|
|
t.Fatalf("expected collector health entry for slow")
|
|
}
|
|
if ch.LastError == nil {
|
|
t.Fatalf("expected LastError to be set")
|
|
}
|
|
if !errors.Is(ch.LastError, context.DeadlineExceeded) {
|
|
t.Fatalf("expected context deadline exceeded; got %v", ch.LastError)
|
|
}
|
|
if st.UpsertCount() < 1 {
|
|
t.Fatalf("expected store.Upsert to be called")
|
|
}
|
|
}
|
|
|
|
func TestEngine_RefreshNowTriggersImmediateCollect(t *testing.T) {
|
|
st := &fakeStore{}
|
|
callCh := make(chan time.Time, 10)
|
|
c := &fakeCollector{
|
|
name: "r",
|
|
interval: 200 * time.Millisecond,
|
|
callCh: callCh,
|
|
}
|
|
|
|
e := New(st, []CollectorConfig{{Collector: c, Timeout: time.Second}}, 0)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
defer e.Stop()
|
|
|
|
e.Start(ctx)
|
|
|
|
// First collect happens immediately.
|
|
select {
|
|
case <-callCh:
|
|
case <-time.After(200 * time.Millisecond):
|
|
t.Fatalf("timed out waiting for initial collect")
|
|
}
|
|
|
|
// Trigger refresh; should happen well before the 200ms interval.
|
|
time.Sleep(10 * time.Millisecond)
|
|
e.RefreshNow()
|
|
|
|
select {
|
|
case <-callCh:
|
|
// ok
|
|
case <-time.After(120 * time.Millisecond):
|
|
t.Fatalf("expected RefreshNow to trigger a collect quickly")
|
|
}
|
|
}
|
|
|
|
func TestEngine_MultipleCollectorsRunOnIntervals(t *testing.T) {
|
|
st := &fakeStore{}
|
|
fast := &fakeCollector{name: "fast", interval: 30 * time.Millisecond}
|
|
slow := &fakeCollector{name: "slow", interval: 80 * time.Millisecond}
|
|
|
|
e := New(st, []CollectorConfig{{Collector: fast, Timeout: time.Second}, {Collector: slow, Timeout: time.Second}}, 0)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
e.Start(ctx)
|
|
// Let it run a bit.
|
|
time.Sleep(220 * time.Millisecond)
|
|
e.Stop()
|
|
|
|
fastCalls := fast.calls.Load()
|
|
slowCalls := slow.calls.Load()
|
|
|
|
// Includes initial collect.
|
|
if fastCalls < 4 {
|
|
t.Fatalf("expected fast collector to be called multiple times; got %d", fastCalls)
|
|
}
|
|
if slowCalls < 2 {
|
|
t.Fatalf("expected slow collector to be called multiple times; got %d", slowCalls)
|
|
}
|
|
}
|