diff --git a/scripts/kanban-hygiene-advisory.py b/scripts/kanban-hygiene-advisory.py new file mode 100755 index 0000000..f774375 --- /dev/null +++ b/scripts/kanban-hygiene-advisory.py @@ -0,0 +1,526 @@ +#!/usr/bin/env python3 +"""Dry-run Kanban hygiene advisory classifier. + +Reads compact board/task summaries and emits bounded labels/next gates without +mutating any Hermes Kanban state. Phase 1 is deterministic rules only; it does +not call kanban tools, restart services, write memory, or send outbound data. +""" + +from __future__ import annotations + +import argparse +import json +import re +import sys +import time +from pathlib import Path +from typing import Any + +SCHEMA = "kanban_hygiene_advisory_v1" +AUTHORITY = { + "may_mutate_board": False, + "may_assign": False, + "may_block_or_unblock": False, + "may_complete_or_archive": False, + "may_create_tasks": False, + "may_write_memory": False, + "may_send_external": False, + "may_restart_services": False, + "may_execute_tools": False, +} +NPU_PROOF = { + "required_for_npu_claims": True, + "attempted": False, + "ok": None, + "npu_busy_delta_us": None, +} + +REQUIRED_TASK_FIELDS = {"id", "title", "status", "assignee", "created_at", "updated_at"} +SUPPORTED_STATUSES = { + "triage", + "todo", + "ready", + "running", + "blocked", + "done", + "archived", + "failed", + "cancelled", +} +TASK_TYPES = { + "charter", + "discovery", + "spec", + "implement", + "test", + "review", + "docs", + "ops", + "integration", + "final", + "unknown", +} +LANES = { + "observability_utilization", + "cron_n8n_classifier", + "rag_context_gate", + "doc_image_audio_triage", + "voice_audio_pipeline", + "kanban_hygiene", + "docs_runbook_service_map", + "ops_integration", + "final_closeout", + "general", + "unknown", +} +LIFECYCLE_PREFIXES = { + "charter", + "discovery", + "spec", + "implement", + "test", + "review", + "docs", + "doc", + "ops", + "integration", + "final", +} + + +def compact_text(task: dict[str, Any]) -> str: + parts = [str(task.get("title", "")), str(task.get("body_excerpt", "")), str(task.get("last_run_summary_excerpt", "")), str(task.get("last_comment_excerpt", ""))] + return " ".join(part for part in parts if part).lower() + + +def load_jsonl(raw: str) -> tuple[list[dict[str, Any]], dict[str, Any]]: + tasks = [] + for line_no, line in enumerate(raw.splitlines(), start=1): + if not line.strip(): + continue + try: + row = json.loads(line) + except json.JSONDecodeError as exc: + raise ValueError(f"invalid JSONL on line {line_no}: {exc.msg}") from exc + if not isinstance(row, dict): + raise ValueError(f"JSONL line {line_no} is not an object") + tasks.append(row) + return tasks, {} + + +def load_input(path: str | None, fmt: str) -> tuple[list[dict[str, Any]], dict[str, Any]]: + raw = sys.stdin.read() if not path or path == "-" else Path(path).read_text(encoding="utf-8") + if not raw.strip(): + raise ValueError("input is empty") + + parse_as_jsonl = fmt == "jsonl" or (fmt == "auto" and "\n" in raw.strip() and not raw.lstrip().startswith(("{", "["))) + if parse_as_jsonl: + return load_jsonl(raw) + + try: + data = json.loads(raw) + except json.JSONDecodeError as exc: + if fmt == "auto" and "\n" in raw.strip(): + return load_jsonl(raw) + raise ValueError(f"invalid JSON input: {exc.msg}") from exc + if isinstance(data, list): + if not all(isinstance(item, dict) for item in data): + raise ValueError("JSON list must contain task objects") + return data, {} + if isinstance(data, dict): + tasks = data.get("tasks") + if tasks is None: + # Treat a single object with required task fields as a one-task summary. + if REQUIRED_TASK_FIELDS.issubset(data): + return [data], {} + raise ValueError("JSON object must contain a 'tasks' list") + if not isinstance(tasks, list) or not all(isinstance(item, dict) for item in tasks): + raise ValueError("'tasks' must be a list of objects") + metadata = {key: value for key, value in data.items() if key != "tasks"} + return tasks, metadata + raise ValueError("input must be JSON object, JSON list, or JSON Lines") + + +def validate_task(task: dict[str, Any]) -> None: + missing = sorted(REQUIRED_TASK_FIELDS - set(task)) + if missing: + task_id = task.get("id", "") + raise ValueError(f"task {task_id} missing required fields: {', '.join(missing)}") + status = str(task.get("status")) + if status not in SUPPORTED_STATUSES: + raise ValueError(f"task {task.get('id')} has unsupported status: {status}") + for field in ("created_at", "updated_at"): + if not isinstance(task.get(field), (int, float)): + raise ValueError(f"task {task.get('id')} field {field} must be epoch seconds") + + +def confidence(value: float) -> float: + return round(max(0.0, min(1.0, value)), 2) + + +def classify_task_type(task: dict[str, Any]) -> dict[str, Any]: + title = str(task.get("title", "")).strip().lower() + body = compact_text(task) + prefix = title.split(":", 1)[0].strip() if ":" in title else "" + prefix_map = {"doc": "docs"} + if prefix in LIFECYCLE_PREFIXES: + value = prefix_map.get(prefix, prefix) + if value in TASK_TYPES: + return {"value": value, "confidence": 0.95, "reason_codes": [f"title_prefix_{value}"]} + keyword_rules = [ + ("discovery", ["discover", "inventory", "repo map", "read-only"]), + ("spec", ["spec", "define", "contract", "schema"]), + ("implement", ["implement", "engineer", "script", "code", "build"]), + ("review", ["review", "approve", "findings"]), + ("docs", ["docs", "runbook", "readme"]), + ("ops", ["ops", "health", "monitor", "deploy", "cleanup"]), + ("integration", ["integration", "merge", "cherry-pick", "fan-in"]), + ("final", ["final", "closeout", "synthesis"]), + ("test", ["test", "smoke", "validate"]), + ("charter", ["charter", "program framing"]), + ] + for value, needles in keyword_rules: + if any(needle in body for needle in needles): + return {"value": value, "confidence": 0.78, "reason_codes": [f"keyword_{value}"]} + return {"value": "unknown", "confidence": 0.2, "reason_codes": ["insufficient_signal"]} + + +def classify_lane(task: dict[str, Any]) -> dict[str, Any]: + text = compact_text(task) + rules = [ + ("kanban_hygiene", ["kanban", "task hygiene", "board summaries", "review-needed", "next gate"]), + ("cron_n8n_classifier", ["cron", "n8n", "alert", "event classifier"]), + ("rag_context_gate", ["rag", "context gate", "retrieval", "bundle"]), + ("doc_image_audio_triage", ["document", "image", "audio triage", "ocr", "attachments"]), + ("voice_audio_pipeline", ["voice", "whisper", "memo", "transcribe"]), + ("docs_runbook_service_map", ["service map", "runbook", "readme"]), + ("observability_utilization", ["health", "utilization", "metrics", "digest"]), + ("ops_integration", ["merge", "integration", "cherry-pick", "fan-in"]), + ("final_closeout", ["final", "closeout", "synthesis"]), + ] + for value, needles in rules: + matched = [needle.replace(" ", "_") for needle in needles if needle in text] + if matched: + return {"value": value, "confidence": 0.9, "reason_codes": [f"mentions_{matched[0]}"]} + if text: + return {"value": "general", "confidence": 0.45, "reason_codes": ["no_lane_specific_signal"]} + return {"value": "unknown", "confidence": 0.1, "reason_codes": ["insufficient_signal"]} + + +def classify_project(task: dict[str, Any], board: str | None, input_metadata: dict[str, Any]) -> dict[str, Any]: + explicit = task.get("project") or input_metadata.get("project") + if explicit: + return {"value": str(explicit), "confidence": 0.9, "source": "input"} + board_name = board or input_metadata.get("board") + if board_name: + return {"value": str(board_name), "confidence": 0.98, "source": "board_name"} + text = compact_text(task) + if "npu" in text or "openvino" in text: + return {"value": "npu-maximization", "confidence": 0.72, "source": "body"} + return {"value": "unknown", "confidence": 0.1, "source": "unknown"} + + +def classify_blocker(task: dict[str, Any]) -> dict[str, Any]: + status = str(task.get("status")) + text = compact_text(task) + last_outcome = str(task.get("last_run_outcome") or "").lower() + reason_codes: list[str] = [] + value = "none" + blocked = False + conf = 0.0 + + if status == "blocked": + blocked = True + conf = 0.85 + if "review-required" in text or "changes requested" in text: + value = "review_changes_requested" + reason_codes.append("blocked_review_required_or_changes") + elif any(word in text for word in ("credential", "token", "path", "spawn_failed")): + value = "missing_credentials" + reason_codes.append("blocked_missing_credentials_or_path") + elif any(word in text for word in ("human", "approval", "decision", "confirm")): + value = "human_decision" + reason_codes.append("blocked_human_decision") + else: + value = "unknown" + reason_codes.append("status_blocked") + elif status == "todo" and task.get("parents"): + value = "missing_parent" + conf = 0.75 + reason_codes.append("todo_with_parents") + elif last_outcome in {"crashed", "timed_out", "failed"}: + value = "failed_tests" + conf = 0.65 + reason_codes.append(f"last_run_{last_outcome}") + + return {"value": value, "blocked": blocked, "confidence": confidence(conf), "reason_codes": reason_codes} + + +def age_hours(now: float, timestamp: Any) -> float | None: + if not isinstance(timestamp, (int, float)): + return None + return round(max(0.0, now - float(timestamp)) / 3600.0, 2) + + +def classify_staleness(task: dict[str, Any], now: float) -> dict[str, Any]: + status = str(task.get("status")) + created = float(task["created_at"]) + activity_ts = float(task.get("heartbeat_at") or task.get("last_activity_at") or task.get("updated_at") or created) + age = age_hours(now, created) + last_activity = age_hours(now, activity_ts) + threshold = 24 + value = "fresh" + reason_codes: list[str] = [] + + if status == "running": + threshold = 1 + if last_activity is not None and last_activity > 1: + value = "stale_lock" + reason_codes.append("running_no_recent_heartbeat") + elif status == "ready": + threshold = 24 + if last_activity is not None and last_activity >= 72: + value = "stale" + reason_codes.append("ready_over_72h") + elif last_activity is not None and last_activity >= 24: + value = "aging" + reason_codes.append("ready_over_24h") + elif status == "blocked": + review_required = "review-required" in compact_text(task) + threshold = 24 if review_required else 48 + if last_activity is not None and last_activity >= 168: + value = "stale" + reason_codes.append("blocked_over_7d") + elif review_required and last_activity is not None and last_activity >= 72: + value = "stale" + reason_codes.append("review_required_over_72h") + elif last_activity is not None and last_activity >= threshold: + value = "aging" + reason_codes.append("blocked_or_review_aging") + elif status == "todo" and not task.get("parents") and last_activity is not None and last_activity >= 72: + value = "orphaned" + threshold = 72 + reason_codes.append("todo_without_parents_over_72h") + + return { + "value": value, + "age_hours": age, + "last_activity_hours": last_activity, + "threshold_hours": threshold, + "reason_codes": reason_codes, + } + + +def normalize_title(title: str) -> str: + text = title.lower().strip() + text = re.sub(r"^(charter|discovery|spec|implement|test|review|docs?|ops|integration|final)\s*:\s*", "", text) + text = re.sub(r"[^a-z0-9]+", " ", text) + return re.sub(r"\s+", " ", text).strip() + + +def find_duplicates(tasks: list[dict[str, Any]], labels: dict[str, dict[str, Any]]) -> dict[str, dict[str, Any]]: + groups: dict[tuple[str, str, str], list[str]] = {} + active_statuses = SUPPORTED_STATUSES - {"done", "archived", "cancelled"} + for task in tasks: + if str(task.get("status")) not in active_statuses: + continue + task_id = str(task["id"]) + key = ( + normalize_title(str(task.get("title", ""))), + labels[task_id]["lane"]["value"], + labels[task_id]["task_type"]["value"], + ) + if key[0]: + groups.setdefault(key, []).append(task_id) + + result = { + str(task["id"]): { + "is_duplicate": False, + "canonical_task_id": None, + "candidate_ids": [], + "confidence": 0.0, + "reason_codes": [], + } + for task in tasks + } + for ids in groups.values(): + if len(ids) < 2: + continue + canonical = sorted(ids)[0] + for task_id in ids: + candidates = [candidate for candidate in ids if candidate != task_id] + result[task_id] = { + "is_duplicate": task_id != canonical, + "canonical_task_id": canonical if task_id != canonical else None, + "candidate_ids": candidates, + "confidence": 0.86, + "reason_codes": ["same_normalized_title_lane_and_task_type"], + } + return result + + +def has_non_positive_npu_busy_delta(text: str) -> bool: + if "npu" not in text and "busy" not in text: + return False + patterns = [ + r"\b(?:npu_)?busy(?:_time)?(?:_delta)?(?:_us)?\s*[=:]\s*([+-]?\d+(?:\.\d+)?)\b", + r"\b(?:npu_)?delta(?:_us)?\s*[=:]\s*([+-]?\d+(?:\.\d+)?)\b", + ] + for pattern in patterns: + for match in re.finditer(pattern, text): + try: + if float(match.group(1)) <= 0: + return True + except ValueError: + continue + return False + + +def classify_review_needed(task: dict[str, Any], task_type: str) -> dict[str, Any]: + text = compact_text(task) + changed_files = task.get("changed_files") or task.get("diff_path") or task.get("tests_run") + if has_non_positive_npu_busy_delta(text): + return {"value": True, "kind": "npu_proof_gate", "confidence": 0.84, "reason_codes": ["npu_claim_non_positive_busy_delta"]} + if "npu" in text and ("http 200" in text or "no busy" in text or "missing busy" in text): + return {"value": True, "kind": "npu_proof_gate", "confidence": 0.8, "reason_codes": ["npu_claim_needs_busy_delta"]} + if "review-required" in text: + kind = "code_change" if task_type == "implement" else "spec_review" + return {"value": True, "kind": kind, "confidence": 0.92, "reason_codes": ["review_required_marker"]} + if changed_files and task_type in {"implement", "ops", "docs"}: + return {"value": True, "kind": "code_change", "confidence": 0.86, "reason_codes": ["reported_changed_files_or_tests"]} + if any(needle in text for needle in ("routing authority", "restart service", "write memory", "send outbound", "private root", "wildcard bind", "vector db mutation")): + return {"value": True, "kind": "human_approval", "confidence": 0.84, "reason_codes": ["authority_change_requires_approval"]} + return {"value": False, "kind": "none", "confidence": 0.2, "reason_codes": []} + + +def classify_next_gate(task: dict[str, Any], labels: dict[str, Any]) -> dict[str, Any]: + task_type = labels["task_type"]["value"] + status = str(task.get("status")) + reason_codes: list[str] = [] + + if labels["duplicate"]["is_duplicate"]: + return {"value": "dedupe_review", "confidence": 0.86, "reason_codes": ["duplicate_candidate"]} + if labels["staleness"]["value"] == "stale_lock": + return {"value": "investigate_stale_lock", "confidence": 0.88, "reason_codes": ["running_stale_lock"]} + blocker = labels["blocker"] + if blocker["value"] in {"human_decision", "missing_credentials", "unknown"} and blocker["blocked"]: + return {"value": "needs_human_decision", "confidence": 0.85, "reason_codes": blocker["reason_codes"] or ["blocked"]} + if blocker["value"] == "missing_parent": + return {"value": "wait_for_parents", "confidence": 0.82, "reason_codes": ["unfinished_parents"]} + if task_type == "implement" and not (task.get("tests_run") or task.get("test_evidence")) and status in {"blocked", "done"}: + return {"value": "needs_test_evidence", "confidence": 0.78, "reason_codes": ["implementation_without_test_evidence"]} + review_needed = labels["review_needed"] + if review_needed["kind"] == "npu_proof_gate": + return {"value": "needs_npu_proof", "confidence": 0.8, "reason_codes": review_needed["reason_codes"]} + if review_needed["value"]: + return {"value": "ready_for_review", "confidence": 0.86, "reason_codes": review_needed["reason_codes"]} + + gate_by_type = { + "spec": "ready_for_implementation", + "implement": "ready_for_review", + "review": "ready_for_integration", + "docs": "ready_for_integration", + "ops": "ready_for_ops_validation", + "integration": "ready_for_closeout", + "final": "safe_to_complete", + "discovery": "safe_to_complete", + "charter": "ready_for_spec", + "test": "ready_for_review", + } + type_gate = gate_by_type.get(task_type, "unknown") + if task_type in gate_by_type: + reason_codes.append(f"task_type_{task_type}") + return {"value": type_gate, "confidence": 0.74 if type_gate != "unknown" else 0.2, "reason_codes": reason_codes} + + +def advisory(tasks: list[dict[str, Any]], *, board: str | None, now: float, input_metadata: dict[str, Any], include_evidence: bool) -> dict[str, Any]: + for task in tasks: + validate_task(task) + + prelim: dict[str, dict[str, Any]] = {} + for task in tasks: + task_id = str(task["id"]) + prelim[task_id] = { + "task_type": classify_task_type(task), + "project": classify_project(task, board, input_metadata), + "lane": classify_lane(task), + "blocker": classify_blocker(task), + "staleness": classify_staleness(task, now), + } + duplicates = find_duplicates(tasks, prelim) + + items = [] + for task in tasks: + task_id = str(task["id"]) + labels = dict(prelim[task_id]) + labels["duplicate"] = duplicates[task_id] + labels["review_needed"] = classify_review_needed(task, labels["task_type"]["value"]) + labels["next_gate"] = classify_next_gate(task, labels) + item = { + "task_id": task_id, + **labels, + "warnings": [], + } + if include_evidence: + item["evidence"] = { + "normalized_title": normalize_title(str(task.get("title", ""))), + "status": task.get("status"), + "parents_count": len(task.get("parents") or []), + "children_count": len(task.get("children") or []), + } + items.append(item) + + counts = { + "tasks": len(items), + "duplicates": sum(1 for item in items if item["duplicate"]["is_duplicate"]), + "review_needed": sum(1 for item in items if item["review_needed"]["value"]), + "stale": sum(1 for item in items if item["staleness"]["value"] in {"stale", "stale_lock", "orphaned"}), + "blocked": sum(1 for item in items if item["blocker"]["blocked"]), + } + return { + "schema": SCHEMA, + "dry_run": True, + "created": int(now), + "board": board or input_metadata.get("board") or None, + "counts": counts, + "authority": AUTHORITY, + "npu_proof": NPU_PROOF, + "items": items, + } + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Dry-run Kanban hygiene advisory classifier", + epilog="Input: JSON object with tasks[] or JSONL task objects. Required task fields: id,title,status,assignee,created_at,updated_at. Optional compact fields such as body_excerpt, parents, children, changed_files, tests_run, last_run_outcome, and last_comment_excerpt improve labels.", + ) + parser.add_argument("--input", "-i", help="Input JSON/JSONL file; omit or '-' for stdin") + parser.add_argument("--format", choices=["auto", "json", "jsonl"], default="auto", help="Input format") + parser.add_argument("--board", help="Board/project name to include in output") + parser.add_argument("--now", type=float, default=None, help="Epoch seconds for deterministic staleness tests") + parser.add_argument("--compact", action="store_true", help="Accepted for compatibility; output is compact JSON by default") + parser.add_argument("--include-evidence", action="store_true", help="Include short derived evidence fields") + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + try: + tasks, metadata = load_input(args.input, args.format) + output = advisory( + tasks, + board=args.board, + now=args.now if args.now is not None else time.time(), + input_metadata=metadata, + include_evidence=args.include_evidence, + ) + except (OSError, ValueError) as exc: + print(f"kanban-hygiene-advisory: {exc}", file=sys.stderr) + return 2 + print(json.dumps(output, sort_keys=True, separators=(",", ":"))) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_kanban_hygiene_advisory.py b/tests/test_kanban_hygiene_advisory.py new file mode 100644 index 0000000..2678d80 --- /dev/null +++ b/tests/test_kanban_hygiene_advisory.py @@ -0,0 +1,198 @@ +from __future__ import annotations + +import importlib.util +import json +import subprocess +import sys +import tempfile +import unittest +from pathlib import Path +from typing import cast + +ROOT = Path(__file__).resolve().parents[1] +MODULE_PATH = ROOT / "scripts" / "kanban-hygiene-advisory.py" + + +def load_module(): + spec = importlib.util.spec_from_file_location("kanban_hygiene_advisory", MODULE_PATH) + assert spec and spec.loader + module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + + +def task(task_id: str, title: str, status: str = "ready", **extra): + row = { + "id": task_id, + "title": title, + "status": status, + "assignee": "engineer", + "created_at": 1_780_000_000, + "updated_at": 1_780_000_100, + } + row.update(extra) + return row + + +class KanbanHygieneAdvisoryTests(unittest.TestCase): + def setUp(self): + self.mod = load_module() + + def advisory(self, tasks, now=1_780_003_600): + return self.mod.advisory( + tasks, + board="npu-maximization", + now=now, + input_metadata={}, + include_evidence=False, + ) + + def test_output_contract_and_authority_flags_are_all_false(self): + output = self.advisory([ + task("t_spec", "spec: Kanban/task hygiene classifier", body_excerpt="Define dry-run labels and next gate.") + ]) + self.assertEqual(output["schema"], "kanban_hygiene_advisory_v1") + self.assertTrue(output["dry_run"]) + self.assertEqual(output["counts"]["tasks"], 1) + self.assertTrue(output["npu_proof"]["required_for_npu_claims"]) + self.assertFalse(output["npu_proof"]["attempted"]) + self.assertTrue(output["authority"]) + self.assertTrue(all(value is False for value in output["authority"].values())) + + def test_required_labels_and_kanban_lane_gate(self): + output = self.advisory([ + task("t1", "spec: Kanban/task hygiene classifier", body_excerpt="Read board summaries and suggest review-needed next gate labels.") + ]) + item = output["items"][0] + for key in ["task_type", "project", "lane", "blocker", "staleness", "duplicate", "review_needed", "next_gate"]: + self.assertIn(key, item) + self.assertEqual(item["task_type"]["value"], "spec") + self.assertEqual(item["project"]["value"], "npu-maximization") + self.assertEqual(item["lane"]["value"], "kanban_hygiene") + self.assertEqual(item["next_gate"]["value"], "ready_for_implementation") + + def test_lifecycle_chain_is_not_duplicate_even_with_same_normalized_title(self): + rows = [ + task("t_spec", "spec: Kanban hygiene advisory", children=["t_impl"]), + task("t_impl", "implement: Kanban hygiene advisory", parents=["t_spec"], children=["t_review"]), + task("t_review", "review: Kanban hygiene advisory", parents=["t_impl"]), + ] + output = self.advisory(rows) + self.assertEqual(output["counts"]["duplicates"], 0) + self.assertTrue(all(not item["duplicate"]["is_duplicate"] for item in output["items"])) + + def test_duplicate_same_type_lane_and_normalized_title_is_flagged(self): + rows = [ + task("t_a", "implement: dry-run Kanban hygiene advisory", body_excerpt="Kanban board summaries"), + task("t_b", "implement: dry run kanban hygiene advisory", body_excerpt="Kanban board summaries"), + ] + output = self.advisory(rows) + self.assertEqual(output["counts"]["duplicates"], 1) + dupes = [item for item in output["items"] if item["duplicate"]["is_duplicate"]] + self.assertEqual(len(dupes), 1) + self.assertEqual(dupes[0]["next_gate"]["value"], "dedupe_review") + + def test_staleness_is_deterministic_with_now(self): + output = self.advisory([ + task("t_run", "implement: NPU service", status="running", updated_at=1_780_000_000, heartbeat_at=1_780_000_000) + ], now=1_780_007_201) + item = output["items"][0] + self.assertEqual(item["staleness"]["value"], "stale_lock") + self.assertEqual(item["next_gate"]["value"], "investigate_stale_lock") + self.assertEqual(output["counts"]["stale"], 1) + + def test_review_required_marker_sets_ready_for_review(self): + output = self.advisory([ + task( + "t_impl", + "implement: dry-run Kanban hygiene advisory", + status="blocked", + body_excerpt="review-required: code change needs review", + changed_files=["scripts/kanban-hygiene-advisory.py"], + tests_run=8, + ) + ]) + item = output["items"][0] + self.assertTrue(item["review_needed"]["value"]) + self.assertEqual(item["review_needed"]["kind"], "code_change") + self.assertEqual(item["next_gate"]["value"], "ready_for_review") + + def test_missing_parent_waits_without_marking_blocked(self): + output = self.advisory([ + task("t_child", "implement: context gate", status="todo", parents=["t_parent"], body_excerpt="RAG context gate") + ]) + item = output["items"][0] + self.assertEqual(item["blocker"]["value"], "missing_parent") + self.assertFalse(item["blocker"]["blocked"]) + self.assertEqual(item["next_gate"]["value"], "wait_for_parents") + + def test_npu_claim_without_busy_delta_routes_to_proof_gate(self): + for excerpt in [ + "NPU classifier returned HTTP 200 but missing busy delta evidence", + "NPU reranker reported npu_busy_delta_us=0", + "NPU reranker reported npu_busy_delta_us=-5", + "NPU reranker reported npu_busy_delta_us=-0.1", + ]: + with self.subTest(excerpt=excerpt): + output = self.advisory([task("t_npu", "test: NPU classifier smoke", body_excerpt=excerpt)]) + item = output["items"][0] + self.assertTrue(item["review_needed"]["value"]) + self.assertEqual(item["review_needed"]["kind"], "npu_proof_gate") + self.assertEqual(item["next_gate"]["value"], "needs_npu_proof") + + def test_npu_proof_gate_dominates_review_required_marker(self): + for excerpt in [ + "review-required: NPU reranker reported npu_busy_delta_us=0 after smoke", + "review-required: NPU classifier returned HTTP 200 but missing busy delta evidence", + ]: + with self.subTest(excerpt=excerpt): + output = self.advisory([ + task( + "t_npu_review", + "implement: NPU classifier smoke", + status="blocked", + body_excerpt=excerpt, + changed_files=["scripts/npu-classifier.py"], + tests_run=1, + ) + ]) + item = output["items"][0] + self.assertTrue(item["review_needed"]["value"]) + self.assertEqual(item["review_needed"]["kind"], "npu_proof_gate") + self.assertEqual(item["next_gate"]["value"], "needs_npu_proof") + + def test_cli_accepts_jsonl_auto_format_and_invalid_schema_exits_nonzero(self): + good_rows = [ + json.dumps(task("t1", "docs: service map update", body_excerpt="runbook README")), + json.dumps(task("t2", "ops: utilization digest", body_excerpt="health metrics digest")), + ] + with tempfile.NamedTemporaryFile("w", suffix=".jsonl", delete=False) as handle: + handle.write("\n".join(good_rows)) + good_path = handle.name + try: + result = subprocess.run( + [sys.executable, str(MODULE_PATH), "--input", good_path, "--board", "npu-maximization", "--now", "1780003600"], + capture_output=True, + text=True, + check=False, + ) + finally: + Path(good_path).unlink(missing_ok=True) + self.assertEqual(result.returncode, 0, result.stderr) + parsed = json.loads(result.stdout) + self.assertEqual(parsed["counts"]["tasks"], 2) + + bad = subprocess.run( + [sys.executable, str(MODULE_PATH)], + input=json.dumps({"tasks": [{"id": "missing-fields"}]}), + capture_output=True, + text=True, + check=False, + ) + self.assertNotEqual(bad.returncode, 0) + self.assertIn("missing required fields", bad.stderr) + + +if __name__ == "__main__": + unittest.main()