#!/usr/bin/env python3 """Local-only advisory gateway for OpenVINO NPU sidecars. This service deliberately returns bounded advisory envelopes. It never routes, writes memory, sends external messages, executes tools, restarts services, or broadens document processing authority. Atlas/Hermes may use these outputs as hints only. """ from __future__ import annotations import argparse import hashlib import ipaddress import json import os import sqlite3 import time import urllib.request from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any, Callable from urllib.parse import urlparse HOST = "127.0.0.1" DOCKER_BRIDGE_HOST = "172.19.0.1" PORT = 18830 CLASSIFIER_URL = "http://127.0.0.1:18819/v1/classify" GENAI_URL = "http://127.0.0.1:18820/v1/worker/generate" DOC_TRIAGE_URL = "http://127.0.0.1:18829/triage" DEFAULT_LOG_DB = Path(os.environ.get("NPU_ADVISORY_LOG_DB", "/home/will/.local/state/openvino-advisory-gateway/events.sqlite")) DEFAULT_ALLOWED_ROOT = Path("/home/will/lab/swarm/openvino-doc-image-triage-npu") DEFAULT_ALLOWED_ROOTS = [Path(p) for p in os.environ.get("NPU_ADVISORY_ALLOWED_ROOTS", str(DEFAULT_ALLOWED_ROOT)).split(os.pathsep) if p] ALLOWED_GENAI_JOBS = {"title", "summary", "notification", "memory_candidate"} AUTHORITY = { "may_route": False, "may_write_memory": False, "may_send_external": False, "may_process_private_dirs": False, "may_execute_tools": False, "may_restart_services": False, } def validate_bind_host(host: str, *, allow_docker_bridge: bool = False) -> None: """Restrict service exposure to localhost or the explicitly approved Docker bridge bind.""" if host == "127.0.0.1": return if not allow_docker_bridge: raise ValueError("refusing non-local bind without --allow-docker-bridge") try: addr = ipaddress.ip_address(host) except ValueError as exc: raise ValueError("bind host must be a literal IP address") from exc if host != DOCKER_BRIDGE_HOST or not (addr.version == 4 and addr.is_private and not addr.is_loopback and not addr.is_unspecified): raise ValueError(f"Docker bridge bind must use approved bridge IP {DOCKER_BRIDGE_HOST}") def sha256_text(text: str) -> str: return hashlib.sha256(text.encode("utf-8")).hexdigest() def http_post_json(url: str, payload: dict[str, Any], timeout_s: float = 20.0) -> dict[str, Any]: req = urllib.request.Request(url, data=json.dumps(payload).encode("utf-8"), headers={"Content-Type": "application/json"}, method="POST") with urllib.request.urlopen(req, timeout=timeout_s) as resp: return json.loads(resp.read().decode("utf-8")) def http_get_json(url: str, timeout_s: float = 8.0) -> dict[str, Any]: with urllib.request.urlopen(url, timeout=timeout_s) as resp: body = resp.read().decode("utf-8") try: return json.loads(body) except json.JSONDecodeError: return {"ok": True, "raw_text": body[:120]} def _npu_delta_from(result: dict[str, Any], fallback: int | None = None) -> int | None: for key in ("npu_busy_delta_us", "sysfs_npu_busy_delta_us"): value = result.get(key) if isinstance(value, int): return value if isinstance(value, float): return int(value) return fallback def _doc_triage_npu_delta(result: dict[str, Any]) -> int | None: pages = ((result.get("result") or {}).get("pages") or []) if isinstance(result, dict) else [] best: int | None = None for page in pages: emb = ((page.get("needs_attention") or {}).get("embedding") or {}) if isinstance(page, dict) else {} delta = emb.get("npu_busy_delta_us") if isinstance(delta, int): best = max(best or 0, delta) return best def build_envelope( *, service: str, operation: str, result: dict[str, Any], mode: str = "advisory", input_scope: str, npu_busy_delta_us: int | None, trace_id: str | None = None, warnings: list[str] | None = None, ) -> dict[str, Any]: npu_ok = bool(isinstance(npu_busy_delta_us, int) and npu_busy_delta_us > 0) return { "ok": True, "schema": "openvino_advisory_v1", "service": service, "operation": operation, "mode": mode, "trace_id": trace_id, "input_scope": input_scope, "result": result, "npu_proof": {"required": True, "ok": npu_ok, "npu_busy_delta_us": npu_busy_delta_us}, "authority": dict(AUTHORITY), "warnings": warnings or [], } class AdvisoryLogger: def __init__(self, db_path: str | Path = DEFAULT_LOG_DB): self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) self._init() def _init(self) -> None: with sqlite3.connect(self.db_path) as con: con.execute( """ CREATE TABLE IF NOT EXISTS advisory_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, created_at REAL NOT NULL, service TEXT NOT NULL, operation TEXT NOT NULL, mode TEXT NOT NULL, input_scope TEXT NOT NULL, input_ref TEXT NOT NULL, npu_busy_delta_us INTEGER, ok INTEGER NOT NULL, raw_payload TEXT ) """ ) def log(self, envelope: dict[str, Any], *, input_ref: str) -> None: proof = envelope.get("npu_proof") or {} with sqlite3.connect(self.db_path) as con: con.execute( """ INSERT INTO advisory_events(created_at, service, operation, mode, input_scope, input_ref, npu_busy_delta_us, ok, raw_payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, NULL) """, ( time.time(), str(envelope.get("service")), str(envelope.get("operation")), str(envelope.get("mode")), str(envelope.get("input_scope")), input_ref, proof.get("npu_busy_delta_us"), 1 if envelope.get("ok") else 0, ), ) def classify_text( text: str, *, trace_id: str | None = None, http_post_json: Callable[[str, dict[str, Any], float], dict[str, Any]] = http_post_json, logger: AdvisoryLogger | None = None, timeout_s: float = 20.0, ) -> dict[str, Any]: if not isinstance(text, str) or not text.strip(): raise ValueError("text must be a non-empty string") payload = {"id": trace_id or "advisory", "text": text, "options": {"include_evidence": False, "dry_run": True}} result = http_post_json(CLASSIFIER_URL, payload, timeout_s) envelope = build_envelope( service="classifier", operation="classify", mode="shadow", input_scope="explicit_text", trace_id=trace_id, result={"labels": result.get("labels", {}), "model": result.get("model"), "service_mode": result.get("mode", "dry_run")}, npu_busy_delta_us=_npu_delta_from(result), ) if logger: logger.log(envelope, input_ref="text:sha256:" + sha256_text(text)) return envelope def generate_bounded( job: str, text: str, *, max_new_tokens: int | None = None, trace_id: str | None = None, http_post_json: Callable[[str, dict[str, Any], float], dict[str, Any]] = http_post_json, logger: AdvisoryLogger | None = None, timeout_s: float = 180.0, ) -> dict[str, Any]: if job not in ALLOWED_GENAI_JOBS: raise ValueError("unsupported advisory generation job") if not isinstance(text, str) or not text.strip(): raise ValueError("input must be a non-empty string") payload: dict[str, Any] = {"job": job, "input": text} if max_new_tokens is not None: payload["max_new_tokens"] = max_new_tokens result = http_post_json(GENAI_URL, payload, timeout_s) envelope = build_envelope( service="genai", operation=f"generate:{job}", mode="draft", input_scope="explicit_text", trace_id=trace_id, result={"draft_text": result.get("text", ""), "json": result.get("json"), "timing_ms": result.get("timing_ms"), "final_authority": False}, npu_busy_delta_us=_npu_delta_from(result), ) if logger: logger.log(envelope, input_ref="text:sha256:" + sha256_text(text)) return envelope def _resolve_allowed(path: str, allowed_roots: list[str] | None, configured_roots: list[Path] | None = None) -> tuple[Path, list[Path]]: configured = [p.expanduser().resolve() for p in (configured_roots or DEFAULT_ALLOWED_ROOTS)] if not configured: raise ValueError("at least one configured allowed root is required") requested = [Path(p).expanduser().resolve() for p in (allowed_roots or [str(p) for p in configured])] if not requested: raise ValueError("at least one requested allowed root is required") for root in requested: if not any(root == base or root.is_relative_to(base) for base in configured): raise ValueError("requested allowed root is outside configured roots") roots = requested candidate = Path(path).expanduser().resolve() if not any(candidate == root or candidate.is_relative_to(root) for root in roots): raise ValueError("path must be inside an allowed root") if not candidate.exists() or not candidate.is_file(): raise ValueError("path must be an existing file") return candidate, roots def triage_file( path: str, *, allowed_roots: list[str] | None = None, configured_roots: list[Path] | None = None, trace_id: str | None = None, http_post_json: Callable[[str, dict[str, Any], float], dict[str, Any]] = http_post_json, logger: AdvisoryLogger | None = None, timeout_s: float = 60.0, ) -> dict[str, Any]: candidate, roots = _resolve_allowed(path, allowed_roots, configured_roots) payload = {"path": str(candidate), "options": {"allowed_roots": [str(r) for r in roots], "max_pages": 3}} result = http_post_json(DOC_TRIAGE_URL, payload, timeout_s) delta = _doc_triage_npu_delta(result) envelope = build_envelope( service="doc_triage", operation="triage_file", mode="reviewable_artifact", input_scope="explicit_file", trace_id=trace_id, result={"triage": result.get("result"), "final_authority": False}, npu_busy_delta_us=delta, ) if logger: envelope["warnings"].append("metadata-only log; raw file contents are not logged") logger.log(envelope, input_ref="file:sha256path:" + sha256_text(str(candidate))) return envelope def health(*, http_get_json: Callable[[str, float], dict[str, Any]] = http_get_json) -> dict[str, Any]: deps = { "classifier": "http://127.0.0.1:18819/healthz", "genai": "http://127.0.0.1:18820/healthz", "doc_triage": "http://127.0.0.1:18829/healthz", } out: dict[str, Any] = {"ok": True, "service": "openvino-advisory-gateway", "mode": "advisory_only", "authority": dict(AUTHORITY), "dependencies": {}} for name, url in deps.items(): try: data = http_get_json(url, 8.0) out["dependencies"][name] = {"ok": bool(data.get("ok", data.get("status") == "ok")), "service": data.get("service"), "device": data.get("device")} except Exception as exc: out["ok"] = False out["dependencies"][name] = {"ok": False, "error": str(exc)} return out def _read_json(handler: BaseHTTPRequestHandler, max_bytes: int = 256 * 1024) -> dict[str, Any]: length = int(handler.headers.get("Content-Length", "0")) if length > max_bytes: raise ValueError("request JSON too large") raw = handler.rfile.read(length) if not raw: return {} return json.loads(raw.decode("utf-8")) def make_handler(logger: AdvisoryLogger, configured_roots: list[Path]): class Handler(BaseHTTPRequestHandler): server_version = "openvino-advisory-gateway/0.1" def log_message(self, format: str, *args: Any) -> None: # noqa: A002 - stdlib override name # Do not log request bodies or private paths. print(f"{self.client_address[0]} {format % args}") def send_json(self, status: int, payload: Any) -> None: body = json.dumps(payload, indent=2, sort_keys=True).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def do_GET(self) -> None: # noqa: N802 if urlparse(self.path).path in ("/", "/health", "/healthz"): self.send_json(200, health()) return self.send_json(404, {"ok": False, "error": "not_found"}) def do_POST(self) -> None: # noqa: N802 path = urlparse(self.path).path try: payload = _read_json(self) if path == "/v1/advisory/classify": self.send_json(200, classify_text(str(payload.get("text", "")), trace_id=payload.get("trace_id"), logger=logger)) return if path == "/v1/advisory/generate": self.send_json(200, generate_bounded(str(payload.get("job", "summary")), str(payload.get("input", "")), max_new_tokens=payload.get("max_new_tokens"), trace_id=payload.get("trace_id"), logger=logger)) return if path == "/v1/advisory/triage": self.send_json(200, triage_file(str(payload.get("path", "")), allowed_roots=payload.get("allowed_roots"), configured_roots=configured_roots, trace_id=payload.get("trace_id"), logger=logger)) return self.send_json(404, {"ok": False, "error": "not_found"}) except Exception as exc: self.send_json(400, {"ok": False, "error": type(exc).__name__, "message": str(exc), "authority": dict(AUTHORITY)}) return Handler def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description="Local-only OpenVINO NPU advisory gateway") parser.add_argument("--host", default=os.environ.get("NPU_ADVISORY_HOST", HOST)) parser.add_argument("--port", type=int, default=int(os.environ.get("NPU_ADVISORY_PORT", str(PORT)))) parser.add_argument("--log-db", default=str(DEFAULT_LOG_DB)) parser.add_argument("--allowed-root", action="append", dest="allowed_roots", default=None, help="Configured file root allowed for advisory doc/image triage. May be repeated.") parser.add_argument( "--allow-docker-bridge", action="store_true", default=os.environ.get("NPU_ADVISORY_ALLOW_DOCKER_BRIDGE", "").lower() in {"1", "true", "yes"}, help="Permit binding to a private Docker bridge IP instead of 127.0.0.1.", ) args = parser.parse_args(argv) try: validate_bind_host(args.host, allow_docker_bridge=args.allow_docker_bridge) except ValueError as exc: raise SystemExit(str(exc)) from exc configured_roots = [Path(p).expanduser().resolve() for p in (args.allowed_roots or DEFAULT_ALLOWED_ROOTS)] logger = AdvisoryLogger(args.log_db) server = ThreadingHTTPServer((args.host, args.port), make_handler(logger, configured_roots)) print(json.dumps({"service": "openvino-advisory-gateway", "host": args.host, "port": args.port, "mode": "advisory_only"}), flush=True) server.serve_forever() return 0 if __name__ == "__main__": raise SystemExit(main())