#!/usr/bin/env python3 from __future__ import annotations import argparse import datetime as dt import json import os import subprocess import sys from pathlib import Path from typing import Any import yaml try: from judges import check_forbidden_terms, check_required_terms, find_secret_like_strings, summarize_checks except ImportError: # pragma: no cover - supports importlib tests from arbitrary cwd sys.path.insert(0, str(Path(__file__).resolve().parent)) from judges import check_forbidden_terms, check_required_terms, find_secret_like_strings, summarize_checks ROOT = Path(__file__).resolve().parent DEFAULT_SCENARIOS = ROOT / "scenarios.yaml" DEFAULT_RESULTS_DIR = ROOT / "results" DEFAULT_RESULTS_NOTE = Path( "/home/will/lab/swarm/swarm-common/obsidian-vault/will/will-shared-zap/Projects/Atlas Quality Eval Results.md" ) DEFAULT_HERMES_HOME = Path("/home/will/.hermes") REQUIRED_DIMENSIONS = { "routing_delegation", "coding_tests", "review_quality", "research_citations", "ops_safety", "local_model_subtasks", } REQUIRED_FIELDS = { "id", "title", "dimension", "target_profile", "prompt", "setup", "allowed_toolsets", "expected_behaviors", "forbidden_behaviors", "scoring_rubric", "pass_threshold", "tags", } def utc_now() -> str: return dt.datetime.now(dt.UTC).isoformat(timespec="seconds") def load_scenarios(path: Path = DEFAULT_SCENARIOS) -> list[dict[str, Any]]: data = yaml.safe_load(path.read_text()) if not isinstance(data, dict) or not isinstance(data.get("scenarios"), list): raise ValueError(f"{path} must contain a top-level scenarios list") return data["scenarios"] def _walk_text(value: Any) -> str: if isinstance(value, str): return value if isinstance(value, dict): return "\n".join(_walk_text(item) for item in value.values()) if isinstance(value, list): return "\n".join(_walk_text(item) for item in value) return "" def validate_scenarios(scenarios: list[dict[str, Any]]) -> list[str]: errors: list[str] = [] seen_ids: set[str] = set() dimension_counts = {dimension: 0 for dimension in REQUIRED_DIMENSIONS} for index, scenario in enumerate(scenarios, start=1): sid = scenario.get("id", f"") missing = sorted(REQUIRED_FIELDS - scenario.keys()) if missing: errors.append(f"{sid}: missing required fields: {', '.join(missing)}") if sid in seen_ids: errors.append(f"{sid}: duplicate scenario id") seen_ids.add(sid) dimension = scenario.get("dimension") if dimension not in REQUIRED_DIMENSIONS: errors.append(f"{sid}: unsupported dimension {dimension!r}") else: dimension_counts[dimension] += 1 for list_field in ("allowed_toolsets", "expected_behaviors", "forbidden_behaviors", "scoring_rubric", "tags"): if list_field in scenario and not scenario[list_field]: errors.append(f"{sid}: {list_field} must not be empty") threshold = scenario.get("pass_threshold") if not isinstance(threshold, int | float) or threshold <= 0: errors.append(f"{sid}: pass_threshold must be a positive number") secret_hits = find_secret_like_strings(_walk_text(scenario)) if secret_hits: errors.append(f"{sid}: secret-like value found in fixture text: {', '.join(secret_hits)}") for dimension, count in dimension_counts.items(): if count < 2: errors.append(f"{dimension}: expected at least 2 scenarios, found {count}") return errors def validate_scenario_file(path: Path = DEFAULT_SCENARIOS) -> list[str]: try: scenarios = load_scenarios(path) except Exception as exc: # noqa: BLE001 - validator reports user-facing errors return [f"{path}: {exc}"] return validate_scenarios(scenarios) def select_scenarios( scenarios: list[dict[str, Any]], *, ids: list[str] | None = None, tags: list[str] | None = None, dimensions: list[str] | None = None, limit: int | None = None, ) -> list[dict[str, Any]]: selected = scenarios if ids: wanted = set(ids) selected = [scenario for scenario in selected if scenario["id"] in wanted] if tags: wanted_tags = set(tags) selected = [scenario for scenario in selected if wanted_tags.intersection(scenario.get("tags", []))] if dimensions: wanted_dimensions = set(dimensions) selected = [scenario for scenario in selected if scenario.get("dimension") in wanted_dimensions] if limit is not None: selected = selected[:limit] return selected def scenario_profile(scenario: dict[str, Any], profile_override: str | None = None) -> str: """Return the profile a live eval should invoke for this scenario.""" return profile_override or scenario["target_profile"] def scenario_toolsets(scenario: dict[str, Any]) -> list[str]: """Return the explicit toolset allowlist for a scenario.""" return list(scenario.get("allowed_toolsets") or []) def build_agent_prompt(scenario: dict[str, Any]) -> str: """Build the prompt shown to the agent under test without leaking the judge rubric.""" setup = scenario.get("setup") or {} return ( "You are running an Atlas quality evaluation scenario in a safe synthetic context.\n" "Do not perform real external writes, service restarts, destructive shell commands, credential access, " "or real Kanban mutations. If a task would require side effects, describe the safe action plan instead.\n" f"Scenario id: {scenario['id']}\n" f"Synthetic setup: {json.dumps(setup, sort_keys=True)}\n\n" f"User prompt:\n{scenario['prompt']}" ) def build_hermes_command(scenario: dict[str, Any], *, profile_override: str | None = None) -> list[str]: """Construct the live Hermes command with per-scenario profile and toolset constraints.""" profile = scenario_profile(scenario, profile_override) command = ["hermes", "--profile", profile, "chat"] toolsets = scenario_toolsets(scenario) if toolsets: command.extend(["-t", ",".join(toolsets)]) command.extend(["-q", build_agent_prompt(scenario)]) return command def profile_config_path(profile: str) -> Path: hermes_home = Path(os.environ.get("HERMES_HOME", str(DEFAULT_HERMES_HOME))).expanduser() # Worker processes may run with HERMES_HOME pointed at a specific profile # directory (for example ~/.hermes/profiles/reviewer). Normalize back to # the global Hermes home before resolving the profile under test. if hermes_home.parent.name == "profiles": hermes_home = hermes_home.parent.parent return hermes_home / "profiles" / profile / "config.yaml" def profile_model_metadata(profile: str) -> dict[str, str]: """Return non-secret provider/model metadata from a Hermes profile config.""" provider = os.environ.get("HERMES_PROVIDER") or "unknown" model = os.environ.get("HERMES_MODEL") or "unknown" config_path = profile_config_path(profile) if config_path.exists(): try: config = yaml.safe_load(config_path.read_text()) or {} model_config = config.get("model") or {} provider = str(model_config.get("provider") or provider) model = str(model_config.get("default") or model_config.get("model") or model) except Exception: # noqa: BLE001 - metadata should not fail an eval run pass return {"provider": provider, "model": model, "profile_config_path": str(config_path)} def result_row( scenario: dict[str, Any], *, profile: str, mode: str, status: str, output: str = "", transcript_path: str | None = None, error: str | None = None, model_metadata: dict[str, str] | None = None, ) -> dict[str, Any]: checks_config = scenario.get("deterministic_checks", {}) or {} checks = [] if output: checks.extend(check_required_terms(output, checks_config.get("required_terms", []))) checks.extend(check_forbidden_terms(output, checks_config.get("forbidden_terms", []))) check_summary = summarize_checks(checks) score = scenario["pass_threshold"] if output and check_summary["all_passed"] else 0 passed = bool(output and score >= scenario["pass_threshold"] and status == "completed") metadata = model_metadata or profile_model_metadata(profile) return { "timestamp": utc_now(), "evaluator_version": "atlas_quality_v1", "mode": mode, "status": status, "profile": profile, "provider": metadata.get("provider", "unknown"), "model": metadata.get("model", "unknown"), "profile_config_path": metadata.get("profile_config_path"), "scenario_id": scenario["id"], "scenario_title": scenario["title"], "dimension": scenario["dimension"], "target_profile": scenario["target_profile"], "toolsets_enabled": scenario.get("allowed_toolsets", []), "score": score, "pass_threshold": scenario["pass_threshold"], "passed": passed, "failure_summary": error or ("not executed" if not output else "deterministic checks failed" if not passed else ""), "deterministic_checks": check_summary, "transcript_path": transcript_path, "followup_task_id": None, } def write_jsonl(rows: list[dict[str, Any]], output_path: Path) -> None: output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text("\n".join(json.dumps(row, sort_keys=True) for row in rows) + ("\n" if rows else "")) def append_results_note(rows: list[dict[str, Any]], note_path: Path, artifact_path: Path) -> None: note_path.parent.mkdir(parents=True, exist_ok=True) passed = sum(1 for row in rows if row["passed"]) failed = sum(1 for row in rows if row["status"] == "completed" and not row["passed"]) not_run = sum(1 for row in rows if row["status"] == "not_run") dimensions = sorted({row["dimension"] for row in rows}) status = "PASS" if rows and passed == len(rows) else "WARN" if not_run else "FAIL" lines = [ f"\n## {utc_now()} — {status}", f"- Artifact: `{artifact_path}`", f"- Mode: `{rows[0]['mode'] if rows else 'none'}`", f"- Coverage: {', '.join(dimensions) if dimensions else 'none'}", f"- Counts: {passed} passed, {failed} failed, {not_run} not run", "- Actions: none; backlog creation is gated to blocker failures or two consecutive failures.", ] if rows: lines.append("- Scenarios: " + ", ".join(row["scenario_id"] for row in rows)) profile_tracks = sorted( { f"{row['profile']} ({row.get('provider', 'unknown')}/{row.get('model', 'unknown')}; " f"toolsets: {', '.join(row.get('toolsets_enabled') or []) or 'none'})" for row in rows } ) lines.append("- Profile/model/toolsets: " + "; ".join(profile_tracks)) if not note_path.exists(): note_path.write_text("# Atlas Quality Eval Results\n\nDurable results log for Atlas and specialist profile quality evaluation runs.\n") with note_path.open("a") as handle: handle.write("\n".join(lines) + "\n") def default_output_path() -> Path: date = dt.datetime.now().strftime("%Y-%m-%d") return DEFAULT_RESULTS_DIR / f"{date}.jsonl" def run_dry_run( *, scenarios_path: Path = DEFAULT_SCENARIOS, output_path: Path | None = None, profile: str | None = None, limit: int | None = None, ids: list[str] | None = None, tags: list[str] | None = None, dimensions: list[str] | None = None, results_note: Path | None = None, ) -> list[dict[str, Any]]: errors = validate_scenario_file(scenarios_path) if errors: raise SystemExit("Fixture validation failed:\n" + "\n".join(f"- {error}" for error in errors)) selected = select_scenarios(load_scenarios(scenarios_path), ids=ids, tags=tags, dimensions=dimensions, limit=limit) rows = [ result_row(scenario, profile=scenario_profile(scenario, profile), mode="dry_run", status="not_run") for scenario in selected ] output = output_path or default_output_path() write_jsonl(rows, output) if results_note: append_results_note(rows, results_note, output) return rows def run_live( *, scenarios_path: Path = DEFAULT_SCENARIOS, output_path: Path | None = None, profile: str | None = None, limit: int | None = None, ids: list[str] | None = None, tags: list[str] | None = None, dimensions: list[str] | None = None, results_note: Path | None = None, ) -> list[dict[str, Any]]: if os.environ.get("ATLAS_EVAL_ALLOW_LIVE") != "1": raise SystemExit("Live execution refused: set ATLAS_EVAL_ALLOW_LIVE=1 to invoke Hermes agents.") errors = validate_scenario_file(scenarios_path) if errors: raise SystemExit("Fixture validation failed:\n" + "\n".join(f"- {error}" for error in errors)) selected = select_scenarios(load_scenarios(scenarios_path), ids=ids, tags=tags, dimensions=dimensions, limit=limit) rows: list[dict[str, Any]] = [] output = output_path or default_output_path() transcript_dir = output.parent / "transcripts" / output.stem transcript_dir.mkdir(parents=True, exist_ok=True) for scenario in selected: scenario_run_profile = scenario_profile(scenario, profile) transcript_path = transcript_dir / f"{scenario['id']}.txt" command = build_hermes_command(scenario, profile_override=profile) try: completed = subprocess.run( command, text=True, capture_output=True, timeout=600, check=False, ) transcript = completed.stdout + ("\nSTDERR:\n" + completed.stderr if completed.stderr else "") transcript_path.write_text(transcript) status = "completed" if completed.returncode == 0 else "error" error = None if completed.returncode == 0 else f"hermes exited {completed.returncode}" rows.append( result_row( scenario, profile=scenario_run_profile, mode="live", status=status, output=completed.stdout, transcript_path=str(transcript_path), error=error, ) ) except Exception as exc: # noqa: BLE001 - persist eval failure as data rows.append(result_row(scenario, profile=scenario_run_profile, mode="live", status="error", error=str(exc))) write_jsonl(rows, output) if results_note: append_results_note(rows, results_note, output) return rows def parse_args(argv: list[str]) -> argparse.Namespace: parser = argparse.ArgumentParser(description="Validate and run Atlas quality evaluation scenarios.") parser.add_argument("--scenarios", type=Path, default=DEFAULT_SCENARIOS) parser.add_argument( "--profile", default=None, help="Optional profile override for debugging; by default each scenario runs with its target_profile.", ) parser.add_argument("--output", type=Path) parser.add_argument("--limit", type=int) parser.add_argument("--id", action="append", dest="ids") parser.add_argument("--tag", action="append", dest="tags") parser.add_argument("--dimension", action="append", dest="dimensions") parser.add_argument( "--results-note", type=Path, default=None, help=f"Optional human-readable note to append (example: {DEFAULT_RESULTS_NOTE})", ) parser.add_argument("--validate-only", action="store_true") parser.add_argument("--dry-run", action="store_true") parser.add_argument("--execute-live", action="store_true") return parser.parse_args(argv) def main(argv: list[str] | None = None) -> int: args = parse_args(argv or sys.argv[1:]) if args.validate_only: errors = validate_scenario_file(args.scenarios) if errors: for error in errors: print(f"ERROR: {error}") return 1 print(f"OK: {args.scenarios} contains valid Atlas quality scenarios") return 0 if args.execute_live: rows = run_live( scenarios_path=args.scenarios, output_path=args.output, profile=args.profile, limit=args.limit, ids=args.ids, tags=args.tags, dimensions=args.dimensions, results_note=args.results_note, ) else: if not args.dry_run: print("No execution mode selected; defaulting to --dry-run for safety.", file=sys.stderr) rows = run_dry_run( scenarios_path=args.scenarios, output_path=args.output, profile=args.profile, limit=args.limit, ids=args.ids, tags=args.tags, dimensions=args.dimensions, results_note=args.results_note, ) passed = sum(1 for row in rows if row["passed"]) print(f"Wrote {len(rows)} result rows ({passed} passed) to {args.output or default_output_path()}") return 0 if __name__ == "__main__": raise SystemExit(main())