39227-vm/tools/local_screenshot_bridge.py
2026-02-10 16:17:02 +01:00

212 lines
7.7 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import base64
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timezone
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
def _slug(s: str, max_len: int = 80) -> str:
s = (s or "").strip().lower()
s = re.sub(r"[^a-z0-9]+", "-", s)
s = s.strip("-")
if not s:
return "screenshot"
return s[:max_len]
class Handler(BaseHTTPRequestHandler):
server_version = "LocalScreenshotBridge/0.1"
def _send_json(self, status: int, payload: dict):
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
# Chrome extension fetch() to localhost will preflight; allow it.
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.end_headers()
self.wfile.write(body)
def do_GET(self): # noqa: N802
if self.path not in ("/", "/health"):
self._send_json(404, {"ok": False, "error": "not_found"})
return
self._send_json(
200,
{
"ok": True,
"service": "local_screenshot_bridge",
"out_dir": str(self.server.out_dir), # type: ignore[attr-defined]
"has_run_cmd": bool(getattr(self.server, "run_cmd", None)), # type: ignore[attr-defined]
},
)
def do_OPTIONS(self): # noqa: N802
self.send_response(204)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.end_headers()
def do_POST(self): # noqa: N802
if self.path != "/screenshot":
self._send_json(404, {"ok": False, "error": "not_found"})
return
try:
length = int(self.headers.get("Content-Length", "0"))
except ValueError:
self._send_json(400, {"ok": False, "error": "bad_content_length"})
return
raw = self.rfile.read(length)
try:
req = json.loads(raw.decode("utf-8"))
except Exception:
self._send_json(400, {"ok": False, "error": "bad_json"})
return
data_url = req.get("data_url") or ""
title = req.get("title") or ""
page_url = req.get("url") or ""
client_ts = req.get("ts") or ""
content = req.get("content", None)
m = re.match(r"^data:image/png;base64,(.*)$", data_url)
if not m:
self._send_json(400, {"ok": False, "error": "expected_png_data_url"})
return
try:
png_bytes = base64.b64decode(m.group(1), validate=True)
except Exception:
self._send_json(400, {"ok": False, "error": "bad_base64"})
return
now = datetime.now(timezone.utc)
stamp = now.strftime("%Y%m%dT%H%M%SZ")
base = f"{stamp}-{_slug(title)}"
out_dir: Path = self.server.out_dir # type: ignore[attr-defined]
out_dir.mkdir(parents=True, exist_ok=True)
png_path = out_dir / f"{base}.png"
meta_path = out_dir / f"{base}.json"
content_path = out_dir / f"{base}.content.json"
try:
png_path.write_bytes(png_bytes)
# Save extracted page content separately to keep the meta file small/handy.
wrote_content = False
if content is not None:
try:
raw_content = json.dumps(content, ensure_ascii=True, indent=2) + "\n"
# Prevent pathological payloads from creating huge files.
if len(raw_content.encode("utf-8")) > 2_000_000:
content = {
"error": "content_too_large_truncated",
"note": "Original extracted content exceeded 2MB.",
}
raw_content = json.dumps(content, ensure_ascii=True, indent=2) + "\n"
content_path.write_text(raw_content, encoding="utf-8")
wrote_content = True
except Exception:
# Don't fail the whole request if content writing fails.
wrote_content = False
final_content_path = str(content_path) if wrote_content else None
meta_path.write_text(
json.dumps(
{
"title": title,
"url": page_url,
"client_ts": client_ts,
"saved_utc": now.isoformat(),
"png_path": str(png_path),
"content_path": final_content_path,
},
indent=2,
ensure_ascii=True,
)
+ "\n",
encoding="utf-8",
)
except Exception as e:
self._send_json(500, {"ok": False, "error": "write_failed", "detail": str(e)})
return
run = getattr(self.server, "run_cmd", None) # type: ignore[attr-defined]
ran = None
if run:
try:
proc = subprocess.run(
run + [str(png_path), str(meta_path)],
cwd=str(self.server.project_root), # type: ignore[attr-defined]
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
ran = {
"cmd": run,
"exit_code": proc.returncode,
"stdout": proc.stdout[-4000:],
"stderr": proc.stderr[-4000:],
}
except Exception as e:
ran = {"cmd": run, "error": str(e)}
self._send_json(
200,
{
"ok": True,
"png_path": str(png_path),
"meta_path": str(meta_path),
"content_path": final_content_path,
"ran": ran,
},
)
def main(argv: list[str]) -> int:
p = argparse.ArgumentParser(description="Receive screenshots from a Chrome extension and save into this project.")
p.add_argument("--port", type=int, default=8765)
p.add_argument("--bind", default="127.0.0.1", help="Bind address (default: 127.0.0.1)")
p.add_argument("--out-dir", default="screenshots", help="Output directory relative to project root")
p.add_argument(
"--run",
nargs="+",
default=None,
help="Optional command to run after saving. Screenshot paths are appended as args: PNG then JSON.",
)
args = p.parse_args(argv)
project_root = Path(__file__).resolve().parents[1]
out_dir = (project_root / args.out_dir).resolve()
httpd = HTTPServer((args.bind, args.port), Handler)
httpd.project_root = project_root # type: ignore[attr-defined]
httpd.out_dir = out_dir # type: ignore[attr-defined]
httpd.run_cmd = args.run # type: ignore[attr-defined]
print(f"Listening on http://{args.bind}:{args.port}/screenshot", file=sys.stderr)
print(f"Saving screenshots to {out_dir}", file=sys.stderr)
if args.run:
print(f"Will run: {' '.join(args.run)} <png_path> <meta_path>", file=sys.stderr)
try:
httpd.serve_forever()
except KeyboardInterrupt:
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))