35606-vm/ai/local_ai_api.py
2025-11-10 03:28:25 +00:00

283 lines
9.2 KiB
Python

"""
LocalAIApi — lightweight Python client for the Flatlogic AI proxy.
Usage (inside the Django workspace):
from ai.local_ai_api import LocalAIApi
response = LocalAIApi.create_response({
"input": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Summarise this text in two sentences."},
],
"text": {"format": {"type": "json_object"}},
})
if response.get("success"):
data = LocalAIApi.decode_json_from_response(response)
# ...
The helper automatically injects the project UUID header and falls back to
reading executor/.env if environment variables are missing.
"""
from __future__ import annotations
import json
import os
import ssl
from typing import Any, Dict, Iterable, Optional
from urllib import error as urlerror
from urllib import request as urlrequest
__all__ = [
"LocalAIApi",
"create_response",
"request",
"decode_json_from_response",
]
_CONFIG_CACHE: Optional[Dict[str, Any]] = None
class LocalAIApi:
"""Static helpers mirroring the PHP implementation."""
@staticmethod
def create_response(params: Dict[str, Any], options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
return create_response(params, options or {})
@staticmethod
def request(path: Optional[str] = None, payload: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
return request(path, payload or {}, options or {})
@staticmethod
def decode_json_from_response(response: Dict[str, Any]) -> Optional[Dict[str, Any]]:
return decode_json_from_response(response)
def create_response(params: Dict[str, Any], options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Signature compatible with the OpenAI Responses API."""
options = options or {}
payload = dict(params)
if not isinstance(payload.get("input"), list) or not payload["input"]:
return {
"success": False,
"error": "input_missing",
"message": 'Parameter "input" is required and must be a non-empty list.',
}
cfg = _config()
if not payload.get("model"):
payload["model"] = cfg["default_model"]
return request(options.get("path"), payload, options)
def request(path: Optional[str], payload: Dict[str, Any], options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Perform a raw request to the AI proxy."""
cfg = _config()
options = options or {}
resolved_path = path or options.get("path") or cfg["responses_path"]
if not resolved_path:
return {
"success": False,
"error": "project_id_missing",
"message": "PROJECT_ID is not defined; cannot resolve AI proxy endpoint.",
}
project_uuid = cfg["project_uuid"]
if not project_uuid:
return {
"success": False,
"error": "project_uuid_missing",
"message": "PROJECT_UUID is not defined; aborting AI request.",
}
if "project_uuid" not in payload and project_uuid:
payload["project_uuid"] = project_uuid
url = _build_url(resolved_path, cfg["base_url"])
timeout = int(options.get("timeout", cfg["timeout"]))
verify_tls = options.get("verify_tls", cfg["verify_tls"])
headers: Dict[str, str] = {
"Content-Type": "application/json",
"Accept": "application/json",
cfg["project_header"]: project_uuid,
}
extra_headers = options.get("headers")
if isinstance(extra_headers, Iterable):
for header in extra_headers:
if isinstance(header, str) and ":" in header:
name, value = header.split(":", 1)
headers[name.strip()] = value.strip()
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = urlrequest.Request(url, data=body, method="POST")
for name, value in headers.items():
req.add_header(name, value)
context = None
if not verify_tls:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
try:
with urlrequest.urlopen(req, timeout=timeout, context=context) as resp:
status = resp.getcode()
response_body = resp.read().decode("utf-8", errors="replace")
except urlerror.HTTPError as exc:
status = exc.getcode()
response_body = exc.read().decode("utf-8", errors="replace")
except Exception as exc: # pylint: disable=broad-except
return {
"success": False,
"error": "request_failed",
"message": str(exc),
}
decoded = None
if response_body:
try:
decoded = json.loads(response_body)
except json.JSONDecodeError:
decoded = None
if 200 <= status < 300:
return {
"success": True,
"status": status,
"data": decoded if decoded is not None else response_body,
}
error_message = "AI proxy request failed"
if isinstance(decoded, dict):
error_message = decoded.get("error") or decoded.get("message") or error_message
elif response_body:
error_message = response_body
return {
"success": False,
"status": status,
"error": error_message,
"response": decoded if decoded is not None else response_body,
}
def decode_json_from_response(response: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Attempt to decode JSON emitted by the model (handles markdown fences)."""
text = _extract_text(response)
if text == "":
return None
try:
decoded = json.loads(text)
if isinstance(decoded, dict):
return decoded
except json.JSONDecodeError:
pass
stripped = text.strip()
if stripped.startswith("```json"):
stripped = stripped[7:]
if stripped.endswith("```"):
stripped = stripped[:-3]
stripped = stripped.strip()
if stripped and stripped != text:
try:
decoded = json.loads(stripped)
if isinstance(decoded, dict):
return decoded
except json.JSONDecodeError:
return None
return None
def _extract_text(response: Dict[str, Any]) -> str:
payload = response.get("data") if response.get("success") else response.get("response")
if isinstance(payload, dict):
output = payload.get("output")
if isinstance(output, list):
combined = ""
for item in output:
content = item.get("content") if isinstance(item, dict) else None
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "output_text" and block.get("text"):
combined += str(block["text"])
if combined:
return combined
choices = payload.get("choices")
if isinstance(choices, list) and choices:
message = choices[0].get("message")
if isinstance(message, dict) and message.get("content"):
return str(message["content"])
if isinstance(payload, str):
return payload
return ""
def _config() -> Dict[str, Any]:
global _CONFIG_CACHE # noqa: PLW0603
if _CONFIG_CACHE is not None:
return _CONFIG_CACHE
_ensure_env_loaded()
base_url = os.getenv("AI_PROXY_BASE_URL", "https://flatlogic.com")
project_id = os.getenv("PROJECT_ID") or None
responses_path = os.getenv("AI_RESPONSES_PATH")
if not responses_path and project_id:
responses_path = f"/projects/{project_id}/ai-request"
_CONFIG_CACHE = {
"base_url": base_url,
"responses_path": responses_path,
"project_id": project_id,
"project_uuid": os.getenv("PROJECT_UUID"),
"project_header": os.getenv("AI_PROJECT_HEADER", "project-uuid"),
"default_model": os.getenv("AI_DEFAULT_MODEL", "gpt-5"),
"timeout": int(os.getenv("AI_TIMEOUT", "30")),
"verify_tls": os.getenv("AI_VERIFY_TLS", "true").lower() not in {"0", "false", "no"},
}
return _CONFIG_CACHE
def _build_url(path: str, base_url: str) -> str:
trimmed = path.strip()
if trimmed.startswith("http://") or trimmed.startswith("https://"):
return trimmed
if trimmed.startswith("/"):
return f"{base_url}{trimmed}"
return f"{base_url}/{trimmed}"
def _ensure_env_loaded() -> None:
"""Populate os.environ from executor/.env if variables are missing."""
if os.getenv("PROJECT_UUID") and os.getenv("PROJECT_ID"):
return
env_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".env"))
if not os.path.exists(env_path):
return
try:
with open(env_path, "r", encoding="utf-8") as handle:
for line in handle:
stripped = line.strip()
if not stripped or stripped.startswith("#") or "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip().strip('\'"')
if key and not os.getenv(key):
os.environ[key] = value
except OSError:
pass