527 lines
21 KiB
Python
Executable File
527 lines
21 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Dry-run Kanban hygiene advisory classifier.
|
|
|
|
Reads compact board/task summaries and emits bounded labels/next gates without
|
|
mutating any Hermes Kanban state. Phase 1 is deterministic rules only; it does
|
|
not call kanban tools, restart services, write memory, or send outbound data.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
SCHEMA = "kanban_hygiene_advisory_v1"
|
|
AUTHORITY = {
|
|
"may_mutate_board": False,
|
|
"may_assign": False,
|
|
"may_block_or_unblock": False,
|
|
"may_complete_or_archive": False,
|
|
"may_create_tasks": False,
|
|
"may_write_memory": False,
|
|
"may_send_external": False,
|
|
"may_restart_services": False,
|
|
"may_execute_tools": False,
|
|
}
|
|
NPU_PROOF = {
|
|
"required_for_npu_claims": True,
|
|
"attempted": False,
|
|
"ok": None,
|
|
"npu_busy_delta_us": None,
|
|
}
|
|
|
|
REQUIRED_TASK_FIELDS = {"id", "title", "status", "assignee", "created_at", "updated_at"}
|
|
SUPPORTED_STATUSES = {
|
|
"triage",
|
|
"todo",
|
|
"ready",
|
|
"running",
|
|
"blocked",
|
|
"done",
|
|
"archived",
|
|
"failed",
|
|
"cancelled",
|
|
}
|
|
TASK_TYPES = {
|
|
"charter",
|
|
"discovery",
|
|
"spec",
|
|
"implement",
|
|
"test",
|
|
"review",
|
|
"docs",
|
|
"ops",
|
|
"integration",
|
|
"final",
|
|
"unknown",
|
|
}
|
|
LANES = {
|
|
"observability_utilization",
|
|
"cron_n8n_classifier",
|
|
"rag_context_gate",
|
|
"doc_image_audio_triage",
|
|
"voice_audio_pipeline",
|
|
"kanban_hygiene",
|
|
"docs_runbook_service_map",
|
|
"ops_integration",
|
|
"final_closeout",
|
|
"general",
|
|
"unknown",
|
|
}
|
|
LIFECYCLE_PREFIXES = {
|
|
"charter",
|
|
"discovery",
|
|
"spec",
|
|
"implement",
|
|
"test",
|
|
"review",
|
|
"docs",
|
|
"doc",
|
|
"ops",
|
|
"integration",
|
|
"final",
|
|
}
|
|
|
|
|
|
def compact_text(task: dict[str, Any]) -> str:
|
|
parts = [str(task.get("title", "")), str(task.get("body_excerpt", "")), str(task.get("last_run_summary_excerpt", "")), str(task.get("last_comment_excerpt", ""))]
|
|
return " ".join(part for part in parts if part).lower()
|
|
|
|
|
|
def load_jsonl(raw: str) -> tuple[list[dict[str, Any]], dict[str, Any]]:
|
|
tasks = []
|
|
for line_no, line in enumerate(raw.splitlines(), start=1):
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
row = json.loads(line)
|
|
except json.JSONDecodeError as exc:
|
|
raise ValueError(f"invalid JSONL on line {line_no}: {exc.msg}") from exc
|
|
if not isinstance(row, dict):
|
|
raise ValueError(f"JSONL line {line_no} is not an object")
|
|
tasks.append(row)
|
|
return tasks, {}
|
|
|
|
|
|
def load_input(path: str | None, fmt: str) -> tuple[list[dict[str, Any]], dict[str, Any]]:
|
|
raw = sys.stdin.read() if not path or path == "-" else Path(path).read_text(encoding="utf-8")
|
|
if not raw.strip():
|
|
raise ValueError("input is empty")
|
|
|
|
parse_as_jsonl = fmt == "jsonl" or (fmt == "auto" and "\n" in raw.strip() and not raw.lstrip().startswith(("{", "[")))
|
|
if parse_as_jsonl:
|
|
return load_jsonl(raw)
|
|
|
|
try:
|
|
data = json.loads(raw)
|
|
except json.JSONDecodeError as exc:
|
|
if fmt == "auto" and "\n" in raw.strip():
|
|
return load_jsonl(raw)
|
|
raise ValueError(f"invalid JSON input: {exc.msg}") from exc
|
|
if isinstance(data, list):
|
|
if not all(isinstance(item, dict) for item in data):
|
|
raise ValueError("JSON list must contain task objects")
|
|
return data, {}
|
|
if isinstance(data, dict):
|
|
tasks = data.get("tasks")
|
|
if tasks is None:
|
|
# Treat a single object with required task fields as a one-task summary.
|
|
if REQUIRED_TASK_FIELDS.issubset(data):
|
|
return [data], {}
|
|
raise ValueError("JSON object must contain a 'tasks' list")
|
|
if not isinstance(tasks, list) or not all(isinstance(item, dict) for item in tasks):
|
|
raise ValueError("'tasks' must be a list of objects")
|
|
metadata = {key: value for key, value in data.items() if key != "tasks"}
|
|
return tasks, metadata
|
|
raise ValueError("input must be JSON object, JSON list, or JSON Lines")
|
|
|
|
|
|
def validate_task(task: dict[str, Any]) -> None:
|
|
missing = sorted(REQUIRED_TASK_FIELDS - set(task))
|
|
if missing:
|
|
task_id = task.get("id", "<unknown>")
|
|
raise ValueError(f"task {task_id} missing required fields: {', '.join(missing)}")
|
|
status = str(task.get("status"))
|
|
if status not in SUPPORTED_STATUSES:
|
|
raise ValueError(f"task {task.get('id')} has unsupported status: {status}")
|
|
for field in ("created_at", "updated_at"):
|
|
if not isinstance(task.get(field), (int, float)):
|
|
raise ValueError(f"task {task.get('id')} field {field} must be epoch seconds")
|
|
|
|
|
|
def confidence(value: float) -> float:
|
|
return round(max(0.0, min(1.0, value)), 2)
|
|
|
|
|
|
def classify_task_type(task: dict[str, Any]) -> dict[str, Any]:
|
|
title = str(task.get("title", "")).strip().lower()
|
|
body = compact_text(task)
|
|
prefix = title.split(":", 1)[0].strip() if ":" in title else ""
|
|
prefix_map = {"doc": "docs"}
|
|
if prefix in LIFECYCLE_PREFIXES:
|
|
value = prefix_map.get(prefix, prefix)
|
|
if value in TASK_TYPES:
|
|
return {"value": value, "confidence": 0.95, "reason_codes": [f"title_prefix_{value}"]}
|
|
keyword_rules = [
|
|
("discovery", ["discover", "inventory", "repo map", "read-only"]),
|
|
("spec", ["spec", "define", "contract", "schema"]),
|
|
("implement", ["implement", "engineer", "script", "code", "build"]),
|
|
("review", ["review", "approve", "findings"]),
|
|
("docs", ["docs", "runbook", "readme"]),
|
|
("ops", ["ops", "health", "monitor", "deploy", "cleanup"]),
|
|
("integration", ["integration", "merge", "cherry-pick", "fan-in"]),
|
|
("final", ["final", "closeout", "synthesis"]),
|
|
("test", ["test", "smoke", "validate"]),
|
|
("charter", ["charter", "program framing"]),
|
|
]
|
|
for value, needles in keyword_rules:
|
|
if any(needle in body for needle in needles):
|
|
return {"value": value, "confidence": 0.78, "reason_codes": [f"keyword_{value}"]}
|
|
return {"value": "unknown", "confidence": 0.2, "reason_codes": ["insufficient_signal"]}
|
|
|
|
|
|
def classify_lane(task: dict[str, Any]) -> dict[str, Any]:
|
|
text = compact_text(task)
|
|
rules = [
|
|
("kanban_hygiene", ["kanban", "task hygiene", "board summaries", "review-needed", "next gate"]),
|
|
("cron_n8n_classifier", ["cron", "n8n", "alert", "event classifier"]),
|
|
("rag_context_gate", ["rag", "context gate", "retrieval", "bundle"]),
|
|
("doc_image_audio_triage", ["document", "image", "audio triage", "ocr", "attachments"]),
|
|
("voice_audio_pipeline", ["voice", "whisper", "memo", "transcribe"]),
|
|
("docs_runbook_service_map", ["service map", "runbook", "readme"]),
|
|
("observability_utilization", ["health", "utilization", "metrics", "digest"]),
|
|
("ops_integration", ["merge", "integration", "cherry-pick", "fan-in"]),
|
|
("final_closeout", ["final", "closeout", "synthesis"]),
|
|
]
|
|
for value, needles in rules:
|
|
matched = [needle.replace(" ", "_") for needle in needles if needle in text]
|
|
if matched:
|
|
return {"value": value, "confidence": 0.9, "reason_codes": [f"mentions_{matched[0]}"]}
|
|
if text:
|
|
return {"value": "general", "confidence": 0.45, "reason_codes": ["no_lane_specific_signal"]}
|
|
return {"value": "unknown", "confidence": 0.1, "reason_codes": ["insufficient_signal"]}
|
|
|
|
|
|
def classify_project(task: dict[str, Any], board: str | None, input_metadata: dict[str, Any]) -> dict[str, Any]:
|
|
explicit = task.get("project") or input_metadata.get("project")
|
|
if explicit:
|
|
return {"value": str(explicit), "confidence": 0.9, "source": "input"}
|
|
board_name = board or input_metadata.get("board")
|
|
if board_name:
|
|
return {"value": str(board_name), "confidence": 0.98, "source": "board_name"}
|
|
text = compact_text(task)
|
|
if "npu" in text or "openvino" in text:
|
|
return {"value": "npu-maximization", "confidence": 0.72, "source": "body"}
|
|
return {"value": "unknown", "confidence": 0.1, "source": "unknown"}
|
|
|
|
|
|
def classify_blocker(task: dict[str, Any]) -> dict[str, Any]:
|
|
status = str(task.get("status"))
|
|
text = compact_text(task)
|
|
last_outcome = str(task.get("last_run_outcome") or "").lower()
|
|
reason_codes: list[str] = []
|
|
value = "none"
|
|
blocked = False
|
|
conf = 0.0
|
|
|
|
if status == "blocked":
|
|
blocked = True
|
|
conf = 0.85
|
|
if "review-required" in text or "changes requested" in text:
|
|
value = "review_changes_requested"
|
|
reason_codes.append("blocked_review_required_or_changes")
|
|
elif any(word in text for word in ("credential", "token", "path", "spawn_failed")):
|
|
value = "missing_credentials"
|
|
reason_codes.append("blocked_missing_credentials_or_path")
|
|
elif any(word in text for word in ("human", "approval", "decision", "confirm")):
|
|
value = "human_decision"
|
|
reason_codes.append("blocked_human_decision")
|
|
else:
|
|
value = "unknown"
|
|
reason_codes.append("status_blocked")
|
|
elif status == "todo" and task.get("parents"):
|
|
value = "missing_parent"
|
|
conf = 0.75
|
|
reason_codes.append("todo_with_parents")
|
|
elif last_outcome in {"crashed", "timed_out", "failed"}:
|
|
value = "failed_tests"
|
|
conf = 0.65
|
|
reason_codes.append(f"last_run_{last_outcome}")
|
|
|
|
return {"value": value, "blocked": blocked, "confidence": confidence(conf), "reason_codes": reason_codes}
|
|
|
|
|
|
def age_hours(now: float, timestamp: Any) -> float | None:
|
|
if not isinstance(timestamp, (int, float)):
|
|
return None
|
|
return round(max(0.0, now - float(timestamp)) / 3600.0, 2)
|
|
|
|
|
|
def classify_staleness(task: dict[str, Any], now: float) -> dict[str, Any]:
|
|
status = str(task.get("status"))
|
|
created = float(task["created_at"])
|
|
activity_ts = float(task.get("heartbeat_at") or task.get("last_activity_at") or task.get("updated_at") or created)
|
|
age = age_hours(now, created)
|
|
last_activity = age_hours(now, activity_ts)
|
|
threshold = 24
|
|
value = "fresh"
|
|
reason_codes: list[str] = []
|
|
|
|
if status == "running":
|
|
threshold = 1
|
|
if last_activity is not None and last_activity > 1:
|
|
value = "stale_lock"
|
|
reason_codes.append("running_no_recent_heartbeat")
|
|
elif status == "ready":
|
|
threshold = 24
|
|
if last_activity is not None and last_activity >= 72:
|
|
value = "stale"
|
|
reason_codes.append("ready_over_72h")
|
|
elif last_activity is not None and last_activity >= 24:
|
|
value = "aging"
|
|
reason_codes.append("ready_over_24h")
|
|
elif status == "blocked":
|
|
review_required = "review-required" in compact_text(task)
|
|
threshold = 24 if review_required else 48
|
|
if last_activity is not None and last_activity >= 168:
|
|
value = "stale"
|
|
reason_codes.append("blocked_over_7d")
|
|
elif review_required and last_activity is not None and last_activity >= 72:
|
|
value = "stale"
|
|
reason_codes.append("review_required_over_72h")
|
|
elif last_activity is not None and last_activity >= threshold:
|
|
value = "aging"
|
|
reason_codes.append("blocked_or_review_aging")
|
|
elif status == "todo" and not task.get("parents") and last_activity is not None and last_activity >= 72:
|
|
value = "orphaned"
|
|
threshold = 72
|
|
reason_codes.append("todo_without_parents_over_72h")
|
|
|
|
return {
|
|
"value": value,
|
|
"age_hours": age,
|
|
"last_activity_hours": last_activity,
|
|
"threshold_hours": threshold,
|
|
"reason_codes": reason_codes,
|
|
}
|
|
|
|
|
|
def normalize_title(title: str) -> str:
|
|
text = title.lower().strip()
|
|
text = re.sub(r"^(charter|discovery|spec|implement|test|review|docs?|ops|integration|final)\s*:\s*", "", text)
|
|
text = re.sub(r"[^a-z0-9]+", " ", text)
|
|
return re.sub(r"\s+", " ", text).strip()
|
|
|
|
|
|
def find_duplicates(tasks: list[dict[str, Any]], labels: dict[str, dict[str, Any]]) -> dict[str, dict[str, Any]]:
|
|
groups: dict[tuple[str, str, str], list[str]] = {}
|
|
active_statuses = SUPPORTED_STATUSES - {"done", "archived", "cancelled"}
|
|
for task in tasks:
|
|
if str(task.get("status")) not in active_statuses:
|
|
continue
|
|
task_id = str(task["id"])
|
|
key = (
|
|
normalize_title(str(task.get("title", ""))),
|
|
labels[task_id]["lane"]["value"],
|
|
labels[task_id]["task_type"]["value"],
|
|
)
|
|
if key[0]:
|
|
groups.setdefault(key, []).append(task_id)
|
|
|
|
result = {
|
|
str(task["id"]): {
|
|
"is_duplicate": False,
|
|
"canonical_task_id": None,
|
|
"candidate_ids": [],
|
|
"confidence": 0.0,
|
|
"reason_codes": [],
|
|
}
|
|
for task in tasks
|
|
}
|
|
for ids in groups.values():
|
|
if len(ids) < 2:
|
|
continue
|
|
canonical = sorted(ids)[0]
|
|
for task_id in ids:
|
|
candidates = [candidate for candidate in ids if candidate != task_id]
|
|
result[task_id] = {
|
|
"is_duplicate": task_id != canonical,
|
|
"canonical_task_id": canonical if task_id != canonical else None,
|
|
"candidate_ids": candidates,
|
|
"confidence": 0.86,
|
|
"reason_codes": ["same_normalized_title_lane_and_task_type"],
|
|
}
|
|
return result
|
|
|
|
|
|
def has_non_positive_npu_busy_delta(text: str) -> bool:
|
|
if "npu" not in text and "busy" not in text:
|
|
return False
|
|
patterns = [
|
|
r"\b(?:npu_)?busy(?:_time)?(?:_delta)?(?:_us)?\s*[=:]\s*([+-]?\d+(?:\.\d+)?)\b",
|
|
r"\b(?:npu_)?delta(?:_us)?\s*[=:]\s*([+-]?\d+(?:\.\d+)?)\b",
|
|
]
|
|
for pattern in patterns:
|
|
for match in re.finditer(pattern, text):
|
|
try:
|
|
if float(match.group(1)) <= 0:
|
|
return True
|
|
except ValueError:
|
|
continue
|
|
return False
|
|
|
|
|
|
def classify_review_needed(task: dict[str, Any], task_type: str) -> dict[str, Any]:
|
|
text = compact_text(task)
|
|
changed_files = task.get("changed_files") or task.get("diff_path") or task.get("tests_run")
|
|
if has_non_positive_npu_busy_delta(text):
|
|
return {"value": True, "kind": "npu_proof_gate", "confidence": 0.84, "reason_codes": ["npu_claim_non_positive_busy_delta"]}
|
|
if "npu" in text and ("http 200" in text or "no busy" in text or "missing busy" in text):
|
|
return {"value": True, "kind": "npu_proof_gate", "confidence": 0.8, "reason_codes": ["npu_claim_needs_busy_delta"]}
|
|
if "review-required" in text:
|
|
kind = "code_change" if task_type == "implement" else "spec_review"
|
|
return {"value": True, "kind": kind, "confidence": 0.92, "reason_codes": ["review_required_marker"]}
|
|
if changed_files and task_type in {"implement", "ops", "docs"}:
|
|
return {"value": True, "kind": "code_change", "confidence": 0.86, "reason_codes": ["reported_changed_files_or_tests"]}
|
|
if any(needle in text for needle in ("routing authority", "restart service", "write memory", "send outbound", "private root", "wildcard bind", "vector db mutation")):
|
|
return {"value": True, "kind": "human_approval", "confidence": 0.84, "reason_codes": ["authority_change_requires_approval"]}
|
|
return {"value": False, "kind": "none", "confidence": 0.2, "reason_codes": []}
|
|
|
|
|
|
def classify_next_gate(task: dict[str, Any], labels: dict[str, Any]) -> dict[str, Any]:
|
|
task_type = labels["task_type"]["value"]
|
|
status = str(task.get("status"))
|
|
reason_codes: list[str] = []
|
|
|
|
if labels["duplicate"]["is_duplicate"]:
|
|
return {"value": "dedupe_review", "confidence": 0.86, "reason_codes": ["duplicate_candidate"]}
|
|
if labels["staleness"]["value"] == "stale_lock":
|
|
return {"value": "investigate_stale_lock", "confidence": 0.88, "reason_codes": ["running_stale_lock"]}
|
|
blocker = labels["blocker"]
|
|
if blocker["value"] in {"human_decision", "missing_credentials", "unknown"} and blocker["blocked"]:
|
|
return {"value": "needs_human_decision", "confidence": 0.85, "reason_codes": blocker["reason_codes"] or ["blocked"]}
|
|
if blocker["value"] == "missing_parent":
|
|
return {"value": "wait_for_parents", "confidence": 0.82, "reason_codes": ["unfinished_parents"]}
|
|
if task_type == "implement" and not (task.get("tests_run") or task.get("test_evidence")) and status in {"blocked", "done"}:
|
|
return {"value": "needs_test_evidence", "confidence": 0.78, "reason_codes": ["implementation_without_test_evidence"]}
|
|
review_needed = labels["review_needed"]
|
|
if review_needed["kind"] == "npu_proof_gate":
|
|
return {"value": "needs_npu_proof", "confidence": 0.8, "reason_codes": review_needed["reason_codes"]}
|
|
if review_needed["value"]:
|
|
return {"value": "ready_for_review", "confidence": 0.86, "reason_codes": review_needed["reason_codes"]}
|
|
|
|
gate_by_type = {
|
|
"spec": "ready_for_implementation",
|
|
"implement": "ready_for_review",
|
|
"review": "ready_for_integration",
|
|
"docs": "ready_for_integration",
|
|
"ops": "ready_for_ops_validation",
|
|
"integration": "ready_for_closeout",
|
|
"final": "safe_to_complete",
|
|
"discovery": "safe_to_complete",
|
|
"charter": "ready_for_spec",
|
|
"test": "ready_for_review",
|
|
}
|
|
type_gate = gate_by_type.get(task_type, "unknown")
|
|
if task_type in gate_by_type:
|
|
reason_codes.append(f"task_type_{task_type}")
|
|
return {"value": type_gate, "confidence": 0.74 if type_gate != "unknown" else 0.2, "reason_codes": reason_codes}
|
|
|
|
|
|
def advisory(tasks: list[dict[str, Any]], *, board: str | None, now: float, input_metadata: dict[str, Any], include_evidence: bool) -> dict[str, Any]:
|
|
for task in tasks:
|
|
validate_task(task)
|
|
|
|
prelim: dict[str, dict[str, Any]] = {}
|
|
for task in tasks:
|
|
task_id = str(task["id"])
|
|
prelim[task_id] = {
|
|
"task_type": classify_task_type(task),
|
|
"project": classify_project(task, board, input_metadata),
|
|
"lane": classify_lane(task),
|
|
"blocker": classify_blocker(task),
|
|
"staleness": classify_staleness(task, now),
|
|
}
|
|
duplicates = find_duplicates(tasks, prelim)
|
|
|
|
items = []
|
|
for task in tasks:
|
|
task_id = str(task["id"])
|
|
labels = dict(prelim[task_id])
|
|
labels["duplicate"] = duplicates[task_id]
|
|
labels["review_needed"] = classify_review_needed(task, labels["task_type"]["value"])
|
|
labels["next_gate"] = classify_next_gate(task, labels)
|
|
item = {
|
|
"task_id": task_id,
|
|
**labels,
|
|
"warnings": [],
|
|
}
|
|
if include_evidence:
|
|
item["evidence"] = {
|
|
"normalized_title": normalize_title(str(task.get("title", ""))),
|
|
"status": task.get("status"),
|
|
"parents_count": len(task.get("parents") or []),
|
|
"children_count": len(task.get("children") or []),
|
|
}
|
|
items.append(item)
|
|
|
|
counts = {
|
|
"tasks": len(items),
|
|
"duplicates": sum(1 for item in items if item["duplicate"]["is_duplicate"]),
|
|
"review_needed": sum(1 for item in items if item["review_needed"]["value"]),
|
|
"stale": sum(1 for item in items if item["staleness"]["value"] in {"stale", "stale_lock", "orphaned"}),
|
|
"blocked": sum(1 for item in items if item["blocker"]["blocked"]),
|
|
}
|
|
return {
|
|
"schema": SCHEMA,
|
|
"dry_run": True,
|
|
"created": int(now),
|
|
"board": board or input_metadata.get("board") or None,
|
|
"counts": counts,
|
|
"authority": AUTHORITY,
|
|
"npu_proof": NPU_PROOF,
|
|
"items": items,
|
|
}
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
description="Dry-run Kanban hygiene advisory classifier",
|
|
epilog="Input: JSON object with tasks[] or JSONL task objects. Required task fields: id,title,status,assignee,created_at,updated_at. Optional compact fields such as body_excerpt, parents, children, changed_files, tests_run, last_run_outcome, and last_comment_excerpt improve labels.",
|
|
)
|
|
parser.add_argument("--input", "-i", help="Input JSON/JSONL file; omit or '-' for stdin")
|
|
parser.add_argument("--format", choices=["auto", "json", "jsonl"], default="auto", help="Input format")
|
|
parser.add_argument("--board", help="Board/project name to include in output")
|
|
parser.add_argument("--now", type=float, default=None, help="Epoch seconds for deterministic staleness tests")
|
|
parser.add_argument("--compact", action="store_true", help="Accepted for compatibility; output is compact JSON by default")
|
|
parser.add_argument("--include-evidence", action="store_true", help="Include short derived evidence fields")
|
|
return parser
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = build_parser()
|
|
args = parser.parse_args(argv)
|
|
try:
|
|
tasks, metadata = load_input(args.input, args.format)
|
|
output = advisory(
|
|
tasks,
|
|
board=args.board,
|
|
now=args.now if args.now is not None else time.time(),
|
|
input_metadata=metadata,
|
|
include_evidence=args.include_evidence,
|
|
)
|
|
except (OSError, ValueError) as exc:
|
|
print(f"kanban-hygiene-advisory: {exc}", file=sys.stderr)
|
|
return 2
|
|
print(json.dumps(output, sort_keys=True, separators=(",", ":")))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|