feat(models): add litellm metadata sync and audit

This commit is contained in:
zap
2026-03-11 20:11:07 +00:00
parent 27211f9325
commit 9d641ba7ca
2 changed files with 555 additions and 66 deletions

314
scripts/sync-litellm-models.py Normal file → Executable file
View File

@@ -1,15 +1,20 @@
#!/usr/bin/env python3
import argparse
import json
import os
import re
import shutil
import sys
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
CONFIG_PATH = Path.home() / ".openclaw" / "openclaw.json"
WORKSPACE = Path.home() / ".openclaw" / "workspace"
METADATA_PATH = WORKSPACE / "models" / "litellm-official-metadata.json"
TIMEOUT = 12
FALLBACK_CONTEXT = 200000
FALLBACK_MAX_TOKENS = 8192
def die(msg: str, code: int = 1):
@@ -18,29 +23,79 @@ def die(msg: str, code: int = 1):
def normalize_base(url: str) -> str:
u = (url or "").rstrip("/")
if not u:
return u
return u
return (url or "").rstrip("/")
def fetch_models(base_url: str, api_key: str | None):
def load_json(path: Path) -> dict[str, Any]:
try:
return json.loads(path.read_text(encoding="utf-8"))
except FileNotFoundError:
die(f"File not found: {path}")
except json.JSONDecodeError as e:
die(f"Invalid JSON in {path}: {e}")
def resolve_json_pointer(doc: Any, pointer: str) -> Any:
if pointer in ("", "/"):
return doc
cur = doc
for raw_part in pointer.lstrip("/").split("/"):
part = raw_part.replace("~1", "/").replace("~0", "~")
if isinstance(cur, dict) and part in cur:
cur = cur[part]
else:
raise KeyError(pointer)
return cur
def resolve_api_key(raw_api_key: Any, cfg: dict[str, Any]) -> str | None:
if isinstance(raw_api_key, str) and raw_api_key.strip():
return raw_api_key.strip()
if isinstance(raw_api_key, dict):
source = raw_api_key.get("source")
if source == "env":
name = raw_api_key.get("name") or raw_api_key.get("id")
if isinstance(name, str) and name:
return os.environ.get(name)
if source == "file":
provider_id = raw_api_key.get("provider") or ((cfg.get("secrets") or {}).get("defaults") or {}).get("file")
providers = (((cfg.get("secrets") or {}).get("providers") or {}))
provider = providers.get(provider_id) if isinstance(provider_id, str) else None
if isinstance(provider, dict) and provider.get("source") == "file":
path = provider.get("path")
pointer = raw_api_key.get("id")
if isinstance(path, str) and isinstance(pointer, str):
try:
secret_doc = load_json(Path(path))
value = resolve_json_pointer(secret_doc, pointer)
if isinstance(value, str) and value:
return value
except Exception:
pass
return (
os.environ.get("LITELLM_API_KEY")
or os.environ.get("OPENAI_API_KEY")
or None
)
def fetch_json(url: str, api_key: str | None):
req = urllib.request.Request(url, method="GET")
req.add_header("Accept", "application/json")
if api_key:
req.add_header("Authorization", f"Bearer {api_key}")
with urllib.request.urlopen(req, timeout=TIMEOUT) as resp:
return json.loads(resp.read().decode("utf-8", errors="replace"))
def fetch_models_and_info(base_url: str, api_key: str | None):
url = normalize_base(base_url)
if not url:
die("litellm.baseUrl is empty")
if not url.endswith("/v1"):
url = f"{url}/v1"
models_url = f"{url}/models"
req = urllib.request.Request(models_url, method="GET")
req.add_header("Accept", "application/json")
if api_key:
req.add_header("Authorization", f"Bearer {api_key}")
with urllib.request.urlopen(req, timeout=TIMEOUT) as resp:
payload = json.loads(resp.read().decode("utf-8", errors="replace"))
# OpenAI-style: {"data": [{"id": "..."}, ...]}
payload = fetch_json(f"{url}/models", api_key)
if isinstance(payload, dict) and isinstance(payload.get("data"), list):
rows = payload["data"]
elif isinstance(payload, list):
@@ -48,44 +103,153 @@ def fetch_models(base_url: str, api_key: str | None):
else:
die(f"Unexpected /models payload shape: {type(payload).__name__}")
ids = []
model_ids: list[str] = []
model_rows: dict[str, dict[str, Any]] = {}
for row in rows:
if isinstance(row, dict):
mid = row.get("id") or row.get("model")
else:
mid = None
if not isinstance(row, dict):
continue
mid = row.get("id") or row.get("model")
if isinstance(mid, str) and mid.strip():
ids.append(mid.strip())
mid = mid.strip()
if mid not in model_rows:
model_ids.append(mid)
model_rows[mid] = row
# stable unique preserve order
seen = set()
out = []
for mid in ids:
if mid not in seen:
seen.add(mid)
out.append(mid)
model_info = None
info_error = None
try:
model_info = fetch_json(f"{normalize_base(base_url)}/model/info", api_key)
except Exception as e: # best effort
info_error = str(e)
return model_ids, model_rows, model_info, info_error
def load_metadata(path: Path) -> dict[str, Any]:
if not path.exists():
return {"models": {}}
data = load_json(path)
models = data.get("models")
if not isinstance(models, dict):
die(f"Metadata file {path} must contain an object at key 'models'")
return data
def pick_model_info(model_id: str, model_rows: dict[str, dict[str, Any]], model_info: Any):
row = model_rows.get(model_id) or {}
info = None
if isinstance(model_info, dict):
if isinstance(model_info.get("data"), list):
for item in model_info["data"]:
if isinstance(item, dict) and item.get("model_name") == model_id:
info = item
break
if info is None and isinstance(model_info.get(model_id), dict):
info = model_info.get(model_id)
return row, info
def clean_input(value: Any) -> list[str] | None:
if isinstance(value, list):
out = [x for x in value if isinstance(x, str) and x]
return out or None
return None
def metadata_from_litellm(model_id: str, model_rows: dict[str, dict[str, Any]], model_info: Any) -> dict[str, Any]:
row, info = pick_model_info(model_id, model_rows, model_info)
out: dict[str, Any] = {}
for source in (row, info):
if not isinstance(source, dict):
continue
for src_key, dst_key in (
("context_window", "contextWindow"),
("max_input_tokens", "contextWindow"),
("max_output_tokens", "maxTokens"),
("max_tokens", "maxTokens"),
):
val = source.get(src_key)
if isinstance(val, int) and val > 0 and dst_key not in out:
out[dst_key] = val
inp = clean_input(source.get("input_types") or source.get("input"))
if inp and "input" not in out:
out["input"] = inp
reasoning = source.get("supports_reasoning")
if isinstance(reasoning, bool) and "reasoning" not in out:
out["reasoning"] = reasoning
if out:
out["source"] = "litellm-api"
return out
def merge_metadata(existing: dict[str, Any], official: dict[str, Any], litellm_meta: dict[str, Any], model_id: str) -> tuple[dict[str, Any], str]:
merged = dict(existing)
merged["id"] = model_id
merged.setdefault("name", existing.get("name") if isinstance(existing.get("name"), str) else model_id)
source_used = "existing-config"
for field in ("contextWindow", "maxTokens", "input", "reasoning"):
if field in official and official[field] not in (None, [], ""):
merged[field] = official[field]
source_used = official.get("source", "official-metadata")
elif field in litellm_meta and litellm_meta[field] not in (None, [], "") and field not in merged:
merged[field] = litellm_meta[field]
source_used = litellm_meta.get("source", "litellm-api")
elif field not in merged:
if field == "contextWindow":
merged[field] = FALLBACK_CONTEXT
source_used = "fallback-default"
elif field == "maxTokens":
merged[field] = FALLBACK_MAX_TOKENS
source_used = "fallback-default"
elif field == "input":
merged[field] = ["text"]
elif field == "reasoning":
merged[field] = False
merged["metadataSource"] = source_used
return merged, source_used
def build_sync_report(models: list[dict[str, Any]], official_meta: dict[str, Any]):
fallback = [m["id"] for m in models if m.get("metadataSource") == "fallback-default"]
from_official = [m["id"] for m in models if str(m.get("metadataSource", "")).startswith("official-")]
missing_official = [m["id"] for m in models if m["id"] not in official_meta]
return {
"total": len(models),
"officialCount": len(from_official),
"fallbackCount": len(fallback),
"fallbackModels": fallback,
"missingOfficialMetadata": missing_official,
}
def main():
if not CONFIG_PATH.exists():
die(f"Config not found: {CONFIG_PATH}")
raw = CONFIG_PATH.read_text(encoding="utf-8")
cfg = json.loads(raw)
parser = argparse.ArgumentParser(description="Sync LiteLLM model ids and metadata into OpenClaw config")
parser.add_argument("--config", type=Path, default=CONFIG_PATH)
parser.add_argument("--metadata", type=Path, default=METADATA_PATH)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--audit-only", action="store_true")
parser.add_argument("--json", action="store_true", help="Print report as JSON")
args = parser.parse_args()
cfg = load_json(args.config)
providers = (((cfg.get("models") or {}).get("providers") or {}))
litellm = providers.get("litellm")
if not isinstance(litellm, dict):
die("models.providers.litellm not found")
base_url = litellm.get("baseUrl")
api_key = litellm.get("apiKey") or os.environ.get("LITELLM_API_KEY")
model_ids = fetch_models(base_url, api_key)
api_key = resolve_api_key(litellm.get("apiKey"), cfg)
model_ids, model_rows, model_info, info_error = fetch_models_and_info(base_url, api_key)
if not model_ids:
die("No models returned from LiteLLM /v1/models")
meta_file = load_metadata(args.metadata)
official_models = meta_file.get("models", {})
if not isinstance(official_models, dict):
die("metadata models must be an object")
existing_models = litellm.get("models") if isinstance(litellm.get("models"), list) else []
existing_by_id = {
m.get("id"): m
@@ -95,39 +259,57 @@ def main():
new_models = []
for mid in model_ids:
if mid in existing_by_id:
m = dict(existing_by_id[mid])
m["id"] = mid
m.setdefault("name", mid)
m.setdefault("input", ["text"])
new_models.append(m)
else:
new_models.append({"id": mid, "name": mid, "input": ["text"]})
existing = dict(existing_by_id.get(mid, {}))
official = official_models.get(mid, {}) if isinstance(official_models.get(mid), dict) else {}
litellm_meta = metadata_from_litellm(mid, model_rows, model_info)
merged, _ = merge_metadata(existing, official, litellm_meta, mid)
new_models.append(merged)
litellm["models"] = new_models
report = build_sync_report(new_models, official_models)
if info_error:
report["modelInfoWarning"] = info_error
# Sync agents.defaults.models entries: keep non-litellm, rebuild litellm/* only.
defaults = ((cfg.get("agents") or {}).get("defaults") or {})
model_map = defaults.get("models") if isinstance(defaults.get("models"), dict) else {}
if not args.audit_only:
litellm["models"] = new_models
defaults = ((cfg.get("agents") or {}).get("defaults") or {})
model_map = defaults.get("models") if isinstance(defaults.get("models"), dict) else {}
preserved = {k: v for k, v in model_map.items() if not k.startswith("litellm/")}
for model in new_models:
key = f"litellm/{model['id']}"
entry = model_map.get(key, {}) if isinstance(model_map.get(key), dict) else {}
preserved[key] = entry
defaults["models"] = preserved
preserved = {k: v for k, v in model_map.items() if not k.startswith("litellm/")}
if not args.dry_run:
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
backup = args.config.with_suffix(f".json.bak-{ts}")
shutil.copy2(args.config, backup)
args.config.write_text(json.dumps(cfg, indent=2) + "\n", encoding="utf-8")
report["backup"] = str(backup)
report["updatedConfig"] = str(args.config)
# preserve any existing per-model settings for still-present models
for mid in model_ids:
key = f"litellm/{mid}"
preserved[key] = model_map.get(key, {})
defaults["models"] = preserved
# write backup + updated config
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
backup = CONFIG_PATH.with_suffix(f".json.bak-{ts}")
shutil.copy2(CONFIG_PATH, backup)
CONFIG_PATH.write_text(json.dumps(cfg, indent=2) + "\n", encoding="utf-8")
print(f"Synced {len(model_ids)} LiteLLM models")
print(f"Backup: {backup}")
if args.json:
print(json.dumps(report, indent=2))
else:
print(f"Synced {report['total']} LiteLLM models")
print(f"Official metadata: {report['officialCount']}")
print(f"Fallback metadata: {report['fallbackCount']}")
if report.get("missingOfficialMetadata"):
print("Missing official metadata:")
for mid in report["missingOfficialMetadata"][:30]:
print(f" - {mid}")
if len(report["missingOfficialMetadata"]) > 30:
print(f" ... +{len(report['missingOfficialMetadata']) - 30} more")
if report.get("fallbackModels"):
print("Still using fallback defaults:")
for mid in report["fallbackModels"][:30]:
print(f" - {mid}")
if len(report["fallbackModels"]) > 30:
print(f" ... +{len(report['fallbackModels']) - 30} more")
if report.get("modelInfoWarning"):
print(f"LiteLLM /model/info warning: {report['modelInfoWarning']}")
if report.get("backup"):
print(f"Backup: {report['backup']}")
if __name__ == "__main__":