135 lines
3.8 KiB
Python
135 lines
3.8 KiB
Python
#!/usr/bin/env python3
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import sys
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
CONFIG_PATH = Path.home() / ".openclaw" / "openclaw.json"
|
|
TIMEOUT = 12
|
|
|
|
|
|
def die(msg: str, code: int = 1):
|
|
print(f"ERROR: {msg}", file=sys.stderr)
|
|
raise SystemExit(code)
|
|
|
|
|
|
def normalize_base(url: str) -> str:
|
|
u = (url or "").rstrip("/")
|
|
if not u:
|
|
return u
|
|
return u
|
|
|
|
|
|
def fetch_models(base_url: str, api_key: str | None):
|
|
url = normalize_base(base_url)
|
|
if not url:
|
|
die("litellm.baseUrl is empty")
|
|
if not url.endswith("/v1"):
|
|
url = f"{url}/v1"
|
|
models_url = f"{url}/models"
|
|
|
|
req = urllib.request.Request(models_url, method="GET")
|
|
req.add_header("Accept", "application/json")
|
|
if api_key:
|
|
req.add_header("Authorization", f"Bearer {api_key}")
|
|
|
|
with urllib.request.urlopen(req, timeout=TIMEOUT) as resp:
|
|
payload = json.loads(resp.read().decode("utf-8", errors="replace"))
|
|
|
|
# OpenAI-style: {"data": [{"id": "..."}, ...]}
|
|
if isinstance(payload, dict) and isinstance(payload.get("data"), list):
|
|
rows = payload["data"]
|
|
elif isinstance(payload, list):
|
|
rows = payload
|
|
else:
|
|
die(f"Unexpected /models payload shape: {type(payload).__name__}")
|
|
|
|
ids = []
|
|
for row in rows:
|
|
if isinstance(row, dict):
|
|
mid = row.get("id") or row.get("model")
|
|
else:
|
|
mid = None
|
|
if isinstance(mid, str) and mid.strip():
|
|
ids.append(mid.strip())
|
|
|
|
# stable unique preserve order
|
|
seen = set()
|
|
out = []
|
|
for mid in ids:
|
|
if mid not in seen:
|
|
seen.add(mid)
|
|
out.append(mid)
|
|
return out
|
|
|
|
|
|
def main():
|
|
if not CONFIG_PATH.exists():
|
|
die(f"Config not found: {CONFIG_PATH}")
|
|
|
|
raw = CONFIG_PATH.read_text(encoding="utf-8")
|
|
cfg = json.loads(raw)
|
|
|
|
providers = (((cfg.get("models") or {}).get("providers") or {}))
|
|
litellm = providers.get("litellm")
|
|
if not isinstance(litellm, dict):
|
|
die("models.providers.litellm not found")
|
|
|
|
base_url = litellm.get("baseUrl")
|
|
api_key = litellm.get("apiKey") or os.environ.get("LITELLM_API_KEY")
|
|
|
|
model_ids = fetch_models(base_url, api_key)
|
|
if not model_ids:
|
|
die("No models returned from LiteLLM /v1/models")
|
|
|
|
existing_models = litellm.get("models") if isinstance(litellm.get("models"), list) else []
|
|
existing_by_id = {
|
|
m.get("id"): m
|
|
for m in existing_models
|
|
if isinstance(m, dict) and isinstance(m.get("id"), str)
|
|
}
|
|
|
|
new_models = []
|
|
for mid in model_ids:
|
|
if mid in existing_by_id:
|
|
m = dict(existing_by_id[mid])
|
|
m["id"] = mid
|
|
m.setdefault("name", mid)
|
|
m.setdefault("input", ["text"])
|
|
new_models.append(m)
|
|
else:
|
|
new_models.append({"id": mid, "name": mid, "input": ["text"]})
|
|
|
|
litellm["models"] = new_models
|
|
|
|
# Sync agents.defaults.models entries: keep non-litellm, rebuild litellm/* only.
|
|
defaults = ((cfg.get("agents") or {}).get("defaults") or {})
|
|
model_map = defaults.get("models") if isinstance(defaults.get("models"), dict) else {}
|
|
|
|
preserved = {k: v for k, v in model_map.items() if not k.startswith("litellm/")}
|
|
|
|
# preserve any existing per-model settings for still-present models
|
|
for mid in model_ids:
|
|
key = f"litellm/{mid}"
|
|
preserved[key] = model_map.get(key, {})
|
|
|
|
defaults["models"] = preserved
|
|
|
|
# write backup + updated config
|
|
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
|
backup = CONFIG_PATH.with_suffix(f".json.bak-{ts}")
|
|
shutil.copy2(CONFIG_PATH, backup)
|
|
|
|
CONFIG_PATH.write_text(json.dumps(cfg, indent=2) + "\n", encoding="utf-8")
|
|
|
|
print(f"Synced {len(model_ids)} LiteLLM models")
|
|
print(f"Backup: {backup}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|