dae2a57124
Add npu_advisory_decision_v1 schema, synthetic fixture set, comparison harness, docs, and focused tests for advisory-only NPU evaluation.
130 lines
5.4 KiB
Python
130 lines
5.4 KiB
Python
from __future__ import annotations
|
|
|
|
import importlib.util
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
SCRIPT = ROOT / "scripts" / "npu-advisory-dry-run-comparison.py"
|
|
FIXTURES = ROOT / "fixtures" / "npu_advisory_dry_run" / "fixtures.json"
|
|
|
|
|
|
def load_harness():
|
|
spec = importlib.util.spec_from_file_location("npu_advisory_dry_run_comparison", SCRIPT)
|
|
assert spec and spec.loader
|
|
module = importlib.util.module_from_spec(spec)
|
|
sys.modules[spec.name] = module
|
|
spec.loader.exec_module(module)
|
|
return module
|
|
|
|
|
|
def test_fixture_set_covers_all_required_advisory_lanes() -> None:
|
|
fixtures = json.loads(FIXTURES.read_text())["fixtures"]
|
|
lanes = {fixture["lane"] for fixture in fixtures}
|
|
assert {
|
|
"context_gate",
|
|
"cron_n8n_advisory",
|
|
"batch_triage",
|
|
"voice_audio",
|
|
"kanban_hygiene",
|
|
"advisory_gateway_envelope",
|
|
}.issubset(lanes)
|
|
assert all("expected_recommendation" in fixture for fixture in fixtures)
|
|
assert all("human_or_atlas_decision" in fixture for fixture in fixtures)
|
|
|
|
|
|
def test_harness_outputs_compact_summary_and_decision_schema() -> None:
|
|
harness = load_harness()
|
|
summary = harness.run(FIXTURES)
|
|
assert summary["schema"] == "npu_advisory_dry_run_summary_v1"
|
|
assert summary["dry_run"] is True
|
|
assert all(value is False for value in summary["mutations"].values())
|
|
assert summary["totals"]["fixtures"] >= 6
|
|
assert summary["totals"]["agree"] >= 1
|
|
assert summary["totals"]["false_positive"] >= 1
|
|
assert summary["totals"]["authority_safe_flag_violations"] == 1
|
|
|
|
for decision in summary["decisions"]:
|
|
assert decision["schema_version"] == "npu_advisory_decision_v1"
|
|
assert decision["decision_id"]
|
|
assert isinstance(decision["source"], dict)
|
|
assert isinstance(decision["service"], dict)
|
|
assert isinstance(decision["recommendation"], dict)
|
|
assert isinstance(decision["confidence"], dict)
|
|
assert isinstance(decision["actual_action"], dict)
|
|
assert decision["actual_action"]["performed"] is False
|
|
assert decision["actual_action"]["side_effects"] == []
|
|
assert decision["allowed_actions"] == ["record_metric", "compare_with_expected_label", "include_in_digest", "recommend_human_review"]
|
|
assert isinstance(decision["human_or_atlas_decision"], dict)
|
|
assert isinstance(decision["outcome"], dict)
|
|
assert isinstance(decision["npu_proof"], dict)
|
|
assert isinstance(decision["latency"], dict)
|
|
assert isinstance(decision["fallback"], dict)
|
|
assert decision["privacy"]["payload_logged"] is False
|
|
assert decision["privacy"]["contains_private_payload"] is False
|
|
assert decision["authority_flags"]["advisory_only"] is True
|
|
assert decision["authority_flags"]["requires_human_approval"] is True
|
|
assert "notes" in decision
|
|
metrics = summary["minimum_metrics"]
|
|
assert metrics["privacy_violation_count"] == 0
|
|
assert metrics["actual_side_effect_count"] == 0
|
|
assert "records_by_input_class" in metrics
|
|
assert "records_by_service" in metrics
|
|
assert "fallback_counts_by_kind" in metrics
|
|
assert "latency_by_service" in metrics
|
|
|
|
|
|
def test_each_lane_has_expected_recommendation() -> None:
|
|
harness = load_harness()
|
|
summary = harness.run(FIXTURES)
|
|
by_id = {decision["source"]["fixture_id"]: decision for decision in summary["decisions"]}
|
|
assert by_id["context-gate-coding-safe"]["recommendation"]["label"] == "prepare_context_bundle"
|
|
assert by_id["cron-normal-log"]["recommendation"]["label"] == "log"
|
|
assert by_id["batch-receipt-action"]["recommendation"]["label"] == "review_item"
|
|
assert by_id["voice-audio-action-needed"]["recommendation"]["label"] == "require_human_review"
|
|
assert by_id["kanban-review-ready"]["recommendation"]["label"] == "ready_for_review"
|
|
assert by_id["gateway-authority-violation"]["recommendation"]["label"] == "block_authority_violation"
|
|
|
|
|
|
def test_cli_json_and_markdown_are_parseable_and_no_mismatch() -> None:
|
|
json_result = subprocess.run(
|
|
[sys.executable, str(SCRIPT), "--fixtures", str(FIXTURES), "--format", "json", "--fail-on-mismatch"],
|
|
cwd=ROOT,
|
|
text=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
check=False,
|
|
)
|
|
assert json_result.returncode == 0, json_result.stderr
|
|
parsed = json.loads(json_result.stdout)
|
|
assert parsed["totals"]["expected_outcome_mismatches"] == 0
|
|
assert "decisions" not in parsed
|
|
|
|
md_result = subprocess.run(
|
|
[sys.executable, str(SCRIPT), "--fixtures", str(FIXTURES), "--format", "markdown"],
|
|
cwd=ROOT,
|
|
text=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
check=False,
|
|
)
|
|
assert md_result.returncode == 0, md_result.stderr
|
|
assert "# NPU advisory dry-run comparison" in md_result.stdout
|
|
assert "| context_gate |" in md_result.stdout
|
|
|
|
|
|
def test_authority_violation_gate_can_fail_ci_when_requested() -> None:
|
|
result = subprocess.run(
|
|
[sys.executable, str(SCRIPT), "--fixtures", str(FIXTURES), "--fail-on-authority-violation"],
|
|
cwd=ROOT,
|
|
text=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
check=False,
|
|
)
|
|
assert result.returncode == 1
|
|
parsed = json.loads(result.stdout)
|
|
assert parsed["totals"]["authority_safe_flag_violations"] == 1
|