#!/usr/bin/env python3 import argparse import base64 import json import os import re import subprocess import sys import time from datetime import datetime, timezone from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path def _slug(s: str, max_len: int = 80) -> str: s = (s or "").strip().lower() s = re.sub(r"[^a-z0-9]+", "-", s) s = s.strip("-") if not s: return "screenshot" return s[:max_len] def _load_dotenv_if_present(project_root: Path) -> None: """ Minimal .env loader: - supports KEY=VALUE - ignores blank lines and lines starting with '#' - does not support quotes/escapes """ if os.getenv("OPENAI_API_KEY"): return p = project_root / ".env" if not p.exists(): return try: for line in p.read_text("utf-8").splitlines(): s = line.strip() if not s or s.startswith("#") or "=" not in s: continue k, v = s.split("=", 1) k = k.strip() v = v.strip() if k and v and k not in os.environ: os.environ[k] = v except Exception: return def _truncate(s: str, max_chars: int) -> str: if len(s) <= max_chars: return s return s[: max_chars - 1] + "\u2026" def _safe_json_dump(obj: object, max_chars: int) -> str: s = json.dumps(obj, ensure_ascii=True, separators=(",", ":"), sort_keys=False) return _truncate(s, max_chars) def _ea_sanitize_text(text: object) -> str: """ Port of fl_geo_sanitize_text(), plus lowercase output (no capitals). Notes: - stays ASCII in-code by using \\u escapes for unicode literals. - preserves newlines (normalizes excess blank lines). """ if text is None: return "" s = str(text) if s == "": return "" # 1) Quick ASCII-level normalizations s = s.replace("\r", "").replace("\t", " ") # 2) Specific single-char replacements replacements = { "\u201c": '"', # “ "\u201d": '"', # ” "\u201e": '"', # „ "\u201f": '"', # ‟ "\u2018": "'", # ‘ "\u2019": "'", # ’ "\u201a": "'", # ‚ "\u201b": "'", # ‛ "\u2014": "-", # — "\u2013": "-", # – "\u2212": "-", # − "\u2022": "- ", # • "\u2026": "...", # … } for k, v in replacements.items(): s = s.replace(k, v) # 3) Regex-based replacements/removals s = re.sub( r"[\u00A0\u2000-\u200A\u202F\u205F\u3000\u1680\u180E\u2800\u3164\uFFA0]", " ", s, ) s = s.replace("\u2028", "\n") # LS s = s.replace("\u2029", "\n\n") # PS s = re.sub( r"[\u200B\u200C\u200D\u200E\u200F\u202A-\u202E\u2060\u2061\u2066-\u2069\u206A-\u206F\u00AD\u034F\u115F\u1160\u17B4\u17B5\u180B-\u180D\uFE00-\uFE0F\uFEFF\u001C\u000C]", "", s, ) # Invisible math s = s.replace("\u2062", "x").replace("\u2063", ",").replace("\u2064", "+") # 4) Collapse excessive spaces s = re.sub(r"[ ]{2,}", " ", s) # 5) Normalize multiple blank lines to at most two s = re.sub(r"\n{3,}", "\n\n", s) # Remove capitals: lowercase all text. return s.lower() def _sanitize_ai_payload(ai: dict, page_url: str, page_title: str) -> dict: # Strict schema requires these keys; we prefer ground truth from meta. out = dict(ai) if isinstance(ai, dict) else {} out["page_url"] = page_url out["page_title"] = page_title out["notes"] = _ea_sanitize_text(out.get("notes", "")) posts = out.get("posts", []) if not isinstance(posts, list): posts = [] cleaned_posts = [] for i, p in enumerate(posts): if not isinstance(p, dict): continue cleaned_posts.append( { "index": int(p.get("index", i)), "post_text": _ea_sanitize_text(p.get("post_text", "")), "short_response": _ea_sanitize_text(p.get("short_response", "")), "medium_response": _ea_sanitize_text(p.get("medium_response", "")), } ) out["posts"] = cleaned_posts return out def _response_schema(max_posts: int) -> dict: return { "type": "object", "additionalProperties": False, "properties": { "page_url": {"type": "string"}, "page_title": {"type": "string"}, "posts": { "type": "array", "maxItems": max_posts, "items": { "type": "object", "additionalProperties": False, "properties": { "index": {"type": "integer"}, "post_text": {"type": "string"}, "short_response": {"type": "string"}, "medium_response": {"type": "string"}, }, "required": ["index", "post_text", "short_response", "medium_response"], }, }, "notes": {"type": "string"}, }, # OpenAI strict json_schema currently expects all top-level properties to be required. # If you don't have a value, return "" / []. "required": ["page_url", "page_title", "posts", "notes"], } def _maybe_generate_ai(server, png_path: Path, meta: dict, content: object) -> dict: """ Returns: { "ok": True, "ai": , "ai_path": , "took_ms": } or { "ok": False, "error": , "detail": } """ if not getattr(server, "ai_enabled", False): return {"ok": False, "error": "ai_disabled"} project_root: Path = server.project_root # type: ignore[attr-defined] _load_dotenv_if_present(project_root) if not os.getenv("OPENAI_API_KEY"): return {"ok": False, "error": "missing_openai_api_key"} try: from openai import OpenAI # type: ignore except Exception as e: return {"ok": False, "error": "missing_openai_sdk", "detail": str(e)} instructions_text = getattr(server, "ai_instructions", "") model = getattr(server, "ai_model", "gpt-5.2") max_posts = int(getattr(server, "ai_max_posts", 12)) content_max_chars = int(getattr(server, "ai_content_max_chars", 120_000)) image_detail = getattr(server, "ai_image_detail", "auto") max_output_tokens = int(getattr(server, "ai_max_output_tokens", 1400)) page_url = str(meta.get("url") or "") page_title = str(meta.get("title") or "") user_payload = { "page_url": page_url, "page_title": page_title, "meta": meta, "content": content, "task": { "goal": "Draft replies to each distinct post currently visible on the page.", "definition_of_post": "A single feed item / post / story / comment root visible on-screen right now. If it's a single-article page, treat the main article as one post.", "output_requirements": { "short_response": "1-2 sentences, direct, useful, no fluff.", "medium_response": "3-6 sentences, more context, still concise.", "style": "Follow the system instructions for voice/tone. If unclear what the post says, be honest and ask a question instead of guessing.", }, }, } prompt_text = ( "You will receive (1) a screenshot of the current viewport and (2) extracted visible page content.\n" "Identify each distinct post visible on the page and draft two reply options per post.\n" "Do not invent facts not present in the screenshot/content.\n" "Return JSON matching the provided schema. Include all top-level keys: page_url, page_title, posts, notes.\n" "If a value is unknown, use an empty string.\n\n" f"PAGE_DATA_JSON={_safe_json_dump(user_payload, content_max_chars)}" ) b64 = base64.b64encode(png_path.read_bytes()).decode("ascii") image_data_url = f"data:image/png;base64,{b64}" t0 = time.monotonic() client = OpenAI() resp = client.responses.create( model=model, instructions=instructions_text, input=[ { "role": "user", "content": [ {"type": "input_text", "text": prompt_text}, {"type": "input_image", "image_url": image_data_url, "detail": image_detail}, ], } ], text={ "format": { "type": "json_schema", "name": "ea_post_responses", "description": "Draft short and medium replies for each visible post on the page.", "schema": _response_schema(max_posts), "strict": True, }, "verbosity": "low", }, max_output_tokens=max_output_tokens, ) took_ms = int((time.monotonic() - t0) * 1000) raw = resp.output_text or "" try: parsed = json.loads(raw) except Exception: parsed = {"error": "non_json_output", "raw": raw} if isinstance(parsed, dict) and "posts" in parsed: parsed = _sanitize_ai_payload(parsed, page_url=page_url, page_title=page_title) ai_path = png_path.with_suffix(".ai.json") ai_path.write_text(json.dumps(parsed, indent=2, ensure_ascii=True) + "\n", encoding="utf-8") return {"ok": True, "ai": parsed, "ai_path": str(ai_path), "took_ms": took_ms} class Handler(BaseHTTPRequestHandler): server_version = "LocalScreenshotBridge/0.1" def _send_json(self, status: int, payload: dict): body = json.dumps(payload, ensure_ascii=True).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(body))) # Chrome extension fetch() to localhost will preflight; allow it. self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") self.end_headers() self.wfile.write(body) def do_GET(self): # noqa: N802 if self.path not in ("/", "/health"): self._send_json(404, {"ok": False, "error": "not_found"}) return self._send_json( 200, { "ok": True, "service": "local_screenshot_bridge", "out_dir": str(self.server.out_dir), # type: ignore[attr-defined] "has_run_cmd": bool(getattr(self.server, "run_cmd", None)), # type: ignore[attr-defined] "ai_enabled": bool(getattr(self.server, "ai_enabled", False)), # type: ignore[attr-defined] }, ) def do_OPTIONS(self): # noqa: N802 self.send_response(204) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") self.end_headers() def do_POST(self): # noqa: N802 if self.path != "/screenshot": self._send_json(404, {"ok": False, "error": "not_found"}) return try: length = int(self.headers.get("Content-Length", "0")) except ValueError: self._send_json(400, {"ok": False, "error": "bad_content_length"}) return raw = self.rfile.read(length) try: req = json.loads(raw.decode("utf-8")) except Exception: self._send_json(400, {"ok": False, "error": "bad_json"}) return data_url = req.get("data_url") or "" title = req.get("title") or "" page_url = req.get("url") or "" client_ts = req.get("ts") or "" content = req.get("content", None) m = re.match(r"^data:image/png;base64,(.*)$", data_url) if not m: self._send_json(400, {"ok": False, "error": "expected_png_data_url"}) return try: png_bytes = base64.b64decode(m.group(1), validate=True) except Exception: self._send_json(400, {"ok": False, "error": "bad_base64"}) return now = datetime.now(timezone.utc) stamp = now.strftime("%Y%m%dT%H%M%SZ") base = f"{stamp}-{_slug(title)}" out_dir: Path = self.server.out_dir # type: ignore[attr-defined] out_dir.mkdir(parents=True, exist_ok=True) png_path = out_dir / f"{base}.png" meta_path = out_dir / f"{base}.json" content_path = out_dir / f"{base}.content.json" try: png_path.write_bytes(png_bytes) # Save extracted page content separately to keep the meta file small/handy. wrote_content = False if content is not None: try: raw_content = json.dumps(content, ensure_ascii=True, indent=2) + "\n" # Prevent pathological payloads from creating huge files. if len(raw_content.encode("utf-8")) > 2_000_000: content = { "error": "content_too_large_truncated", "note": "Original extracted content exceeded 2MB.", } raw_content = json.dumps(content, ensure_ascii=True, indent=2) + "\n" content_path.write_text(raw_content, encoding="utf-8") wrote_content = True except Exception: # Don't fail the whole request if content writing fails. wrote_content = False final_content_path = str(content_path) if wrote_content else None meta_path.write_text( json.dumps( { "title": title, "url": page_url, "client_ts": client_ts, "saved_utc": now.isoformat(), "png_path": str(png_path), "content_path": final_content_path, }, indent=2, ensure_ascii=True, ) + "\n", encoding="utf-8", ) except Exception as e: self._send_json(500, {"ok": False, "error": "write_failed", "detail": str(e)}) return meta_obj = { "title": title, "url": page_url, "client_ts": client_ts, "saved_utc": now.isoformat(), "png_path": str(png_path), "content_path": final_content_path, } run = getattr(self.server, "run_cmd", None) # type: ignore[attr-defined] ran = None if run: try: # Pass content_path as a 3rd arg when available. This keeps hooks compatible with older 2-arg scripts. args = [str(png_path), str(meta_path)] if final_content_path: args.append(final_content_path) proc = subprocess.run( run + args, cwd=str(self.server.project_root), # type: ignore[attr-defined] stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) ran = { "cmd": run, "exit_code": proc.returncode, "stdout": proc.stdout[-4000:], "stderr": proc.stderr[-4000:], } except Exception as e: ran = {"cmd": run, "error": str(e)} ai_result = None if getattr(self.server, "ai_enabled", False): # type: ignore[attr-defined] try: ai_result = _maybe_generate_ai(self.server, png_path, meta_obj, content) except Exception as e: ai_result = {"ok": False, "error": "ai_exception", "detail": str(e)} self._send_json( 200, { "ok": True, "png_path": str(png_path), "meta_path": str(meta_path), "content_path": final_content_path, "ran": ran, "ai_result": ai_result, }, ) def main(argv: list[str]) -> int: p = argparse.ArgumentParser(description="Receive screenshots from a Chrome extension and save into this project.") p.add_argument("--port", type=int, default=8765) p.add_argument("--bind", default="127.0.0.1", help="Bind address (default: 127.0.0.1)") p.add_argument("--out-dir", default="screenshots", help="Output directory relative to project root") p.add_argument("--ai", action="store_true", help="Run OpenAI to generate reply suggestions and return them to the extension") p.add_argument("--ai-model", default=os.getenv("AI_EA_MODEL", "gpt-5.2")) p.add_argument("--ai-max-posts", type=int, default=int(os.getenv("AI_EA_MAX_POSTS", "12"))) p.add_argument("--ai-content-max-chars", type=int, default=int(os.getenv("AI_EA_CONTENT_MAX_CHARS", "120000"))) p.add_argument("--ai-image-detail", default=os.getenv("AI_EA_IMAGE_DETAIL", "auto")) p.add_argument("--ai-max-output-tokens", type=int, default=int(os.getenv("AI_EA_MAX_OUTPUT_TOKENS", "1400"))) p.add_argument( "--run", nargs="+", default=None, help="Optional command to run after saving. Screenshot paths are appended as args: PNG then JSON.", ) args = p.parse_args(argv) project_root = Path(__file__).resolve().parents[1] out_dir = (project_root / args.out_dir).resolve() if args.ai: _load_dotenv_if_present(project_root) instructions_path = project_root / "AI_EA_INSTRUCTIONS.MD" ai_instructions = instructions_path.read_text("utf-8") if instructions_path.exists() else "" httpd = HTTPServer((args.bind, args.port), Handler) httpd.project_root = project_root # type: ignore[attr-defined] httpd.out_dir = out_dir # type: ignore[attr-defined] httpd.run_cmd = args.run # type: ignore[attr-defined] httpd.ai_enabled = bool(args.ai) # type: ignore[attr-defined] httpd.ai_model = args.ai_model # type: ignore[attr-defined] httpd.ai_max_posts = args.ai_max_posts # type: ignore[attr-defined] httpd.ai_content_max_chars = args.ai_content_max_chars # type: ignore[attr-defined] httpd.ai_image_detail = args.ai_image_detail # type: ignore[attr-defined] httpd.ai_max_output_tokens = args.ai_max_output_tokens # type: ignore[attr-defined] httpd.ai_instructions = ai_instructions # type: ignore[attr-defined] print(f"Listening on http://{args.bind}:{args.port}/screenshot", file=sys.stderr) print(f"Saving screenshots to {out_dir}", file=sys.stderr) if args.ai: print(f"OpenAI enabled: model={args.ai_model} max_posts={args.ai_max_posts}", file=sys.stderr) if args.run: print(f"Will run: {' '.join(args.run)} [content_path]", file=sys.stderr) try: httpd.serve_forever() except KeyboardInterrupt: return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))