Files
swarm-master/scripts/npu-advisory-dry-run-comparison.py
William Valentin dae2a57124 feat(npu): add advisory dry-run comparison harness
Add npu_advisory_decision_v1 schema, synthetic fixture set, comparison harness, docs, and focused tests for advisory-only NPU evaluation.
2026-06-06 15:30:31 -07:00

568 lines
26 KiB
Python
Executable File

#!/usr/bin/env python3
"""Dry-run comparison harness for advisory-only NPU lanes.
The harness evaluates synthetic/non-private fixtures against deterministic lane
adapters and emits compact npu_advisory_decision_v1 records plus JSON/markdown
summaries. It intentionally performs no live routing, memory writes, tool
execution, service restarts, outbound sends, broad private scans, or vector-store
mutation.
"""
from __future__ import annotations
import argparse
import datetime as dt
import hashlib
import uuid
import importlib.util
import json
import re
import sys
import time
from collections import Counter, defaultdict
from pathlib import Path
from typing import Any, Mapping
REPO_ROOT = Path(__file__).resolve().parents[1]
DEFAULT_FIXTURES = REPO_ROOT / "fixtures" / "npu_advisory_dry_run" / "fixtures.json"
SCHEMA = "npu_advisory_decision_v1"
HARNESS_SCHEMA = "npu_advisory_dry_run_summary_v1"
AUTHORITY_FLAGS_CLOSED = {
"can_route_atlas": False,
"can_write_memory": False,
"can_execute_tools": False,
"can_restart_services": False,
"can_send_outbound": False,
"can_scan_private_roots": False,
"can_mutate_vector_store": False,
"can_post_advisory_event": False,
"can_change_gateway_config": False,
"requires_human_approval": True,
"advisory_only": True,
}
MAY_TO_CAN = {
"may_route": "can_route_atlas",
"may_write_memory": "can_write_memory",
"may_execute_tools": "can_execute_tools",
"may_restart_services": "can_restart_services",
"may_send_external": "can_send_outbound",
"may_process_private_dirs": "can_scan_private_roots",
"may_mutate_vector_db": "can_mutate_vector_store",
"may_change_live_config": "can_change_gateway_config",
}
MUTATION_FLAGS_FALSE = {
"live_routing": False,
"memory_writes": False,
"tool_execution": False,
"service_restarts": False,
"outbound_sends": False,
"broad_private_scans": False,
"vector_store_mutation": False,
"gateway_restart": False,
}
ALLOWED_ACTIONS = ["record_metric", "compare_with_expected_label", "include_in_digest", "recommend_human_review"]
NO_ACTUAL_ACTION = {"kind": "dry_run_reported", "performed": False, "performed_by": "harness", "side_effects": []}
ACTION_PATTERNS = {
"follow_up": re.compile(r"\b(follow up|follow-up|circle back|reply|respond)\b", re.I),
"date_or_deadline": re.compile(r"\b(deadline|due|by (?:mon|tue|wed|thu|fri|sat|sun)|20\d{2}[-/]\d{1,2}[-/]\d{1,2})\b", re.I),
"decision": re.compile(r"\b(decided|decision|approved|rejected|go with|choose)\b", re.I),
"task": re.compile(r"\b(todo|to-do|action item|assign|need to|please|reminder|review|ask)\b", re.I),
}
class HarnessError(ValueError):
pass
def load_module(name: str, path: Path):
spec = importlib.util.spec_from_file_location(name, path)
if spec is None or spec.loader is None:
raise HarnessError(f"module_import_failed:{path}")
module = importlib.util.module_from_spec(spec)
sys.modules.setdefault(name, module)
spec.loader.exec_module(module) # type: ignore[union-attr]
return module
def confidence_bucket(value: float | int | None) -> str:
if value is None:
return "unknown"
v = float(value)
if v >= 0.95:
return "very_high"
if v >= 0.80:
return "high"
if v >= 0.60:
return "medium"
if v >= 0.40:
return "low"
return "very_low"
def lane_confidence(output: Mapping[str, Any], fallback: float = 0.7) -> float:
for key in ("confidence", "score"):
try:
return float(output[key])
except (KeyError, TypeError, ValueError):
pass
labels = output.get("labels")
if isinstance(labels, Mapping):
vals: list[float] = []
for value in labels.values():
if isinstance(value, Mapping) and "confidence" in value:
try:
vals.append(float(value["confidence"]))
except (TypeError, ValueError):
continue
if vals:
return max(vals)
return fallback
def closed_authority_flags(extra: Mapping[str, Any] | None = None) -> dict[str, bool]:
flags = dict(AUTHORITY_FLAGS_CLOSED)
for key, value in (extra or {}).items():
mapped = MAY_TO_CAN.get(key, key)
if mapped in flags and mapped not in {"requires_human_approval", "advisory_only"}:
flags[mapped] = bool(value)
return flags
def authority_violations(flags: Mapping[str, Any]) -> list[str]:
return sorted(
key for key, value in flags.items()
if key.startswith("can_") and bool(value)
)
def severity_for(label: str) -> str:
if label in {"escalate", "block_authority_violation"}:
return "critical"
if label in {"require_human_review", "review_item", "ready_for_review", "prepare_context_bundle"}:
return "medium"
if label in {"summarize", "log"}:
return "info"
return "none"
def npu_proof_v1(proof: Mapping[str, Any]) -> dict[str, Any]:
busy = proof.get("npu_busy_delta_us") or proof.get("busy_delta_us")
service_delta = proof.get("service_reported_delta_us") or proof.get("npu_busy_delta_us")
proof_ok = proof.get("ok")
if proof_ok is None and busy is not None:
try:
proof_ok = int(busy) > 0
except (TypeError, ValueError):
proof_ok = None
fixture_only = bool(proof.get("fixture_only", True))
return {
"proof_mode": "offline_fixture" if fixture_only else "service_reported_delta",
"busy_delta_us": int(busy) if isinstance(busy, int) or (isinstance(busy, str) and busy.isdigit()) else None,
"service_reported_delta_us": int(service_delta) if isinstance(service_delta, int) or (isinstance(service_delta, str) and service_delta.isdigit()) else None,
"inference_ran": bool(proof_ok) if proof_ok is not None else False,
"proof_ok": bool(proof_ok) if proof_ok is not None else None,
"counter_path": None,
}
def compare_outcome(recommendation: str, expected: str, human: str) -> str:
if recommendation == human == expected:
return "agree"
if recommendation in {"escalate", "summarize", "review_item", "require_human_review", "prepare_context_bundle"} and human in {"log", "suppress", "none"}:
return "false_positive"
if recommendation in {"log", "suppress", "none"} and human in {"escalate", "summarize", "review_item", "require_human_review", "prepare_context_bundle"}:
return "false_negative"
if recommendation in {"uncertain", "defer"}:
return "uncertain"
return "disagree"
def evaluate_context_gate(fixture: Mapping[str, Any]) -> dict[str, Any]:
context_gate = load_module("openvino_context_gate.context_gate", REPO_ROOT / "openvino_context_gate" / "context_gate.py")
plan = context_gate.build_plan(str(fixture["query"]), context=fixture.get("context") or {}, options={"require_npu_proof": False})
blocked = plan["bundle_plan"].get("blocked_fields") or []
if blocked:
recommendation = "require_human_review"
elif plan["bundle_plan"]["bundle_name"] in {"CodingTaskBundle", "OpsDebugBundle", "ResearchBundle"}:
recommendation = "prepare_context_bundle"
else:
recommendation = "answer_directly"
return {
"recommendation": recommendation,
"confidence": plan["query_class"].get("confidence", 0.7),
"npu_proof": plan["npu_proof"],
"notes": [f"bundle={plan['bundle_plan']['bundle_name']}", f"sources={','.join(s['source'] for s in plan['source_plan'])}"],
"raw_compact": {"bundle_name": plan["bundle_plan"]["bundle_name"], "sources": [s["source"] for s in plan["source_plan"]], "blocked_fields": [f["field"] for f in blocked]},
}
def cron_recommendation(envelope: Mapping[str, Any], event: Mapping[str, Any]) -> str:
labels = ((envelope.get("result") or {}).get("labels") or {}) if isinstance(envelope.get("result"), Mapping) else {}
urgency = (((labels.get("urgency") or {}).get("value")) if isinstance(labels.get("urgency"), Mapping) else labels.get("urgency")) or "normal"
npu = envelope.get("npu_proof") or {}
npu_ok = bool(npu.get("ok") is True and int(npu.get("npu_busy_delta_us") or 0) > 0)
severity = str(event.get("severity") or "normal")
if not npu_ok:
return "log"
if severity == "critical":
return "escalate"
if severity == "warning" or urgency in {"high", "critical"}:
return "summarize"
return "log"
def evaluate_cron_n8n(fixture: Mapping[str, Any]) -> dict[str, Any]:
envelope = fixture.get("gateway_envelope") or {}
event = fixture.get("event") or {}
labels = ((envelope.get("result") or {}).get("labels") or {}) if isinstance(envelope.get("result"), Mapping) else {}
confidence = lane_confidence({"labels": labels}, 0.6)
return {
"recommendation": cron_recommendation(envelope, event),
"confidence": confidence,
"npu_proof": envelope.get("npu_proof") or {},
"authority_from_envelope": envelope.get("authority") or {},
"notes": [f"workflow={event.get('workflow')}", f"severity={event.get('severity')}"]
}
def evaluate_batch_triage(fixture: Mapping[str, Any]) -> dict[str, Any]:
text = str(fixture.get("document_text") or "")
reasons = sorted(name for name, rx in ACTION_PATTERNS.items() if rx.search(text))
if reasons:
recommendation = "review_item"
conf = 0.82
elif len(text.strip()) < 20:
recommendation = "uncertain"
conf = 0.35
else:
recommendation = "suppress"
conf = 0.64
return {
"recommendation": recommendation,
"confidence": conf,
"npu_proof": {"verified": False, "required": False, "note": "fixture_rules_no_npu_claim"},
"notes": [f"lane={fixture.get('triage_lane')}", f"reason_codes={','.join(reasons) or 'none'}"],
"raw_compact": {"reasons": reasons, "raw_text_redacted": True, "full_path_included": False},
}
def evaluate_voice_audio(fixture: Mapping[str, Any]) -> dict[str, Any]:
pipeline = load_module("npu_voice_audio_pipeline", REPO_ROOT / "scripts" / "npu_voice_audio_pipeline.py")
proof = fixture.get("npu_proof") or {}
action_worthy, atlas_gate, next_gate = pipeline.decide_gate(
str(fixture.get("transcript") or ""),
dict(fixture.get("labels") or {}),
whisper_proven=bool(proof.get("whisper")),
classifier_proven=bool(proof.get("classifier")),
)
if atlas_gate.startswith("blocked"):
recommendation = "require_human_review"
elif action_worthy:
recommendation = "review_item"
else:
recommendation = "suppress"
return {
"recommendation": recommendation,
"confidence": 0.86 if action_worthy else 0.66,
"npu_proof": {"whisper": bool(proof.get("whisper")), "classifier": bool(proof.get("classifier")), "verified": bool(proof.get("whisper") and proof.get("classifier"))},
"notes": [f"atlas_gate={atlas_gate}", f"next_gate={next_gate}", "transcript_redacted=true"],
"raw_compact": {"action_worthy": action_worthy, "atlas_gate": atlas_gate, "next_gate": next_gate},
}
def evaluate_kanban_hygiene(fixture: Mapping[str, Any]) -> dict[str, Any]:
hygiene = load_module("kanban_hygiene_advisory", REPO_ROOT / "scripts" / "kanban-hygiene-advisory.py")
out = hygiene.advisory(list(fixture.get("tasks") or []), board="synthetic-npu", now=float(fixture.get("now") or time.time()), input_metadata={}, include_evidence=False)
item = out["items"][0]
next_gate = item["next_gate"]["value"]
return {
"recommendation": next_gate,
"confidence": item["next_gate"].get("confidence", 0.7),
"npu_proof": out["npu_proof"],
"notes": [f"task_id={item['task_id']}", f"review_needed={item['review_needed']['value']}"],
"raw_compact": {"counts": out["counts"], "next_gate": item["next_gate"]},
}
def evaluate_gateway_envelope(fixture: Mapping[str, Any]) -> dict[str, Any]:
envelope = fixture.get("gateway_envelope") or {}
flags = closed_authority_flags(envelope.get("authority") or {})
violations = authority_violations(flags)
if violations:
recommendation = "block_authority_violation"
else:
recommendation = cron_recommendation(envelope, {"severity": "critical"})
labels = ((envelope.get("result") or {}).get("labels") or {}) if isinstance(envelope.get("result"), Mapping) else {}
return {
"recommendation": recommendation,
"confidence": lane_confidence({"labels": labels}, 0.8),
"npu_proof": envelope.get("npu_proof") or {},
"authority_from_envelope": envelope.get("authority") or {},
"notes": [f"violations={','.join(violations) or 'none'}", f"trace_id={envelope.get('trace_id')}"]
}
EVALUATORS = {
"context_gate": evaluate_context_gate,
"cron_n8n_advisory": evaluate_cron_n8n,
"batch_triage": evaluate_batch_triage,
"voice_audio": evaluate_voice_audio,
"kanban_hygiene": evaluate_kanban_hygiene,
"advisory_gateway_envelope": evaluate_gateway_envelope,
}
def build_decision(fixture: Mapping[str, Any], evaluated: Mapping[str, Any]) -> dict[str, Any]:
extra_authority = evaluated.get("authority_from_envelope") if isinstance(evaluated.get("authority_from_envelope"), Mapping) else None
authority_flags = closed_authority_flags(extra_authority)
violations = authority_violations(authority_flags)
recommendation = str(evaluated["recommendation"])
human = str(fixture["human_or_atlas_decision"])
expected = str(fixture["expected_recommendation"])
outcome_label = compare_outcome(recommendation, expected, human)
if recommendation == expected and outcome_label != str(fixture.get("expected_outcome", outcome_label)):
outcome_label = str(fixture.get("expected_outcome"))
confidence_score = float(evaluated.get("confidence") or 0.0)
npu_raw = dict(evaluated.get("npu_proof") or {})
npu_raw.setdefault("fixture_only", True)
fixture_id = str(fixture.get("id"))
input_class = str(fixture.get("input_class") or fixture.get("lane") or "unknown")
service_name = str(fixture.get("service") or fixture.get("lane") or "unknown")
source_kind = str(fixture.get("source") or "fixture")
comparison = "agree" if outcome_label == "agree" else ("uncertain" if outcome_label == "uncertain" else "disagree")
error_type = outcome_label if outcome_label in {"false_positive", "false_negative", "severity_overcall", "severity_undercall"} else None
if violations:
error_type = "unsafe_authority"
return {
"schema_version": SCHEMA,
"decision_id": str(uuid.uuid5(uuid.NAMESPACE_URL, f"{SCHEMA}:{fixture_id}")),
"timestamp": dt.datetime.now(dt.timezone.utc).isoformat(timespec="seconds"),
"source": {
"kind": "fixture",
"fixture_id": fixture_id,
"fixture_set": "npu_advisory_eval_v1",
"artifact_ref": None,
"content_hash": "sha256:" + hashlib.sha256(json.dumps(fixture, sort_keys=True, default=str).encode()).hexdigest(),
"privacy_class": "synthetic" if source_kind.startswith("synthetic") else "non_private",
},
"service": {
"name": service_name,
"endpoint": service_name,
"mode": "offline_fixture",
"model": "openvino-local-fixture",
},
"input_class": input_class,
"recommendation": {
"label": recommendation,
"severity": severity_for(recommendation),
"reasons": list(evaluated.get("notes") or []),
"evidence_refs": [f"fixture:{fixture_id}", f"lane:{fixture.get('lane')}"] ,
"raw_output_ref": None,
},
"expected_recommendation": expected,
"confidence": {
"score": round(confidence_score, 3),
"bucket": confidence_bucket(confidence_score),
"bucket_rule": "v1_default",
"calibrated": False,
},
"authority_flags": authority_flags,
"allowed_actions": ALLOWED_ACTIONS,
"actual_action": dict(NO_ACTUAL_ACTION),
"human_or_atlas_decision": {
"source": "fixture_expected",
"label": human,
"severity": severity_for(human),
"confidence": None,
"decision_ref": fixture_id,
"timestamp": None,
},
"outcome": {
"comparison": comparison,
"label": outcome_label,
"error_type": error_type,
"human_review_required": bool(violations or recommendation in {"require_human_review", "block_authority_violation"}),
"promotion_blocker": bool(violations or error_type in {"false_negative", "unsafe_authority", "privacy_violation"}),
},
"expected_outcome": fixture.get("expected_outcome"),
"npu_proof": npu_proof_v1(npu_raw),
"latency": {"total_ms": 0, "service_ms": None, "queue_ms": None, "timeout": False},
"fallback": {"occurred": True, "kind": "offline", "reason": "synthetic_fixture_deterministic_adapter_no_live_service_call", "expected": True},
"privacy": {"payload_logged": False, "redaction": "metadata_only", "retention": "local_audit", "contains_private_payload": False},
"notes": list(evaluated.get("notes") or []),
"authority_safe_flag_violations": violations,
# Compatibility fields for compact summaries/tests.
"fixture_id": fixture_id,
"lane": fixture.get("lane"),
}
def run(fixtures_path: Path) -> dict[str, Any]:
data = json.loads(fixtures_path.read_text(encoding="utf-8"))
fixtures = data.get("fixtures")
if not isinstance(fixtures, list) or not fixtures:
raise HarnessError("fixture_set_empty")
decisions = []
started = time.perf_counter()
for fixture in fixtures:
lane = fixture.get("lane")
evaluator = EVALUATORS.get(str(lane))
if evaluator is None:
raise HarnessError(f"unsupported_lane:{lane}")
t0 = time.perf_counter()
evaluated = evaluator(fixture)
decision = build_decision(fixture, evaluated)
decision["latency"]["total_ms"] = round((time.perf_counter() - t0) * 1000, 3)
decisions.append(decision)
counts = Counter(d["outcome"]["label"] for d in decisions)
by_lane: dict[str, Counter[str]] = defaultdict(Counter)
confidence = Counter(d["confidence"]["bucket"] for d in decisions)
recommendations = Counter(d["recommendation"]["label"] for d in decisions)
violations = [d for d in decisions if d["authority_safe_flag_violations"]]
mismatches = [d for d in decisions if d["outcome"]["label"] != d.get("expected_outcome")]
return {
"schema": HARNESS_SCHEMA,
"fixture_file": str(fixtures_path),
"dry_run": True,
"mutations": dict(MUTATION_FLAGS_FALSE),
"totals": {
"fixtures": len(decisions),
"agree": counts.get("agree", 0),
"disagree": counts.get("disagree", 0),
"uncertain": counts.get("uncertain", 0),
"false_positive": counts.get("false_positive", 0),
"false_negative": counts.get("false_negative", 0),
"authority_safe_flag_violations": len(violations),
"expected_outcome_mismatches": len(mismatches),
"wall_ms": round((time.perf_counter() - started) * 1000, 3),
},
"by_lane": lane_summary(decisions),
"confidence_buckets": dict(sorted(confidence.items())),
"recommendations": dict(sorted(recommendations.items())),
"minimum_metrics": minimum_metrics(decisions),
"violations": [{"fixture_id": d["fixture_id"], "flags": d["authority_safe_flag_violations"]} for d in violations],
"mismatches": [{"fixture_id": d["fixture_id"], "outcome": d["outcome"]["label"], "expected_outcome": d.get("expected_outcome")} for d in mismatches],
"decisions": decisions,
}
def percentile(values: list[float], pct: float) -> float | None:
if not values:
return None
ordered = sorted(values)
idx = min(len(ordered) - 1, max(0, round((pct / 100) * (len(ordered) - 1))))
return ordered[idx]
def minimum_metrics(decisions: list[dict[str, Any]]) -> dict[str, Any]:
by_input = Counter(d["input_class"] for d in decisions)
by_service = Counter(d["service"]["name"] for d in decisions)
fallback_kinds = Counter(d["fallback"]["kind"] for d in decisions if d["fallback"]["occurred"])
proof_ok = sum(1 for d in decisions if d["npu_proof"]["proof_ok"] is True)
proof_missing = sum(1 for d in decisions if d["npu_proof"]["proof_ok"] is False)
proof_na = sum(1 for d in decisions if d["npu_proof"]["proof_ok"] is None)
privacy_violations = sum(1 for d in decisions if d["privacy"]["contains_private_payload"] or d["privacy"]["payload_logged"])
side_effects = sum(1 for d in decisions if d["actual_action"]["performed"] or d["actual_action"]["side_effects"])
timeouts = sum(1 for d in decisions if d["latency"].get("timeout"))
lat_by_service: dict[str, dict[str, float | None]] = {}
for service in by_service:
vals = [float(d["latency"]["total_ms"]) for d in decisions if d["service"]["name"] == service]
lat_by_service[service] = {"p50_ms": percentile(vals, 50), "p95_ms": percentile(vals, 95)}
lat_by_input: dict[str, dict[str, float | None]] = {}
for input_class in by_input:
vals = [float(d["latency"]["total_ms"]) for d in decisions if d["input_class"] == input_class]
lat_by_input[input_class] = {"p50_ms": percentile(vals, 50), "p95_ms": percentile(vals, 95)}
outcomes = Counter(d["outcome"]["label"] for d in decisions)
return {
"total_records": len(decisions),
"records_by_input_class": dict(sorted(by_input.items())),
"records_by_service": dict(sorted(by_service.items())),
"privacy_violation_count": privacy_violations,
"actual_side_effect_count": side_effects,
"missing_reference_count": outcomes.get("missing_reference", 0),
"fallback_count": sum(fallback_kinds.values()),
"fallback_counts_by_kind": dict(sorted(fallback_kinds.items())),
"expected_fallback_count": sum(1 for d in decisions if d["fallback"]["occurred"] and d["fallback"]["expected"]),
"unexpected_fallback_count": sum(1 for d in decisions if d["fallback"]["occurred"] and not d["fallback"]["expected"]),
"npu_proof_ok_count": proof_ok,
"npu_proof_missing_count": proof_missing,
"npu_proof_not_applicable_count": proof_na,
"latency_by_service": lat_by_service,
"latency_by_input_class": lat_by_input,
"timeout_count": timeouts,
}
def lane_summary(decisions: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
lanes: dict[str, list[dict[str, Any]]] = defaultdict(list)
for d in decisions:
lanes[str(d["lane"])].append(d)
out = {}
for lane, items in sorted(lanes.items()):
c = Counter(d["outcome"]["label"] for d in items)
out[lane] = {
"fixtures": len(items),
"agree": c.get("agree", 0),
"disagree": c.get("disagree", 0),
"false_positive": c.get("false_positive", 0),
"false_negative": c.get("false_negative", 0),
"uncertain": c.get("uncertain", 0),
"authority_safe_flag_violations": sum(1 for d in items if d["authority_safe_flag_violations"]),
}
return out
def markdown_summary(summary: Mapping[str, Any]) -> str:
totals = summary["totals"]
lines = [
"# NPU advisory dry-run comparison",
"",
f"fixtures: {totals['fixtures']} | agree: {totals['agree']} | disagree: {totals['disagree']} | false_positive: {totals['false_positive']} | false_negative: {totals['false_negative']} | uncertain: {totals['uncertain']}",
f"authority_safe_flag_violations: {totals['authority_safe_flag_violations']} | mutations: all_false",
"",
"| lane | fixtures | agree | false_positive | false_negative | violations |",
"| --- | ---: | ---: | ---: | ---: | ---: |",
]
for lane, row in summary["by_lane"].items():
lines.append(f"| {lane} | {row['fixtures']} | {row['agree']} | {row['false_positive']} | {row['false_negative']} | {row['authority_safe_flag_violations']} |")
if summary.get("violations"):
lines.extend(["", "## Authority-safe flag violations"])
for violation in summary["violations"]:
lines.append(f"- {violation['fixture_id']}: {', '.join(violation['flags'])}")
return "\n".join(lines) + "\n"
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Run synthetic advisory-only NPU dry-run fixture comparisons.")
parser.add_argument("--fixtures", default=str(DEFAULT_FIXTURES), help="Synthetic fixture JSON file")
parser.add_argument("--format", choices=["json", "markdown"], default="json")
parser.add_argument("--include-decisions", action="store_true", help="Include per-fixture decision records in JSON output")
parser.add_argument("--fail-on-mismatch", action="store_true", help="Return non-zero if observed outcome differs from fixture expected_outcome")
parser.add_argument("--fail-on-authority-violation", action="store_true", help="Return non-zero if any fixture exposes may_* authority flags set true")
return parser
def main(argv: list[str] | None = None) -> int:
args = build_parser().parse_args(argv)
try:
summary = run(Path(args.fixtures).expanduser().resolve())
except (OSError, json.JSONDecodeError, HarnessError) as exc:
print(json.dumps({"ok": False, "error": str(exc), "dry_run": True, "mutations": MUTATION_FLAGS_FALSE}, sort_keys=True), file=sys.stderr)
return 2
if args.format == "markdown":
print(markdown_summary(summary), end="")
else:
out = dict(summary)
if not args.include_decisions:
out.pop("decisions", None)
print(json.dumps(out, sort_keys=True, separators=(",", ":")))
if args.fail_on_mismatch and summary["totals"]["expected_outcome_mismatches"]:
return 1
if args.fail_on_authority_violation and summary["totals"]["authority_safe_flag_violations"]:
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())