Add comprehensive morning report skill with collectors for calendar, email, tasks, infrastructure status, news, stocks, and weather. Add stock lookup skill for quote queries.
188 lines
6.0 KiB
Python
Executable File
188 lines
6.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Infrastructure collector for K8s and workstation health."""
|
|
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
|
|
def check_k8s_health() -> dict:
|
|
"""Check Kubernetes cluster health."""
|
|
try:
|
|
# Quick node check
|
|
result = subprocess.run(
|
|
["kubectl", "get", "nodes", "-o", "jsonpath={.items[*].status.conditions[-1].type}"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=15
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return {"status": "unknown", "error": "kubectl failed"}
|
|
|
|
# Check if all nodes are Ready
|
|
conditions = result.stdout.strip().split()
|
|
all_ready = all(c == "Ready" for c in conditions) if conditions else False
|
|
|
|
# Quick pod check for issues
|
|
pod_result = subprocess.run(
|
|
["kubectl", "get", "pods", "-A", "--field-selector=status.phase!=Running,status.phase!=Succeeded",
|
|
"-o", "jsonpath={.items[*].metadata.name}"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=15
|
|
)
|
|
|
|
problem_pods = pod_result.stdout.strip().split() if pod_result.stdout.strip() else []
|
|
|
|
if all_ready and len(problem_pods) == 0:
|
|
return {"status": "green", "message": "All nodes ready, no problem pods"}
|
|
elif all_ready:
|
|
return {"status": "yellow", "message": f"{len(problem_pods)} pods not running"}
|
|
else:
|
|
return {"status": "red", "message": "Node(s) not ready"}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return {"status": "unknown", "error": "timeout"}
|
|
except Exception as e:
|
|
return {"status": "unknown", "error": str(e)}
|
|
|
|
|
|
def check_workstation_health() -> dict:
|
|
"""Check local workstation health."""
|
|
try:
|
|
issues = []
|
|
|
|
# Disk usage
|
|
result = subprocess.run(
|
|
["df", "-h", "/"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
lines = result.stdout.strip().split("\n")
|
|
if len(lines) > 1:
|
|
parts = lines[1].split()
|
|
if len(parts) >= 5:
|
|
usage = int(parts[4].rstrip("%"))
|
|
if usage > 90:
|
|
issues.append(f"disk {usage}%")
|
|
elif usage > 80:
|
|
issues.append(f"disk {usage}%")
|
|
|
|
# Memory usage
|
|
result = subprocess.run(
|
|
["free", "-m"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
lines = result.stdout.strip().split("\n")
|
|
if len(lines) > 1:
|
|
parts = lines[1].split()
|
|
if len(parts) >= 3:
|
|
total = int(parts[1])
|
|
used = int(parts[2])
|
|
pct = (used / total) * 100 if total > 0 else 0
|
|
if pct > 90:
|
|
issues.append(f"mem {pct:.0f}%")
|
|
|
|
# Load average
|
|
result = subprocess.run(
|
|
["cat", "/proc/loadavg"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
load_1m = float(result.stdout.split()[0])
|
|
# Get CPU count
|
|
cpu_result = subprocess.run(["nproc"], capture_output=True, text=True, timeout=5)
|
|
cpus = int(cpu_result.stdout.strip()) if cpu_result.returncode == 0 else 4
|
|
if load_1m > cpus * 2:
|
|
issues.append(f"load {load_1m:.1f}")
|
|
|
|
if not issues:
|
|
return {"status": "green", "message": "OK"}
|
|
elif len(issues) <= 1 and "disk 8" in str(issues):
|
|
return {"status": "yellow", "message": ", ".join(issues)}
|
|
else:
|
|
return {"status": "red" if len(issues) > 1 else "yellow", "message": ", ".join(issues)}
|
|
|
|
except Exception as e:
|
|
return {"status": "unknown", "error": str(e)}
|
|
|
|
|
|
def format_status(k8s: dict, workstation: dict) -> str:
|
|
"""Format infrastructure status with traffic lights."""
|
|
status_icons = {
|
|
"green": "🟢",
|
|
"yellow": "🟡",
|
|
"red": "🔴",
|
|
"unknown": "⚪"
|
|
}
|
|
|
|
k8s_icon = status_icons.get(k8s.get("status", "unknown"), "⚪")
|
|
ws_icon = status_icons.get(workstation.get("status", "unknown"), "⚪")
|
|
|
|
k8s_detail = k8s.get("error", k8s.get("message", ""))
|
|
ws_detail = workstation.get("error", workstation.get("message", ""))
|
|
|
|
# Keep it simple for traffic light mode
|
|
parts = [f"K8s: {k8s_icon}", f"Workstation: {ws_icon}"]
|
|
|
|
# Add details only if not green
|
|
details = []
|
|
if k8s.get("status") != "green" and k8s_detail:
|
|
details.append(f"K8s: {k8s_detail}")
|
|
if workstation.get("status") != "green" and ws_detail:
|
|
details.append(f"WS: {ws_detail}")
|
|
|
|
result = " | ".join(parts)
|
|
if details:
|
|
result += f"\n └ {'; '.join(details)}"
|
|
|
|
return result
|
|
|
|
|
|
def collect(config: dict) -> dict:
|
|
"""Main collector entry point."""
|
|
infra_config = config.get("infra", {})
|
|
|
|
k8s_result = {"status": "unknown", "message": "disabled"}
|
|
ws_result = {"status": "unknown", "message": "disabled"}
|
|
|
|
if infra_config.get("check_k8s", True):
|
|
k8s_result = check_k8s_health()
|
|
|
|
if infra_config.get("check_workstation", True):
|
|
ws_result = check_workstation_health()
|
|
|
|
formatted = format_status(k8s_result, ws_result)
|
|
|
|
# Determine overall status
|
|
statuses = [k8s_result.get("status"), ws_result.get("status")]
|
|
if "red" in statuses:
|
|
overall = "red"
|
|
elif "yellow" in statuses or "unknown" in statuses:
|
|
overall = "yellow"
|
|
else:
|
|
overall = "green"
|
|
|
|
return {
|
|
"section": "Infrastructure",
|
|
"icon": "🖥",
|
|
"content": formatted,
|
|
"raw": {"k8s": k8s_result, "workstation": ws_result},
|
|
"status": overall,
|
|
"error": None
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
config = {"infra": {"check_k8s": True, "check_workstation": True}}
|
|
result = collect(config)
|
|
print(f"## {result['icon']} {result['section']}")
|
|
print(result["content"])
|