Files
swarm-master/swarm-common/agent-evals/atlas_quality/run_eval_suite.py

441 lines
17 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import datetime as dt
import json
import os
import subprocess
import sys
from pathlib import Path
from typing import Any
import yaml
try:
from judges import check_forbidden_terms, check_required_terms, find_secret_like_strings, summarize_checks
except ImportError: # pragma: no cover - supports importlib tests from arbitrary cwd
sys.path.insert(0, str(Path(__file__).resolve().parent))
from judges import check_forbidden_terms, check_required_terms, find_secret_like_strings, summarize_checks
ROOT = Path(__file__).resolve().parent
DEFAULT_SCENARIOS = ROOT / "scenarios.yaml"
DEFAULT_RESULTS_DIR = ROOT / "results"
DEFAULT_RESULTS_NOTE = Path(
"/home/will/lab/swarm/swarm-common/obsidian-vault/will/will-shared-zap/Projects/Atlas Quality Eval Results.md"
)
DEFAULT_HERMES_HOME = Path("/home/will/.hermes")
REQUIRED_DIMENSIONS = {
"routing_delegation",
"coding_tests",
"review_quality",
"research_citations",
"ops_safety",
"local_model_subtasks",
}
REQUIRED_FIELDS = {
"id",
"title",
"dimension",
"target_profile",
"prompt",
"setup",
"allowed_toolsets",
"expected_behaviors",
"forbidden_behaviors",
"scoring_rubric",
"pass_threshold",
"tags",
}
def utc_now() -> str:
return dt.datetime.now(dt.UTC).isoformat(timespec="seconds")
def load_scenarios(path: Path = DEFAULT_SCENARIOS) -> list[dict[str, Any]]:
data = yaml.safe_load(path.read_text())
if not isinstance(data, dict) or not isinstance(data.get("scenarios"), list):
raise ValueError(f"{path} must contain a top-level scenarios list")
return data["scenarios"]
def _walk_text(value: Any) -> str:
if isinstance(value, str):
return value
if isinstance(value, dict):
return "\n".join(_walk_text(item) for item in value.values())
if isinstance(value, list):
return "\n".join(_walk_text(item) for item in value)
return ""
def validate_scenarios(scenarios: list[dict[str, Any]]) -> list[str]:
errors: list[str] = []
seen_ids: set[str] = set()
dimension_counts = {dimension: 0 for dimension in REQUIRED_DIMENSIONS}
for index, scenario in enumerate(scenarios, start=1):
sid = scenario.get("id", f"<scenario #{index}>")
missing = sorted(REQUIRED_FIELDS - scenario.keys())
if missing:
errors.append(f"{sid}: missing required fields: {', '.join(missing)}")
if sid in seen_ids:
errors.append(f"{sid}: duplicate scenario id")
seen_ids.add(sid)
dimension = scenario.get("dimension")
if dimension not in REQUIRED_DIMENSIONS:
errors.append(f"{sid}: unsupported dimension {dimension!r}")
else:
dimension_counts[dimension] += 1
for list_field in ("allowed_toolsets", "expected_behaviors", "forbidden_behaviors", "scoring_rubric", "tags"):
if list_field in scenario and not scenario[list_field]:
errors.append(f"{sid}: {list_field} must not be empty")
threshold = scenario.get("pass_threshold")
if not isinstance(threshold, int | float) or threshold <= 0:
errors.append(f"{sid}: pass_threshold must be a positive number")
secret_hits = find_secret_like_strings(_walk_text(scenario))
if secret_hits:
errors.append(f"{sid}: secret-like value found in fixture text: {', '.join(secret_hits)}")
for dimension, count in dimension_counts.items():
if count < 2:
errors.append(f"{dimension}: expected at least 2 scenarios, found {count}")
return errors
def validate_scenario_file(path: Path = DEFAULT_SCENARIOS) -> list[str]:
try:
scenarios = load_scenarios(path)
except Exception as exc: # noqa: BLE001 - validator reports user-facing errors
return [f"{path}: {exc}"]
return validate_scenarios(scenarios)
def select_scenarios(
scenarios: list[dict[str, Any]],
*,
ids: list[str] | None = None,
tags: list[str] | None = None,
dimensions: list[str] | None = None,
limit: int | None = None,
) -> list[dict[str, Any]]:
selected = scenarios
if ids:
wanted = set(ids)
selected = [scenario for scenario in selected if scenario["id"] in wanted]
if tags:
wanted_tags = set(tags)
selected = [scenario for scenario in selected if wanted_tags.intersection(scenario.get("tags", []))]
if dimensions:
wanted_dimensions = set(dimensions)
selected = [scenario for scenario in selected if scenario.get("dimension") in wanted_dimensions]
if limit is not None:
selected = selected[:limit]
return selected
def scenario_profile(scenario: dict[str, Any], profile_override: str | None = None) -> str:
"""Return the profile a live eval should invoke for this scenario."""
return profile_override or scenario["target_profile"]
def scenario_toolsets(scenario: dict[str, Any]) -> list[str]:
"""Return the explicit toolset allowlist for a scenario."""
return list(scenario.get("allowed_toolsets") or [])
def build_agent_prompt(scenario: dict[str, Any]) -> str:
"""Build the prompt shown to the agent under test without leaking the judge rubric."""
setup = scenario.get("setup") or {}
return (
"You are running an Atlas quality evaluation scenario in a safe synthetic context.\n"
"Do not perform real external writes, service restarts, destructive shell commands, credential access, "
"or real Kanban mutations. If a task would require side effects, describe the safe action plan instead.\n"
f"Scenario id: {scenario['id']}\n"
f"Synthetic setup: {json.dumps(setup, sort_keys=True)}\n\n"
f"User prompt:\n{scenario['prompt']}"
)
def build_hermes_command(scenario: dict[str, Any], *, profile_override: str | None = None) -> list[str]:
"""Construct the live Hermes command with per-scenario profile and toolset constraints."""
profile = scenario_profile(scenario, profile_override)
command = ["hermes", "--profile", profile, "chat"]
toolsets = scenario_toolsets(scenario)
if toolsets:
command.extend(["-t", ",".join(toolsets)])
command.extend(["-q", build_agent_prompt(scenario)])
return command
def profile_config_path(profile: str) -> Path:
hermes_home = Path(os.environ.get("HERMES_HOME", str(DEFAULT_HERMES_HOME))).expanduser()
# Worker processes may run with HERMES_HOME pointed at a specific profile
# directory (for example ~/.hermes/profiles/reviewer). Normalize back to
# the global Hermes home before resolving the profile under test.
if hermes_home.parent.name == "profiles":
hermes_home = hermes_home.parent.parent
return hermes_home / "profiles" / profile / "config.yaml"
def profile_model_metadata(profile: str) -> dict[str, str]:
"""Return non-secret provider/model metadata from a Hermes profile config."""
provider = os.environ.get("HERMES_PROVIDER") or "unknown"
model = os.environ.get("HERMES_MODEL") or "unknown"
config_path = profile_config_path(profile)
if config_path.exists():
try:
config = yaml.safe_load(config_path.read_text()) or {}
model_config = config.get("model") or {}
provider = str(model_config.get("provider") or provider)
model = str(model_config.get("default") or model_config.get("model") or model)
except Exception: # noqa: BLE001 - metadata should not fail an eval run
pass
return {"provider": provider, "model": model, "profile_config_path": str(config_path)}
def result_row(
scenario: dict[str, Any],
*,
profile: str,
mode: str,
status: str,
output: str = "",
transcript_path: str | None = None,
error: str | None = None,
model_metadata: dict[str, str] | None = None,
) -> dict[str, Any]:
checks_config = scenario.get("deterministic_checks", {}) or {}
checks = []
if output:
checks.extend(check_required_terms(output, checks_config.get("required_terms", [])))
checks.extend(check_forbidden_terms(output, checks_config.get("forbidden_terms", [])))
check_summary = summarize_checks(checks)
score = scenario["pass_threshold"] if output and check_summary["all_passed"] else 0
passed = bool(output and score >= scenario["pass_threshold"] and status == "completed")
metadata = model_metadata or profile_model_metadata(profile)
return {
"timestamp": utc_now(),
"evaluator_version": "atlas_quality_v1",
"mode": mode,
"status": status,
"profile": profile,
"provider": metadata.get("provider", "unknown"),
"model": metadata.get("model", "unknown"),
"profile_config_path": metadata.get("profile_config_path"),
"scenario_id": scenario["id"],
"scenario_title": scenario["title"],
"dimension": scenario["dimension"],
"target_profile": scenario["target_profile"],
"toolsets_enabled": scenario.get("allowed_toolsets", []),
"score": score,
"pass_threshold": scenario["pass_threshold"],
"passed": passed,
"failure_summary": error or ("not executed" if not output else "deterministic checks failed" if not passed else ""),
"deterministic_checks": check_summary,
"transcript_path": transcript_path,
"followup_task_id": None,
}
def write_jsonl(rows: list[dict[str, Any]], output_path: Path) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text("\n".join(json.dumps(row, sort_keys=True) for row in rows) + ("\n" if rows else ""))
def append_results_note(rows: list[dict[str, Any]], note_path: Path, artifact_path: Path) -> None:
note_path.parent.mkdir(parents=True, exist_ok=True)
passed = sum(1 for row in rows if row["passed"])
failed = sum(1 for row in rows if row["status"] == "completed" and not row["passed"])
not_run = sum(1 for row in rows if row["status"] == "not_run")
dimensions = sorted({row["dimension"] for row in rows})
status = "PASS" if rows and passed == len(rows) else "WARN" if not_run else "FAIL"
lines = [
f"\n## {utc_now()}{status}",
f"- Artifact: `{artifact_path}`",
f"- Mode: `{rows[0]['mode'] if rows else 'none'}`",
f"- Coverage: {', '.join(dimensions) if dimensions else 'none'}",
f"- Counts: {passed} passed, {failed} failed, {not_run} not run",
"- Actions: none; backlog creation is gated to blocker failures or two consecutive failures.",
]
if rows:
lines.append("- Scenarios: " + ", ".join(row["scenario_id"] for row in rows))
profile_tracks = sorted(
{
f"{row['profile']} ({row.get('provider', 'unknown')}/{row.get('model', 'unknown')}; "
f"toolsets: {', '.join(row.get('toolsets_enabled') or []) or 'none'})"
for row in rows
}
)
lines.append("- Profile/model/toolsets: " + "; ".join(profile_tracks))
if not note_path.exists():
note_path.write_text("# Atlas Quality Eval Results\n\nDurable results log for Atlas and specialist profile quality evaluation runs.\n")
with note_path.open("a") as handle:
handle.write("\n".join(lines) + "\n")
def default_output_path() -> Path:
date = dt.datetime.now().strftime("%Y-%m-%d")
return DEFAULT_RESULTS_DIR / f"{date}.jsonl"
def run_dry_run(
*,
scenarios_path: Path = DEFAULT_SCENARIOS,
output_path: Path | None = None,
profile: str | None = None,
limit: int | None = None,
ids: list[str] | None = None,
tags: list[str] | None = None,
dimensions: list[str] | None = None,
results_note: Path | None = None,
) -> list[dict[str, Any]]:
errors = validate_scenario_file(scenarios_path)
if errors:
raise SystemExit("Fixture validation failed:\n" + "\n".join(f"- {error}" for error in errors))
selected = select_scenarios(load_scenarios(scenarios_path), ids=ids, tags=tags, dimensions=dimensions, limit=limit)
rows = [
result_row(scenario, profile=scenario_profile(scenario, profile), mode="dry_run", status="not_run")
for scenario in selected
]
output = output_path or default_output_path()
write_jsonl(rows, output)
if results_note:
append_results_note(rows, results_note, output)
return rows
def run_live(
*,
scenarios_path: Path = DEFAULT_SCENARIOS,
output_path: Path | None = None,
profile: str | None = None,
limit: int | None = None,
ids: list[str] | None = None,
tags: list[str] | None = None,
dimensions: list[str] | None = None,
results_note: Path | None = None,
) -> list[dict[str, Any]]:
if os.environ.get("ATLAS_EVAL_ALLOW_LIVE") != "1":
raise SystemExit("Live execution refused: set ATLAS_EVAL_ALLOW_LIVE=1 to invoke Hermes agents.")
errors = validate_scenario_file(scenarios_path)
if errors:
raise SystemExit("Fixture validation failed:\n" + "\n".join(f"- {error}" for error in errors))
selected = select_scenarios(load_scenarios(scenarios_path), ids=ids, tags=tags, dimensions=dimensions, limit=limit)
rows: list[dict[str, Any]] = []
output = output_path or default_output_path()
transcript_dir = output.parent / "transcripts" / output.stem
transcript_dir.mkdir(parents=True, exist_ok=True)
for scenario in selected:
scenario_run_profile = scenario_profile(scenario, profile)
transcript_path = transcript_dir / f"{scenario['id']}.txt"
command = build_hermes_command(scenario, profile_override=profile)
try:
completed = subprocess.run(
command,
text=True,
capture_output=True,
timeout=600,
check=False,
)
transcript = completed.stdout + ("\nSTDERR:\n" + completed.stderr if completed.stderr else "")
transcript_path.write_text(transcript)
status = "completed" if completed.returncode == 0 else "error"
error = None if completed.returncode == 0 else f"hermes exited {completed.returncode}"
rows.append(
result_row(
scenario,
profile=scenario_run_profile,
mode="live",
status=status,
output=completed.stdout,
transcript_path=str(transcript_path),
error=error,
)
)
except Exception as exc: # noqa: BLE001 - persist eval failure as data
rows.append(result_row(scenario, profile=scenario_run_profile, mode="live", status="error", error=str(exc)))
write_jsonl(rows, output)
if results_note:
append_results_note(rows, results_note, output)
return rows
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Validate and run Atlas quality evaluation scenarios.")
parser.add_argument("--scenarios", type=Path, default=DEFAULT_SCENARIOS)
parser.add_argument(
"--profile",
default=None,
help="Optional profile override for debugging; by default each scenario runs with its target_profile.",
)
parser.add_argument("--output", type=Path)
parser.add_argument("--limit", type=int)
parser.add_argument("--id", action="append", dest="ids")
parser.add_argument("--tag", action="append", dest="tags")
parser.add_argument("--dimension", action="append", dest="dimensions")
parser.add_argument(
"--results-note",
type=Path,
default=None,
help=f"Optional human-readable note to append (example: {DEFAULT_RESULTS_NOTE})",
)
parser.add_argument("--validate-only", action="store_true")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--execute-live", action="store_true")
return parser.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv or sys.argv[1:])
if args.validate_only:
errors = validate_scenario_file(args.scenarios)
if errors:
for error in errors:
print(f"ERROR: {error}")
return 1
print(f"OK: {args.scenarios} contains valid Atlas quality scenarios")
return 0
if args.execute_live:
rows = run_live(
scenarios_path=args.scenarios,
output_path=args.output,
profile=args.profile,
limit=args.limit,
ids=args.ids,
tags=args.tags,
dimensions=args.dimensions,
results_note=args.results_note,
)
else:
if not args.dry_run:
print("No execution mode selected; defaulting to --dry-run for safety.", file=sys.stderr)
rows = run_dry_run(
scenarios_path=args.scenarios,
output_path=args.output,
profile=args.profile,
limit=args.limit,
ids=args.ids,
tags=args.tags,
dimensions=args.dimensions,
results_note=args.results_note,
)
passed = sum(1 for row in rows if row["passed"])
print(f"Wrote {len(rows)} result rows ({passed} passed) to {args.output or default_output_path()}")
return 0
if __name__ == "__main__":
raise SystemExit(main())