package k8s import ( "context" "fmt" "os" "path/filepath" "sort" "sync" "time" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" appslisters "k8s.io/client-go/listers/apps/v1" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "tower/internal/collectors" "tower/internal/model" ) // Collector is the ControlTower Kubernetes collector. // // It uses client-go informers (LIST+WATCH with local caches) against the user's // kubeconfig current context, across all namespaces. // // Degradation behavior: // - If WATCH fails repeatedly, it falls back to polling LIST and emits a P1 // "degraded to polling" issue. // - While in polling mode, it periodically attempts to recover back to watches. // - If the cluster is unreachable, it emits a P0 only after 10s continuous failure. // - If RBAC forbids list/watch for a resource, it emits a single P2 issue per // inaccessible resource and continues for accessible resources. // // Noise control: // - Rollups group by (namespace, reason, kind) when group size >= 20. // - Cap max issues to 200 after rollups. // // Instantiate with NewCollector(). type Collector struct { interval time.Duration unreachableGrace time.Duration pendingGrace time.Duration workloadGrace time.Duration crashLoopThresh int rollupThreshold int maxIssues int watchFailureThreshold int watchFailureWindow time.Duration pollRecoverEvery time.Duration mu sync.Mutex syncWG sync.WaitGroup client kubernetes.Interface factory informers.SharedInformerFactory stopCh chan struct{} started bool syncedFns []cache.InformerSynced podsLister corelisters.PodLister nodesLister corelisters.NodeLister eventsLister corelisters.EventLister deployLister appslisters.DeploymentLister statefulSetLister appslisters.StatefulSetLister daemonSetLister appslisters.DaemonSetLister // polling indicates we have degraded from informers to list polling. polling bool pollSince time.Time lastPollRecoverAttempt time.Time watchFailWindowStart time.Time watchFailCount int // rbacDenied is keyed by resource name ("pods", "nodes", ...). rbacDenied map[string]error unreach *unreachableTracker lastSuccess time.Time } func NewCollector() *Collector { c := &Collector{ interval: 2 * time.Second, unreachableGrace: 10 * time.Second, pendingGrace: 120 * time.Second, workloadGrace: 180 * time.Second, crashLoopThresh: 5, rollupThreshold: 20, maxIssues: 200, watchFailureThreshold: 5, watchFailureWindow: 30 * time.Second, pollRecoverEvery: 30 * time.Second, rbacDenied: map[string]error{}, } c.unreach = newUnreachableTracker(c.unreachableGrace) return c } var _ collectors.Collector = (*Collector)(nil) func (c *Collector) Name() string { return "k8s" } func (c *Collector) Interval() time.Duration { if c.interval <= 0 { return 2 * time.Second } return c.interval } func (c *Collector) Collect(ctx context.Context) ([]model.Issue, collectors.Status, error) { now := time.Now() if err := ctx.Err(); err != nil { return nil, collectors.Status{Health: collectors.HealthError, Message: "canceled"}, err } // If kubeconfig doesn't exist, treat Kubernetes as "disabled". if !kubeconfigExists() { return nil, collectors.Status{Health: collectors.HealthDegraded, Message: "kubeconfig not found"}, nil } if err := c.ensureClient(); err != nil { c.unreach.observeFailure(now, err) if c.unreach.shouldEmit(now) { iss := stampIssueTimes(now, unreachableIssue(err)) return []model.Issue{iss}, collectors.Status{Health: collectors.HealthError, Message: "unreachable"}, nil } return nil, collectors.Status{Health: collectors.HealthError, Message: "k8s client init failed (grace)"}, nil } // Connectivity/auth check with grace. if err := Ping(ctx, c.client); err != nil { c.unreach.observeFailure(now, err) if c.unreach.shouldEmit(now) { iss := stampIssueTimes(now, unreachableIssue(err)) return []model.Issue{iss}, collectors.Status{Health: collectors.HealthError, Message: "unreachable"}, nil } return nil, collectors.Status{Health: collectors.HealthError, Message: "k8s unreachable (grace)"}, nil } c.unreach.observeSuccess() c.lastSuccess = now // Prefer informers unless currently degraded to polling. if c.isPolling() { c.maybeRecoverInformers(ctx, now) } if !c.isPolling() { _ = c.ensureInformers(ctx) } issues := make([]model.Issue, 0, 64) issues = append(issues, c.rbacIssues()...) st := collectors.Status{Health: collectors.HealthOK, LastSuccess: c.lastSuccess} if c.isPolling() { st.Health = collectors.HealthDegraded st.Message = "degraded to polling" issues = append(issues, stampIssueTimes(now, pollingDegradedIssue())) issues = append(issues, c.collectByPolling(ctx, now)...) } else { // If caches aren't ready, use polling for this tick only. if !c.cachesSyncedQuick(ctx) { st.Health = collectors.HealthDegraded st.Message = "waiting for informer cache; used list" issues = append(issues, c.collectByPolling(ctx, now)...) } else { issues = append(issues, c.collectFromCaches(now)...) if len(c.snapshotRBACDenied()) > 0 { st.Health = collectors.HealthDegraded st.Message = "partial RBAC access" } } } // Set timestamps, roll up and cap. for i := range issues { issues[i] = stampIssueTimes(now, issues[i]) } issues = Rollup(issues, c.rollupThreshold, 5) model.SortIssuesDefault(issues) issues = CapIssues(issues, c.maxIssues) return issues, st, nil } func (c *Collector) ensureClient() error { c.mu.Lock() defer c.mu.Unlock() if c.client != nil { return nil } cs, _, err := ClientFromCurrentContext() if err != nil { return err } c.client = cs return nil } func kubeconfigExists() bool { if p := os.Getenv("KUBECONFIG"); p != "" { for _, fp := range filepath.SplitList(p) { if fp == "" { continue } if _, err := os.Stat(fp); err == nil { return true } } return false } p := defaultKubeconfigPath() if p == "" { return false } _, err := os.Stat(p) return err == nil } func (c *Collector) ensureInformers(ctx context.Context) error { c.mu.Lock() if c.started || c.polling { c.mu.Unlock() return nil } client := c.client c.mu.Unlock() if client == nil { return fmt.Errorf("nil kubernetes client") } // RBAC preflight before we even construct informers (so we can skip forbidden ones). c.preflightRBAC(ctx, client) factory := informers.NewSharedInformerFactory(client, 0) var ( podsInf cache.SharedIndexInformer nodesInf cache.SharedIndexInformer evsInf cache.SharedIndexInformer depInf cache.SharedIndexInformer stsInf cache.SharedIndexInformer dsInf cache.SharedIndexInformer ) if !c.isRBACDenied("pods") { i := factory.Core().V1().Pods() i.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) { c.recordWatchError("pods", err) }) c.mu.Lock() c.podsLister = i.Lister() c.mu.Unlock() podsInf = i.Informer() } if !c.isRBACDenied("nodes") { i := factory.Core().V1().Nodes() i.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) { c.recordWatchError("nodes", err) }) c.mu.Lock() c.nodesLister = i.Lister() c.mu.Unlock() nodesInf = i.Informer() } if !c.isRBACDenied("events") { i := factory.Core().V1().Events() i.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) { c.recordWatchError("events", err) }) c.mu.Lock() c.eventsLister = i.Lister() c.mu.Unlock() evsInf = i.Informer() } if !c.isRBACDenied("deployments") { i := factory.Apps().V1().Deployments() i.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) { c.recordWatchError("deployments", err) }) c.mu.Lock() c.deployLister = i.Lister() c.mu.Unlock() depInf = i.Informer() } if !c.isRBACDenied("statefulsets") { i := factory.Apps().V1().StatefulSets() i.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) { c.recordWatchError("statefulsets", err) }) c.mu.Lock() c.statefulSetLister = i.Lister() c.mu.Unlock() stsInf = i.Informer() } if !c.isRBACDenied("daemonsets") { i := factory.Apps().V1().DaemonSets() i.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) { c.recordWatchError("daemonsets", err) }) c.mu.Lock() c.daemonSetLister = i.Lister() c.mu.Unlock() dsInf = i.Informer() } synced := make([]cache.InformerSynced, 0, 6) if podsInf != nil { synced = append(synced, podsInf.HasSynced) } if nodesInf != nil { synced = append(synced, nodesInf.HasSynced) } if evsInf != nil { synced = append(synced, evsInf.HasSynced) } if depInf != nil { synced = append(synced, depInf.HasSynced) } if stsInf != nil { synced = append(synced, stsInf.HasSynced) } if dsInf != nil { synced = append(synced, dsInf.HasSynced) } stopCh := make(chan struct{}) c.mu.Lock() c.factory = factory c.stopCh = stopCh c.started = true c.syncedFns = synced c.mu.Unlock() factory.Start(stopCh) c.syncWG.Add(1) go func() { defer c.syncWG.Done() syncCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if ok := cache.WaitForCacheSync(syncCtx.Done(), synced...); !ok { fmt.Printf("k8s: informer cache sync failed or timed out\n") } }() return nil } func (c *Collector) maybeRecoverInformers(ctx context.Context, now time.Time) { c.mu.Lock() interval := c.pollRecoverEvery last := c.lastPollRecoverAttempt c.mu.Unlock() if interval <= 0 { interval = 30 * time.Second } if !last.IsZero() && now.Sub(last) < interval { return } c.mu.Lock() c.lastPollRecoverAttempt = now c.mu.Unlock() // Only attempt if connectivity is OK (already pinged successfully in Collect). // Reset watch failure counters and exit polling; subsequent Collect will ensureInformers. c.mu.Lock() c.polling = false c.pollSince = time.Time{} c.watchFailWindowStart = time.Time{} c.watchFailCount = 0 c.mu.Unlock() _ = c.ensureInformers(ctx) } func (c *Collector) preflightRBAC(ctx context.Context, client kubernetes.Interface) { shortCtx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() probe := func(resource string, f func(context.Context) error) { if err := f(shortCtx); err != nil { if apierrors.IsForbidden(err) { c.noteRBAC(resource, err) } } } probe("nodes", func(ctx context.Context) error { _, err := client.CoreV1().Nodes().List(ctx, metav1.ListOptions{Limit: 1}) return err }) probe("pods", func(ctx context.Context) error { _, err := client.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{Limit: 1}) return err }) probe("deployments", func(ctx context.Context) error { _, err := client.AppsV1().Deployments(metav1.NamespaceAll).List(ctx, metav1.ListOptions{Limit: 1}) return err }) probe("statefulsets", func(ctx context.Context) error { _, err := client.AppsV1().StatefulSets(metav1.NamespaceAll).List(ctx, metav1.ListOptions{Limit: 1}) return err }) probe("daemonsets", func(ctx context.Context) error { _, err := client.AppsV1().DaemonSets(metav1.NamespaceAll).List(ctx, metav1.ListOptions{Limit: 1}) return err }) probe("events", func(ctx context.Context) error { _, err := client.CoreV1().Events(metav1.NamespaceAll).List(ctx, metav1.ListOptions{Limit: 1}) return err }) } func (c *Collector) noteRBAC(resource string, err error) { if err == nil || !apierrors.IsForbidden(err) { return } c.mu.Lock() defer c.mu.Unlock() if _, ok := c.rbacDenied[resource]; ok { return } c.rbacDenied[resource] = err } func (c *Collector) isRBACDenied(resource string) bool { c.mu.Lock() defer c.mu.Unlock() _, ok := c.rbacDenied[resource] return ok } func (c *Collector) snapshotRBACDenied() map[string]error { c.mu.Lock() defer c.mu.Unlock() out := make(map[string]error, len(c.rbacDenied)) for k, v := range c.rbacDenied { out[k] = v } return out } func (c *Collector) recordWatchError(resource string, err error) { if err == nil { return } if apierrors.IsForbidden(err) { c.noteRBAC(resource, err) return } now := time.Now() c.mu.Lock() defer c.mu.Unlock() if c.polling { return } if c.watchFailWindowStart.IsZero() || now.Sub(c.watchFailWindowStart) > c.watchFailureWindow { c.watchFailWindowStart = now c.watchFailCount = 0 } c.watchFailCount++ if c.watchFailCount >= c.watchFailureThreshold { c.polling = true c.pollSince = now if c.stopCh != nil { close(c.stopCh) c.stopCh = nil } c.started = false c.factory = nil c.syncedFns = nil c.syncWG.Wait() } } func (c *Collector) cachesSyncedQuick(ctx context.Context) bool { c.mu.Lock() synced := append([]cache.InformerSynced(nil), c.syncedFns...) c.mu.Unlock() if len(synced) == 0 { return false } syncCtx, cancel := context.WithTimeout(ctx, 200*time.Millisecond) defer cancel() return cache.WaitForCacheSync(syncCtx.Done(), synced...) } func (c *Collector) collectFromCaches(now time.Time) []model.Issue { c.mu.Lock() podsLister := c.podsLister nodesLister := c.nodesLister eventsLister := c.eventsLister deployLister := c.deployLister stsLister := c.statefulSetLister dsLister := c.daemonSetLister denied := make(map[string]error, len(c.rbacDenied)) for k, v := range c.rbacDenied { denied[k] = v } c.mu.Unlock() issues := make([]model.Issue, 0, 64) sel := labels.Everything() if _, ok := denied["nodes"]; !ok && nodesLister != nil { if list, err := nodesLister.List(sel); err == nil { nodes := make([]*corev1.Node, 0, len(list)) for i := range list { nodes = append(nodes, list[i]) } issues = append(issues, IssuesFromNodes(nodes)...) } } if _, ok := denied["pods"]; !ok && podsLister != nil { if list, err := podsLister.List(sel); err == nil { pods := make([]*corev1.Pod, 0, len(list)) for i := range list { pods = append(pods, list[i]) } issues = append(issues, IssuesFromPods(pods, now, c.pendingGrace, c.crashLoopThresh)...) } } if _, ok := denied["deployments"]; !ok && deployLister != nil { if list, err := deployLister.List(sel); err == nil { deps := make([]*appsv1.Deployment, 0, len(list)) for i := range list { deps = append(deps, list[i]) } issues = append(issues, IssuesFromDeployments(deps, now, c.workloadGrace)...) } } if _, ok := denied["statefulsets"]; !ok && stsLister != nil { if list, err := stsLister.List(sel); err == nil { sts := make([]*appsv1.StatefulSet, 0, len(list)) for i := range list { sts = append(sts, list[i]) } issues = append(issues, IssuesFromStatefulSets(sts, now, c.workloadGrace)...) } } if _, ok := denied["daemonsets"]; !ok && dsLister != nil { if list, err := dsLister.List(sel); err == nil { dss := make([]*appsv1.DaemonSet, 0, len(list)) for i := range list { dss = append(dss, list[i]) } issues = append(issues, IssuesFromDaemonSets(dss, now, c.workloadGrace)...) } } if _, ok := denied["events"]; !ok && eventsLister != nil { if list, err := eventsLister.List(sel); err == nil { es := make([]*corev1.Event, 0, len(list)) for i := range list { es = append(es, list[i]) } issues = append(issues, IssuesFromEvents(es, now)...) } } return issues } func (c *Collector) collectByPolling(ctx context.Context, now time.Time) []model.Issue { c.mu.Lock() client := c.client denied := make(map[string]error, len(c.rbacDenied)) for k, v := range c.rbacDenied { denied[k] = v } c.mu.Unlock() if client == nil { return nil } issues := make([]model.Issue, 0, 64) if _, ok := denied["nodes"]; !ok { if nodes, err := client.CoreV1().Nodes().List(ctx, metav1.ListOptions{}); err != nil { c.noteRBAC("nodes", err) } else { list := make([]*corev1.Node, 0, len(nodes.Items)) for i := range nodes.Items { list = append(list, &nodes.Items[i]) } issues = append(issues, IssuesFromNodes(list)...) } } if _, ok := denied["pods"]; !ok { if pods, err := client.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}); err != nil { c.noteRBAC("pods", err) } else { list := make([]*corev1.Pod, 0, len(pods.Items)) for i := range pods.Items { list = append(list, &pods.Items[i]) } issues = append(issues, IssuesFromPods(list, now, c.pendingGrace, c.crashLoopThresh)...) } } if _, ok := denied["deployments"]; !ok { if deps, err := client.AppsV1().Deployments(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}); err != nil { c.noteRBAC("deployments", err) } else { list := make([]*appsv1.Deployment, 0, len(deps.Items)) for i := range deps.Items { list = append(list, &deps.Items[i]) } issues = append(issues, IssuesFromDeployments(list, now, c.workloadGrace)...) } } if _, ok := denied["statefulsets"]; !ok { if sts, err := client.AppsV1().StatefulSets(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}); err != nil { c.noteRBAC("statefulsets", err) } else { list := make([]*appsv1.StatefulSet, 0, len(sts.Items)) for i := range sts.Items { list = append(list, &sts.Items[i]) } issues = append(issues, IssuesFromStatefulSets(list, now, c.workloadGrace)...) } } if _, ok := denied["daemonsets"]; !ok { if dss, err := client.AppsV1().DaemonSets(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}); err != nil { c.noteRBAC("daemonsets", err) } else { list := make([]*appsv1.DaemonSet, 0, len(dss.Items)) for i := range dss.Items { list = append(list, &dss.Items[i]) } issues = append(issues, IssuesFromDaemonSets(list, now, c.workloadGrace)...) } } if _, ok := denied["events"]; !ok { if evs, err := client.CoreV1().Events(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}); err != nil { c.noteRBAC("events", err) } else { list := make([]*corev1.Event, 0, len(evs.Items)) for i := range evs.Items { list = append(list, &evs.Items[i]) } issues = append(issues, IssuesFromEvents(list, now)...) } } return issues } func (c *Collector) rbacIssues() []model.Issue { denied := c.snapshotRBACDenied() keys := make([]string, 0, len(denied)) for k := range denied { keys = append(keys, k) } sort.Strings(keys) out := make([]model.Issue, 0, len(keys)) for _, res := range keys { err := denied[res] out = append(out, model.Issue{ ID: fmt.Sprintf("k8s:rbac:%s", res), Category: model.CategoryKubernetes, Priority: model.PriorityP2, Title: fmt.Sprintf("Insufficient RBAC: list/watch %s", res), Details: fmt.Sprintf("Current context cannot access %s (forbidden). %s", res, sanitizeError(err)), Evidence: map[string]string{ "kind": "Cluster", "reason": "RBAC", "namespace": "", "resource": res, }, SuggestedFix: fmt.Sprintf("kubectl auth can-i list %s --all-namespaces", res), }) } return out } func pollingDegradedIssue() model.Issue { return model.Issue{ ID: "k8s:cluster:polling", Category: model.CategoryKubernetes, Priority: model.PriorityP1, Title: "Kubernetes degraded: polling (watch failing)", Details: "Kubernetes watches have failed repeatedly; collector switched to LIST polling. Data may be less real-time and API load is higher.", Evidence: map[string]string{ "kind": "Cluster", "reason": "DegradedPolling", "namespace": "", }, SuggestedFix: "Check API server / network stability and RBAC; ensure watch endpoints are reachable.", } } func stampIssueTimes(now time.Time, iss model.Issue) model.Issue { iss.LastSeen = now if iss.FirstSeen.IsZero() { iss.FirstSeen = now } return iss } func (c *Collector) isPolling() bool { c.mu.Lock() defer c.mu.Unlock() return c.polling }