Files
swarm-master/openvino-advisory-gateway/gateway.py
T
2026-06-04 16:03:52 -07:00

351 lines
14 KiB
Python

#!/usr/bin/env python3
"""Local-only advisory gateway for OpenVINO NPU sidecars.
This service deliberately returns bounded advisory envelopes. It never routes,
writes memory, sends external messages, executes tools, restarts services, or
broadens document processing authority. Atlas/Hermes may use these outputs as
hints only.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import sqlite3
import time
import urllib.request
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any, Callable
from urllib.parse import urlparse
HOST = "127.0.0.1"
PORT = 18830
CLASSIFIER_URL = "http://127.0.0.1:18819/v1/classify"
GENAI_URL = "http://127.0.0.1:18820/v1/worker/generate"
DOC_TRIAGE_URL = "http://127.0.0.1:18829/triage"
DEFAULT_LOG_DB = Path(os.environ.get("NPU_ADVISORY_LOG_DB", "/home/will/.local/state/openvino-advisory-gateway/events.sqlite"))
DEFAULT_ALLOWED_ROOT = Path("/home/will/lab/swarm/openvino-doc-image-triage-npu")
DEFAULT_ALLOWED_ROOTS = [Path(p) for p in os.environ.get("NPU_ADVISORY_ALLOWED_ROOTS", str(DEFAULT_ALLOWED_ROOT)).split(os.pathsep) if p]
ALLOWED_GENAI_JOBS = {"title", "summary", "notification", "memory_candidate"}
AUTHORITY = {
"may_route": False,
"may_write_memory": False,
"may_send_external": False,
"may_process_private_dirs": False,
"may_execute_tools": False,
"may_restart_services": False,
}
def sha256_text(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def http_post_json(url: str, payload: dict[str, Any], timeout_s: float = 20.0) -> dict[str, Any]:
req = urllib.request.Request(url, data=json.dumps(payload).encode("utf-8"), headers={"Content-Type": "application/json"}, method="POST")
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
return json.loads(resp.read().decode("utf-8"))
def http_get_json(url: str, timeout_s: float = 8.0) -> dict[str, Any]:
with urllib.request.urlopen(url, timeout=timeout_s) as resp:
body = resp.read().decode("utf-8")
try:
return json.loads(body)
except json.JSONDecodeError:
return {"ok": True, "raw_text": body[:120]}
def _npu_delta_from(result: dict[str, Any], fallback: int | None = None) -> int | None:
for key in ("npu_busy_delta_us", "sysfs_npu_busy_delta_us"):
value = result.get(key)
if isinstance(value, int):
return value
if isinstance(value, float):
return int(value)
return fallback
def _doc_triage_npu_delta(result: dict[str, Any]) -> int | None:
pages = ((result.get("result") or {}).get("pages") or []) if isinstance(result, dict) else []
best: int | None = None
for page in pages:
emb = ((page.get("needs_attention") or {}).get("embedding") or {}) if isinstance(page, dict) else {}
delta = emb.get("npu_busy_delta_us")
if isinstance(delta, int):
best = max(best or 0, delta)
return best
def build_envelope(
*,
service: str,
operation: str,
result: dict[str, Any],
mode: str = "advisory",
input_scope: str,
npu_busy_delta_us: int | None,
trace_id: str | None = None,
warnings: list[str] | None = None,
) -> dict[str, Any]:
npu_ok = bool(isinstance(npu_busy_delta_us, int) and npu_busy_delta_us > 0)
return {
"ok": True,
"schema": "openvino_advisory_v1",
"service": service,
"operation": operation,
"mode": mode,
"trace_id": trace_id,
"input_scope": input_scope,
"result": result,
"npu_proof": {"required": True, "ok": npu_ok, "npu_busy_delta_us": npu_busy_delta_us},
"authority": dict(AUTHORITY),
"warnings": warnings or [],
}
class AdvisoryLogger:
def __init__(self, db_path: str | Path = DEFAULT_LOG_DB):
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init()
def _init(self) -> None:
with sqlite3.connect(self.db_path) as con:
con.execute(
"""
CREATE TABLE IF NOT EXISTS advisory_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at REAL NOT NULL,
service TEXT NOT NULL,
operation TEXT NOT NULL,
mode TEXT NOT NULL,
input_scope TEXT NOT NULL,
input_ref TEXT NOT NULL,
npu_busy_delta_us INTEGER,
ok INTEGER NOT NULL,
raw_payload TEXT
)
"""
)
def log(self, envelope: dict[str, Any], *, input_ref: str) -> None:
proof = envelope.get("npu_proof") or {}
with sqlite3.connect(self.db_path) as con:
con.execute(
"""
INSERT INTO advisory_events(created_at, service, operation, mode, input_scope, input_ref,
npu_busy_delta_us, ok, raw_payload)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, NULL)
""",
(
time.time(),
str(envelope.get("service")),
str(envelope.get("operation")),
str(envelope.get("mode")),
str(envelope.get("input_scope")),
input_ref,
proof.get("npu_busy_delta_us"),
1 if envelope.get("ok") else 0,
),
)
def classify_text(
text: str,
*,
trace_id: str | None = None,
http_post_json: Callable[[str, dict[str, Any], float], dict[str, Any]] = http_post_json,
logger: AdvisoryLogger | None = None,
timeout_s: float = 20.0,
) -> dict[str, Any]:
if not isinstance(text, str) or not text.strip():
raise ValueError("text must be a non-empty string")
payload = {"id": trace_id or "advisory", "text": text, "options": {"include_evidence": False, "dry_run": True}}
result = http_post_json(CLASSIFIER_URL, payload, timeout_s)
envelope = build_envelope(
service="classifier",
operation="classify",
mode="shadow",
input_scope="explicit_text",
trace_id=trace_id,
result={"labels": result.get("labels", {}), "model": result.get("model"), "service_mode": result.get("mode", "dry_run")},
npu_busy_delta_us=_npu_delta_from(result),
)
if logger:
logger.log(envelope, input_ref="text:sha256:" + sha256_text(text))
return envelope
def generate_bounded(
job: str,
text: str,
*,
max_new_tokens: int | None = None,
trace_id: str | None = None,
http_post_json: Callable[[str, dict[str, Any], float], dict[str, Any]] = http_post_json,
logger: AdvisoryLogger | None = None,
timeout_s: float = 180.0,
) -> dict[str, Any]:
if job not in ALLOWED_GENAI_JOBS:
raise ValueError("unsupported advisory generation job")
if not isinstance(text, str) or not text.strip():
raise ValueError("input must be a non-empty string")
payload: dict[str, Any] = {"job": job, "input": text}
if max_new_tokens is not None:
payload["max_new_tokens"] = max_new_tokens
result = http_post_json(GENAI_URL, payload, timeout_s)
envelope = build_envelope(
service="genai",
operation=f"generate:{job}",
mode="draft",
input_scope="explicit_text",
trace_id=trace_id,
result={"draft_text": result.get("text", ""), "json": result.get("json"), "timing_ms": result.get("timing_ms"), "final_authority": False},
npu_busy_delta_us=_npu_delta_from(result),
)
if logger:
logger.log(envelope, input_ref="text:sha256:" + sha256_text(text))
return envelope
def _resolve_allowed(path: str, allowed_roots: list[str] | None, configured_roots: list[Path] | None = None) -> tuple[Path, list[Path]]:
configured = [p.expanduser().resolve() for p in (configured_roots or DEFAULT_ALLOWED_ROOTS)]
if not configured:
raise ValueError("at least one configured allowed root is required")
requested = [Path(p).expanduser().resolve() for p in (allowed_roots or [str(p) for p in configured])]
if not requested:
raise ValueError("at least one requested allowed root is required")
for root in requested:
if not any(root == base or root.is_relative_to(base) for base in configured):
raise ValueError("requested allowed root is outside configured roots")
roots = requested
candidate = Path(path).expanduser().resolve()
if not any(candidate == root or candidate.is_relative_to(root) for root in roots):
raise ValueError("path must be inside an allowed root")
if not candidate.exists() or not candidate.is_file():
raise ValueError("path must be an existing file")
return candidate, roots
def triage_file(
path: str,
*,
allowed_roots: list[str] | None = None,
configured_roots: list[Path] | None = None,
trace_id: str | None = None,
http_post_json: Callable[[str, dict[str, Any], float], dict[str, Any]] = http_post_json,
logger: AdvisoryLogger | None = None,
timeout_s: float = 60.0,
) -> dict[str, Any]:
candidate, roots = _resolve_allowed(path, allowed_roots, configured_roots)
payload = {"path": str(candidate), "options": {"allowed_roots": [str(r) for r in roots], "max_pages": 3}}
result = http_post_json(DOC_TRIAGE_URL, payload, timeout_s)
delta = _doc_triage_npu_delta(result)
envelope = build_envelope(
service="doc_triage",
operation="triage_file",
mode="reviewable_artifact",
input_scope="explicit_file",
trace_id=trace_id,
result={"triage": result.get("result"), "final_authority": False},
npu_busy_delta_us=delta,
)
if logger:
envelope["warnings"].append("metadata-only log; raw file contents are not logged")
logger.log(envelope, input_ref="file:sha256path:" + sha256_text(str(candidate)))
return envelope
def health(*, http_get_json: Callable[[str, float], dict[str, Any]] = http_get_json) -> dict[str, Any]:
deps = {
"classifier": "http://127.0.0.1:18819/healthz",
"genai": "http://127.0.0.1:18820/healthz",
"doc_triage": "http://127.0.0.1:18829/healthz",
}
out: dict[str, Any] = {"ok": True, "service": "openvino-advisory-gateway", "mode": "advisory_only", "authority": dict(AUTHORITY), "dependencies": {}}
for name, url in deps.items():
try:
data = http_get_json(url, 8.0)
out["dependencies"][name] = {"ok": bool(data.get("ok", data.get("status") == "ok")), "service": data.get("service"), "device": data.get("device")}
except Exception as exc:
out["ok"] = False
out["dependencies"][name] = {"ok": False, "error": str(exc)}
return out
def _read_json(handler: BaseHTTPRequestHandler, max_bytes: int = 256 * 1024) -> dict[str, Any]:
length = int(handler.headers.get("Content-Length", "0"))
if length > max_bytes:
raise ValueError("request JSON too large")
raw = handler.rfile.read(length)
if not raw:
return {}
return json.loads(raw.decode("utf-8"))
def make_handler(logger: AdvisoryLogger, configured_roots: list[Path]):
class Handler(BaseHTTPRequestHandler):
server_version = "openvino-advisory-gateway/0.1"
def log_message(self, format: str, *args: Any) -> None: # noqa: A002 - stdlib override name
# Do not log request bodies or private paths.
print(f"{self.client_address[0]} {format % args}")
def send_json(self, status: int, payload: Any) -> None:
body = json.dumps(payload, indent=2, sort_keys=True).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self) -> None: # noqa: N802
if urlparse(self.path).path in ("/", "/health", "/healthz"):
self.send_json(200, health())
return
self.send_json(404, {"ok": False, "error": "not_found"})
def do_POST(self) -> None: # noqa: N802
path = urlparse(self.path).path
try:
payload = _read_json(self)
if path == "/v1/advisory/classify":
self.send_json(200, classify_text(str(payload.get("text", "")), trace_id=payload.get("trace_id"), logger=logger))
return
if path == "/v1/advisory/generate":
self.send_json(200, generate_bounded(str(payload.get("job", "summary")), str(payload.get("input", "")), max_new_tokens=payload.get("max_new_tokens"), trace_id=payload.get("trace_id"), logger=logger))
return
if path == "/v1/advisory/triage":
self.send_json(200, triage_file(str(payload.get("path", "")), allowed_roots=payload.get("allowed_roots"), configured_roots=configured_roots, trace_id=payload.get("trace_id"), logger=logger))
return
self.send_json(404, {"ok": False, "error": "not_found"})
except Exception as exc:
self.send_json(400, {"ok": False, "error": type(exc).__name__, "message": str(exc), "authority": dict(AUTHORITY)})
return Handler
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Local-only OpenVINO NPU advisory gateway")
parser.add_argument("--host", default=os.environ.get("NPU_ADVISORY_HOST", HOST))
parser.add_argument("--port", type=int, default=int(os.environ.get("NPU_ADVISORY_PORT", str(PORT))))
parser.add_argument("--log-db", default=str(DEFAULT_LOG_DB))
parser.add_argument("--allowed-root", action="append", dest="allowed_roots", default=None, help="Configured file root allowed for advisory doc/image triage. May be repeated.")
args = parser.parse_args(argv)
if args.host != "127.0.0.1":
raise SystemExit("refusing non-local bind")
configured_roots = [Path(p).expanduser().resolve() for p in (args.allowed_roots or DEFAULT_ALLOWED_ROOTS)]
logger = AdvisoryLogger(args.log_db)
server = ThreadingHTTPServer((args.host, args.port), make_handler(logger, configured_roots))
print(json.dumps({"service": "openvino-advisory-gateway", "host": args.host, "port": args.port, "mode": "advisory_only"}), flush=True)
server.serve_forever()
return 0
if __name__ == "__main__":
raise SystemExit(main())