From 72434c8bc33a83e5e8f70ba2d60e675dbe88ebdf Mon Sep 17 00:00:00 2001 From: William Valentin Date: Sat, 6 Jun 2026 15:30:31 -0700 Subject: [PATCH] feat(npu): add advisory metrics to utilization digest Roll up confidence, recommendation, authority, fallback, and service-level metrics, including v1 authority-flag handling. --- docs/npu-utilization-digest.md | 2 +- scripts/npu-utilization-digest.py | 178 ++++++++++++++++++++++++++- tests/test_npu_utilization_digest.py | 78 +++++++++++- 3 files changed, 254 insertions(+), 4 deletions(-) diff --git a/docs/npu-utilization-digest.md b/docs/npu-utilization-digest.md index 195e1b7..86e51ee 100644 --- a/docs/npu-utilization-digest.md +++ b/docs/npu-utilization-digest.md @@ -33,7 +33,7 @@ scripts/npu-utilization-digest.py --format jsonl --no-write python -m pytest tests/test_npu_utilization_digest.py -q ``` -Output shape is intentionally small: service booleans, counts, average probe ms, sysfs deltas, proof flags, fallback warning counts, artifact path, and closed gates. `fallbacks` includes unavailable services, failed/missing proof, and skipped proof-capable smokes such as disabled Whisper/doc-triage probes or GenAI cold-load skips; intentionally health-only RAG/advisory rows are not fallbacks unless unavailable. It does not print raw embeddings, transcripts, OCR text, model completions, request headers, or full upstream JSON. +Output shape is intentionally small: service booleans, request counts by service, average probe ms, sysfs/NPU busy deltas by service, proof flags, fallback totals and per-service fallback counts, confidence distribution, escalation/suppression recommendation counts, authority-safe flag violation totals, artifact path, and closed gates. `fallbacks` includes unavailable services, failed/missing proof, and skipped proof-capable smokes such as disabled Whisper/doc-triage probes or GenAI cold-load skips; intentionally health-only RAG/advisory rows are not fallbacks unless unavailable. It does not print raw embeddings, transcripts, OCR text, model completions, request headers, or full upstream JSON. Covered rows: diff --git a/scripts/npu-utilization-digest.py b/scripts/npu-utilization-digest.py index 72942de..d7f7eb2 100755 --- a/scripts/npu-utilization-digest.py +++ b/scripts/npu-utilization-digest.py @@ -72,6 +72,10 @@ class ServiceRow: dry_run: bool | None = None suppress: int | None = None escalate: int | None = None + recommendation: str | None = None + confidence: float | None = None + confidence_bucket: str | None = None + authority_violations: int | None = None loaded: bool | None = None allowed_roots_count: int | None = None reason: str | None = None @@ -83,6 +87,136 @@ def compact_dict(obj: Any) -> dict[str, Any]: return {k: v for k, v in data.items() if v is not None and v != []} +AUTHORITY_SAFE_ACTIONS = { + "", "none", "log", "observe", "dry_run", "recommend", "suppress", "escalate", + "record_metric", "compare_with_expected_label", "include_in_digest", + "open_review_ticket_candidate", "recommend_human_review", +} +AUTHORITY_FLAG_KEYS = { + "advisory_post", + "atlas_routing", + "broad_private_scan", + "delivery_send", + "gateway_restart", + "live_routing", + "memory_write", + "outbound_send", + "private_root_scan", + "service_restart", + "tool_execution", + "vector_mutation", +} +AUTHORITY_FLAG_ALIASES = { + "can_route_atlas": "atlas_routing", + "can_write_memory": "memory_write", + "can_execute_tools": "tool_execution", + "can_restart_services": "service_restart", + "can_send_outbound": "outbound_send", + "can_scan_private_roots": "private_root_scan", + "can_mutate_vector_store": "vector_mutation", + "can_post_advisory_event": "advisory_post", + "can_change_gateway_config": "gateway_restart", + "may_route": "atlas_routing", + "may_write_memory": "memory_write", + "may_execute_tools": "tool_execution", + "may_restart_services": "service_restart", + "may_send_external": "outbound_send", + "may_process_private_dirs": "private_root_scan", + "may_mutate_vector_db": "vector_mutation", + "may_change_live_config": "gateway_restart", +} + + +def confidence_bucket(confidence: float | None) -> str | None: + if confidence is None: + return None + if confidence >= 0.8: + return "high" + if confidence >= 0.5: + return "medium" + return "low" + + +def coerce_confidence(value: Any) -> float | None: + if isinstance(value, bool): + return None + if isinstance(value, (int, float)): + return max(0.0, min(1.0, float(value))) + if isinstance(value, str): + try: + return max(0.0, min(1.0, float(value))) + except ValueError: + return None + return None + + +def extract_confidence(payload: dict[str, Any]) -> float | None: + direct = coerce_confidence(payload.get("confidence")) + if direct is not None: + return direct + raw_labels = payload.get("labels") + labels: dict[str, Any] = raw_labels if isinstance(raw_labels, dict) else {} + scores: list[float] = [] + for value in labels.values(): + if isinstance(value, dict): + for score_key in ("confidence", "score", "probability"): + if score_key in value: + score = coerce_confidence(value.get(score_key)) + break + score = None + else: + score = coerce_confidence(value) + if score is not None: + scores.append(score) + return max(scores) if scores else None + + +def extract_recommendation(payload: dict[str, Any]) -> str | None: + for key in ("recommendation", "classification", "input_class"): + value = payload.get(key) + if isinstance(value, str) and value: + return value[:48] + raw_action = payload.get("action") + action: dict[str, Any] = raw_action if isinstance(raw_action, dict) else {} + value = action.get("recommendation") or action.get("type") + return str(value)[:48] if value else None + + +def count_authority_violations(payload: dict[str, Any]) -> int: + """Count advisory response hints that would exceed read-only/dry-run authority. + + Supports both legacy compact payloads and `npu_advisory_decision_v1`. + Valid schema-safe allowed actions and object-shaped no-op actual actions must + not count as violations; any true live-authority flag must count. + """ + violations = 0 + raw_flags = payload.get("authority_flags") + flags: dict[str, Any] = raw_flags if isinstance(raw_flags, dict) else {} + for key, value in flags.items(): + canonical = AUTHORITY_FLAG_ALIASES.get(key, key) + if canonical in AUTHORITY_FLAG_KEYS and bool(value): + violations += 1 + + raw_allowed = payload.get("allowed_actions") + allowed: list[Any] = raw_allowed if isinstance(raw_allowed, list) else [] + for action in allowed: + if str(action).lower() not in AUTHORITY_SAFE_ACTIONS: + violations += 1 + + raw_actual = payload.get("actual_action") + if isinstance(raw_actual, dict): + performed = bool(raw_actual.get("performed")) + side_effects = raw_actual.get("side_effects") or [] + kind = str(raw_actual.get("kind") or "none").lower() + if performed or side_effects or kind not in AUTHORITY_SAFE_ACTIONS | {"recorded_metric", "dry_run_reported"}: + violations += 1 + else: + actual = str(raw_actual or "").lower() + if actual and actual not in AUTHORITY_SAFE_ACTIONS: + violations += 1 + return violations + + def read_busy(path: Path = BUSY_PATH) -> int | None: try: return int(path.read_text().strip()) @@ -234,6 +368,12 @@ def probe_classifier(timeout: float, busy_path: Path = BUSY_PATH, post_json: Cal action: dict[str, Any] = raw_action if isinstance(raw_action, dict) else {} row.escalate = int(bool(action.get("escalate") or labels.get("action_required") or labels.get("tool_needed"))) row.suppress = int(bool(action.get("suppress") or labels.get("no_op") or labels.get("duplicate"))) + row.recommendation = extract_recommendation(data) or ("escalate" if row.escalate else "suppress" if row.suppress else "log") + row.confidence = extract_confidence(data) + row.confidence_bucket = confidence_bucket(row.confidence) + row.authority_violations = count_authority_violations(data) + if row.authority_violations: + row.warnings.append("authority_violation") row.items = len(labels) apply_proof(row, delta) if not row.reachable: @@ -387,10 +527,28 @@ def build_summary(rows: list[ServiceRow], artifact_path: str | None, counter_del proof_ok = sum(1 for r in proof_rows if r.proof_ok) gates_closed = sum(1 for r in rows if str(r.gate).startswith("closed:")) fallbacks = sum(r.fallbacks for r in rows) + request_counts_by_service = {r.service: r.calls for r in rows if r.calls} + npu_busy_delta_us_by_service = {r.service: r.npu_delta_us for r in rows if r.npu_delta_us is not None} + fallbacks_by_service = {r.service: r.fallbacks for r in rows if r.fallbacks} + recommendation_counts = {"escalate": 0, "suppress": 0} + confidence_distribution: dict[str, int] = {"low": 0, "medium": 0, "high": 0, "unknown": 0} + authority_violations = 0 warnings: dict[str, int] = {} for row in rows: + recommendation = (row.recommendation or "").lower() + if recommendation in recommendation_counts: + recommendation_counts[recommendation] += 1 + else: + recommendation_counts["escalate"] += row.escalate or 0 + recommendation_counts["suppress"] += row.suppress or 0 + if row.confidence_bucket: + confidence_distribution[row.confidence_bucket] = confidence_distribution.get(row.confidence_bucket, 0) + 1 + elif row.recommendation or row.escalate is not None or row.suppress is not None: + confidence_distribution["unknown"] += 1 + authority_violations += row.authority_violations or 0 for warning in row.warnings: warnings[warning] = warnings.get(warning, 0) + 1 + confidence_distribution = {k: v for k, v in confidence_distribution.items() if v} return { "type": "summary", "timestamp": started_at, @@ -401,6 +559,12 @@ def build_summary(rows: list[ServiceRow], artifact_path: str | None, counter_del "proof_ok": proof_ok, "proof_total": len(proof_rows), "fallbacks": fallbacks, + "fallbacks_by_service": fallbacks_by_service, + "request_counts_by_service": request_counts_by_service, + "npu_busy_delta_us_by_service": npu_busy_delta_us_by_service, + "confidence_distribution": confidence_distribution, + "recommendation_counts": {k: v for k, v in recommendation_counts.items() if v}, + "authority_violations": authority_violations, "gates_closed": gates_closed, "warnings": warnings, "artifact": artifact_path, @@ -411,8 +575,14 @@ def render_text(summary: dict[str, Any], rows: list[ServiceRow]) -> str: lines = [ f"NPU utilization digest {summary['timestamp']}", f"counter={summary['counter']} delta_us={summary.get('delta_us')}", - f"services_ok={summary['services_ok']}/{summary['services_total']} proof_ok={summary['proof_ok']}/{summary['proof_total']} fallbacks={summary['fallbacks']} gates_closed={summary['gates_closed']}", + f"services_ok={summary['services_ok']}/{summary['services_total']} proof_ok={summary['proof_ok']}/{summary['proof_total']} fallbacks={summary['fallbacks']} authority_violations={summary['authority_violations']} gates_closed={summary['gates_closed']}", ] + rec_counts = summary.get("recommendation_counts") or {} + if rec_counts: + lines.append("recommendations: " + " ".join(f"{k}={v}" for k, v in sorted(rec_counts.items()))) + conf_dist = summary.get("confidence_distribution") or {} + if conf_dist: + lines.append("confidence: " + " ".join(f"{k}={v}" for k, v in sorted(conf_dist.items()))) for r in rows: parts = [f"- {r.service}:", f"ok={str(r.reachable).lower()}"] if r.calls: @@ -437,6 +607,12 @@ def render_text(summary: dict[str, Any], rows: list[ServiceRow]) -> str: parts.append(f"suppress={r.suppress}") if r.escalate is not None: parts.append(f"escalate={r.escalate}") + if r.recommendation is not None: + parts.append(f"recommendation={r.recommendation}") + if r.confidence_bucket is not None: + parts.append(f"confidence={r.confidence_bucket}") + if r.authority_violations is not None: + parts.append(f"authority_violations={r.authority_violations}") if r.loaded is not None: parts.append(f"loaded={str(r.loaded).lower()}") if r.allowed_roots_count is not None: diff --git a/tests/test_npu_utilization_digest.py b/tests/test_npu_utilization_digest.py index a284fbd..07679de 100644 --- a/tests/test_npu_utilization_digest.py +++ b/tests/test_npu_utilization_digest.py @@ -67,7 +67,15 @@ def test_classifier_dry_run_payload(tmp_path, monkeypatch): def fake_post(url, payload, timeout): seen.update(payload) busy.write_text("35") - return 200, {"labels": {"tool_needed": True, "duplicate": False}, "npu_busy_delta_us": 25} + return 200, { + "labels": {"tool_needed": True, "duplicate": False}, + "recommendation": "escalate", + "confidence": 0.84, + "authority_flags": {"tool_execution": False, "memory_write": False}, + "allowed_actions": ["log", "recommend"], + "actual_action": "dry_run", + "npu_busy_delta_us": 25, + } monkeypatch.setattr(digest, "health_row", fake_health) row = digest.probe_classifier(1, busy_path=busy, post_json=fake_post) @@ -75,6 +83,10 @@ def test_classifier_dry_run_payload(tmp_path, monkeypatch): assert seen["options"]["include_evidence"] is False assert row.escalate == 1 assert row.suppress == 0 + assert row.recommendation == "escalate" + assert row.confidence == 0.84 + assert row.confidence_bucket == "high" + assert row.authority_violations == 0 assert row.proof_ok is True @@ -145,15 +157,77 @@ def test_disabled_proof_smokes_count_as_fallbacks(monkeypatch): def test_jsonl_shape(tmp_path): - rows = [digest.ServiceRow(service="embeddings", reachable=True, probe_ran=True, proof_ok=True, npu_delta_us=1)] + rows = [digest.ServiceRow(service="embeddings", reachable=True, probe_ran=True, proof_ok=True, calls=1, npu_delta_us=1)] summary = digest.build_summary(rows, None, 1, "2026-06-05T14:20:00-07:00") path = digest.write_jsonl(summary, rows, tmp_path) lines = [json.loads(line) for line in path.read_text().splitlines()] assert lines[0]["type"] == "summary" + assert lines[0]["request_counts_by_service"] == {"embeddings": 1} + assert lines[0]["npu_busy_delta_us_by_service"] == {"embeddings": 1} assert lines[1]["type"] == "service" assert lines[1]["service"] == "embeddings" +def test_summary_observability_rollups_and_text(): + rows = [ + digest.ServiceRow(service="classifier", reachable=True, calls=1, npu_delta_us=25, fallbacks=0, escalate=1, suppress=0, recommendation="escalate", confidence=0.84, confidence_bucket="high", authority_violations=0), + digest.ServiceRow(service="doc_triage", reachable=True, calls=1, npu_delta_us=7, fallbacks=1, warnings=["no_positive_sysfs_delta"]), + digest.ServiceRow(service="advisory_gateway", reachable=True, gate="closed:advisory-post", authority_violations=1, warnings=["authority_violation"]), + ] + summary = digest.build_summary(rows, None, 32, "2026-06-05T14:20:00-07:00") + assert summary["request_counts_by_service"] == {"classifier": 1, "doc_triage": 1} + assert summary["npu_busy_delta_us_by_service"] == {"classifier": 25, "doc_triage": 7} + assert summary["fallbacks_by_service"] == {"doc_triage": 1} + assert summary["confidence_distribution"] == {"high": 1} + assert summary["recommendation_counts"] == {"escalate": 1} + assert summary["authority_violations"] == 1 + text = digest.render_text(summary, rows) + assert "authority_violations=1" in text + assert "recommendations: escalate=1" in text + assert "confidence: high=1" in text + + +def test_authority_violation_detection(): + assert digest.count_authority_violations({ + "authority_flags": {"tool_execution": True, "memory_write": False}, + "allowed_actions": ["log", "service_restart"], + "actual_action": "outbound_send", + }) == 3 + + +def test_v1_authority_violation_detection(): + safe_payload = { + "authority_flags": { + "can_route_atlas": False, + "can_write_memory": False, + "can_execute_tools": False, + "can_restart_services": False, + "can_send_outbound": False, + "can_scan_private_roots": False, + "can_mutate_vector_store": False, + "can_post_advisory_event": False, + "can_change_gateway_config": False, + "requires_human_approval": True, + "advisory_only": True, + }, + "allowed_actions": ["record_metric", "compare_with_expected_label", "include_in_digest", "recommend_human_review"], + "actual_action": {"kind": "dry_run_reported", "performed": False, "performed_by": "harness", "side_effects": []}, + } + assert digest.count_authority_violations(safe_payload) == 0 + unsafe = dict(safe_payload) + unsafe["authority_flags"] = dict(safe_payload["authority_flags"], can_execute_tools=True) + assert digest.count_authority_violations(unsafe) == 1 + + +def test_recommendation_only_and_zero_confidence_rollups(): + payload = {"labels": {"no_op": {"confidence": 0.0, "score": 0.9}}, "recommendation": "suppress"} + assert digest.extract_confidence(payload) == 0.0 + row = digest.ServiceRow(service="classifier", reachable=True, recommendation="suppress", confidence=0.0, confidence_bucket="low") + summary = digest.build_summary([row], None, None, "2026-06-05T14:20:00-07:00") + assert summary["recommendation_counts"] == {"suppress": 1} + assert summary["confidence_distribution"] == {"low": 1} + + def test_exit_codes(monkeypatch): rows = [digest.ServiceRow(service="embeddings", reachable=True, probe_ran=True, proof_ok=False, warnings=["no_positive_sysfs_delta"])] summary = digest.build_summary(rows, None, 0, "2026-06-05T14:20:00-07:00")