Files
porthole/internal/engine/engine.go
OpenCode Test 1421b4659e feat: implement ControlTower TUI for cluster and host monitoring
Add complete TUI application for monitoring Kubernetes clusters and host
systems. Features include:

Core features:
- Collector framework with concurrent scheduling
- Host collectors: disk, memory, load, network
- Kubernetes collectors: pods, nodes, workloads, events with informers
- Issue deduplication, state management, and resolve-after logic
- Bubble Tea TUI with table view, details pane, and filtering
- JSON export functionality

UX improvements:
- Help overlay with keybindings
- Priority/category filters with visual indicators
- Direct priority jump (0/1/2/3)
- Bulk acknowledge (Shift+A)
- Clipboard copy (y)
- Theme toggle (T)
- Age format toggle (d)
- Wide title toggle (t)
- Vi-style navigation (j/k)
- Home/End jump (g/G)
- Rollup drill-down in details

Robustness:
- Grace period for unreachable clusters
- Rollups for high-volume issues
- Flap suppression
- RBAC error handling

Files: All core application code with tests for host collectors,
engine, store, model, and export packages.
2025-12-24 13:29:51 -08:00

310 lines
7.1 KiB
Go

package engine
import (
"context"
"sync"
"time"
"tower/internal/collectors"
"tower/internal/model"
)
// IssueStore is the Engine's dependency on the issue store.
//
// The concrete implementation lives in internal/store. We depend on an interface
// here to keep the Engine testable.
//
// NOTE: The store is responsible for dedupe + lifecycle (resolve-after, ack, etc.).
// The Engine simply merges outputs from collectors and passes them into Upsert.
//
// Engine calls Snapshot() to publish UI snapshots.
//
// This interface must be satisfied by internal/store.IssueStore.
// (Do not add persistence here.)
type IssueStore interface {
Upsert(now time.Time, issues []model.Issue)
Snapshot(now time.Time) []model.Issue
}
// CollectorConfig wires a collector into the Engine.
// Timeout applies per Collect() invocation.
// Interval comes from the collector itself.
//
// If Timeout <= 0, no per-collector timeout is applied.
type CollectorConfig struct {
Collector collectors.Collector
Timeout time.Duration
}
// CollectorHealth tracks the current health of a collector.
//
// Status is the last status returned by the collector.
// LastError is the last error returned by the collector (if any).
type CollectorHealth struct {
Status collectors.Status
LastError error
LastRun time.Time
LastOK time.Time
LastRunDur time.Duration
}
// Snapshot is the Engine's UI-facing view.
//
// Issues are sorted using the default sort order (Priority desc, then recency desc).
// Collectors is keyed by collector name.
type Snapshot struct {
At time.Time
Issues []model.Issue
Collectors map[string]CollectorHealth
}
type collectResult struct {
name string
at time.Time
duration time.Duration
issues []model.Issue
status collectors.Status
err error
}
type collectorRunner struct {
cfg CollectorConfig
refreshCh chan struct{}
}
// Engine runs collectors on their own schedules, merges issues, and updates the store.
// It publishes snapshots for the UI.
//
// Lifecycle:
//
// e := New(...)
// e.Start(ctx)
// defer e.Stop()
//
// Snapshots are emitted:
// - after any store update (collector completion)
// - periodically at refreshInterval (if > 0)
//
// RefreshNow() forces all collectors to run immediately.
type Engine struct {
store IssueStore
refreshInterval time.Duration
snapshots chan Snapshot
results chan collectResult
mu sync.Mutex
latestIssuesByCollector map[string][]model.Issue
health map[string]CollectorHealth
collectors []collectorRunner
cancel context.CancelFunc
wg sync.WaitGroup
startOnce sync.Once
stopOnce sync.Once
}
// New constructs an Engine.
//
// refreshInterval governs periodic snapshot emission. If refreshInterval <= 0,
// snapshots are only emitted when collectors finish.
func New(st IssueStore, cs []CollectorConfig, refreshInterval time.Duration) *Engine {
runners := make([]collectorRunner, 0, len(cs))
for _, c := range cs {
runners = append(runners, collectorRunner{
cfg: c,
refreshCh: make(chan struct{}, 1),
})
}
return &Engine{
store: st,
refreshInterval: refreshInterval,
snapshots: make(chan Snapshot, 32),
results: make(chan collectResult, 64),
latestIssuesByCollector: map[string][]model.Issue{},
health: map[string]CollectorHealth{},
collectors: runners,
}
}
// Start begins background collection. It is safe to call Start once.
func (e *Engine) Start(parent context.Context) {
e.startOnce.Do(func() {
ctx, cancel := context.WithCancel(parent)
e.cancel = cancel
e.wg.Add(1)
go func() {
defer e.wg.Done()
e.runAggregator(ctx)
}()
for i := range e.collectors {
r := &e.collectors[i]
e.wg.Add(1)
go func(r *collectorRunner) {
defer e.wg.Done()
e.runCollector(ctx, r)
}(r)
}
})
}
// Stop stops the Engine and closes the snapshots channel.
func (e *Engine) Stop() {
e.stopOnce.Do(func() {
if e.cancel != nil {
e.cancel()
}
e.wg.Wait()
close(e.snapshots)
})
}
// Snapshots returns a receive-only channel of snapshots.
func (e *Engine) Snapshots() <-chan Snapshot { return e.snapshots }
// RefreshNow forces all collectors to run immediately.
//
// This is non-blocking; if a collector already has a refresh queued, it will not
// queue additional refresh signals.
func (e *Engine) RefreshNow() {
for i := range e.collectors {
ch := e.collectors[i].refreshCh
select {
case ch <- struct{}{}:
default:
}
}
}
func (e *Engine) runCollector(ctx context.Context, r *collectorRunner) {
name := r.cfg.Collector.Name()
interval := r.cfg.Collector.Interval()
if interval <= 0 {
interval = time.Second
}
doCollect := func() {
start := time.Now()
collectCtx := ctx
cancel := func() {}
if r.cfg.Timeout > 0 {
collectCtx, cancel = context.WithTimeout(ctx, r.cfg.Timeout)
}
defer cancel()
issues, st, err := r.cfg.Collector.Collect(collectCtx)
finish := time.Now()
dur := finish.Sub(start)
// Copy issues slice to avoid data races when collectors reuse underlying storage.
copied := make([]model.Issue, len(issues))
copy(copied, issues)
res := collectResult{
name: name,
at: finish,
duration: dur,
issues: copied,
status: st,
err: err,
}
select {
case e.results <- res:
case <-ctx.Done():
return
}
}
// Collect immediately on start so the UI isn't empty for the first interval.
doCollect()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
doCollect()
case <-r.refreshCh:
doCollect()
}
}
}
func (e *Engine) runAggregator(ctx context.Context) {
var ticker *time.Ticker
var tick <-chan time.Time
if e.refreshInterval > 0 {
ticker = time.NewTicker(e.refreshInterval)
defer ticker.Stop()
tick = ticker.C
}
emitSnapshot := func(at time.Time) {
issues := e.store.Snapshot(at)
// Ensure deterministic default sort for the UI.
model.SortIssuesDefault(issues)
// Copy collector health map.
e.mu.Lock()
h := make(map[string]CollectorHealth, len(e.health))
for k, v := range e.health {
h[k] = v
}
e.mu.Unlock()
snap := Snapshot{At: at, Issues: issues, Collectors: h}
// Non-blocking publish; drop if UI is behind.
select {
case e.snapshots <- snap:
default:
}
}
for {
select {
case <-ctx.Done():
return
case <-tick:
emitSnapshot(time.Now())
case res := <-e.results:
e.mu.Lock()
// On collector errors, keep the last known issues for that collector.
// This prevents transient errors/timeouts from making issues disappear.
if res.err == nil {
e.latestIssuesByCollector[res.name] = res.issues
}
ch := e.health[res.name]
ch.Status = res.status
ch.LastRun = res.at
ch.LastRunDur = res.duration
ch.LastError = res.err
if res.err == nil {
ch.LastOK = res.at
}
e.health[res.name] = ch
merged := make([]model.Issue, 0, 64)
for _, issues := range e.latestIssuesByCollector {
merged = append(merged, issues...)
}
e.mu.Unlock()
e.store.Upsert(res.at, merged)
emitSnapshot(res.at)
}
}
}