38567-vm/aiea/tools/local_screenshot_bridge.py
2026-02-18 14:31:16 +00:00

838 lines
32 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import argparse
import base64
import json
import os
import re
import subprocess
import sys
import time
import traceback
from datetime import datetime, timezone
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from typing import Optional
_LOG_LEVELS = {"debug": 10, "info": 20, "error": 40, "quiet": 100}
def _log(server, level: str, msg: str) -> None:
"""
Minimal structured-ish logging to stderr.
server.log_level: debug|info|error|quiet (default: info)
"""
try:
configured = getattr(server, "log_level", "info") if server is not None else "info"
configured_rank = _LOG_LEVELS.get(str(configured).lower(), _LOG_LEVELS["info"])
level_rank = _LOG_LEVELS.get(str(level).lower(), _LOG_LEVELS["info"])
if level_rank < configured_rank:
return
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
print(f"[{ts}] {level.upper()}: {msg}", file=sys.stderr)
except Exception:
# Never let logging break the handler.
return
def _slug(s: str, max_len: int = 80) -> str:
s = (s or "").strip().lower()
s = re.sub(r"[^a-z0-9]+", "-", s)
s = s.strip("-")
if not s:
return "screenshot"
return s[:max_len]
def _load_dotenv_if_present(project_root: Path) -> None:
"""
Minimal .env loader:
- supports KEY=VALUE
- ignores blank lines and lines starting with '#'
- does not support quotes/escapes
"""
if os.getenv("OPENAI_API_KEY"):
return
p = project_root / ".env"
if not p.exists():
return
try:
for line in p.read_text("utf-8").splitlines():
s = line.strip()
if not s or s.startswith("#") or "=" not in s:
continue
k, v = s.split("=", 1)
k = k.strip()
v = v.strip()
if k and v and k not in os.environ:
os.environ[k] = v
except Exception:
return
def _truncate(s: str, max_chars: int) -> str:
if len(s) <= max_chars:
return s
return s[: max_chars - 1] + "\u2026"
def _safe_json_dump(obj: object, max_chars: int) -> str:
s = json.dumps(obj, ensure_ascii=True, separators=(",", ":"), sort_keys=False)
return _truncate(s, max_chars)
def _maybe_dump_model(obj: object) -> Optional[dict]:
"""
Best-effort conversion of SDK model objects (pydantic-ish) to plain dicts.
"""
if obj is None:
return None
for attr in ("model_dump", "dict", "to_dict"):
fn = getattr(obj, attr, None)
if callable(fn):
try:
out = fn()
return out if isinstance(out, dict) else None
except Exception:
continue
return None
def _extract_json_text(raw: str) -> Optional[str]:
"""
Best-effort extraction of a JSON object from a text response.
Some providers occasionally wrap JSON in code fences or add preamble text.
We keep this conservative; if it can't be extracted, we return None.
"""
if not raw:
return None
# ```json ... ``` or any fenced block: ```...```
# We capture the whole fenced body (not just {...}) so nested braces don't break extraction.
m = re.search(r"```(?:[a-z0-9_+-]+)?\s*(.*?)\s*```", raw, flags=re.DOTALL | re.IGNORECASE)
if m:
return m.group(1).strip()
# First {...last} span
start = raw.find("{")
end = raw.rfind("}")
if start != -1 and end != -1 and end > start:
return raw[start : end + 1].strip()
return None
def _ai_error_payload(
*,
error: str,
detail: str = "",
raw: str = "",
model: str = "",
response_id: str = "",
status: str = "",
incomplete_reason: str = "",
usage: Optional[dict] = None,
took_ms: Optional[int] = None,
ai_path: Optional[Path] = None,
) -> dict:
payload: dict = {"ok": False, "error": error}
if detail:
payload["detail"] = detail
if raw:
payload["raw_preview"] = _truncate(raw, 2000)
if model:
payload["model"] = model
if response_id:
payload["response_id"] = response_id
if status:
payload["status"] = status
if incomplete_reason:
payload["incomplete_reason"] = incomplete_reason
if usage:
payload["usage"] = usage
if isinstance(took_ms, int):
payload["took_ms"] = took_ms
if ai_path is not None:
payload["ai_path"] = str(ai_path)
return payload
def _is_valid_ai_json(parsed: object) -> bool:
if not isinstance(parsed, dict):
return False
posts = parsed.get("posts", None)
return isinstance(posts, list)
def _ea_sanitize_text(text: object) -> str:
"""
Port of fl_geo_sanitize_text(), plus lowercase output (no capitals).
Notes:
- stays ASCII in-code by using \\u escapes for unicode literals.
- preserves newlines (normalizes excess blank lines).
"""
if text is None:
return ""
s = str(text)
if s == "":
return ""
# 1) Quick ASCII-level normalizations
s = s.replace("\r", "").replace("\t", " ")
# 2) Specific single-char replacements
replacements = {
"\u201c": '"', # “
"\u201d": '"', # ”
"\u201e": '"', # „
"\u201f": '"', # ‟
"\u2018": "'", #
"\u2019": "'", #
"\u201a": "'", #
"\u201b": "'", #
"\u2014": "-", # —
"\u2013": "-", #
"\u2212": "-", #
"\u2022": "- ", # •
"\u2026": "...", # …
}
for k, v in replacements.items():
s = s.replace(k, v)
# 3) Regex-based replacements/removals
s = re.sub(
r"[\u00A0\u2000-\u200A\u202F\u205F\u3000\u1680\u180E\u2800\u3164\uFFA0]",
" ",
s,
)
s = s.replace("\u2028", "\n") # LS
s = s.replace("\u2029", "\n\n") # PS
s = re.sub(
r"[\u200B\u200C\u200D\u200E\u200F\u202A-\u202E\u2060\u2061\u2066-\u2069\u206A-\u206F\u00AD\u034F\u115F\u1160\u17B4\u17B5\u180B-\u180D\uFE00-\uFE0F\uFEFF\u001C\u000C]",
"",
s,
)
# Invisible math
s = s.replace("\u2062", "x").replace("\u2063", ",").replace("\u2064", "+")
# 4) Collapse excessive spaces
s = re.sub(r"[ ]{2,}", " ", s)
# 5) Normalize multiple blank lines to at most two
s = re.sub(r"\n{3,}", "\n\n", s)
# Remove capitals: lowercase all text.
return s.lower()
def _sanitize_ai_payload(ai: dict, page_url: str, page_title: str) -> dict:
# Strict schema requires these keys; we prefer ground truth from meta.
out = dict(ai) if isinstance(ai, dict) else {}
out["page_url"] = page_url
out["page_title"] = page_title
out["notes"] = _ea_sanitize_text(out.get("notes", ""))
posts = out.get("posts", [])
if not isinstance(posts, list):
posts = []
cleaned_posts = []
for i, p in enumerate(posts):
if not isinstance(p, dict):
continue
cleaned_posts.append(
{
"index": int(p.get("index", i)),
"post_text": _ea_sanitize_text(p.get("post_text", "")),
"improved_short": _ea_sanitize_text(p.get("improved_short", "")),
"improved_medium": _ea_sanitize_text(p.get("improved_medium", "")),
"critical_short": _ea_sanitize_text(p.get("critical_short", "")),
"critical_medium": _ea_sanitize_text(p.get("critical_medium", "")),
"suggested_short": _ea_sanitize_text(p.get("suggested_short", "")),
"suggested_medium": _ea_sanitize_text(p.get("suggested_medium", "")),
}
)
out["posts"] = cleaned_posts
return out
def _response_schema(max_posts: int) -> dict:
return {
"type": "object",
"additionalProperties": False,
"properties": {
"page_url": {"type": "string"},
"page_title": {"type": "string"},
"posts": {
"type": "array",
"maxItems": max_posts,
"items": {
"type": "object",
"additionalProperties": False,
"properties": {
"index": {"type": "integer"},
"post_text": {"type": "string"},
"improved_short": {"type": "string"},
"improved_medium": {"type": "string"},
"critical_short": {"type": "string"},
"critical_medium": {"type": "string"},
"suggested_short": {"type": "string"},
"suggested_medium": {"type": "string"},
},
"required": [
"index",
"post_text",
"improved_short",
"improved_medium",
"critical_short",
"critical_medium",
"suggested_short",
"suggested_medium",
],
},
},
"notes": {"type": "string"},
},
# OpenAI strict json_schema currently expects all top-level properties to be required.
# If you don't have a value, return "" / [].
"required": ["page_url", "page_title", "posts", "notes"],
}
def _maybe_generate_ai(server, png_path: Path, meta: dict, content: object) -> dict:
"""
Returns:
{ "ok": True, "ai": <obj>, "ai_path": <str>, "took_ms": <int> }
or { "ok": False, "error": <str>, "detail": <str?> }
"""
if not getattr(server, "ai_enabled", False):
return {"ok": False, "error": "ai_disabled"}
project_root: Path = server.project_root # type: ignore[attr-defined]
_load_dotenv_if_present(project_root)
if not os.getenv("OPENAI_API_KEY"):
_log(server, "error", "AI disabled: missing OPENAI_API_KEY")
return {"ok": False, "error": "missing_openai_api_key"}
try:
from openai import OpenAI # type: ignore
except Exception as e:
_log(server, "error", f"AI disabled: missing openai sdk ({type(e).__name__}: {e})")
return {"ok": False, "error": "missing_openai_sdk", "detail": str(e)}
instructions_text = getattr(server, "ai_instructions", "")
model = getattr(server, "ai_model", "gpt-5.2")
max_posts = int(getattr(server, "ai_max_posts", 12))
content_max_chars = int(getattr(server, "ai_content_max_chars", 120_000))
image_detail = getattr(server, "ai_image_detail", "auto")
max_output_tokens = int(getattr(server, "ai_max_output_tokens", 1400))
page_url = str(meta.get("url") or "")
page_title = str(meta.get("title") or "")
extra_instructions = str(meta.get("extra_instructions") or "").strip()
_log(
server,
"debug",
f"AI request: model={model} max_posts={max_posts} image_detail={image_detail} max_output_tokens={max_output_tokens} url={_truncate(page_url, 140)}",
)
user_payload = {
"page_url": page_url,
"page_title": page_title,
"meta": meta,
"content": content,
"task": {
"goal": "Draft replies to each distinct post currently visible on the page.",
"definition_of_post": "A single feed item / post / story / comment root visible on-screen right now. If it's a single-article page, treat the main article as one post.",
"output_requirements": {
# "ironic": "Lightly ironic, laughing at us humans (not cruel).",
"improved": "Proofread whatever is given in extra_instructions [EXTRA_INSTRUCTIONS]. Proofreading-style improvements, preserving the original words as much as possible. Improving in medium version.",
"critical": "Bold/critical: politely questions the premise or assumptions.",
"suggested": "Best style you think fits (helpful/witty/clarifying/etc).",
"short": "direct, useful, no fluff. if X(twitter): 1-2 sentences max, if Reddit: 3-6 sentences max.",
"medium": "more context, still concise. if X(twitter): 3-6 sentences max, if Reddit: 6-12 sentences max.",
"style": "Follow the system instructions for voice/tone. Apply EXTRA_INSTRUCTIONS to all responses. If unclear what the post says, be honest and ask a question instead of guessing.",
},
},
}
prompt_text = (
"You will receive (1) a screenshot of the current viewport and (2) extracted visible page content.\n"
"Identify each distinct post visible on the page and draft SIX reply options per post:\n"
"- improved_short, improved_medium\n"
"- critical_short, critical_medium\n"
"- suggested_short, suggested_medium\n"
"All six must follow the system instructions and EXTRA_INSTRUCTIONS.\n"
"Do not invent facts not present in the screenshot/content.\n"
"Return JSON matching the provided schema. Include all top-level keys: page_url, page_title, posts, notes.\n"
"If a value is unknown, use an empty string.\n\n"
+ (f"EXTRA_INSTRUCTIONS={extra_instructions}\n\n" if extra_instructions else "")
+ f"PAGE_DATA_JSON={_safe_json_dump(user_payload, content_max_chars)}"
)
b64 = base64.b64encode(png_path.read_bytes()).decode("ascii")
image_data_url = f"data:image/png;base64,{b64}"
t0 = time.monotonic()
client = OpenAI()
resp = None
took_ms = None
try:
resp = client.responses.create(
model=model,
instructions=instructions_text,
input=[
{
"role": "user",
"content": [
{"type": "input_text", "text": prompt_text},
{"type": "input_image", "image_url": image_data_url, "detail": image_detail},
],
}
],
text={
"format": {
"type": "json_schema",
"name": "ea_post_responses",
"description": "Draft short and medium replies for each visible post on the page.",
"schema": _response_schema(max_posts),
"strict": True,
},
"verbosity": "low",
},
max_output_tokens=max_output_tokens,
)
took_ms = int((time.monotonic() - t0) * 1000)
except Exception as e:
took_ms = int((time.monotonic() - t0) * 1000)
tb = traceback.format_exc(limit=8)
detail = f"{type(e).__name__}: {e}"
# Some OpenAI exceptions have request id / status in attrs; include if present.
rid = getattr(e, "request_id", "") or getattr(getattr(e, "response", None), "headers", {}).get("x-request-id", "")
sc = getattr(e, "status_code", None)
if rid:
detail = f"{detail} (request_id={rid})"
if isinstance(sc, int):
detail = f"{detail} (status={sc})"
_log(server, "error", f"AI exception after {took_ms}ms: {detail}")
_log(server, "debug", f"AI traceback:\n{tb}")
# Still write a debug file for the screenshot for later inspection.
ai_path = png_path.with_suffix(".ai.json")
debug_obj = {
"error": "ai_exception",
"detail": detail,
"took_ms": took_ms,
"model": str(model),
"traceback": tb if getattr(server, "log_level", "info") == "debug" else "",
}
try:
ai_path.write_text(json.dumps(debug_obj, indent=2, ensure_ascii=True) + "\n", encoding="utf-8")
except Exception:
pass
return _ai_error_payload(
error="ai_exception",
detail=detail,
model=str(model),
response_id="",
status="",
incomplete_reason="",
usage=None,
took_ms=took_ms,
ai_path=ai_path,
)
response_id = str(getattr(resp, "id", "") or "")
status = str(getattr(resp, "status", "") or "")
incomplete_reason = ""
try:
incomplete_details = getattr(resp, "incomplete_details", None)
if incomplete_details is not None:
incomplete_reason = str(getattr(incomplete_details, "reason", "") or "")
except Exception:
incomplete_reason = ""
usage = _maybe_dump_model(getattr(resp, "usage", None))
_log(
server,
"debug",
f"AI response meta: status={status or '?'} incomplete_reason={incomplete_reason or '(none)'} usage={_safe_json_dump(usage or {}, 800)} response_id={response_id}",
)
raw = ""
try:
raw = resp.output_text or ""
except Exception:
raw = ""
_log(server, "debug", f"AI output_text chars={len(raw)} response_id={response_id}")
parsed: object
parse_error = ""
parse_detail = ""
try:
parsed = json.loads(raw)
except Exception as e:
# Try a couple conservative extraction heuristics.
candidate = _extract_json_text(raw)
if candidate:
try:
parsed = json.loads(candidate)
except Exception as e2:
parse_error = "non_json_output"
parse_detail = f"{type(e2).__name__}: {e2}"
parsed = {"error": "non_json_output", "raw": raw}
else:
parse_error = "non_json_output"
parse_detail = f"{type(e).__name__}: {e}"
parsed = {"error": "non_json_output", "raw": raw}
if parse_error:
_log(server, "error", f"AI output parse failed: {parse_error} ({parse_detail}) response_id={response_id}")
if isinstance(parsed, dict) and "posts" in parsed:
parsed = _sanitize_ai_payload(parsed, page_url=page_url, page_title=page_title)
ai_path = png_path.with_suffix(".ai.json")
try:
# When generation fails, persist helpful metadata along with the raw text.
if parse_error and isinstance(parsed, dict) and "raw" in parsed:
parsed = {
**parsed,
"response_id": response_id,
"model": str(model),
"status": status,
"incomplete_reason": incomplete_reason,
"usage": usage or {},
}
ai_path.write_text(json.dumps(parsed, indent=2, ensure_ascii=True) + "\n", encoding="utf-8")
except Exception as e:
_log(server, "error", f"Failed writing AI file {ai_path}: {type(e).__name__}: {e}")
# If we didn't get schema-conformant output, treat it as an error so the popup doesn't show "unknown".
if not _is_valid_ai_json(parsed):
err = parse_error or (parsed.get("error", "") if isinstance(parsed, dict) else "") or "invalid_ai_output"
detail = parse_detail or "AI returned JSON that does not match the expected schema (missing posts[])."
if incomplete_reason:
err = incomplete_reason if incomplete_reason in ("max_output_tokens", "content_filter") else err
if incomplete_reason == "max_output_tokens":
detail = (
f"AI response was truncated (incomplete_reason=max_output_tokens). "
f"Try increasing --ai-max-output-tokens (currently {max_output_tokens}) or decreasing --ai-max-posts (currently {max_posts}). "
f"Original parse error: {parse_detail or '(none)'}"
)
else:
detail = f"AI response incomplete (incomplete_reason={incomplete_reason}). Parse error: {parse_detail or '(none)'}"
_log(server, "error", f"AI output invalid: error={err} response_id={response_id} ai_path={ai_path}")
return _ai_error_payload(
error=str(err),
detail=str(detail),
raw=raw,
model=str(model),
response_id=response_id,
status=status,
incomplete_reason=incomplete_reason,
usage=usage,
took_ms=took_ms,
ai_path=ai_path,
)
posts_count = len(parsed.get("posts", [])) if isinstance(parsed, dict) else 0
_log(server, "info", f"AI ok: posts={posts_count} took_ms={took_ms} response_id={response_id} ai_path={ai_path}")
return {
"ok": True,
"ai": parsed,
"ai_path": str(ai_path),
"took_ms": took_ms,
"response_id": response_id,
"model": str(model),
"status": status,
"incomplete_reason": incomplete_reason,
"usage": usage,
}
class Handler(BaseHTTPRequestHandler):
server_version = "LocalScreenshotBridge/0.1"
def log_message(self, fmt: str, *args): # noqa: N802
# Route default HTTP request logs through our logger (and respect --log-level).
try:
msg = fmt % args
except Exception:
msg = fmt
try:
ip = self.client_address[0] if getattr(self, "client_address", None) else "?"
except Exception:
ip = "?"
_log(self.server, "debug", f"HTTP {ip} {msg}") # type: ignore[arg-type]
def _send_json(self, status: int, payload: dict):
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
# Chrome extension fetch() to localhost will preflight; allow it.
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.end_headers()
self.wfile.write(body)
def do_GET(self): # noqa: N802
if self.path not in ("/", "/health"):
self._send_json(404, {"ok": False, "error": "not_found"})
return
self._send_json(
200,
{
"ok": True,
"service": "local_screenshot_bridge",
"out_dir": str(self.server.out_dir), # type: ignore[attr-defined]
"has_run_cmd": bool(getattr(self.server, "run_cmd", None)), # type: ignore[attr-defined]
"ai_enabled": bool(getattr(self.server, "ai_enabled", False)), # type: ignore[attr-defined]
},
)
def do_OPTIONS(self): # noqa: N802
self.send_response(204)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.end_headers()
def do_POST(self): # noqa: N802
if self.path != "/screenshot":
self._send_json(404, {"ok": False, "error": "not_found"})
return
try:
length = int(self.headers.get("Content-Length", "0"))
except ValueError:
self._send_json(400, {"ok": False, "error": "bad_content_length"})
return
raw = self.rfile.read(length)
try:
req = json.loads(raw.decode("utf-8"))
except Exception:
self._send_json(400, {"ok": False, "error": "bad_json"})
return
data_url = req.get("data_url") or ""
title = req.get("title") or ""
page_url = req.get("url") or ""
client_ts = req.get("ts") or ""
content = req.get("content", None)
extra_instructions = req.get("extra_instructions") or ""
extra_s = str(extra_instructions).strip()
if extra_s:
extra_s = _truncate(extra_s.replace("\r", " ").replace("\n", " "), 140)
extra_part = f" extra={extra_s}"
else:
extra_part = ""
_log(
self.server, # type: ignore[arg-type]
"info",
f"Capture received: title={_truncate(str(title), 80)} url={_truncate(str(page_url), 140)} content={'yes' if content is not None else 'no'}{extra_part}",
)
m = re.match(r"^data:image/png;base64,(.*)$", data_url)
if not m:
self._send_json(400, {"ok": False, "error": "expected_png_data_url"})
return
try:
png_bytes = base64.b64decode(m.group(1), validate=True)
except Exception:
self._send_json(400, {"ok": False, "error": "bad_base64"})
return
now = datetime.now(timezone.utc)
stamp = now.strftime("%Y%m%dT%H%M%SZ")
base = f"{stamp}-{_slug(title)}"
out_dir: Path = self.server.out_dir # type: ignore[attr-defined]
out_dir.mkdir(parents=True, exist_ok=True)
png_path = out_dir / f"{base}.png"
meta_path = out_dir / f"{base}.json"
content_path = out_dir / f"{base}.content.json"
try:
png_path.write_bytes(png_bytes)
# Save extracted page content separately to keep the meta file small/handy.
wrote_content = False
if content is not None:
try:
raw_content = json.dumps(content, ensure_ascii=True, indent=2) + "\n"
# Prevent pathological payloads from creating huge files.
if len(raw_content.encode("utf-8")) > 2_000_000:
content = {
"error": "content_too_large_truncated",
"note": "Original extracted content exceeded 2MB.",
}
raw_content = json.dumps(content, ensure_ascii=True, indent=2) + "\n"
content_path.write_text(raw_content, encoding="utf-8")
wrote_content = True
except Exception:
_log(self.server, "error", f"Failed writing content file {content_path}") # type: ignore[arg-type]
# Don't fail the whole request if content writing fails.
wrote_content = False
final_content_path = str(content_path) if wrote_content else None
meta_path.write_text(
json.dumps(
{
"title": title,
"url": page_url,
"client_ts": client_ts,
"saved_utc": now.isoformat(),
"png_path": str(png_path),
"content_path": final_content_path,
"extra_instructions": extra_instructions,
},
indent=2,
ensure_ascii=True,
)
+ "\n",
encoding="utf-8",
)
except Exception as e:
self._send_json(500, {"ok": False, "error": "write_failed", "detail": str(e)})
return
meta_obj = {
"title": title,
"url": page_url,
"client_ts": client_ts,
"saved_utc": now.isoformat(),
"png_path": str(png_path),
"content_path": final_content_path,
"extra_instructions": extra_instructions,
}
_log(
self.server, # type: ignore[arg-type]
"info",
f"Saved: png={png_path} meta={meta_path} content={final_content_path or '(none)'}",
)
run = getattr(self.server, "run_cmd", None) # type: ignore[attr-defined]
ran = None
if run:
try:
# Pass content_path as a 3rd arg when available. This keeps hooks compatible with older 2-arg scripts.
args = [str(png_path), str(meta_path)]
if final_content_path:
args.append(final_content_path)
proc = subprocess.run(
run + args,
cwd=str(self.server.project_root), # type: ignore[attr-defined]
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
ran = {
"cmd": run,
"exit_code": proc.returncode,
"stdout": proc.stdout[-4000:],
"stderr": proc.stderr[-4000:],
}
if proc.returncode != 0:
_log(self.server, "error", f"Hook failed: exit={proc.returncode} cmd={' '.join(run)}") # type: ignore[arg-type]
except Exception as e:
ran = {"cmd": run, "error": str(e)}
_log(self.server, "error", f"Hook exception: {type(e).__name__}: {e}") # type: ignore[arg-type]
ai_result = None
if getattr(self.server, "ai_enabled", False): # type: ignore[attr-defined]
try:
ai_result = _maybe_generate_ai(self.server, png_path, meta_obj, content)
except Exception as e:
detail = f"{type(e).__name__}: {e}"
_log(self.server, "error", f"AI exception (outer): {detail}") # type: ignore[arg-type]
_log(self.server, "debug", f"AI outer traceback:\n{traceback.format_exc(limit=8)}") # type: ignore[arg-type]
ai_result = {"ok": False, "error": "ai_exception", "detail": detail}
self._send_json(
200,
{
"ok": True,
"png_path": str(png_path),
"meta_path": str(meta_path),
"content_path": final_content_path,
"ran": ran,
"ai_result": ai_result,
},
)
def main(argv: list[str]) -> int:
p = argparse.ArgumentParser(description="Receive screenshots from a Chrome extension and save into this project.")
p.add_argument("--port", type=int, default=8765)
p.add_argument("--bind", default="127.0.0.1", help="Bind address (default: 127.0.0.1)")
p.add_argument("--out-dir", default="screenshots", help="Output directory relative to project root")
p.add_argument("--ai", action="store_true", help="Run OpenAI to generate reply suggestions and return them to the extension")
p.add_argument(
"--log-level",
default=os.getenv("AIEA_LOG_LEVEL", "info"),
choices=sorted(_LOG_LEVELS.keys()),
help="Logging verbosity: debug|info|error|quiet (env: AIEA_LOG_LEVEL)",
)
p.add_argument("--ai-model", default=os.getenv("AI_EA_MODEL", "gpt-5.2"))
p.add_argument("--ai-max-posts", type=int, default=int(os.getenv("AI_EA_MAX_POSTS", "12")))
p.add_argument("--ai-content-max-chars", type=int, default=int(os.getenv("AI_EA_CONTENT_MAX_CHARS", "120000")))
p.add_argument("--ai-image-detail", default=os.getenv("AI_EA_IMAGE_DETAIL", "auto"))
p.add_argument("--ai-max-output-tokens", type=int, default=int(os.getenv("AI_EA_MAX_OUTPUT_TOKENS", "1400")))
p.add_argument(
"--run",
nargs="+",
default=None,
help="Optional command to run after saving. Args appended: <png_path> <meta_path> [content_path].",
)
args = p.parse_args(argv)
project_root = Path(__file__).resolve().parents[1]
out_dir = (project_root / args.out_dir).resolve()
if args.ai:
_load_dotenv_if_present(project_root)
instructions_path = project_root / "AI_EA_INSTRUCTIONS.MD"
ai_instructions = instructions_path.read_text("utf-8") if instructions_path.exists() else ""
httpd = HTTPServer((args.bind, args.port), Handler)
httpd.project_root = project_root # type: ignore[attr-defined]
httpd.out_dir = out_dir # type: ignore[attr-defined]
httpd.run_cmd = args.run # type: ignore[attr-defined]
httpd.ai_enabled = bool(args.ai) # type: ignore[attr-defined]
httpd.log_level = args.log_level # type: ignore[attr-defined]
httpd.ai_model = args.ai_model # type: ignore[attr-defined]
httpd.ai_max_posts = args.ai_max_posts # type: ignore[attr-defined]
httpd.ai_content_max_chars = args.ai_content_max_chars # type: ignore[attr-defined]
httpd.ai_image_detail = args.ai_image_detail # type: ignore[attr-defined]
httpd.ai_max_output_tokens = args.ai_max_output_tokens # type: ignore[attr-defined]
httpd.ai_instructions = ai_instructions # type: ignore[attr-defined]
print(f"Listening on http://{args.bind}:{args.port}/screenshot", file=sys.stderr)
print(f"Saving screenshots to {out_dir}", file=sys.stderr)
print(f"Log level: {args.log_level}", file=sys.stderr)
if args.ai:
print(f"OpenAI enabled: model={args.ai_model} max_posts={args.ai_max_posts}", file=sys.stderr)
if args.run:
print(f"Will run: {' '.join(args.run)} <png_path> <meta_path> [content_path]", file=sys.stderr)
try:
httpd.serve_forever()
except KeyboardInterrupt:
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))