Files
porthole/internal/collectors/k8s/informers.go
OpenCode Test 1421b4659e feat: implement ControlTower TUI for cluster and host monitoring
Add complete TUI application for monitoring Kubernetes clusters and host
systems. Features include:

Core features:
- Collector framework with concurrent scheduling
- Host collectors: disk, memory, load, network
- Kubernetes collectors: pods, nodes, workloads, events with informers
- Issue deduplication, state management, and resolve-after logic
- Bubble Tea TUI with table view, details pane, and filtering
- JSON export functionality

UX improvements:
- Help overlay with keybindings
- Priority/category filters with visual indicators
- Direct priority jump (0/1/2/3)
- Bulk acknowledge (Shift+A)
- Clipboard copy (y)
- Theme toggle (T)
- Age format toggle (d)
- Wide title toggle (t)
- Vi-style navigation (j/k)
- Home/End jump (g/G)
- Rollup drill-down in details

Robustness:
- Grace period for unreachable clusters
- Rollups for high-volume issues
- Flap suppression
- RBAC error handling

Files: All core application code with tests for host collectors,
engine, store, model, and export packages.
2025-12-24 13:29:51 -08:00

721 lines
20 KiB
Go

package k8s
import (
"context"
"fmt"
"os"
"path/filepath"
"sort"
"sync"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
appslisters "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"tower/internal/collectors"
"tower/internal/model"
)
// Collector is the ControlTower Kubernetes collector.
//
// It uses client-go informers (LIST+WATCH with local caches) against the user's
// kubeconfig current context, across all namespaces.
//
// Degradation behavior:
// - If WATCH fails repeatedly, it falls back to polling LIST and emits a P1
// "degraded to polling" issue.
// - While in polling mode, it periodically attempts to recover back to watches.
// - If the cluster is unreachable, it emits a P0 only after 10s continuous failure.
// - If RBAC forbids list/watch for a resource, it emits a single P2 issue per
// inaccessible resource and continues for accessible resources.
//
// Noise control:
// - Rollups group by (namespace, reason, kind) when group size >= 20.
// - Cap max issues to 200 after rollups.
//
// Instantiate with NewCollector().
type Collector struct {
interval time.Duration
unreachableGrace time.Duration
pendingGrace time.Duration
workloadGrace time.Duration
crashLoopThresh int
rollupThreshold int
maxIssues int
watchFailureThreshold int
watchFailureWindow time.Duration
pollRecoverEvery time.Duration
mu sync.Mutex
syncWG sync.WaitGroup
client kubernetes.Interface
factory informers.SharedInformerFactory
stopCh chan struct{}
started bool
syncedFns []cache.InformerSynced
podsLister corelisters.PodLister
nodesLister corelisters.NodeLister
eventsLister corelisters.EventLister
deployLister appslisters.DeploymentLister
statefulSetLister appslisters.StatefulSetLister
daemonSetLister appslisters.DaemonSetLister
// polling indicates we have degraded from informers to list polling.
polling bool
pollSince time.Time
lastPollRecoverAttempt time.Time
watchFailWindowStart time.Time
watchFailCount int
// rbacDenied is keyed by resource name ("pods", "nodes", ...).
rbacDenied map[string]error
unreach *unreachableTracker
lastSuccess time.Time
}
func NewCollector() *Collector {
c := &Collector{
interval: 2 * time.Second,
unreachableGrace: 10 * time.Second,
pendingGrace: 120 * time.Second,
workloadGrace: 180 * time.Second,
crashLoopThresh: 5,
rollupThreshold: 20,
maxIssues: 200,
watchFailureThreshold: 5,
watchFailureWindow: 30 * time.Second,
pollRecoverEvery: 30 * time.Second,
rbacDenied: map[string]error{},
}
c.unreach = newUnreachableTracker(c.unreachableGrace)
return c
}
var _ collectors.Collector = (*Collector)(nil)
func (c *Collector) Name() string { return "k8s" }
func (c *Collector) Interval() time.Duration {
if c.interval <= 0 {
return 2 * time.Second
}
return c.interval
}
func (c *Collector) Collect(ctx context.Context) ([]model.Issue, collectors.Status, error) {
now := time.Now()
if err := ctx.Err(); err != nil {
return nil, collectors.Status{Health: collectors.HealthError, Message: "canceled"}, err
}
// If kubeconfig doesn't exist, treat Kubernetes as "disabled".
if !kubeconfigExists() {
return nil, collectors.Status{Health: collectors.HealthDegraded, Message: "kubeconfig not found"}, nil
}
if err := c.ensureClient(); err != nil {
c.unreach.observeFailure(now, err)
if c.unreach.shouldEmit(now) {
iss := stampIssueTimes(now, unreachableIssue(err))
return []model.Issue{iss}, collectors.Status{Health: collectors.HealthError, Message: "unreachable"}, nil
}
return nil, collectors.Status{Health: collectors.HealthError, Message: "k8s client init failed (grace)"}, nil
}
// Connectivity/auth check with grace.
if err := Ping(ctx, c.client); err != nil {
c.unreach.observeFailure(now, err)
if c.unreach.shouldEmit(now) {
iss := stampIssueTimes(now, unreachableIssue(err))
return []model.Issue{iss}, collectors.Status{Health: collectors.HealthError, Message: "unreachable"}, nil
}
return nil, collectors.Status{Health: collectors.HealthError, Message: "k8s unreachable (grace)"}, nil
}
c.unreach.observeSuccess()
c.lastSuccess = now
// Prefer informers unless currently degraded to polling.
if c.isPolling() {
c.maybeRecoverInformers(ctx, now)
}
if !c.isPolling() {
_ = c.ensureInformers(ctx)
}
issues := make([]model.Issue, 0, 64)
issues = append(issues, c.rbacIssues()...)
st := collectors.Status{Health: collectors.HealthOK, LastSuccess: c.lastSuccess}
if c.isPolling() {
st.Health = collectors.HealthDegraded
st.Message = "degraded to polling"
issues = append(issues, stampIssueTimes(now, pollingDegradedIssue()))
issues = append(issues, c.collectByPolling(ctx, now)...)
} else {
// If caches aren't ready, use polling for this tick only.
if !c.cachesSyncedQuick(ctx) {
st.Health = collectors.HealthDegraded
st.Message = "waiting for informer cache; used list"
issues = append(issues, c.collectByPolling(ctx, now)...)
} else {
issues = append(issues, c.collectFromCaches(now)...)
if len(c.snapshotRBACDenied()) > 0 {
st.Health = collectors.HealthDegraded
st.Message = "partial RBAC access"
}
}
}
// Set timestamps, roll up and cap.
for i := range issues {
issues[i] = stampIssueTimes(now, issues[i])
}
issues = Rollup(issues, c.rollupThreshold, 5)
model.SortIssuesDefault(issues)
issues = CapIssues(issues, c.maxIssues)
return issues, st, nil
}
func (c *Collector) ensureClient() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.client != nil {
return nil
}
cs, _, err := ClientFromCurrentContext()
if err != nil {
return err
}
c.client = cs
return nil
}
func kubeconfigExists() bool {
if p := os.Getenv("KUBECONFIG"); p != "" {
for _, fp := range filepath.SplitList(p) {
if fp == "" {
continue
}
if _, err := os.Stat(fp); err == nil {
return true
}
}
return false
}
p := defaultKubeconfigPath()
if p == "" {
return false
}
_, err := os.Stat(p)
return err == nil
}
func (c *Collector) ensureInformers(ctx context.Context) error {
c.mu.Lock()
if c.started || c.polling {
c.mu.Unlock()
return nil
}
client := c.client
c.mu.Unlock()
if client == nil {
return fmt.Errorf("nil kubernetes client")
}
// RBAC preflight before we even construct informers (so we can skip forbidden ones).
c.preflightRBAC(ctx, client)
factory := informers.NewSharedInformerFactory(client, 0)
var (
podsInf cache.SharedIndexInformer
nodesInf cache.SharedIndexInformer
evsInf cache.SharedIndexInformer
depInf cache.SharedIndexInformer
stsInf cache.SharedIndexInformer
dsInf cache.SharedIndexInformer
)
if !c.isRBACDenied("pods") {
i := factory.Core().V1().Pods()
i.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) { c.recordWatchError("pods", err) })
c.mu.Lock()
c.podsLister = i.Lister()
c.mu.Unlock()
podsInf = i.Informer()
}
if !c.isRBACDenied("nodes") {
i := factory.Core().V1().Nodes()
i.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) { c.recordWatchError("nodes", err) })
c.mu.Lock()
c.nodesLister = i.Lister()
c.mu.Unlock()
nodesInf = i.Informer()
}
if !c.isRBACDenied("events") {
i := factory.Core().V1().Events()
i.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) { c.recordWatchError("events", err) })
c.mu.Lock()
c.eventsLister = i.Lister()
c.mu.Unlock()
evsInf = i.Informer()
}
if !c.isRBACDenied("deployments") {
i := factory.Apps().V1().Deployments()
i.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) { c.recordWatchError("deployments", err) })
c.mu.Lock()
c.deployLister = i.Lister()
c.mu.Unlock()
depInf = i.Informer()
}
if !c.isRBACDenied("statefulsets") {
i := factory.Apps().V1().StatefulSets()
i.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) { c.recordWatchError("statefulsets", err) })
c.mu.Lock()
c.statefulSetLister = i.Lister()
c.mu.Unlock()
stsInf = i.Informer()
}
if !c.isRBACDenied("daemonsets") {
i := factory.Apps().V1().DaemonSets()
i.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) { c.recordWatchError("daemonsets", err) })
c.mu.Lock()
c.daemonSetLister = i.Lister()
c.mu.Unlock()
dsInf = i.Informer()
}
synced := make([]cache.InformerSynced, 0, 6)
if podsInf != nil {
synced = append(synced, podsInf.HasSynced)
}
if nodesInf != nil {
synced = append(synced, nodesInf.HasSynced)
}
if evsInf != nil {
synced = append(synced, evsInf.HasSynced)
}
if depInf != nil {
synced = append(synced, depInf.HasSynced)
}
if stsInf != nil {
synced = append(synced, stsInf.HasSynced)
}
if dsInf != nil {
synced = append(synced, dsInf.HasSynced)
}
stopCh := make(chan struct{})
c.mu.Lock()
c.factory = factory
c.stopCh = stopCh
c.started = true
c.syncedFns = synced
c.mu.Unlock()
factory.Start(stopCh)
c.syncWG.Add(1)
go func() {
defer c.syncWG.Done()
syncCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if ok := cache.WaitForCacheSync(syncCtx.Done(), synced...); !ok {
fmt.Printf("k8s: informer cache sync failed or timed out\n")
}
}()
return nil
}
func (c *Collector) maybeRecoverInformers(ctx context.Context, now time.Time) {
c.mu.Lock()
interval := c.pollRecoverEvery
last := c.lastPollRecoverAttempt
c.mu.Unlock()
if interval <= 0 {
interval = 30 * time.Second
}
if !last.IsZero() && now.Sub(last) < interval {
return
}
c.mu.Lock()
c.lastPollRecoverAttempt = now
c.mu.Unlock()
// Only attempt if connectivity is OK (already pinged successfully in Collect).
// Reset watch failure counters and exit polling; subsequent Collect will ensureInformers.
c.mu.Lock()
c.polling = false
c.pollSince = time.Time{}
c.watchFailWindowStart = time.Time{}
c.watchFailCount = 0
c.mu.Unlock()
_ = c.ensureInformers(ctx)
}
func (c *Collector) preflightRBAC(ctx context.Context, client kubernetes.Interface) {
shortCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
probe := func(resource string, f func(context.Context) error) {
if err := f(shortCtx); err != nil {
if apierrors.IsForbidden(err) {
c.noteRBAC(resource, err)
}
}
}
probe("nodes", func(ctx context.Context) error {
_, err := client.CoreV1().Nodes().List(ctx, metav1.ListOptions{Limit: 1})
return err
})
probe("pods", func(ctx context.Context) error {
_, err := client.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{Limit: 1})
return err
})
probe("deployments", func(ctx context.Context) error {
_, err := client.AppsV1().Deployments(metav1.NamespaceAll).List(ctx, metav1.ListOptions{Limit: 1})
return err
})
probe("statefulsets", func(ctx context.Context) error {
_, err := client.AppsV1().StatefulSets(metav1.NamespaceAll).List(ctx, metav1.ListOptions{Limit: 1})
return err
})
probe("daemonsets", func(ctx context.Context) error {
_, err := client.AppsV1().DaemonSets(metav1.NamespaceAll).List(ctx, metav1.ListOptions{Limit: 1})
return err
})
probe("events", func(ctx context.Context) error {
_, err := client.CoreV1().Events(metav1.NamespaceAll).List(ctx, metav1.ListOptions{Limit: 1})
return err
})
}
func (c *Collector) noteRBAC(resource string, err error) {
if err == nil || !apierrors.IsForbidden(err) {
return
}
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.rbacDenied[resource]; ok {
return
}
c.rbacDenied[resource] = err
}
func (c *Collector) isRBACDenied(resource string) bool {
c.mu.Lock()
defer c.mu.Unlock()
_, ok := c.rbacDenied[resource]
return ok
}
func (c *Collector) snapshotRBACDenied() map[string]error {
c.mu.Lock()
defer c.mu.Unlock()
out := make(map[string]error, len(c.rbacDenied))
for k, v := range c.rbacDenied {
out[k] = v
}
return out
}
func (c *Collector) recordWatchError(resource string, err error) {
if err == nil {
return
}
if apierrors.IsForbidden(err) {
c.noteRBAC(resource, err)
return
}
now := time.Now()
c.mu.Lock()
defer c.mu.Unlock()
if c.polling {
return
}
if c.watchFailWindowStart.IsZero() || now.Sub(c.watchFailWindowStart) > c.watchFailureWindow {
c.watchFailWindowStart = now
c.watchFailCount = 0
}
c.watchFailCount++
if c.watchFailCount >= c.watchFailureThreshold {
c.polling = true
c.pollSince = now
if c.stopCh != nil {
close(c.stopCh)
c.stopCh = nil
}
c.started = false
c.factory = nil
c.syncedFns = nil
c.syncWG.Wait()
}
}
func (c *Collector) cachesSyncedQuick(ctx context.Context) bool {
c.mu.Lock()
synced := append([]cache.InformerSynced(nil), c.syncedFns...)
c.mu.Unlock()
if len(synced) == 0 {
return false
}
syncCtx, cancel := context.WithTimeout(ctx, 200*time.Millisecond)
defer cancel()
return cache.WaitForCacheSync(syncCtx.Done(), synced...)
}
func (c *Collector) collectFromCaches(now time.Time) []model.Issue {
c.mu.Lock()
podsLister := c.podsLister
nodesLister := c.nodesLister
eventsLister := c.eventsLister
deployLister := c.deployLister
stsLister := c.statefulSetLister
dsLister := c.daemonSetLister
denied := make(map[string]error, len(c.rbacDenied))
for k, v := range c.rbacDenied {
denied[k] = v
}
c.mu.Unlock()
issues := make([]model.Issue, 0, 64)
sel := labels.Everything()
if _, ok := denied["nodes"]; !ok && nodesLister != nil {
if list, err := nodesLister.List(sel); err == nil {
nodes := make([]*corev1.Node, 0, len(list))
for i := range list {
nodes = append(nodes, list[i])
}
issues = append(issues, IssuesFromNodes(nodes)...)
}
}
if _, ok := denied["pods"]; !ok && podsLister != nil {
if list, err := podsLister.List(sel); err == nil {
pods := make([]*corev1.Pod, 0, len(list))
for i := range list {
pods = append(pods, list[i])
}
issues = append(issues, IssuesFromPods(pods, now, c.pendingGrace, c.crashLoopThresh)...)
}
}
if _, ok := denied["deployments"]; !ok && deployLister != nil {
if list, err := deployLister.List(sel); err == nil {
deps := make([]*appsv1.Deployment, 0, len(list))
for i := range list {
deps = append(deps, list[i])
}
issues = append(issues, IssuesFromDeployments(deps, now, c.workloadGrace)...)
}
}
if _, ok := denied["statefulsets"]; !ok && stsLister != nil {
if list, err := stsLister.List(sel); err == nil {
sts := make([]*appsv1.StatefulSet, 0, len(list))
for i := range list {
sts = append(sts, list[i])
}
issues = append(issues, IssuesFromStatefulSets(sts, now, c.workloadGrace)...)
}
}
if _, ok := denied["daemonsets"]; !ok && dsLister != nil {
if list, err := dsLister.List(sel); err == nil {
dss := make([]*appsv1.DaemonSet, 0, len(list))
for i := range list {
dss = append(dss, list[i])
}
issues = append(issues, IssuesFromDaemonSets(dss, now, c.workloadGrace)...)
}
}
if _, ok := denied["events"]; !ok && eventsLister != nil {
if list, err := eventsLister.List(sel); err == nil {
es := make([]*corev1.Event, 0, len(list))
for i := range list {
es = append(es, list[i])
}
issues = append(issues, IssuesFromEvents(es, now)...)
}
}
return issues
}
func (c *Collector) collectByPolling(ctx context.Context, now time.Time) []model.Issue {
c.mu.Lock()
client := c.client
denied := make(map[string]error, len(c.rbacDenied))
for k, v := range c.rbacDenied {
denied[k] = v
}
c.mu.Unlock()
if client == nil {
return nil
}
issues := make([]model.Issue, 0, 64)
if _, ok := denied["nodes"]; !ok {
if nodes, err := client.CoreV1().Nodes().List(ctx, metav1.ListOptions{}); err != nil {
c.noteRBAC("nodes", err)
} else {
list := make([]*corev1.Node, 0, len(nodes.Items))
for i := range nodes.Items {
list = append(list, &nodes.Items[i])
}
issues = append(issues, IssuesFromNodes(list)...)
}
}
if _, ok := denied["pods"]; !ok {
if pods, err := client.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}); err != nil {
c.noteRBAC("pods", err)
} else {
list := make([]*corev1.Pod, 0, len(pods.Items))
for i := range pods.Items {
list = append(list, &pods.Items[i])
}
issues = append(issues, IssuesFromPods(list, now, c.pendingGrace, c.crashLoopThresh)...)
}
}
if _, ok := denied["deployments"]; !ok {
if deps, err := client.AppsV1().Deployments(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}); err != nil {
c.noteRBAC("deployments", err)
} else {
list := make([]*appsv1.Deployment, 0, len(deps.Items))
for i := range deps.Items {
list = append(list, &deps.Items[i])
}
issues = append(issues, IssuesFromDeployments(list, now, c.workloadGrace)...)
}
}
if _, ok := denied["statefulsets"]; !ok {
if sts, err := client.AppsV1().StatefulSets(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}); err != nil {
c.noteRBAC("statefulsets", err)
} else {
list := make([]*appsv1.StatefulSet, 0, len(sts.Items))
for i := range sts.Items {
list = append(list, &sts.Items[i])
}
issues = append(issues, IssuesFromStatefulSets(list, now, c.workloadGrace)...)
}
}
if _, ok := denied["daemonsets"]; !ok {
if dss, err := client.AppsV1().DaemonSets(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}); err != nil {
c.noteRBAC("daemonsets", err)
} else {
list := make([]*appsv1.DaemonSet, 0, len(dss.Items))
for i := range dss.Items {
list = append(list, &dss.Items[i])
}
issues = append(issues, IssuesFromDaemonSets(list, now, c.workloadGrace)...)
}
}
if _, ok := denied["events"]; !ok {
if evs, err := client.CoreV1().Events(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}); err != nil {
c.noteRBAC("events", err)
} else {
list := make([]*corev1.Event, 0, len(evs.Items))
for i := range evs.Items {
list = append(list, &evs.Items[i])
}
issues = append(issues, IssuesFromEvents(list, now)...)
}
}
return issues
}
func (c *Collector) rbacIssues() []model.Issue {
denied := c.snapshotRBACDenied()
keys := make([]string, 0, len(denied))
for k := range denied {
keys = append(keys, k)
}
sort.Strings(keys)
out := make([]model.Issue, 0, len(keys))
for _, res := range keys {
err := denied[res]
out = append(out, model.Issue{
ID: fmt.Sprintf("k8s:rbac:%s", res),
Category: model.CategoryKubernetes,
Priority: model.PriorityP2,
Title: fmt.Sprintf("Insufficient RBAC: list/watch %s", res),
Details: fmt.Sprintf("Current context cannot access %s (forbidden). %s", res, sanitizeError(err)),
Evidence: map[string]string{
"kind": "Cluster",
"reason": "RBAC",
"namespace": "",
"resource": res,
},
SuggestedFix: fmt.Sprintf("kubectl auth can-i list %s --all-namespaces", res),
})
}
return out
}
func pollingDegradedIssue() model.Issue {
return model.Issue{
ID: "k8s:cluster:polling",
Category: model.CategoryKubernetes,
Priority: model.PriorityP1,
Title: "Kubernetes degraded: polling (watch failing)",
Details: "Kubernetes watches have failed repeatedly; collector switched to LIST polling. Data may be less real-time and API load is higher.",
Evidence: map[string]string{
"kind": "Cluster",
"reason": "DegradedPolling",
"namespace": "",
},
SuggestedFix: "Check API server / network stability and RBAC; ensure watch endpoints are reachable.",
}
}
func stampIssueTimes(now time.Time, iss model.Issue) model.Issue {
iss.LastSeen = now
if iss.FirstSeen.IsZero() {
iss.FirstSeen = now
}
return iss
}
func (c *Collector) isPolling() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.polling
}