#!/usr/bin/env python3 """Dry-run Kanban hygiene advisory classifier. Reads compact board/task summaries and emits bounded labels/next gates without mutating any Hermes Kanban state. Phase 1 is deterministic rules only; it does not call kanban tools, restart services, write memory, or send outbound data. """ from __future__ import annotations import argparse import json import re import sys import time from pathlib import Path from typing import Any SCHEMA = "kanban_hygiene_advisory_v1" AUTHORITY = { "may_mutate_board": False, "may_assign": False, "may_block_or_unblock": False, "may_complete_or_archive": False, "may_create_tasks": False, "may_write_memory": False, "may_send_external": False, "may_restart_services": False, "may_execute_tools": False, } NPU_PROOF = { "required_for_npu_claims": True, "attempted": False, "ok": None, "npu_busy_delta_us": None, } REQUIRED_TASK_FIELDS = {"id", "title", "status", "assignee", "created_at", "updated_at"} SUPPORTED_STATUSES = { "triage", "todo", "ready", "running", "blocked", "done", "archived", "failed", "cancelled", } TASK_TYPES = { "charter", "discovery", "spec", "implement", "test", "review", "docs", "ops", "integration", "final", "unknown", } LANES = { "observability_utilization", "cron_n8n_classifier", "rag_context_gate", "doc_image_audio_triage", "voice_audio_pipeline", "kanban_hygiene", "docs_runbook_service_map", "ops_integration", "final_closeout", "general", "unknown", } LIFECYCLE_PREFIXES = { "charter", "discovery", "spec", "implement", "test", "review", "docs", "doc", "ops", "integration", "final", } def compact_text(task: dict[str, Any]) -> str: parts = [str(task.get("title", "")), str(task.get("body_excerpt", "")), str(task.get("last_run_summary_excerpt", "")), str(task.get("last_comment_excerpt", ""))] return " ".join(part for part in parts if part).lower() def load_jsonl(raw: str) -> tuple[list[dict[str, Any]], dict[str, Any]]: tasks = [] for line_no, line in enumerate(raw.splitlines(), start=1): if not line.strip(): continue try: row = json.loads(line) except json.JSONDecodeError as exc: raise ValueError(f"invalid JSONL on line {line_no}: {exc.msg}") from exc if not isinstance(row, dict): raise ValueError(f"JSONL line {line_no} is not an object") tasks.append(row) return tasks, {} def load_input(path: str | None, fmt: str) -> tuple[list[dict[str, Any]], dict[str, Any]]: raw = sys.stdin.read() if not path or path == "-" else Path(path).read_text(encoding="utf-8") if not raw.strip(): raise ValueError("input is empty") parse_as_jsonl = fmt == "jsonl" or (fmt == "auto" and "\n" in raw.strip() and not raw.lstrip().startswith(("{", "["))) if parse_as_jsonl: return load_jsonl(raw) try: data = json.loads(raw) except json.JSONDecodeError as exc: if fmt == "auto" and "\n" in raw.strip(): return load_jsonl(raw) raise ValueError(f"invalid JSON input: {exc.msg}") from exc if isinstance(data, list): if not all(isinstance(item, dict) for item in data): raise ValueError("JSON list must contain task objects") return data, {} if isinstance(data, dict): tasks = data.get("tasks") if tasks is None: # Treat a single object with required task fields as a one-task summary. if REQUIRED_TASK_FIELDS.issubset(data): return [data], {} raise ValueError("JSON object must contain a 'tasks' list") if not isinstance(tasks, list) or not all(isinstance(item, dict) for item in tasks): raise ValueError("'tasks' must be a list of objects") metadata = {key: value for key, value in data.items() if key != "tasks"} return tasks, metadata raise ValueError("input must be JSON object, JSON list, or JSON Lines") def validate_task(task: dict[str, Any]) -> None: missing = sorted(REQUIRED_TASK_FIELDS - set(task)) if missing: task_id = task.get("id", "") raise ValueError(f"task {task_id} missing required fields: {', '.join(missing)}") status = str(task.get("status")) if status not in SUPPORTED_STATUSES: raise ValueError(f"task {task.get('id')} has unsupported status: {status}") for field in ("created_at", "updated_at"): if not isinstance(task.get(field), (int, float)): raise ValueError(f"task {task.get('id')} field {field} must be epoch seconds") def confidence(value: float) -> float: return round(max(0.0, min(1.0, value)), 2) def classify_task_type(task: dict[str, Any]) -> dict[str, Any]: title = str(task.get("title", "")).strip().lower() body = compact_text(task) prefix = title.split(":", 1)[0].strip() if ":" in title else "" prefix_map = {"doc": "docs"} if prefix in LIFECYCLE_PREFIXES: value = prefix_map.get(prefix, prefix) if value in TASK_TYPES: return {"value": value, "confidence": 0.95, "reason_codes": [f"title_prefix_{value}"]} keyword_rules = [ ("discovery", ["discover", "inventory", "repo map", "read-only"]), ("spec", ["spec", "define", "contract", "schema"]), ("implement", ["implement", "engineer", "script", "code", "build"]), ("review", ["review", "approve", "findings"]), ("docs", ["docs", "runbook", "readme"]), ("ops", ["ops", "health", "monitor", "deploy", "cleanup"]), ("integration", ["integration", "merge", "cherry-pick", "fan-in"]), ("final", ["final", "closeout", "synthesis"]), ("test", ["test", "smoke", "validate"]), ("charter", ["charter", "program framing"]), ] for value, needles in keyword_rules: if any(needle in body for needle in needles): return {"value": value, "confidence": 0.78, "reason_codes": [f"keyword_{value}"]} return {"value": "unknown", "confidence": 0.2, "reason_codes": ["insufficient_signal"]} def classify_lane(task: dict[str, Any]) -> dict[str, Any]: text = compact_text(task) rules = [ ("kanban_hygiene", ["kanban", "task hygiene", "board summaries", "review-needed", "next gate"]), ("cron_n8n_classifier", ["cron", "n8n", "alert", "event classifier"]), ("rag_context_gate", ["rag", "context gate", "retrieval", "bundle"]), ("doc_image_audio_triage", ["document", "image", "audio triage", "ocr", "attachments"]), ("voice_audio_pipeline", ["voice", "whisper", "memo", "transcribe"]), ("docs_runbook_service_map", ["service map", "runbook", "readme"]), ("observability_utilization", ["health", "utilization", "metrics", "digest"]), ("ops_integration", ["merge", "integration", "cherry-pick", "fan-in"]), ("final_closeout", ["final", "closeout", "synthesis"]), ] for value, needles in rules: matched = [needle.replace(" ", "_") for needle in needles if needle in text] if matched: return {"value": value, "confidence": 0.9, "reason_codes": [f"mentions_{matched[0]}"]} if text: return {"value": "general", "confidence": 0.45, "reason_codes": ["no_lane_specific_signal"]} return {"value": "unknown", "confidence": 0.1, "reason_codes": ["insufficient_signal"]} def classify_project(task: dict[str, Any], board: str | None, input_metadata: dict[str, Any]) -> dict[str, Any]: explicit = task.get("project") or input_metadata.get("project") if explicit: return {"value": str(explicit), "confidence": 0.9, "source": "input"} board_name = board or input_metadata.get("board") if board_name: return {"value": str(board_name), "confidence": 0.98, "source": "board_name"} text = compact_text(task) if "npu" in text or "openvino" in text: return {"value": "npu-maximization", "confidence": 0.72, "source": "body"} return {"value": "unknown", "confidence": 0.1, "source": "unknown"} def classify_blocker(task: dict[str, Any]) -> dict[str, Any]: status = str(task.get("status")) text = compact_text(task) last_outcome = str(task.get("last_run_outcome") or "").lower() reason_codes: list[str] = [] value = "none" blocked = False conf = 0.0 if status == "blocked": blocked = True conf = 0.85 if "review-required" in text or "changes requested" in text: value = "review_changes_requested" reason_codes.append("blocked_review_required_or_changes") elif any(word in text for word in ("credential", "token", "path", "spawn_failed")): value = "missing_credentials" reason_codes.append("blocked_missing_credentials_or_path") elif any(word in text for word in ("human", "approval", "decision", "confirm")): value = "human_decision" reason_codes.append("blocked_human_decision") else: value = "unknown" reason_codes.append("status_blocked") elif status == "todo" and task.get("parents"): value = "missing_parent" conf = 0.75 reason_codes.append("todo_with_parents") elif last_outcome in {"crashed", "timed_out", "failed"}: value = "failed_tests" conf = 0.65 reason_codes.append(f"last_run_{last_outcome}") return {"value": value, "blocked": blocked, "confidence": confidence(conf), "reason_codes": reason_codes} def age_hours(now: float, timestamp: Any) -> float | None: if not isinstance(timestamp, (int, float)): return None return round(max(0.0, now - float(timestamp)) / 3600.0, 2) def classify_staleness(task: dict[str, Any], now: float) -> dict[str, Any]: status = str(task.get("status")) created = float(task["created_at"]) activity_ts = float(task.get("heartbeat_at") or task.get("last_activity_at") or task.get("updated_at") or created) age = age_hours(now, created) last_activity = age_hours(now, activity_ts) threshold = 24 value = "fresh" reason_codes: list[str] = [] if status == "running": threshold = 1 if last_activity is not None and last_activity > 1: value = "stale_lock" reason_codes.append("running_no_recent_heartbeat") elif status == "ready": threshold = 24 if last_activity is not None and last_activity >= 72: value = "stale" reason_codes.append("ready_over_72h") elif last_activity is not None and last_activity >= 24: value = "aging" reason_codes.append("ready_over_24h") elif status == "blocked": review_required = "review-required" in compact_text(task) threshold = 24 if review_required else 48 if last_activity is not None and last_activity >= 168: value = "stale" reason_codes.append("blocked_over_7d") elif review_required and last_activity is not None and last_activity >= 72: value = "stale" reason_codes.append("review_required_over_72h") elif last_activity is not None and last_activity >= threshold: value = "aging" reason_codes.append("blocked_or_review_aging") elif status == "todo" and not task.get("parents") and last_activity is not None and last_activity >= 72: value = "orphaned" threshold = 72 reason_codes.append("todo_without_parents_over_72h") return { "value": value, "age_hours": age, "last_activity_hours": last_activity, "threshold_hours": threshold, "reason_codes": reason_codes, } def normalize_title(title: str) -> str: text = title.lower().strip() text = re.sub(r"^(charter|discovery|spec|implement|test|review|docs?|ops|integration|final)\s*:\s*", "", text) text = re.sub(r"[^a-z0-9]+", " ", text) return re.sub(r"\s+", " ", text).strip() def find_duplicates(tasks: list[dict[str, Any]], labels: dict[str, dict[str, Any]]) -> dict[str, dict[str, Any]]: groups: dict[tuple[str, str, str], list[str]] = {} active_statuses = SUPPORTED_STATUSES - {"done", "archived", "cancelled"} for task in tasks: if str(task.get("status")) not in active_statuses: continue task_id = str(task["id"]) key = ( normalize_title(str(task.get("title", ""))), labels[task_id]["lane"]["value"], labels[task_id]["task_type"]["value"], ) if key[0]: groups.setdefault(key, []).append(task_id) result = { str(task["id"]): { "is_duplicate": False, "canonical_task_id": None, "candidate_ids": [], "confidence": 0.0, "reason_codes": [], } for task in tasks } for ids in groups.values(): if len(ids) < 2: continue canonical = sorted(ids)[0] for task_id in ids: candidates = [candidate for candidate in ids if candidate != task_id] result[task_id] = { "is_duplicate": task_id != canonical, "canonical_task_id": canonical if task_id != canonical else None, "candidate_ids": candidates, "confidence": 0.86, "reason_codes": ["same_normalized_title_lane_and_task_type"], } return result def has_non_positive_npu_busy_delta(text: str) -> bool: if "npu" not in text and "busy" not in text: return False patterns = [ r"\b(?:npu_)?busy(?:_time)?(?:_delta)?(?:_us)?\s*[=:]\s*([+-]?\d+(?:\.\d+)?)\b", r"\b(?:npu_)?delta(?:_us)?\s*[=:]\s*([+-]?\d+(?:\.\d+)?)\b", ] for pattern in patterns: for match in re.finditer(pattern, text): try: if float(match.group(1)) <= 0: return True except ValueError: continue return False def classify_review_needed(task: dict[str, Any], task_type: str) -> dict[str, Any]: text = compact_text(task) changed_files = task.get("changed_files") or task.get("diff_path") or task.get("tests_run") if has_non_positive_npu_busy_delta(text): return {"value": True, "kind": "npu_proof_gate", "confidence": 0.84, "reason_codes": ["npu_claim_non_positive_busy_delta"]} if "npu" in text and ("http 200" in text or "no busy" in text or "missing busy" in text): return {"value": True, "kind": "npu_proof_gate", "confidence": 0.8, "reason_codes": ["npu_claim_needs_busy_delta"]} if "review-required" in text: kind = "code_change" if task_type == "implement" else "spec_review" return {"value": True, "kind": kind, "confidence": 0.92, "reason_codes": ["review_required_marker"]} if changed_files and task_type in {"implement", "ops", "docs"}: return {"value": True, "kind": "code_change", "confidence": 0.86, "reason_codes": ["reported_changed_files_or_tests"]} if any(needle in text for needle in ("routing authority", "restart service", "write memory", "send outbound", "private root", "wildcard bind", "vector db mutation")): return {"value": True, "kind": "human_approval", "confidence": 0.84, "reason_codes": ["authority_change_requires_approval"]} return {"value": False, "kind": "none", "confidence": 0.2, "reason_codes": []} def classify_next_gate(task: dict[str, Any], labels: dict[str, Any]) -> dict[str, Any]: task_type = labels["task_type"]["value"] status = str(task.get("status")) reason_codes: list[str] = [] if labels["duplicate"]["is_duplicate"]: return {"value": "dedupe_review", "confidence": 0.86, "reason_codes": ["duplicate_candidate"]} if labels["staleness"]["value"] == "stale_lock": return {"value": "investigate_stale_lock", "confidence": 0.88, "reason_codes": ["running_stale_lock"]} blocker = labels["blocker"] if blocker["value"] in {"human_decision", "missing_credentials", "unknown"} and blocker["blocked"]: return {"value": "needs_human_decision", "confidence": 0.85, "reason_codes": blocker["reason_codes"] or ["blocked"]} if blocker["value"] == "missing_parent": return {"value": "wait_for_parents", "confidence": 0.82, "reason_codes": ["unfinished_parents"]} if task_type == "implement" and not (task.get("tests_run") or task.get("test_evidence")) and status in {"blocked", "done"}: return {"value": "needs_test_evidence", "confidence": 0.78, "reason_codes": ["implementation_without_test_evidence"]} review_needed = labels["review_needed"] if review_needed["kind"] == "npu_proof_gate": return {"value": "needs_npu_proof", "confidence": 0.8, "reason_codes": review_needed["reason_codes"]} if review_needed["value"]: return {"value": "ready_for_review", "confidence": 0.86, "reason_codes": review_needed["reason_codes"]} gate_by_type = { "spec": "ready_for_implementation", "implement": "ready_for_review", "review": "ready_for_integration", "docs": "ready_for_integration", "ops": "ready_for_ops_validation", "integration": "ready_for_closeout", "final": "safe_to_complete", "discovery": "safe_to_complete", "charter": "ready_for_spec", "test": "ready_for_review", } type_gate = gate_by_type.get(task_type, "unknown") if task_type in gate_by_type: reason_codes.append(f"task_type_{task_type}") return {"value": type_gate, "confidence": 0.74 if type_gate != "unknown" else 0.2, "reason_codes": reason_codes} def advisory(tasks: list[dict[str, Any]], *, board: str | None, now: float, input_metadata: dict[str, Any], include_evidence: bool) -> dict[str, Any]: for task in tasks: validate_task(task) prelim: dict[str, dict[str, Any]] = {} for task in tasks: task_id = str(task["id"]) prelim[task_id] = { "task_type": classify_task_type(task), "project": classify_project(task, board, input_metadata), "lane": classify_lane(task), "blocker": classify_blocker(task), "staleness": classify_staleness(task, now), } duplicates = find_duplicates(tasks, prelim) items = [] for task in tasks: task_id = str(task["id"]) labels = dict(prelim[task_id]) labels["duplicate"] = duplicates[task_id] labels["review_needed"] = classify_review_needed(task, labels["task_type"]["value"]) labels["next_gate"] = classify_next_gate(task, labels) item = { "task_id": task_id, **labels, "warnings": [], } if include_evidence: item["evidence"] = { "normalized_title": normalize_title(str(task.get("title", ""))), "status": task.get("status"), "parents_count": len(task.get("parents") or []), "children_count": len(task.get("children") or []), } items.append(item) counts = { "tasks": len(items), "duplicates": sum(1 for item in items if item["duplicate"]["is_duplicate"]), "review_needed": sum(1 for item in items if item["review_needed"]["value"]), "stale": sum(1 for item in items if item["staleness"]["value"] in {"stale", "stale_lock", "orphaned"}), "blocked": sum(1 for item in items if item["blocker"]["blocked"]), } return { "schema": SCHEMA, "dry_run": True, "created": int(now), "board": board or input_metadata.get("board") or None, "counts": counts, "authority": AUTHORITY, "npu_proof": NPU_PROOF, "items": items, } def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Dry-run Kanban hygiene advisory classifier", epilog="Input: JSON object with tasks[] or JSONL task objects. Required task fields: id,title,status,assignee,created_at,updated_at. Optional compact fields such as body_excerpt, parents, children, changed_files, tests_run, last_run_outcome, and last_comment_excerpt improve labels.", ) parser.add_argument("--input", "-i", help="Input JSON/JSONL file; omit or '-' for stdin") parser.add_argument("--format", choices=["auto", "json", "jsonl"], default="auto", help="Input format") parser.add_argument("--board", help="Board/project name to include in output") parser.add_argument("--now", type=float, default=None, help="Epoch seconds for deterministic staleness tests") parser.add_argument("--compact", action="store_true", help="Accepted for compatibility; output is compact JSON by default") parser.add_argument("--include-evidence", action="store_true", help="Include short derived evidence fields") return parser def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) try: tasks, metadata = load_input(args.input, args.format) output = advisory( tasks, board=args.board, now=args.now if args.now is not None else time.time(), input_metadata=metadata, include_evidence=args.include_evidence, ) except (OSError, ValueError) as exc: print(f"kanban-hygiene-advisory: {exc}", file=sys.stderr) return 2 print(json.dumps(output, sort_keys=True, separators=(",", ":"))) return 0 if __name__ == "__main__": raise SystemExit(main())