Files
swarm-master/openvino-doc-image-triage-npu/server.py
T
2026-06-04 11:41:55 -07:00

179 lines
7.3 KiB
Python

#!/usr/bin/env python3
"""Stdlib localhost HTTP wrapper for the triage prototype.
Endpoints:
- GET /healthz
- GET /models
- POST /triage JSON: {"path":"/local/file", "options": {...}}
- POST /triage/batch JSON: {"paths":["/local/file"], "options": {...}}
The server binds to 127.0.0.1 by default and accepts only local file paths under
configured allowed roots. It never uploads document/image contents externally.
"""
from __future__ import annotations
import argparse
import json
import os
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from triage import DEFAULT_EMBED_URL, TriageOptions, read_npu_busy, triage_batch, triage_file
def _roots_within_configured(requested_roots: list[Any], configured_roots: list[Path]) -> list[Path]:
"""Return request roots only when they narrow the startup allowlist."""
narrowed: list[Path] = []
configured = [root.expanduser().resolve() for root in configured_roots]
for raw in requested_roots:
candidate = Path(str(raw)).expanduser().resolve()
if any(candidate == root or candidate.is_relative_to(root) for root in configured):
narrowed.append(candidate)
else:
raise ValueError("requested allowed_roots must be within configured allowed roots")
return narrowed
def _validated_embedding_url(raw_url: Any) -> str:
"""Allow only the configured local loopback embeddings service."""
url = str(raw_url)
parsed = urlparse(url)
host = parsed.hostname or ""
if (
parsed.scheme == "http"
and host in {"127.0.0.1", "localhost", "::1"}
and (parsed.port or 80) == 18817
and parsed.path == "/v1/embeddings"
and not parsed.username
and not parsed.password
):
return url
raise ValueError("embedding_url override must target the configured local loopback embeddings service")
def make_options(payload: dict[str, Any], default_roots: list[Path]) -> TriageOptions:
opts = payload.get("options") or {}
requested_roots = opts.get("allowed_roots", [])
if requested_roots:
if not isinstance(requested_roots, list):
raise ValueError("allowed_roots must be a list")
roots = _roots_within_configured(requested_roots, default_roots)
else:
roots = default_roots
embedding_url = DEFAULT_EMBED_URL
if "embedding_url" in opts:
embedding_url = _validated_embedding_url(opts["embedding_url"])
return TriageOptions(
max_pages=int(opts.get("max_pages", 3)),
include_ocr_text=bool(opts.get("include_ocr_text", False)),
dry_run=bool(opts.get("dry_run", False)),
use_embeddings=bool(opts.get("use_embeddings", True)),
embedding_url=embedding_url,
allowed_roots=roots,
include_full_path=bool(opts.get("include_full_path", False)),
)
class Handler(BaseHTTPRequestHandler):
server_version = "openvino-doc-image-triage-npu/0.1"
def _json(self, status: int, body: dict[str, Any]) -> None:
data = json.dumps(body, sort_keys=True).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def log_message(self, format: str, *args: Any) -> None:
# Do not log request bodies, OCR text, or file paths.
return
@property
def allowed_roots(self) -> list[Path]:
return self.server.allowed_roots # type: ignore[attr-defined]
def do_GET(self) -> None: # noqa: N802
if self.path in ("/", "/healthz", "/health"):
self._json(200, {
"ok": True,
"service": "openvino-doc-image-triage-npu",
"bind_policy": "localhost-default",
"npu_busy_time_us": read_npu_busy(),
"npu_busy_check_enabled": True,
"allowed_roots": [str(p) for p in self.allowed_roots],
"privacy": {"external_uploads": False, "raw_text_logged": False},
})
return
if self.path == "/models":
self._json(200, {
"models": [
{
"stage": "needs_attention_embedding",
"model": "bge-base-en-v1.5-int8-ov via local :18817",
"target_device": "NPU",
"verification": "sysfs npu_busy_time_us before/after embedding call",
},
{
"stage": "image_category_classification",
"model": "rule-based fallback in prototype v1",
"target_device": "CPU",
"npu_status": "not configured; future static-shape MobileNet/EfficientNet/ResNet OV IR",
},
{"stage": "ocr_text_extraction", "model": "optional local sidecar/PDF text", "target_device": "CPU"},
]
})
return
self._json(404, {"ok": False, "error": "not_found"})
def _read_payload(self) -> dict[str, Any]:
length = int(self.headers.get("Content-Length", "0"))
if length > 512 * 1024:
raise ValueError("request JSON too large")
raw = self.rfile.read(length)
if not raw:
return {}
return json.loads(raw.decode())
def do_POST(self) -> None: # noqa: N802
try:
payload = self._read_payload()
options = make_options(payload, self.allowed_roots)
if self.path == "/triage":
path = payload.get("path")
if not path:
self._json(400, {"ok": False, "error": "missing_path"})
return
self._json(200, {"ok": True, "result": triage_file(path, options)})
return
if self.path == "/triage/batch":
paths = payload.get("paths") or []
if not isinstance(paths, list) or not paths:
self._json(400, {"ok": False, "error": "missing_paths"})
return
self._json(200, triage_batch([str(p) for p in paths], options))
return
self._json(404, {"ok": False, "error": "not_found"})
except Exception as exc:
self._json(400, {"ok": False, "error": type(exc).__name__, "message": str(exc)})
def main() -> int:
parser = argparse.ArgumentParser(description="Local-only doc/image triage HTTP server")
parser.add_argument("--host", default=os.environ.get("DOC_IMAGE_TRIAGE_HOST", "127.0.0.1"))
parser.add_argument("--port", type=int, default=int(os.environ.get("DOC_IMAGE_TRIAGE_PORT", "18820")))
parser.add_argument("--allowed-root", action="append", default=[], help="allowed local root; may repeat")
args = parser.parse_args()
roots = [Path(p).expanduser().resolve() for p in args.allowed_root] or [Path.cwd().resolve()]
httpd = ThreadingHTTPServer((args.host, args.port), Handler)
httpd.allowed_roots = roots # type: ignore[attr-defined]
print(json.dumps({"service": "openvino-doc-image-triage-npu", "host": args.host, "port": args.port, "allowed_roots": [str(p) for p in roots]}), flush=True)
httpd.serve_forever()
return 0
if __name__ == "__main__":
raise SystemExit(main())