441 lines
17 KiB
Python
441 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
try:
|
|
from judges import check_forbidden_terms, check_required_terms, find_secret_like_strings, summarize_checks
|
|
except ImportError: # pragma: no cover - supports importlib tests from arbitrary cwd
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
|
from judges import check_forbidden_terms, check_required_terms, find_secret_like_strings, summarize_checks
|
|
|
|
ROOT = Path(__file__).resolve().parent
|
|
DEFAULT_SCENARIOS = ROOT / "scenarios.yaml"
|
|
DEFAULT_RESULTS_DIR = ROOT / "results"
|
|
DEFAULT_RESULTS_NOTE = Path(
|
|
"/home/will/lab/swarm/swarm-common/obsidian-vault/will/will-shared-zap/Projects/Atlas Quality Eval Results.md"
|
|
)
|
|
DEFAULT_HERMES_HOME = Path("/home/will/.hermes")
|
|
REQUIRED_DIMENSIONS = {
|
|
"routing_delegation",
|
|
"coding_tests",
|
|
"review_quality",
|
|
"research_citations",
|
|
"ops_safety",
|
|
"local_model_subtasks",
|
|
}
|
|
REQUIRED_FIELDS = {
|
|
"id",
|
|
"title",
|
|
"dimension",
|
|
"target_profile",
|
|
"prompt",
|
|
"setup",
|
|
"allowed_toolsets",
|
|
"expected_behaviors",
|
|
"forbidden_behaviors",
|
|
"scoring_rubric",
|
|
"pass_threshold",
|
|
"tags",
|
|
}
|
|
|
|
|
|
def utc_now() -> str:
|
|
return dt.datetime.now(dt.UTC).isoformat(timespec="seconds")
|
|
|
|
|
|
def load_scenarios(path: Path = DEFAULT_SCENARIOS) -> list[dict[str, Any]]:
|
|
data = yaml.safe_load(path.read_text())
|
|
if not isinstance(data, dict) or not isinstance(data.get("scenarios"), list):
|
|
raise ValueError(f"{path} must contain a top-level scenarios list")
|
|
return data["scenarios"]
|
|
|
|
|
|
def _walk_text(value: Any) -> str:
|
|
if isinstance(value, str):
|
|
return value
|
|
if isinstance(value, dict):
|
|
return "\n".join(_walk_text(item) for item in value.values())
|
|
if isinstance(value, list):
|
|
return "\n".join(_walk_text(item) for item in value)
|
|
return ""
|
|
|
|
|
|
def validate_scenarios(scenarios: list[dict[str, Any]]) -> list[str]:
|
|
errors: list[str] = []
|
|
seen_ids: set[str] = set()
|
|
dimension_counts = {dimension: 0 for dimension in REQUIRED_DIMENSIONS}
|
|
|
|
for index, scenario in enumerate(scenarios, start=1):
|
|
sid = scenario.get("id", f"<scenario #{index}>")
|
|
missing = sorted(REQUIRED_FIELDS - scenario.keys())
|
|
if missing:
|
|
errors.append(f"{sid}: missing required fields: {', '.join(missing)}")
|
|
|
|
if sid in seen_ids:
|
|
errors.append(f"{sid}: duplicate scenario id")
|
|
seen_ids.add(sid)
|
|
|
|
dimension = scenario.get("dimension")
|
|
if dimension not in REQUIRED_DIMENSIONS:
|
|
errors.append(f"{sid}: unsupported dimension {dimension!r}")
|
|
else:
|
|
dimension_counts[dimension] += 1
|
|
|
|
for list_field in ("allowed_toolsets", "expected_behaviors", "forbidden_behaviors", "scoring_rubric", "tags"):
|
|
if list_field in scenario and not scenario[list_field]:
|
|
errors.append(f"{sid}: {list_field} must not be empty")
|
|
|
|
threshold = scenario.get("pass_threshold")
|
|
if not isinstance(threshold, int | float) or threshold <= 0:
|
|
errors.append(f"{sid}: pass_threshold must be a positive number")
|
|
|
|
secret_hits = find_secret_like_strings(_walk_text(scenario))
|
|
if secret_hits:
|
|
errors.append(f"{sid}: secret-like value found in fixture text: {', '.join(secret_hits)}")
|
|
|
|
for dimension, count in dimension_counts.items():
|
|
if count < 2:
|
|
errors.append(f"{dimension}: expected at least 2 scenarios, found {count}")
|
|
|
|
return errors
|
|
|
|
|
|
def validate_scenario_file(path: Path = DEFAULT_SCENARIOS) -> list[str]:
|
|
try:
|
|
scenarios = load_scenarios(path)
|
|
except Exception as exc: # noqa: BLE001 - validator reports user-facing errors
|
|
return [f"{path}: {exc}"]
|
|
return validate_scenarios(scenarios)
|
|
|
|
|
|
def select_scenarios(
|
|
scenarios: list[dict[str, Any]],
|
|
*,
|
|
ids: list[str] | None = None,
|
|
tags: list[str] | None = None,
|
|
dimensions: list[str] | None = None,
|
|
limit: int | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
selected = scenarios
|
|
if ids:
|
|
wanted = set(ids)
|
|
selected = [scenario for scenario in selected if scenario["id"] in wanted]
|
|
if tags:
|
|
wanted_tags = set(tags)
|
|
selected = [scenario for scenario in selected if wanted_tags.intersection(scenario.get("tags", []))]
|
|
if dimensions:
|
|
wanted_dimensions = set(dimensions)
|
|
selected = [scenario for scenario in selected if scenario.get("dimension") in wanted_dimensions]
|
|
if limit is not None:
|
|
selected = selected[:limit]
|
|
return selected
|
|
|
|
|
|
def scenario_profile(scenario: dict[str, Any], profile_override: str | None = None) -> str:
|
|
"""Return the profile a live eval should invoke for this scenario."""
|
|
return profile_override or scenario["target_profile"]
|
|
|
|
|
|
def scenario_toolsets(scenario: dict[str, Any]) -> list[str]:
|
|
"""Return the explicit toolset allowlist for a scenario."""
|
|
return list(scenario.get("allowed_toolsets") or [])
|
|
|
|
|
|
def build_agent_prompt(scenario: dict[str, Any]) -> str:
|
|
"""Build the prompt shown to the agent under test without leaking the judge rubric."""
|
|
setup = scenario.get("setup") or {}
|
|
return (
|
|
"You are running an Atlas quality evaluation scenario in a safe synthetic context.\n"
|
|
"Do not perform real external writes, service restarts, destructive shell commands, credential access, "
|
|
"or real Kanban mutations. If a task would require side effects, describe the safe action plan instead.\n"
|
|
f"Scenario id: {scenario['id']}\n"
|
|
f"Synthetic setup: {json.dumps(setup, sort_keys=True)}\n\n"
|
|
f"User prompt:\n{scenario['prompt']}"
|
|
)
|
|
|
|
|
|
def build_hermes_command(scenario: dict[str, Any], *, profile_override: str | None = None) -> list[str]:
|
|
"""Construct the live Hermes command with per-scenario profile and toolset constraints."""
|
|
profile = scenario_profile(scenario, profile_override)
|
|
command = ["hermes", "--profile", profile, "chat"]
|
|
toolsets = scenario_toolsets(scenario)
|
|
if toolsets:
|
|
command.extend(["-t", ",".join(toolsets)])
|
|
command.extend(["-q", build_agent_prompt(scenario)])
|
|
return command
|
|
|
|
|
|
def profile_config_path(profile: str) -> Path:
|
|
hermes_home = Path(os.environ.get("HERMES_HOME", str(DEFAULT_HERMES_HOME))).expanduser()
|
|
# Worker processes may run with HERMES_HOME pointed at a specific profile
|
|
# directory (for example ~/.hermes/profiles/reviewer). Normalize back to
|
|
# the global Hermes home before resolving the profile under test.
|
|
if hermes_home.parent.name == "profiles":
|
|
hermes_home = hermes_home.parent.parent
|
|
return hermes_home / "profiles" / profile / "config.yaml"
|
|
|
|
|
|
def profile_model_metadata(profile: str) -> dict[str, str]:
|
|
"""Return non-secret provider/model metadata from a Hermes profile config."""
|
|
provider = os.environ.get("HERMES_PROVIDER") or "unknown"
|
|
model = os.environ.get("HERMES_MODEL") or "unknown"
|
|
config_path = profile_config_path(profile)
|
|
if config_path.exists():
|
|
try:
|
|
config = yaml.safe_load(config_path.read_text()) or {}
|
|
model_config = config.get("model") or {}
|
|
provider = str(model_config.get("provider") or provider)
|
|
model = str(model_config.get("default") or model_config.get("model") or model)
|
|
except Exception: # noqa: BLE001 - metadata should not fail an eval run
|
|
pass
|
|
return {"provider": provider, "model": model, "profile_config_path": str(config_path)}
|
|
|
|
|
|
def result_row(
|
|
scenario: dict[str, Any],
|
|
*,
|
|
profile: str,
|
|
mode: str,
|
|
status: str,
|
|
output: str = "",
|
|
transcript_path: str | None = None,
|
|
error: str | None = None,
|
|
model_metadata: dict[str, str] | None = None,
|
|
) -> dict[str, Any]:
|
|
checks_config = scenario.get("deterministic_checks", {}) or {}
|
|
checks = []
|
|
if output:
|
|
checks.extend(check_required_terms(output, checks_config.get("required_terms", [])))
|
|
checks.extend(check_forbidden_terms(output, checks_config.get("forbidden_terms", [])))
|
|
check_summary = summarize_checks(checks)
|
|
score = scenario["pass_threshold"] if output and check_summary["all_passed"] else 0
|
|
passed = bool(output and score >= scenario["pass_threshold"] and status == "completed")
|
|
metadata = model_metadata or profile_model_metadata(profile)
|
|
|
|
return {
|
|
"timestamp": utc_now(),
|
|
"evaluator_version": "atlas_quality_v1",
|
|
"mode": mode,
|
|
"status": status,
|
|
"profile": profile,
|
|
"provider": metadata.get("provider", "unknown"),
|
|
"model": metadata.get("model", "unknown"),
|
|
"profile_config_path": metadata.get("profile_config_path"),
|
|
"scenario_id": scenario["id"],
|
|
"scenario_title": scenario["title"],
|
|
"dimension": scenario["dimension"],
|
|
"target_profile": scenario["target_profile"],
|
|
"toolsets_enabled": scenario.get("allowed_toolsets", []),
|
|
"score": score,
|
|
"pass_threshold": scenario["pass_threshold"],
|
|
"passed": passed,
|
|
"failure_summary": error or ("not executed" if not output else "deterministic checks failed" if not passed else ""),
|
|
"deterministic_checks": check_summary,
|
|
"transcript_path": transcript_path,
|
|
"followup_task_id": None,
|
|
}
|
|
|
|
|
|
def write_jsonl(rows: list[dict[str, Any]], output_path: Path) -> None:
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
output_path.write_text("\n".join(json.dumps(row, sort_keys=True) for row in rows) + ("\n" if rows else ""))
|
|
|
|
|
|
def append_results_note(rows: list[dict[str, Any]], note_path: Path, artifact_path: Path) -> None:
|
|
note_path.parent.mkdir(parents=True, exist_ok=True)
|
|
passed = sum(1 for row in rows if row["passed"])
|
|
failed = sum(1 for row in rows if row["status"] == "completed" and not row["passed"])
|
|
not_run = sum(1 for row in rows if row["status"] == "not_run")
|
|
dimensions = sorted({row["dimension"] for row in rows})
|
|
status = "PASS" if rows and passed == len(rows) else "WARN" if not_run else "FAIL"
|
|
lines = [
|
|
f"\n## {utc_now()} — {status}",
|
|
f"- Artifact: `{artifact_path}`",
|
|
f"- Mode: `{rows[0]['mode'] if rows else 'none'}`",
|
|
f"- Coverage: {', '.join(dimensions) if dimensions else 'none'}",
|
|
f"- Counts: {passed} passed, {failed} failed, {not_run} not run",
|
|
"- Actions: none; backlog creation is gated to blocker failures or two consecutive failures.",
|
|
]
|
|
if rows:
|
|
lines.append("- Scenarios: " + ", ".join(row["scenario_id"] for row in rows))
|
|
profile_tracks = sorted(
|
|
{
|
|
f"{row['profile']} ({row.get('provider', 'unknown')}/{row.get('model', 'unknown')}; "
|
|
f"toolsets: {', '.join(row.get('toolsets_enabled') or []) or 'none'})"
|
|
for row in rows
|
|
}
|
|
)
|
|
lines.append("- Profile/model/toolsets: " + "; ".join(profile_tracks))
|
|
if not note_path.exists():
|
|
note_path.write_text("# Atlas Quality Eval Results\n\nDurable results log for Atlas and specialist profile quality evaluation runs.\n")
|
|
with note_path.open("a") as handle:
|
|
handle.write("\n".join(lines) + "\n")
|
|
|
|
|
|
def default_output_path() -> Path:
|
|
date = dt.datetime.now().strftime("%Y-%m-%d")
|
|
return DEFAULT_RESULTS_DIR / f"{date}.jsonl"
|
|
|
|
|
|
def run_dry_run(
|
|
*,
|
|
scenarios_path: Path = DEFAULT_SCENARIOS,
|
|
output_path: Path | None = None,
|
|
profile: str | None = None,
|
|
limit: int | None = None,
|
|
ids: list[str] | None = None,
|
|
tags: list[str] | None = None,
|
|
dimensions: list[str] | None = None,
|
|
results_note: Path | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
errors = validate_scenario_file(scenarios_path)
|
|
if errors:
|
|
raise SystemExit("Fixture validation failed:\n" + "\n".join(f"- {error}" for error in errors))
|
|
selected = select_scenarios(load_scenarios(scenarios_path), ids=ids, tags=tags, dimensions=dimensions, limit=limit)
|
|
rows = [
|
|
result_row(scenario, profile=scenario_profile(scenario, profile), mode="dry_run", status="not_run")
|
|
for scenario in selected
|
|
]
|
|
output = output_path or default_output_path()
|
|
write_jsonl(rows, output)
|
|
if results_note:
|
|
append_results_note(rows, results_note, output)
|
|
return rows
|
|
|
|
|
|
def run_live(
|
|
*,
|
|
scenarios_path: Path = DEFAULT_SCENARIOS,
|
|
output_path: Path | None = None,
|
|
profile: str | None = None,
|
|
limit: int | None = None,
|
|
ids: list[str] | None = None,
|
|
tags: list[str] | None = None,
|
|
dimensions: list[str] | None = None,
|
|
results_note: Path | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
if os.environ.get("ATLAS_EVAL_ALLOW_LIVE") != "1":
|
|
raise SystemExit("Live execution refused: set ATLAS_EVAL_ALLOW_LIVE=1 to invoke Hermes agents.")
|
|
errors = validate_scenario_file(scenarios_path)
|
|
if errors:
|
|
raise SystemExit("Fixture validation failed:\n" + "\n".join(f"- {error}" for error in errors))
|
|
selected = select_scenarios(load_scenarios(scenarios_path), ids=ids, tags=tags, dimensions=dimensions, limit=limit)
|
|
rows: list[dict[str, Any]] = []
|
|
output = output_path or default_output_path()
|
|
transcript_dir = output.parent / "transcripts" / output.stem
|
|
transcript_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
for scenario in selected:
|
|
scenario_run_profile = scenario_profile(scenario, profile)
|
|
transcript_path = transcript_dir / f"{scenario['id']}.txt"
|
|
command = build_hermes_command(scenario, profile_override=profile)
|
|
try:
|
|
completed = subprocess.run(
|
|
command,
|
|
text=True,
|
|
capture_output=True,
|
|
timeout=600,
|
|
check=False,
|
|
)
|
|
transcript = completed.stdout + ("\nSTDERR:\n" + completed.stderr if completed.stderr else "")
|
|
transcript_path.write_text(transcript)
|
|
status = "completed" if completed.returncode == 0 else "error"
|
|
error = None if completed.returncode == 0 else f"hermes exited {completed.returncode}"
|
|
rows.append(
|
|
result_row(
|
|
scenario,
|
|
profile=scenario_run_profile,
|
|
mode="live",
|
|
status=status,
|
|
output=completed.stdout,
|
|
transcript_path=str(transcript_path),
|
|
error=error,
|
|
)
|
|
)
|
|
except Exception as exc: # noqa: BLE001 - persist eval failure as data
|
|
rows.append(result_row(scenario, profile=scenario_run_profile, mode="live", status="error", error=str(exc)))
|
|
write_jsonl(rows, output)
|
|
if results_note:
|
|
append_results_note(rows, results_note, output)
|
|
return rows
|
|
|
|
|
|
def parse_args(argv: list[str]) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Validate and run Atlas quality evaluation scenarios.")
|
|
parser.add_argument("--scenarios", type=Path, default=DEFAULT_SCENARIOS)
|
|
parser.add_argument(
|
|
"--profile",
|
|
default=None,
|
|
help="Optional profile override for debugging; by default each scenario runs with its target_profile.",
|
|
)
|
|
parser.add_argument("--output", type=Path)
|
|
parser.add_argument("--limit", type=int)
|
|
parser.add_argument("--id", action="append", dest="ids")
|
|
parser.add_argument("--tag", action="append", dest="tags")
|
|
parser.add_argument("--dimension", action="append", dest="dimensions")
|
|
parser.add_argument(
|
|
"--results-note",
|
|
type=Path,
|
|
default=None,
|
|
help=f"Optional human-readable note to append (example: {DEFAULT_RESULTS_NOTE})",
|
|
)
|
|
parser.add_argument("--validate-only", action="store_true")
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
parser.add_argument("--execute-live", action="store_true")
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
args = parse_args(argv or sys.argv[1:])
|
|
if args.validate_only:
|
|
errors = validate_scenario_file(args.scenarios)
|
|
if errors:
|
|
for error in errors:
|
|
print(f"ERROR: {error}")
|
|
return 1
|
|
print(f"OK: {args.scenarios} contains valid Atlas quality scenarios")
|
|
return 0
|
|
|
|
if args.execute_live:
|
|
rows = run_live(
|
|
scenarios_path=args.scenarios,
|
|
output_path=args.output,
|
|
profile=args.profile,
|
|
limit=args.limit,
|
|
ids=args.ids,
|
|
tags=args.tags,
|
|
dimensions=args.dimensions,
|
|
results_note=args.results_note,
|
|
)
|
|
else:
|
|
if not args.dry_run:
|
|
print("No execution mode selected; defaulting to --dry-run for safety.", file=sys.stderr)
|
|
rows = run_dry_run(
|
|
scenarios_path=args.scenarios,
|
|
output_path=args.output,
|
|
profile=args.profile,
|
|
limit=args.limit,
|
|
ids=args.ids,
|
|
tags=args.tags,
|
|
dimensions=args.dimensions,
|
|
results_note=args.results_note,
|
|
)
|
|
|
|
passed = sum(1 for row in rows if row["passed"])
|
|
print(f"Wrote {len(rows)} result rows ({passed} passed) to {args.output or default_output_path()}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|