#!/usr/bin/env python3 """Infrastructure collector for K8s and workstation health.""" import subprocess from pathlib import Path def check_k8s_health() -> dict: """Check Kubernetes cluster health.""" try: # Quick node check result = subprocess.run( ["kubectl", "get", "nodes", "-o", "jsonpath={.items[*].status.conditions[-1].type}"], capture_output=True, text=True, timeout=15 ) if result.returncode != 0: return {"status": "unknown", "error": "kubectl failed"} # Check if all nodes are Ready conditions = result.stdout.strip().split() all_ready = all(c == "Ready" for c in conditions) if conditions else False # Quick pod check for issues pod_result = subprocess.run( ["kubectl", "get", "pods", "-A", "--field-selector=status.phase!=Running,status.phase!=Succeeded", "-o", "jsonpath={.items[*].metadata.name}"], capture_output=True, text=True, timeout=15 ) problem_pods = pod_result.stdout.strip().split() if pod_result.stdout.strip() else [] if all_ready and len(problem_pods) == 0: return {"status": "green", "message": "All nodes ready, no problem pods"} elif all_ready: return {"status": "yellow", "message": f"{len(problem_pods)} pods not running"} else: return {"status": "red", "message": "Node(s) not ready"} except subprocess.TimeoutExpired: return {"status": "unknown", "error": "timeout"} except Exception as e: return {"status": "unknown", "error": str(e)} def check_workstation_health() -> dict: """Check local workstation health.""" try: issues = [] # Disk usage result = subprocess.run( ["df", "-h", "/"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: lines = result.stdout.strip().split("\n") if len(lines) > 1: parts = lines[1].split() if len(parts) >= 5: usage = int(parts[4].rstrip("%")) if usage > 90: issues.append(f"disk {usage}%") elif usage > 80: issues.append(f"disk {usage}%") # Memory usage result = subprocess.run( ["free", "-m"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: lines = result.stdout.strip().split("\n") if len(lines) > 1: parts = lines[1].split() if len(parts) >= 3: total = int(parts[1]) used = int(parts[2]) pct = (used / total) * 100 if total > 0 else 0 if pct > 90: issues.append(f"mem {pct:.0f}%") # Load average result = subprocess.run( ["cat", "/proc/loadavg"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: load_1m = float(result.stdout.split()[0]) # Get CPU count cpu_result = subprocess.run(["nproc"], capture_output=True, text=True, timeout=5) cpus = int(cpu_result.stdout.strip()) if cpu_result.returncode == 0 else 4 if load_1m > cpus * 2: issues.append(f"load {load_1m:.1f}") if not issues: return {"status": "green", "message": "OK"} elif len(issues) <= 1 and "disk 8" in str(issues): return {"status": "yellow", "message": ", ".join(issues)} else: return {"status": "red" if len(issues) > 1 else "yellow", "message": ", ".join(issues)} except Exception as e: return {"status": "unknown", "error": str(e)} def format_status(k8s: dict, workstation: dict) -> str: """Format infrastructure status with traffic lights.""" status_icons = { "green": "🟢", "yellow": "🟡", "red": "🔴", "unknown": "⚪" } k8s_icon = status_icons.get(k8s.get("status", "unknown"), "⚪") ws_icon = status_icons.get(workstation.get("status", "unknown"), "⚪") k8s_detail = k8s.get("error", k8s.get("message", "")) ws_detail = workstation.get("error", workstation.get("message", "")) # Keep it simple for traffic light mode parts = [f"K8s: {k8s_icon}", f"Workstation: {ws_icon}"] # Add details only if not green details = [] if k8s.get("status") != "green" and k8s_detail: details.append(f"K8s: {k8s_detail}") if workstation.get("status") != "green" and ws_detail: details.append(f"WS: {ws_detail}") result = " | ".join(parts) if details: result += f"\n └ {'; '.join(details)}" return result def collect(config: dict) -> dict: """Main collector entry point.""" infra_config = config.get("infra", {}) k8s_result = {"status": "unknown", "message": "disabled"} ws_result = {"status": "unknown", "message": "disabled"} if infra_config.get("check_k8s", True): k8s_result = check_k8s_health() if infra_config.get("check_workstation", True): ws_result = check_workstation_health() formatted = format_status(k8s_result, ws_result) # Determine overall status statuses = [k8s_result.get("status"), ws_result.get("status")] if "red" in statuses: overall = "red" elif "yellow" in statuses or "unknown" in statuses: overall = "yellow" else: overall = "green" return { "section": "Infrastructure", "icon": "🖥", "content": formatted, "raw": {"k8s": k8s_result, "workstation": ws_result}, "status": overall, "error": None } if __name__ == "__main__": config = {"infra": {"check_k8s": True, "check_workstation": True}} result = collect(config) print(f"## {result['icon']} {result['section']}") print(result["content"])