Files
swarm-master/scripts/kanban-hygiene-advisory.py
2026-06-05 15:52:43 -07:00

527 lines
21 KiB
Python
Executable File

#!/usr/bin/env python3
"""Dry-run Kanban hygiene advisory classifier.
Reads compact board/task summaries and emits bounded labels/next gates without
mutating any Hermes Kanban state. Phase 1 is deterministic rules only; it does
not call kanban tools, restart services, write memory, or send outbound data.
"""
from __future__ import annotations
import argparse
import json
import re
import sys
import time
from pathlib import Path
from typing import Any
SCHEMA = "kanban_hygiene_advisory_v1"
AUTHORITY = {
"may_mutate_board": False,
"may_assign": False,
"may_block_or_unblock": False,
"may_complete_or_archive": False,
"may_create_tasks": False,
"may_write_memory": False,
"may_send_external": False,
"may_restart_services": False,
"may_execute_tools": False,
}
NPU_PROOF = {
"required_for_npu_claims": True,
"attempted": False,
"ok": None,
"npu_busy_delta_us": None,
}
REQUIRED_TASK_FIELDS = {"id", "title", "status", "assignee", "created_at", "updated_at"}
SUPPORTED_STATUSES = {
"triage",
"todo",
"ready",
"running",
"blocked",
"done",
"archived",
"failed",
"cancelled",
}
TASK_TYPES = {
"charter",
"discovery",
"spec",
"implement",
"test",
"review",
"docs",
"ops",
"integration",
"final",
"unknown",
}
LANES = {
"observability_utilization",
"cron_n8n_classifier",
"rag_context_gate",
"doc_image_audio_triage",
"voice_audio_pipeline",
"kanban_hygiene",
"docs_runbook_service_map",
"ops_integration",
"final_closeout",
"general",
"unknown",
}
LIFECYCLE_PREFIXES = {
"charter",
"discovery",
"spec",
"implement",
"test",
"review",
"docs",
"doc",
"ops",
"integration",
"final",
}
def compact_text(task: dict[str, Any]) -> str:
parts = [str(task.get("title", "")), str(task.get("body_excerpt", "")), str(task.get("last_run_summary_excerpt", "")), str(task.get("last_comment_excerpt", ""))]
return " ".join(part for part in parts if part).lower()
def load_jsonl(raw: str) -> tuple[list[dict[str, Any]], dict[str, Any]]:
tasks = []
for line_no, line in enumerate(raw.splitlines(), start=1):
if not line.strip():
continue
try:
row = json.loads(line)
except json.JSONDecodeError as exc:
raise ValueError(f"invalid JSONL on line {line_no}: {exc.msg}") from exc
if not isinstance(row, dict):
raise ValueError(f"JSONL line {line_no} is not an object")
tasks.append(row)
return tasks, {}
def load_input(path: str | None, fmt: str) -> tuple[list[dict[str, Any]], dict[str, Any]]:
raw = sys.stdin.read() if not path or path == "-" else Path(path).read_text(encoding="utf-8")
if not raw.strip():
raise ValueError("input is empty")
parse_as_jsonl = fmt == "jsonl" or (fmt == "auto" and "\n" in raw.strip() and not raw.lstrip().startswith(("{", "[")))
if parse_as_jsonl:
return load_jsonl(raw)
try:
data = json.loads(raw)
except json.JSONDecodeError as exc:
if fmt == "auto" and "\n" in raw.strip():
return load_jsonl(raw)
raise ValueError(f"invalid JSON input: {exc.msg}") from exc
if isinstance(data, list):
if not all(isinstance(item, dict) for item in data):
raise ValueError("JSON list must contain task objects")
return data, {}
if isinstance(data, dict):
tasks = data.get("tasks")
if tasks is None:
# Treat a single object with required task fields as a one-task summary.
if REQUIRED_TASK_FIELDS.issubset(data):
return [data], {}
raise ValueError("JSON object must contain a 'tasks' list")
if not isinstance(tasks, list) or not all(isinstance(item, dict) for item in tasks):
raise ValueError("'tasks' must be a list of objects")
metadata = {key: value for key, value in data.items() if key != "tasks"}
return tasks, metadata
raise ValueError("input must be JSON object, JSON list, or JSON Lines")
def validate_task(task: dict[str, Any]) -> None:
missing = sorted(REQUIRED_TASK_FIELDS - set(task))
if missing:
task_id = task.get("id", "<unknown>")
raise ValueError(f"task {task_id} missing required fields: {', '.join(missing)}")
status = str(task.get("status"))
if status not in SUPPORTED_STATUSES:
raise ValueError(f"task {task.get('id')} has unsupported status: {status}")
for field in ("created_at", "updated_at"):
if not isinstance(task.get(field), (int, float)):
raise ValueError(f"task {task.get('id')} field {field} must be epoch seconds")
def confidence(value: float) -> float:
return round(max(0.0, min(1.0, value)), 2)
def classify_task_type(task: dict[str, Any]) -> dict[str, Any]:
title = str(task.get("title", "")).strip().lower()
body = compact_text(task)
prefix = title.split(":", 1)[0].strip() if ":" in title else ""
prefix_map = {"doc": "docs"}
if prefix in LIFECYCLE_PREFIXES:
value = prefix_map.get(prefix, prefix)
if value in TASK_TYPES:
return {"value": value, "confidence": 0.95, "reason_codes": [f"title_prefix_{value}"]}
keyword_rules = [
("discovery", ["discover", "inventory", "repo map", "read-only"]),
("spec", ["spec", "define", "contract", "schema"]),
("implement", ["implement", "engineer", "script", "code", "build"]),
("review", ["review", "approve", "findings"]),
("docs", ["docs", "runbook", "readme"]),
("ops", ["ops", "health", "monitor", "deploy", "cleanup"]),
("integration", ["integration", "merge", "cherry-pick", "fan-in"]),
("final", ["final", "closeout", "synthesis"]),
("test", ["test", "smoke", "validate"]),
("charter", ["charter", "program framing"]),
]
for value, needles in keyword_rules:
if any(needle in body for needle in needles):
return {"value": value, "confidence": 0.78, "reason_codes": [f"keyword_{value}"]}
return {"value": "unknown", "confidence": 0.2, "reason_codes": ["insufficient_signal"]}
def classify_lane(task: dict[str, Any]) -> dict[str, Any]:
text = compact_text(task)
rules = [
("kanban_hygiene", ["kanban", "task hygiene", "board summaries", "review-needed", "next gate"]),
("cron_n8n_classifier", ["cron", "n8n", "alert", "event classifier"]),
("rag_context_gate", ["rag", "context gate", "retrieval", "bundle"]),
("doc_image_audio_triage", ["document", "image", "audio triage", "ocr", "attachments"]),
("voice_audio_pipeline", ["voice", "whisper", "memo", "transcribe"]),
("docs_runbook_service_map", ["service map", "runbook", "readme"]),
("observability_utilization", ["health", "utilization", "metrics", "digest"]),
("ops_integration", ["merge", "integration", "cherry-pick", "fan-in"]),
("final_closeout", ["final", "closeout", "synthesis"]),
]
for value, needles in rules:
matched = [needle.replace(" ", "_") for needle in needles if needle in text]
if matched:
return {"value": value, "confidence": 0.9, "reason_codes": [f"mentions_{matched[0]}"]}
if text:
return {"value": "general", "confidence": 0.45, "reason_codes": ["no_lane_specific_signal"]}
return {"value": "unknown", "confidence": 0.1, "reason_codes": ["insufficient_signal"]}
def classify_project(task: dict[str, Any], board: str | None, input_metadata: dict[str, Any]) -> dict[str, Any]:
explicit = task.get("project") or input_metadata.get("project")
if explicit:
return {"value": str(explicit), "confidence": 0.9, "source": "input"}
board_name = board or input_metadata.get("board")
if board_name:
return {"value": str(board_name), "confidence": 0.98, "source": "board_name"}
text = compact_text(task)
if "npu" in text or "openvino" in text:
return {"value": "npu-maximization", "confidence": 0.72, "source": "body"}
return {"value": "unknown", "confidence": 0.1, "source": "unknown"}
def classify_blocker(task: dict[str, Any]) -> dict[str, Any]:
status = str(task.get("status"))
text = compact_text(task)
last_outcome = str(task.get("last_run_outcome") or "").lower()
reason_codes: list[str] = []
value = "none"
blocked = False
conf = 0.0
if status == "blocked":
blocked = True
conf = 0.85
if "review-required" in text or "changes requested" in text:
value = "review_changes_requested"
reason_codes.append("blocked_review_required_or_changes")
elif any(word in text for word in ("credential", "token", "path", "spawn_failed")):
value = "missing_credentials"
reason_codes.append("blocked_missing_credentials_or_path")
elif any(word in text for word in ("human", "approval", "decision", "confirm")):
value = "human_decision"
reason_codes.append("blocked_human_decision")
else:
value = "unknown"
reason_codes.append("status_blocked")
elif status == "todo" and task.get("parents"):
value = "missing_parent"
conf = 0.75
reason_codes.append("todo_with_parents")
elif last_outcome in {"crashed", "timed_out", "failed"}:
value = "failed_tests"
conf = 0.65
reason_codes.append(f"last_run_{last_outcome}")
return {"value": value, "blocked": blocked, "confidence": confidence(conf), "reason_codes": reason_codes}
def age_hours(now: float, timestamp: Any) -> float | None:
if not isinstance(timestamp, (int, float)):
return None
return round(max(0.0, now - float(timestamp)) / 3600.0, 2)
def classify_staleness(task: dict[str, Any], now: float) -> dict[str, Any]:
status = str(task.get("status"))
created = float(task["created_at"])
activity_ts = float(task.get("heartbeat_at") or task.get("last_activity_at") or task.get("updated_at") or created)
age = age_hours(now, created)
last_activity = age_hours(now, activity_ts)
threshold = 24
value = "fresh"
reason_codes: list[str] = []
if status == "running":
threshold = 1
if last_activity is not None and last_activity > 1:
value = "stale_lock"
reason_codes.append("running_no_recent_heartbeat")
elif status == "ready":
threshold = 24
if last_activity is not None and last_activity >= 72:
value = "stale"
reason_codes.append("ready_over_72h")
elif last_activity is not None and last_activity >= 24:
value = "aging"
reason_codes.append("ready_over_24h")
elif status == "blocked":
review_required = "review-required" in compact_text(task)
threshold = 24 if review_required else 48
if last_activity is not None and last_activity >= 168:
value = "stale"
reason_codes.append("blocked_over_7d")
elif review_required and last_activity is not None and last_activity >= 72:
value = "stale"
reason_codes.append("review_required_over_72h")
elif last_activity is not None and last_activity >= threshold:
value = "aging"
reason_codes.append("blocked_or_review_aging")
elif status == "todo" and not task.get("parents") and last_activity is not None and last_activity >= 72:
value = "orphaned"
threshold = 72
reason_codes.append("todo_without_parents_over_72h")
return {
"value": value,
"age_hours": age,
"last_activity_hours": last_activity,
"threshold_hours": threshold,
"reason_codes": reason_codes,
}
def normalize_title(title: str) -> str:
text = title.lower().strip()
text = re.sub(r"^(charter|discovery|spec|implement|test|review|docs?|ops|integration|final)\s*:\s*", "", text)
text = re.sub(r"[^a-z0-9]+", " ", text)
return re.sub(r"\s+", " ", text).strip()
def find_duplicates(tasks: list[dict[str, Any]], labels: dict[str, dict[str, Any]]) -> dict[str, dict[str, Any]]:
groups: dict[tuple[str, str, str], list[str]] = {}
active_statuses = SUPPORTED_STATUSES - {"done", "archived", "cancelled"}
for task in tasks:
if str(task.get("status")) not in active_statuses:
continue
task_id = str(task["id"])
key = (
normalize_title(str(task.get("title", ""))),
labels[task_id]["lane"]["value"],
labels[task_id]["task_type"]["value"],
)
if key[0]:
groups.setdefault(key, []).append(task_id)
result = {
str(task["id"]): {
"is_duplicate": False,
"canonical_task_id": None,
"candidate_ids": [],
"confidence": 0.0,
"reason_codes": [],
}
for task in tasks
}
for ids in groups.values():
if len(ids) < 2:
continue
canonical = sorted(ids)[0]
for task_id in ids:
candidates = [candidate for candidate in ids if candidate != task_id]
result[task_id] = {
"is_duplicate": task_id != canonical,
"canonical_task_id": canonical if task_id != canonical else None,
"candidate_ids": candidates,
"confidence": 0.86,
"reason_codes": ["same_normalized_title_lane_and_task_type"],
}
return result
def has_non_positive_npu_busy_delta(text: str) -> bool:
if "npu" not in text and "busy" not in text:
return False
patterns = [
r"\b(?:npu_)?busy(?:_time)?(?:_delta)?(?:_us)?\s*[=:]\s*([+-]?\d+(?:\.\d+)?)\b",
r"\b(?:npu_)?delta(?:_us)?\s*[=:]\s*([+-]?\d+(?:\.\d+)?)\b",
]
for pattern in patterns:
for match in re.finditer(pattern, text):
try:
if float(match.group(1)) <= 0:
return True
except ValueError:
continue
return False
def classify_review_needed(task: dict[str, Any], task_type: str) -> dict[str, Any]:
text = compact_text(task)
changed_files = task.get("changed_files") or task.get("diff_path") or task.get("tests_run")
if has_non_positive_npu_busy_delta(text):
return {"value": True, "kind": "npu_proof_gate", "confidence": 0.84, "reason_codes": ["npu_claim_non_positive_busy_delta"]}
if "npu" in text and ("http 200" in text or "no busy" in text or "missing busy" in text):
return {"value": True, "kind": "npu_proof_gate", "confidence": 0.8, "reason_codes": ["npu_claim_needs_busy_delta"]}
if "review-required" in text:
kind = "code_change" if task_type == "implement" else "spec_review"
return {"value": True, "kind": kind, "confidence": 0.92, "reason_codes": ["review_required_marker"]}
if changed_files and task_type in {"implement", "ops", "docs"}:
return {"value": True, "kind": "code_change", "confidence": 0.86, "reason_codes": ["reported_changed_files_or_tests"]}
if any(needle in text for needle in ("routing authority", "restart service", "write memory", "send outbound", "private root", "wildcard bind", "vector db mutation")):
return {"value": True, "kind": "human_approval", "confidence": 0.84, "reason_codes": ["authority_change_requires_approval"]}
return {"value": False, "kind": "none", "confidence": 0.2, "reason_codes": []}
def classify_next_gate(task: dict[str, Any], labels: dict[str, Any]) -> dict[str, Any]:
task_type = labels["task_type"]["value"]
status = str(task.get("status"))
reason_codes: list[str] = []
if labels["duplicate"]["is_duplicate"]:
return {"value": "dedupe_review", "confidence": 0.86, "reason_codes": ["duplicate_candidate"]}
if labels["staleness"]["value"] == "stale_lock":
return {"value": "investigate_stale_lock", "confidence": 0.88, "reason_codes": ["running_stale_lock"]}
blocker = labels["blocker"]
if blocker["value"] in {"human_decision", "missing_credentials", "unknown"} and blocker["blocked"]:
return {"value": "needs_human_decision", "confidence": 0.85, "reason_codes": blocker["reason_codes"] or ["blocked"]}
if blocker["value"] == "missing_parent":
return {"value": "wait_for_parents", "confidence": 0.82, "reason_codes": ["unfinished_parents"]}
if task_type == "implement" and not (task.get("tests_run") or task.get("test_evidence")) and status in {"blocked", "done"}:
return {"value": "needs_test_evidence", "confidence": 0.78, "reason_codes": ["implementation_without_test_evidence"]}
review_needed = labels["review_needed"]
if review_needed["kind"] == "npu_proof_gate":
return {"value": "needs_npu_proof", "confidence": 0.8, "reason_codes": review_needed["reason_codes"]}
if review_needed["value"]:
return {"value": "ready_for_review", "confidence": 0.86, "reason_codes": review_needed["reason_codes"]}
gate_by_type = {
"spec": "ready_for_implementation",
"implement": "ready_for_review",
"review": "ready_for_integration",
"docs": "ready_for_integration",
"ops": "ready_for_ops_validation",
"integration": "ready_for_closeout",
"final": "safe_to_complete",
"discovery": "safe_to_complete",
"charter": "ready_for_spec",
"test": "ready_for_review",
}
type_gate = gate_by_type.get(task_type, "unknown")
if task_type in gate_by_type:
reason_codes.append(f"task_type_{task_type}")
return {"value": type_gate, "confidence": 0.74 if type_gate != "unknown" else 0.2, "reason_codes": reason_codes}
def advisory(tasks: list[dict[str, Any]], *, board: str | None, now: float, input_metadata: dict[str, Any], include_evidence: bool) -> dict[str, Any]:
for task in tasks:
validate_task(task)
prelim: dict[str, dict[str, Any]] = {}
for task in tasks:
task_id = str(task["id"])
prelim[task_id] = {
"task_type": classify_task_type(task),
"project": classify_project(task, board, input_metadata),
"lane": classify_lane(task),
"blocker": classify_blocker(task),
"staleness": classify_staleness(task, now),
}
duplicates = find_duplicates(tasks, prelim)
items = []
for task in tasks:
task_id = str(task["id"])
labels = dict(prelim[task_id])
labels["duplicate"] = duplicates[task_id]
labels["review_needed"] = classify_review_needed(task, labels["task_type"]["value"])
labels["next_gate"] = classify_next_gate(task, labels)
item = {
"task_id": task_id,
**labels,
"warnings": [],
}
if include_evidence:
item["evidence"] = {
"normalized_title": normalize_title(str(task.get("title", ""))),
"status": task.get("status"),
"parents_count": len(task.get("parents") or []),
"children_count": len(task.get("children") or []),
}
items.append(item)
counts = {
"tasks": len(items),
"duplicates": sum(1 for item in items if item["duplicate"]["is_duplicate"]),
"review_needed": sum(1 for item in items if item["review_needed"]["value"]),
"stale": sum(1 for item in items if item["staleness"]["value"] in {"stale", "stale_lock", "orphaned"}),
"blocked": sum(1 for item in items if item["blocker"]["blocked"]),
}
return {
"schema": SCHEMA,
"dry_run": True,
"created": int(now),
"board": board or input_metadata.get("board") or None,
"counts": counts,
"authority": AUTHORITY,
"npu_proof": NPU_PROOF,
"items": items,
}
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Dry-run Kanban hygiene advisory classifier",
epilog="Input: JSON object with tasks[] or JSONL task objects. Required task fields: id,title,status,assignee,created_at,updated_at. Optional compact fields such as body_excerpt, parents, children, changed_files, tests_run, last_run_outcome, and last_comment_excerpt improve labels.",
)
parser.add_argument("--input", "-i", help="Input JSON/JSONL file; omit or '-' for stdin")
parser.add_argument("--format", choices=["auto", "json", "jsonl"], default="auto", help="Input format")
parser.add_argument("--board", help="Board/project name to include in output")
parser.add_argument("--now", type=float, default=None, help="Epoch seconds for deterministic staleness tests")
parser.add_argument("--compact", action="store_true", help="Accepted for compatibility; output is compact JSON by default")
parser.add_argument("--include-evidence", action="store_true", help="Include short derived evidence fields")
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
try:
tasks, metadata = load_input(args.input, args.format)
output = advisory(
tasks,
board=args.board,
now=args.now if args.now is not None else time.time(),
input_metadata=metadata,
include_evidence=args.include_evidence,
)
except (OSError, ValueError) as exc:
print(f"kanban-hygiene-advisory: {exc}", file=sys.stderr)
return 2
print(json.dumps(output, sort_keys=True, separators=(",", ":")))
return 0
if __name__ == "__main__":
raise SystemExit(main())