Add complete TUI application for monitoring Kubernetes clusters and host systems. Features include: Core features: - Collector framework with concurrent scheduling - Host collectors: disk, memory, load, network - Kubernetes collectors: pods, nodes, workloads, events with informers - Issue deduplication, state management, and resolve-after logic - Bubble Tea TUI with table view, details pane, and filtering - JSON export functionality UX improvements: - Help overlay with keybindings - Priority/category filters with visual indicators - Direct priority jump (0/1/2/3) - Bulk acknowledge (Shift+A) - Clipboard copy (y) - Theme toggle (T) - Age format toggle (d) - Wide title toggle (t) - Vi-style navigation (j/k) - Home/End jump (g/G) - Rollup drill-down in details Robustness: - Grace period for unreachable clusters - Rollups for high-volume issues - Flap suppression - RBAC error handling Files: All core application code with tests for host collectors, engine, store, model, and export packages.
213 lines
6.1 KiB
Go
213 lines
6.1 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
bubbletea "github.com/charmbracelet/bubbletea"
|
|
|
|
"tower/internal/collectors"
|
|
"tower/internal/collectors/host"
|
|
collectorsk8s "tower/internal/collectors/k8s"
|
|
"tower/internal/engine"
|
|
"tower/internal/export"
|
|
"tower/internal/model"
|
|
"tower/internal/store"
|
|
"tower/internal/ui"
|
|
)
|
|
|
|
const (
|
|
defaultRefreshInterval = 1 * time.Second
|
|
defaultResolveAfter = 30 * time.Second
|
|
collectorTimeoutFast = 250 * time.Millisecond
|
|
collectorTimeoutK8sList = 2 * time.Second
|
|
k8sUnreachableGraceDefault = 10 * time.Second
|
|
)
|
|
|
|
func main() {
|
|
var exportPath string
|
|
flag.StringVar(&exportPath, "export", "", "write issues JSON snapshot to this path and exit")
|
|
flag.Parse()
|
|
|
|
if exportPath != "" {
|
|
if err := validateExportPath(exportPath); err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
|
defer cancel()
|
|
|
|
st := store.New(defaultResolveAfter)
|
|
|
|
configs := []engine.CollectorConfig{
|
|
{Collector: host.NewDiskCollector(), Timeout: collectorTimeoutFast},
|
|
{Collector: host.NewMemCollector(), Timeout: collectorTimeoutFast},
|
|
{Collector: host.NewLoadCollector(), Timeout: collectorTimeoutFast},
|
|
{Collector: host.NewNetCollector(), Timeout: collectorTimeoutFast},
|
|
}
|
|
|
|
// If kubeconfig is present, register the full Kubernetes collector (informers
|
|
// with polling fallback, rules, rollups, and unreachable grace).
|
|
if kubeconfigExists() {
|
|
configs = append(configs, engine.CollectorConfig{Collector: collectorsk8s.NewCollector(), Timeout: collectorTimeoutK8sList})
|
|
}
|
|
|
|
eng := engine.New(st, configs, defaultRefreshInterval)
|
|
eng.Start(ctx)
|
|
defer eng.Stop()
|
|
|
|
if exportPath != "" {
|
|
// Give collectors a brief moment to run their initial collection.
|
|
select {
|
|
case <-time.After(200 * time.Millisecond):
|
|
case <-ctx.Done():
|
|
os.Exit(1)
|
|
}
|
|
|
|
snap := st.Snapshot(time.Now())
|
|
if err := export.WriteIssues(exportPath, snap); err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
os.Exit(1)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Run Bubble Tea UI.
|
|
m := ui.New("", eng.Snapshots(), eng.RefreshNow, st.Acknowledge, st.Unacknowledge, export.WriteIssues)
|
|
p := bubbletea.NewProgram(m, bubbletea.WithAltScreen())
|
|
if _, err := p.Run(); err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func kubeconfigExists() bool {
|
|
// Respect KUBECONFIG when set; otherwise check ~/.kube/config.
|
|
if p := os.Getenv("KUBECONFIG"); p != "" {
|
|
_, err := os.Stat(p)
|
|
return err == nil
|
|
}
|
|
if h, err := os.UserHomeDir(); err == nil {
|
|
p := filepath.Join(h, ".kube", "config")
|
|
_, err := os.Stat(p)
|
|
return err == nil
|
|
}
|
|
return false
|
|
}
|
|
|
|
func validateExportPath(path string) error {
|
|
cleanPath := filepath.Clean(path)
|
|
|
|
if strings.Contains(cleanPath, ".."+string(filepath.Separator)) {
|
|
return fmt.Errorf("path traversal not allowed in export path: %s", path)
|
|
}
|
|
|
|
if filepath.IsAbs(cleanPath) {
|
|
return fmt.Errorf("absolute paths not allowed in export path: %s", path)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// k8sConnectivityCollector is a minimal Kubernetes collector.
|
|
// It only validates connectivity/auth and emits a P0 issue after a grace window.
|
|
//
|
|
// Full cluster state collection is implemented elsewhere; this keeps main wired
|
|
// and provides a useful health signal the UI can display.
|
|
//
|
|
// NOTE: This collector intentionally returns nil error on connectivity issues so
|
|
// the Engine does not "freeze" last-known issues.
|
|
//
|
|
// It does not use informers (cheap) and runs at a low cadence.
|
|
//
|
|
//nolint:unused // referenced via newK8sConnectivityCollector
|
|
type unreachableTracker struct {
|
|
grace time.Duration
|
|
firstFailureAt time.Time
|
|
lastErr error
|
|
}
|
|
|
|
func newUnreachableTracker(grace time.Duration) *unreachableTracker {
|
|
if grace <= 0 {
|
|
grace = 10 * time.Second
|
|
}
|
|
return &unreachableTracker{grace: grace}
|
|
}
|
|
|
|
func (t *unreachableTracker) observeSuccess() {
|
|
t.firstFailureAt = time.Time{}
|
|
t.lastErr = nil
|
|
}
|
|
|
|
func (t *unreachableTracker) observeFailure(now time.Time, err error) {
|
|
if err == nil {
|
|
return
|
|
}
|
|
t.lastErr = err
|
|
if t.firstFailureAt.IsZero() {
|
|
t.firstFailureAt = now
|
|
}
|
|
}
|
|
|
|
func (t *unreachableTracker) shouldEmit(now time.Time) bool {
|
|
return t.lastErr != nil && !t.firstFailureAt.IsZero() && now.Sub(t.firstFailureAt) >= t.grace
|
|
}
|
|
|
|
type k8sConnectivityCollector struct {
|
|
tracker *unreachableTracker
|
|
}
|
|
|
|
func newK8sConnectivityCollector() collectors.Collector {
|
|
return &k8sConnectivityCollector{tracker: newUnreachableTracker(k8sUnreachableGraceDefault)}
|
|
}
|
|
|
|
func (c *k8sConnectivityCollector) Name() string { return "k8s:connectivity" }
|
|
|
|
func (c *k8sConnectivityCollector) Interval() time.Duration { return 5 * time.Second }
|
|
|
|
func (c *k8sConnectivityCollector) Collect(ctx context.Context) ([]model.Issue, collectors.Status, error) {
|
|
now := time.Now()
|
|
cs, _, err := collectorsk8s.ClientFromCurrentContext()
|
|
if err != nil {
|
|
c.tracker.observeFailure(now, err)
|
|
return c.issuesForFailure(now, err), collectors.Status{Health: collectors.HealthDegraded, Message: "kubeconfig/client error"}, nil
|
|
}
|
|
|
|
// Short ping to validate reachability.
|
|
pingErr := collectorsk8s.Ping(ctx, cs)
|
|
if pingErr == nil {
|
|
c.tracker.observeSuccess()
|
|
return nil, collectors.OKStatus(), nil
|
|
}
|
|
|
|
c.tracker.observeFailure(now, pingErr)
|
|
return c.issuesForFailure(now, pingErr), collectors.Status{Health: collectors.HealthDegraded, Message: "k8s ping failed"}, nil
|
|
}
|
|
|
|
func (c *k8sConnectivityCollector) issuesForFailure(now time.Time, err error) []model.Issue {
|
|
if c.tracker.shouldEmit(now) {
|
|
return []model.Issue{model.Issue{
|
|
ID: "k8s:cluster:unreachable",
|
|
Category: model.CategoryKubernetes,
|
|
Priority: model.PriorityP0,
|
|
Title: "Kubernetes cluster unreachable / auth failed",
|
|
Details: fmt.Sprintf("Kubernetes API unreachable or credentials invalid. Last error: %v", err),
|
|
Evidence: map[string]string{"reason": "Unreachable"},
|
|
SuggestedFix: "kubectl cluster-info\nkubectl get nodes",
|
|
}}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Keep otherwise-unused constants referenced.
|
|
var _ = []any{collectors.HealthOK, collectorTimeoutFast, collectorTimeoutK8sList}
|