838 lines
32 KiB
Python
838 lines
32 KiB
Python
#!/usr/bin/env python3
|
||
import argparse
|
||
import base64
|
||
import json
|
||
import os
|
||
import re
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
import traceback
|
||
from datetime import datetime, timezone
|
||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
|
||
_LOG_LEVELS = {"debug": 10, "info": 20, "error": 40, "quiet": 100}
|
||
|
||
|
||
def _log(server, level: str, msg: str) -> None:
|
||
"""
|
||
Minimal structured-ish logging to stderr.
|
||
|
||
server.log_level: debug|info|error|quiet (default: info)
|
||
"""
|
||
try:
|
||
configured = getattr(server, "log_level", "info") if server is not None else "info"
|
||
configured_rank = _LOG_LEVELS.get(str(configured).lower(), _LOG_LEVELS["info"])
|
||
level_rank = _LOG_LEVELS.get(str(level).lower(), _LOG_LEVELS["info"])
|
||
if level_rank < configured_rank:
|
||
return
|
||
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
print(f"[{ts}] {level.upper()}: {msg}", file=sys.stderr)
|
||
except Exception:
|
||
# Never let logging break the handler.
|
||
return
|
||
|
||
|
||
def _slug(s: str, max_len: int = 80) -> str:
|
||
s = (s or "").strip().lower()
|
||
s = re.sub(r"[^a-z0-9]+", "-", s)
|
||
s = s.strip("-")
|
||
if not s:
|
||
return "screenshot"
|
||
return s[:max_len]
|
||
|
||
def _load_dotenv_if_present(project_root: Path) -> None:
|
||
"""
|
||
Minimal .env loader:
|
||
- supports KEY=VALUE
|
||
- ignores blank lines and lines starting with '#'
|
||
- does not support quotes/escapes
|
||
"""
|
||
if os.getenv("OPENAI_API_KEY"):
|
||
return
|
||
|
||
p = project_root / ".env"
|
||
if not p.exists():
|
||
return
|
||
|
||
try:
|
||
for line in p.read_text("utf-8").splitlines():
|
||
s = line.strip()
|
||
if not s or s.startswith("#") or "=" not in s:
|
||
continue
|
||
k, v = s.split("=", 1)
|
||
k = k.strip()
|
||
v = v.strip()
|
||
if k and v and k not in os.environ:
|
||
os.environ[k] = v
|
||
except Exception:
|
||
return
|
||
|
||
|
||
def _truncate(s: str, max_chars: int) -> str:
|
||
if len(s) <= max_chars:
|
||
return s
|
||
return s[: max_chars - 1] + "\u2026"
|
||
|
||
|
||
def _safe_json_dump(obj: object, max_chars: int) -> str:
|
||
s = json.dumps(obj, ensure_ascii=True, separators=(",", ":"), sort_keys=False)
|
||
return _truncate(s, max_chars)
|
||
|
||
|
||
def _maybe_dump_model(obj: object) -> Optional[dict]:
|
||
"""
|
||
Best-effort conversion of SDK model objects (pydantic-ish) to plain dicts.
|
||
"""
|
||
if obj is None:
|
||
return None
|
||
for attr in ("model_dump", "dict", "to_dict"):
|
||
fn = getattr(obj, attr, None)
|
||
if callable(fn):
|
||
try:
|
||
out = fn()
|
||
return out if isinstance(out, dict) else None
|
||
except Exception:
|
||
continue
|
||
return None
|
||
|
||
|
||
def _extract_json_text(raw: str) -> Optional[str]:
|
||
"""
|
||
Best-effort extraction of a JSON object from a text response.
|
||
|
||
Some providers occasionally wrap JSON in code fences or add preamble text.
|
||
We keep this conservative; if it can't be extracted, we return None.
|
||
"""
|
||
if not raw:
|
||
return None
|
||
|
||
# ```json ... ``` or any fenced block: ```...```
|
||
# We capture the whole fenced body (not just {...}) so nested braces don't break extraction.
|
||
m = re.search(r"```(?:[a-z0-9_+-]+)?\s*(.*?)\s*```", raw, flags=re.DOTALL | re.IGNORECASE)
|
||
if m:
|
||
return m.group(1).strip()
|
||
|
||
# First {...last} span
|
||
start = raw.find("{")
|
||
end = raw.rfind("}")
|
||
if start != -1 and end != -1 and end > start:
|
||
return raw[start : end + 1].strip()
|
||
|
||
return None
|
||
|
||
|
||
def _ai_error_payload(
|
||
*,
|
||
error: str,
|
||
detail: str = "",
|
||
raw: str = "",
|
||
model: str = "",
|
||
response_id: str = "",
|
||
status: str = "",
|
||
incomplete_reason: str = "",
|
||
usage: Optional[dict] = None,
|
||
took_ms: Optional[int] = None,
|
||
ai_path: Optional[Path] = None,
|
||
) -> dict:
|
||
payload: dict = {"ok": False, "error": error}
|
||
if detail:
|
||
payload["detail"] = detail
|
||
if raw:
|
||
payload["raw_preview"] = _truncate(raw, 2000)
|
||
if model:
|
||
payload["model"] = model
|
||
if response_id:
|
||
payload["response_id"] = response_id
|
||
if status:
|
||
payload["status"] = status
|
||
if incomplete_reason:
|
||
payload["incomplete_reason"] = incomplete_reason
|
||
if usage:
|
||
payload["usage"] = usage
|
||
if isinstance(took_ms, int):
|
||
payload["took_ms"] = took_ms
|
||
if ai_path is not None:
|
||
payload["ai_path"] = str(ai_path)
|
||
return payload
|
||
|
||
|
||
def _is_valid_ai_json(parsed: object) -> bool:
|
||
if not isinstance(parsed, dict):
|
||
return False
|
||
posts = parsed.get("posts", None)
|
||
return isinstance(posts, list)
|
||
|
||
|
||
def _ea_sanitize_text(text: object) -> str:
|
||
"""
|
||
Port of fl_geo_sanitize_text(), plus lowercase output (no capitals).
|
||
|
||
Notes:
|
||
- stays ASCII in-code by using \\u escapes for unicode literals.
|
||
- preserves newlines (normalizes excess blank lines).
|
||
"""
|
||
if text is None:
|
||
return ""
|
||
s = str(text)
|
||
if s == "":
|
||
return ""
|
||
|
||
# 1) Quick ASCII-level normalizations
|
||
s = s.replace("\r", "").replace("\t", " ")
|
||
|
||
# 2) Specific single-char replacements
|
||
replacements = {
|
||
"\u201c": '"', # “
|
||
"\u201d": '"', # ”
|
||
"\u201e": '"', # „
|
||
"\u201f": '"', # ‟
|
||
"\u2018": "'", # ‘
|
||
"\u2019": "'", # ’
|
||
"\u201a": "'", # ‚
|
||
"\u201b": "'", # ‛
|
||
"\u2014": "-", # —
|
||
"\u2013": "-", # –
|
||
"\u2212": "-", # −
|
||
"\u2022": "- ", # •
|
||
"\u2026": "...", # …
|
||
}
|
||
for k, v in replacements.items():
|
||
s = s.replace(k, v)
|
||
|
||
# 3) Regex-based replacements/removals
|
||
s = re.sub(
|
||
r"[\u00A0\u2000-\u200A\u202F\u205F\u3000\u1680\u180E\u2800\u3164\uFFA0]",
|
||
" ",
|
||
s,
|
||
)
|
||
s = s.replace("\u2028", "\n") # LS
|
||
s = s.replace("\u2029", "\n\n") # PS
|
||
|
||
s = re.sub(
|
||
r"[\u200B\u200C\u200D\u200E\u200F\u202A-\u202E\u2060\u2061\u2066-\u2069\u206A-\u206F\u00AD\u034F\u115F\u1160\u17B4\u17B5\u180B-\u180D\uFE00-\uFE0F\uFEFF\u001C\u000C]",
|
||
"",
|
||
s,
|
||
)
|
||
|
||
# Invisible math
|
||
s = s.replace("\u2062", "x").replace("\u2063", ",").replace("\u2064", "+")
|
||
|
||
# 4) Collapse excessive spaces
|
||
s = re.sub(r"[ ]{2,}", " ", s)
|
||
|
||
# 5) Normalize multiple blank lines to at most two
|
||
s = re.sub(r"\n{3,}", "\n\n", s)
|
||
|
||
# Remove capitals: lowercase all text.
|
||
return s.lower()
|
||
|
||
|
||
def _sanitize_ai_payload(ai: dict, page_url: str, page_title: str) -> dict:
|
||
# Strict schema requires these keys; we prefer ground truth from meta.
|
||
out = dict(ai) if isinstance(ai, dict) else {}
|
||
out["page_url"] = page_url
|
||
out["page_title"] = page_title
|
||
out["notes"] = _ea_sanitize_text(out.get("notes", ""))
|
||
|
||
posts = out.get("posts", [])
|
||
if not isinstance(posts, list):
|
||
posts = []
|
||
|
||
cleaned_posts = []
|
||
for i, p in enumerate(posts):
|
||
if not isinstance(p, dict):
|
||
continue
|
||
cleaned_posts.append(
|
||
{
|
||
"index": int(p.get("index", i)),
|
||
"post_text": _ea_sanitize_text(p.get("post_text", "")),
|
||
"improved_short": _ea_sanitize_text(p.get("improved_short", "")),
|
||
"improved_medium": _ea_sanitize_text(p.get("improved_medium", "")),
|
||
"critical_short": _ea_sanitize_text(p.get("critical_short", "")),
|
||
"critical_medium": _ea_sanitize_text(p.get("critical_medium", "")),
|
||
"suggested_short": _ea_sanitize_text(p.get("suggested_short", "")),
|
||
"suggested_medium": _ea_sanitize_text(p.get("suggested_medium", "")),
|
||
}
|
||
)
|
||
out["posts"] = cleaned_posts
|
||
return out
|
||
|
||
|
||
def _response_schema(max_posts: int) -> dict:
|
||
return {
|
||
"type": "object",
|
||
"additionalProperties": False,
|
||
"properties": {
|
||
"page_url": {"type": "string"},
|
||
"page_title": {"type": "string"},
|
||
"posts": {
|
||
"type": "array",
|
||
"maxItems": max_posts,
|
||
"items": {
|
||
"type": "object",
|
||
"additionalProperties": False,
|
||
"properties": {
|
||
"index": {"type": "integer"},
|
||
"post_text": {"type": "string"},
|
||
"improved_short": {"type": "string"},
|
||
"improved_medium": {"type": "string"},
|
||
"critical_short": {"type": "string"},
|
||
"critical_medium": {"type": "string"},
|
||
"suggested_short": {"type": "string"},
|
||
"suggested_medium": {"type": "string"},
|
||
},
|
||
"required": [
|
||
"index",
|
||
"post_text",
|
||
"improved_short",
|
||
"improved_medium",
|
||
"critical_short",
|
||
"critical_medium",
|
||
"suggested_short",
|
||
"suggested_medium",
|
||
],
|
||
},
|
||
},
|
||
"notes": {"type": "string"},
|
||
},
|
||
# OpenAI strict json_schema currently expects all top-level properties to be required.
|
||
# If you don't have a value, return "" / [].
|
||
"required": ["page_url", "page_title", "posts", "notes"],
|
||
}
|
||
|
||
|
||
def _maybe_generate_ai(server, png_path: Path, meta: dict, content: object) -> dict:
|
||
"""
|
||
Returns:
|
||
{ "ok": True, "ai": <obj>, "ai_path": <str>, "took_ms": <int> }
|
||
or { "ok": False, "error": <str>, "detail": <str?> }
|
||
"""
|
||
if not getattr(server, "ai_enabled", False):
|
||
return {"ok": False, "error": "ai_disabled"}
|
||
|
||
project_root: Path = server.project_root # type: ignore[attr-defined]
|
||
_load_dotenv_if_present(project_root)
|
||
|
||
if not os.getenv("OPENAI_API_KEY"):
|
||
_log(server, "error", "AI disabled: missing OPENAI_API_KEY")
|
||
return {"ok": False, "error": "missing_openai_api_key"}
|
||
|
||
try:
|
||
from openai import OpenAI # type: ignore
|
||
except Exception as e:
|
||
_log(server, "error", f"AI disabled: missing openai sdk ({type(e).__name__}: {e})")
|
||
return {"ok": False, "error": "missing_openai_sdk", "detail": str(e)}
|
||
|
||
instructions_text = getattr(server, "ai_instructions", "")
|
||
model = getattr(server, "ai_model", "gpt-5.2")
|
||
max_posts = int(getattr(server, "ai_max_posts", 12))
|
||
content_max_chars = int(getattr(server, "ai_content_max_chars", 120_000))
|
||
image_detail = getattr(server, "ai_image_detail", "auto")
|
||
max_output_tokens = int(getattr(server, "ai_max_output_tokens", 1400))
|
||
|
||
page_url = str(meta.get("url") or "")
|
||
page_title = str(meta.get("title") or "")
|
||
extra_instructions = str(meta.get("extra_instructions") or "").strip()
|
||
|
||
_log(
|
||
server,
|
||
"debug",
|
||
f"AI request: model={model} max_posts={max_posts} image_detail={image_detail} max_output_tokens={max_output_tokens} url={_truncate(page_url, 140)}",
|
||
)
|
||
|
||
user_payload = {
|
||
"page_url": page_url,
|
||
"page_title": page_title,
|
||
"meta": meta,
|
||
"content": content,
|
||
"task": {
|
||
"goal": "Draft replies to each distinct post currently visible on the page.",
|
||
"definition_of_post": "A single feed item / post / story / comment root visible on-screen right now. If it's a single-article page, treat the main article as one post.",
|
||
"output_requirements": {
|
||
# "ironic": "Lightly ironic, laughing at us humans (not cruel).",
|
||
"improved": "Proofread whatever is given in extra_instructions [EXTRA_INSTRUCTIONS]. Proofreading-style improvements, preserving the original words as much as possible. Improving in medium version.",
|
||
"critical": "Bold/critical: politely questions the premise or assumptions.",
|
||
"suggested": "Best style you think fits (helpful/witty/clarifying/etc).",
|
||
"short": "direct, useful, no fluff. if X(twitter): 1-2 sentences max, if Reddit: 3-6 sentences max.",
|
||
"medium": "more context, still concise. if X(twitter): 3-6 sentences max, if Reddit: 6-12 sentences max.",
|
||
"style": "Follow the system instructions for voice/tone. Apply EXTRA_INSTRUCTIONS to all responses. If unclear what the post says, be honest and ask a question instead of guessing.",
|
||
},
|
||
},
|
||
}
|
||
|
||
prompt_text = (
|
||
"You will receive (1) a screenshot of the current viewport and (2) extracted visible page content.\n"
|
||
"Identify each distinct post visible on the page and draft SIX reply options per post:\n"
|
||
"- improved_short, improved_medium\n"
|
||
"- critical_short, critical_medium\n"
|
||
"- suggested_short, suggested_medium\n"
|
||
"All six must follow the system instructions and EXTRA_INSTRUCTIONS.\n"
|
||
"Do not invent facts not present in the screenshot/content.\n"
|
||
"Return JSON matching the provided schema. Include all top-level keys: page_url, page_title, posts, notes.\n"
|
||
"If a value is unknown, use an empty string.\n\n"
|
||
+ (f"EXTRA_INSTRUCTIONS={extra_instructions}\n\n" if extra_instructions else "")
|
||
+ f"PAGE_DATA_JSON={_safe_json_dump(user_payload, content_max_chars)}"
|
||
)
|
||
|
||
b64 = base64.b64encode(png_path.read_bytes()).decode("ascii")
|
||
image_data_url = f"data:image/png;base64,{b64}"
|
||
|
||
t0 = time.monotonic()
|
||
client = OpenAI()
|
||
resp = None
|
||
took_ms = None
|
||
try:
|
||
resp = client.responses.create(
|
||
model=model,
|
||
instructions=instructions_text,
|
||
input=[
|
||
{
|
||
"role": "user",
|
||
"content": [
|
||
{"type": "input_text", "text": prompt_text},
|
||
{"type": "input_image", "image_url": image_data_url, "detail": image_detail},
|
||
],
|
||
}
|
||
],
|
||
text={
|
||
"format": {
|
||
"type": "json_schema",
|
||
"name": "ea_post_responses",
|
||
"description": "Draft short and medium replies for each visible post on the page.",
|
||
"schema": _response_schema(max_posts),
|
||
"strict": True,
|
||
},
|
||
"verbosity": "low",
|
||
},
|
||
max_output_tokens=max_output_tokens,
|
||
)
|
||
took_ms = int((time.monotonic() - t0) * 1000)
|
||
except Exception as e:
|
||
took_ms = int((time.monotonic() - t0) * 1000)
|
||
tb = traceback.format_exc(limit=8)
|
||
detail = f"{type(e).__name__}: {e}"
|
||
# Some OpenAI exceptions have request id / status in attrs; include if present.
|
||
rid = getattr(e, "request_id", "") or getattr(getattr(e, "response", None), "headers", {}).get("x-request-id", "")
|
||
sc = getattr(e, "status_code", None)
|
||
if rid:
|
||
detail = f"{detail} (request_id={rid})"
|
||
if isinstance(sc, int):
|
||
detail = f"{detail} (status={sc})"
|
||
_log(server, "error", f"AI exception after {took_ms}ms: {detail}")
|
||
_log(server, "debug", f"AI traceback:\n{tb}")
|
||
# Still write a debug file for the screenshot for later inspection.
|
||
ai_path = png_path.with_suffix(".ai.json")
|
||
debug_obj = {
|
||
"error": "ai_exception",
|
||
"detail": detail,
|
||
"took_ms": took_ms,
|
||
"model": str(model),
|
||
"traceback": tb if getattr(server, "log_level", "info") == "debug" else "",
|
||
}
|
||
try:
|
||
ai_path.write_text(json.dumps(debug_obj, indent=2, ensure_ascii=True) + "\n", encoding="utf-8")
|
||
except Exception:
|
||
pass
|
||
return _ai_error_payload(
|
||
error="ai_exception",
|
||
detail=detail,
|
||
model=str(model),
|
||
response_id="",
|
||
status="",
|
||
incomplete_reason="",
|
||
usage=None,
|
||
took_ms=took_ms,
|
||
ai_path=ai_path,
|
||
)
|
||
|
||
response_id = str(getattr(resp, "id", "") or "")
|
||
status = str(getattr(resp, "status", "") or "")
|
||
incomplete_reason = ""
|
||
try:
|
||
incomplete_details = getattr(resp, "incomplete_details", None)
|
||
if incomplete_details is not None:
|
||
incomplete_reason = str(getattr(incomplete_details, "reason", "") or "")
|
||
except Exception:
|
||
incomplete_reason = ""
|
||
usage = _maybe_dump_model(getattr(resp, "usage", None))
|
||
|
||
_log(
|
||
server,
|
||
"debug",
|
||
f"AI response meta: status={status or '?'} incomplete_reason={incomplete_reason or '(none)'} usage={_safe_json_dump(usage or {}, 800)} response_id={response_id}",
|
||
)
|
||
|
||
raw = ""
|
||
try:
|
||
raw = resp.output_text or ""
|
||
except Exception:
|
||
raw = ""
|
||
_log(server, "debug", f"AI output_text chars={len(raw)} response_id={response_id}")
|
||
|
||
parsed: object
|
||
parse_error = ""
|
||
parse_detail = ""
|
||
try:
|
||
parsed = json.loads(raw)
|
||
except Exception as e:
|
||
# Try a couple conservative extraction heuristics.
|
||
candidate = _extract_json_text(raw)
|
||
if candidate:
|
||
try:
|
||
parsed = json.loads(candidate)
|
||
except Exception as e2:
|
||
parse_error = "non_json_output"
|
||
parse_detail = f"{type(e2).__name__}: {e2}"
|
||
parsed = {"error": "non_json_output", "raw": raw}
|
||
else:
|
||
parse_error = "non_json_output"
|
||
parse_detail = f"{type(e).__name__}: {e}"
|
||
parsed = {"error": "non_json_output", "raw": raw}
|
||
|
||
if parse_error:
|
||
_log(server, "error", f"AI output parse failed: {parse_error} ({parse_detail}) response_id={response_id}")
|
||
|
||
if isinstance(parsed, dict) and "posts" in parsed:
|
||
parsed = _sanitize_ai_payload(parsed, page_url=page_url, page_title=page_title)
|
||
|
||
ai_path = png_path.with_suffix(".ai.json")
|
||
try:
|
||
# When generation fails, persist helpful metadata along with the raw text.
|
||
if parse_error and isinstance(parsed, dict) and "raw" in parsed:
|
||
parsed = {
|
||
**parsed,
|
||
"response_id": response_id,
|
||
"model": str(model),
|
||
"status": status,
|
||
"incomplete_reason": incomplete_reason,
|
||
"usage": usage or {},
|
||
}
|
||
ai_path.write_text(json.dumps(parsed, indent=2, ensure_ascii=True) + "\n", encoding="utf-8")
|
||
except Exception as e:
|
||
_log(server, "error", f"Failed writing AI file {ai_path}: {type(e).__name__}: {e}")
|
||
|
||
# If we didn't get schema-conformant output, treat it as an error so the popup doesn't show "unknown".
|
||
if not _is_valid_ai_json(parsed):
|
||
err = parse_error or (parsed.get("error", "") if isinstance(parsed, dict) else "") or "invalid_ai_output"
|
||
detail = parse_detail or "AI returned JSON that does not match the expected schema (missing posts[])."
|
||
if incomplete_reason:
|
||
err = incomplete_reason if incomplete_reason in ("max_output_tokens", "content_filter") else err
|
||
if incomplete_reason == "max_output_tokens":
|
||
detail = (
|
||
f"AI response was truncated (incomplete_reason=max_output_tokens). "
|
||
f"Try increasing --ai-max-output-tokens (currently {max_output_tokens}) or decreasing --ai-max-posts (currently {max_posts}). "
|
||
f"Original parse error: {parse_detail or '(none)'}"
|
||
)
|
||
else:
|
||
detail = f"AI response incomplete (incomplete_reason={incomplete_reason}). Parse error: {parse_detail or '(none)'}"
|
||
_log(server, "error", f"AI output invalid: error={err} response_id={response_id} ai_path={ai_path}")
|
||
return _ai_error_payload(
|
||
error=str(err),
|
||
detail=str(detail),
|
||
raw=raw,
|
||
model=str(model),
|
||
response_id=response_id,
|
||
status=status,
|
||
incomplete_reason=incomplete_reason,
|
||
usage=usage,
|
||
took_ms=took_ms,
|
||
ai_path=ai_path,
|
||
)
|
||
|
||
posts_count = len(parsed.get("posts", [])) if isinstance(parsed, dict) else 0
|
||
_log(server, "info", f"AI ok: posts={posts_count} took_ms={took_ms} response_id={response_id} ai_path={ai_path}")
|
||
return {
|
||
"ok": True,
|
||
"ai": parsed,
|
||
"ai_path": str(ai_path),
|
||
"took_ms": took_ms,
|
||
"response_id": response_id,
|
||
"model": str(model),
|
||
"status": status,
|
||
"incomplete_reason": incomplete_reason,
|
||
"usage": usage,
|
||
}
|
||
|
||
|
||
class Handler(BaseHTTPRequestHandler):
|
||
server_version = "LocalScreenshotBridge/0.1"
|
||
|
||
def log_message(self, fmt: str, *args): # noqa: N802
|
||
# Route default HTTP request logs through our logger (and respect --log-level).
|
||
try:
|
||
msg = fmt % args
|
||
except Exception:
|
||
msg = fmt
|
||
try:
|
||
ip = self.client_address[0] if getattr(self, "client_address", None) else "?"
|
||
except Exception:
|
||
ip = "?"
|
||
_log(self.server, "debug", f"HTTP {ip} {msg}") # type: ignore[arg-type]
|
||
|
||
def _send_json(self, status: int, payload: dict):
|
||
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
|
||
self.send_response(status)
|
||
self.send_header("Content-Type", "application/json; charset=utf-8")
|
||
self.send_header("Content-Length", str(len(body)))
|
||
# Chrome extension fetch() to localhost will preflight; allow it.
|
||
self.send_header("Access-Control-Allow-Origin", "*")
|
||
self.send_header("Access-Control-Allow-Methods", "POST, OPTIONS")
|
||
self.send_header("Access-Control-Allow-Headers", "Content-Type")
|
||
self.end_headers()
|
||
self.wfile.write(body)
|
||
|
||
def do_GET(self): # noqa: N802
|
||
if self.path not in ("/", "/health"):
|
||
self._send_json(404, {"ok": False, "error": "not_found"})
|
||
return
|
||
self._send_json(
|
||
200,
|
||
{
|
||
"ok": True,
|
||
"service": "local_screenshot_bridge",
|
||
"out_dir": str(self.server.out_dir), # type: ignore[attr-defined]
|
||
"has_run_cmd": bool(getattr(self.server, "run_cmd", None)), # type: ignore[attr-defined]
|
||
"ai_enabled": bool(getattr(self.server, "ai_enabled", False)), # type: ignore[attr-defined]
|
||
},
|
||
)
|
||
|
||
def do_OPTIONS(self): # noqa: N802
|
||
self.send_response(204)
|
||
self.send_header("Access-Control-Allow-Origin", "*")
|
||
self.send_header("Access-Control-Allow-Methods", "POST, OPTIONS")
|
||
self.send_header("Access-Control-Allow-Headers", "Content-Type")
|
||
self.end_headers()
|
||
|
||
def do_POST(self): # noqa: N802
|
||
if self.path != "/screenshot":
|
||
self._send_json(404, {"ok": False, "error": "not_found"})
|
||
return
|
||
|
||
try:
|
||
length = int(self.headers.get("Content-Length", "0"))
|
||
except ValueError:
|
||
self._send_json(400, {"ok": False, "error": "bad_content_length"})
|
||
return
|
||
|
||
raw = self.rfile.read(length)
|
||
try:
|
||
req = json.loads(raw.decode("utf-8"))
|
||
except Exception:
|
||
self._send_json(400, {"ok": False, "error": "bad_json"})
|
||
return
|
||
|
||
data_url = req.get("data_url") or ""
|
||
title = req.get("title") or ""
|
||
page_url = req.get("url") or ""
|
||
client_ts = req.get("ts") or ""
|
||
content = req.get("content", None)
|
||
extra_instructions = req.get("extra_instructions") or ""
|
||
|
||
extra_s = str(extra_instructions).strip()
|
||
if extra_s:
|
||
extra_s = _truncate(extra_s.replace("\r", " ").replace("\n", " "), 140)
|
||
extra_part = f" extra={extra_s}"
|
||
else:
|
||
extra_part = ""
|
||
_log(
|
||
self.server, # type: ignore[arg-type]
|
||
"info",
|
||
f"Capture received: title={_truncate(str(title), 80)} url={_truncate(str(page_url), 140)} content={'yes' if content is not None else 'no'}{extra_part}",
|
||
)
|
||
|
||
m = re.match(r"^data:image/png;base64,(.*)$", data_url)
|
||
if not m:
|
||
self._send_json(400, {"ok": False, "error": "expected_png_data_url"})
|
||
return
|
||
|
||
try:
|
||
png_bytes = base64.b64decode(m.group(1), validate=True)
|
||
except Exception:
|
||
self._send_json(400, {"ok": False, "error": "bad_base64"})
|
||
return
|
||
|
||
now = datetime.now(timezone.utc)
|
||
stamp = now.strftime("%Y%m%dT%H%M%SZ")
|
||
base = f"{stamp}-{_slug(title)}"
|
||
|
||
out_dir: Path = self.server.out_dir # type: ignore[attr-defined]
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
png_path = out_dir / f"{base}.png"
|
||
meta_path = out_dir / f"{base}.json"
|
||
content_path = out_dir / f"{base}.content.json"
|
||
|
||
try:
|
||
png_path.write_bytes(png_bytes)
|
||
|
||
# Save extracted page content separately to keep the meta file small/handy.
|
||
wrote_content = False
|
||
if content is not None:
|
||
try:
|
||
raw_content = json.dumps(content, ensure_ascii=True, indent=2) + "\n"
|
||
# Prevent pathological payloads from creating huge files.
|
||
if len(raw_content.encode("utf-8")) > 2_000_000:
|
||
content = {
|
||
"error": "content_too_large_truncated",
|
||
"note": "Original extracted content exceeded 2MB.",
|
||
}
|
||
raw_content = json.dumps(content, ensure_ascii=True, indent=2) + "\n"
|
||
content_path.write_text(raw_content, encoding="utf-8")
|
||
wrote_content = True
|
||
except Exception:
|
||
_log(self.server, "error", f"Failed writing content file {content_path}") # type: ignore[arg-type]
|
||
# Don't fail the whole request if content writing fails.
|
||
wrote_content = False
|
||
|
||
final_content_path = str(content_path) if wrote_content else None
|
||
|
||
meta_path.write_text(
|
||
json.dumps(
|
||
{
|
||
"title": title,
|
||
"url": page_url,
|
||
"client_ts": client_ts,
|
||
"saved_utc": now.isoformat(),
|
||
"png_path": str(png_path),
|
||
"content_path": final_content_path,
|
||
"extra_instructions": extra_instructions,
|
||
},
|
||
indent=2,
|
||
ensure_ascii=True,
|
||
)
|
||
+ "\n",
|
||
encoding="utf-8",
|
||
)
|
||
except Exception as e:
|
||
self._send_json(500, {"ok": False, "error": "write_failed", "detail": str(e)})
|
||
return
|
||
|
||
meta_obj = {
|
||
"title": title,
|
||
"url": page_url,
|
||
"client_ts": client_ts,
|
||
"saved_utc": now.isoformat(),
|
||
"png_path": str(png_path),
|
||
"content_path": final_content_path,
|
||
"extra_instructions": extra_instructions,
|
||
}
|
||
_log(
|
||
self.server, # type: ignore[arg-type]
|
||
"info",
|
||
f"Saved: png={png_path} meta={meta_path} content={final_content_path or '(none)'}",
|
||
)
|
||
|
||
run = getattr(self.server, "run_cmd", None) # type: ignore[attr-defined]
|
||
ran = None
|
||
if run:
|
||
try:
|
||
# Pass content_path as a 3rd arg when available. This keeps hooks compatible with older 2-arg scripts.
|
||
args = [str(png_path), str(meta_path)]
|
||
if final_content_path:
|
||
args.append(final_content_path)
|
||
proc = subprocess.run(
|
||
run + args,
|
||
cwd=str(self.server.project_root), # type: ignore[attr-defined]
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True,
|
||
)
|
||
ran = {
|
||
"cmd": run,
|
||
"exit_code": proc.returncode,
|
||
"stdout": proc.stdout[-4000:],
|
||
"stderr": proc.stderr[-4000:],
|
||
}
|
||
if proc.returncode != 0:
|
||
_log(self.server, "error", f"Hook failed: exit={proc.returncode} cmd={' '.join(run)}") # type: ignore[arg-type]
|
||
except Exception as e:
|
||
ran = {"cmd": run, "error": str(e)}
|
||
_log(self.server, "error", f"Hook exception: {type(e).__name__}: {e}") # type: ignore[arg-type]
|
||
|
||
ai_result = None
|
||
if getattr(self.server, "ai_enabled", False): # type: ignore[attr-defined]
|
||
try:
|
||
ai_result = _maybe_generate_ai(self.server, png_path, meta_obj, content)
|
||
except Exception as e:
|
||
detail = f"{type(e).__name__}: {e}"
|
||
_log(self.server, "error", f"AI exception (outer): {detail}") # type: ignore[arg-type]
|
||
_log(self.server, "debug", f"AI outer traceback:\n{traceback.format_exc(limit=8)}") # type: ignore[arg-type]
|
||
ai_result = {"ok": False, "error": "ai_exception", "detail": detail}
|
||
|
||
self._send_json(
|
||
200,
|
||
{
|
||
"ok": True,
|
||
"png_path": str(png_path),
|
||
"meta_path": str(meta_path),
|
||
"content_path": final_content_path,
|
||
"ran": ran,
|
||
"ai_result": ai_result,
|
||
},
|
||
)
|
||
|
||
|
||
def main(argv: list[str]) -> int:
|
||
p = argparse.ArgumentParser(description="Receive screenshots from a Chrome extension and save into this project.")
|
||
p.add_argument("--port", type=int, default=8765)
|
||
p.add_argument("--bind", default="127.0.0.1", help="Bind address (default: 127.0.0.1)")
|
||
p.add_argument("--out-dir", default="screenshots", help="Output directory relative to project root")
|
||
p.add_argument("--ai", action="store_true", help="Run OpenAI to generate reply suggestions and return them to the extension")
|
||
p.add_argument(
|
||
"--log-level",
|
||
default=os.getenv("AIEA_LOG_LEVEL", "info"),
|
||
choices=sorted(_LOG_LEVELS.keys()),
|
||
help="Logging verbosity: debug|info|error|quiet (env: AIEA_LOG_LEVEL)",
|
||
)
|
||
p.add_argument("--ai-model", default=os.getenv("AI_EA_MODEL", "gpt-5.2"))
|
||
p.add_argument("--ai-max-posts", type=int, default=int(os.getenv("AI_EA_MAX_POSTS", "12")))
|
||
p.add_argument("--ai-content-max-chars", type=int, default=int(os.getenv("AI_EA_CONTENT_MAX_CHARS", "120000")))
|
||
p.add_argument("--ai-image-detail", default=os.getenv("AI_EA_IMAGE_DETAIL", "auto"))
|
||
p.add_argument("--ai-max-output-tokens", type=int, default=int(os.getenv("AI_EA_MAX_OUTPUT_TOKENS", "1400")))
|
||
p.add_argument(
|
||
"--run",
|
||
nargs="+",
|
||
default=None,
|
||
help="Optional command to run after saving. Args appended: <png_path> <meta_path> [content_path].",
|
||
)
|
||
args = p.parse_args(argv)
|
||
|
||
project_root = Path(__file__).resolve().parents[1]
|
||
out_dir = (project_root / args.out_dir).resolve()
|
||
if args.ai:
|
||
_load_dotenv_if_present(project_root)
|
||
instructions_path = project_root / "AI_EA_INSTRUCTIONS.MD"
|
||
ai_instructions = instructions_path.read_text("utf-8") if instructions_path.exists() else ""
|
||
|
||
httpd = HTTPServer((args.bind, args.port), Handler)
|
||
httpd.project_root = project_root # type: ignore[attr-defined]
|
||
httpd.out_dir = out_dir # type: ignore[attr-defined]
|
||
httpd.run_cmd = args.run # type: ignore[attr-defined]
|
||
httpd.ai_enabled = bool(args.ai) # type: ignore[attr-defined]
|
||
httpd.log_level = args.log_level # type: ignore[attr-defined]
|
||
httpd.ai_model = args.ai_model # type: ignore[attr-defined]
|
||
httpd.ai_max_posts = args.ai_max_posts # type: ignore[attr-defined]
|
||
httpd.ai_content_max_chars = args.ai_content_max_chars # type: ignore[attr-defined]
|
||
httpd.ai_image_detail = args.ai_image_detail # type: ignore[attr-defined]
|
||
httpd.ai_max_output_tokens = args.ai_max_output_tokens # type: ignore[attr-defined]
|
||
httpd.ai_instructions = ai_instructions # type: ignore[attr-defined]
|
||
|
||
print(f"Listening on http://{args.bind}:{args.port}/screenshot", file=sys.stderr)
|
||
print(f"Saving screenshots to {out_dir}", file=sys.stderr)
|
||
print(f"Log level: {args.log_level}", file=sys.stderr)
|
||
if args.ai:
|
||
print(f"OpenAI enabled: model={args.ai_model} max_posts={args.ai_max_posts}", file=sys.stderr)
|
||
if args.run:
|
||
print(f"Will run: {' '.join(args.run)} <png_path> <meta_path> [content_path]", file=sys.stderr)
|
||
try:
|
||
httpd.serve_forever()
|
||
except KeyboardInterrupt:
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main(sys.argv[1:]))
|