283 lines
9.2 KiB
Python
283 lines
9.2 KiB
Python
"""
|
|
LocalAIApi — lightweight Python client for the Flatlogic AI proxy.
|
|
|
|
Usage (inside the Django workspace):
|
|
|
|
from ai.local_ai_api import LocalAIApi
|
|
|
|
response = LocalAIApi.create_response({
|
|
"input": [
|
|
{"role": "system", "content": "You are a helpful assistant."},
|
|
{"role": "user", "content": "Summarise this text in two sentences."},
|
|
],
|
|
"text": {"format": {"type": "json_object"}},
|
|
})
|
|
|
|
if response.get("success"):
|
|
data = LocalAIApi.decode_json_from_response(response)
|
|
# ...
|
|
|
|
The helper automatically injects the project UUID header and falls back to
|
|
reading executor/.env if environment variables are missing.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import ssl
|
|
from typing import Any, Dict, Iterable, Optional
|
|
from urllib import error as urlerror
|
|
from urllib import request as urlrequest
|
|
|
|
__all__ = [
|
|
"LocalAIApi",
|
|
"create_response",
|
|
"request",
|
|
"decode_json_from_response",
|
|
]
|
|
|
|
|
|
_CONFIG_CACHE: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
class LocalAIApi:
|
|
"""Static helpers mirroring the PHP implementation."""
|
|
|
|
@staticmethod
|
|
def create_response(params: Dict[str, Any], options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
return create_response(params, options or {})
|
|
|
|
@staticmethod
|
|
def request(path: Optional[str] = None, payload: Optional[Dict[str, Any]] = None,
|
|
options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
return request(path, payload or {}, options or {})
|
|
|
|
@staticmethod
|
|
def decode_json_from_response(response: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
return decode_json_from_response(response)
|
|
|
|
|
|
def create_response(params: Dict[str, Any], options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
"""Signature compatible with the OpenAI Responses API."""
|
|
options = options or {}
|
|
payload = dict(params)
|
|
|
|
if not isinstance(payload.get("input"), list) or not payload["input"]:
|
|
return {
|
|
"success": False,
|
|
"error": "input_missing",
|
|
"message": 'Parameter "input" is required and must be a non-empty list.',
|
|
}
|
|
|
|
cfg = _config()
|
|
if not payload.get("model"):
|
|
payload["model"] = cfg["default_model"]
|
|
|
|
return request(options.get("path"), payload, options)
|
|
|
|
|
|
def request(path: Optional[str], payload: Dict[str, Any], options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
"""Perform a raw request to the AI proxy."""
|
|
cfg = _config()
|
|
options = options or {}
|
|
|
|
resolved_path = path or options.get("path") or cfg["responses_path"]
|
|
if not resolved_path:
|
|
return {
|
|
"success": False,
|
|
"error": "project_id_missing",
|
|
"message": "PROJECT_ID is not defined; cannot resolve AI proxy endpoint.",
|
|
}
|
|
|
|
project_uuid = cfg["project_uuid"]
|
|
if not project_uuid:
|
|
return {
|
|
"success": False,
|
|
"error": "project_uuid_missing",
|
|
"message": "PROJECT_UUID is not defined; aborting AI request.",
|
|
}
|
|
|
|
if "project_uuid" not in payload and project_uuid:
|
|
payload["project_uuid"] = project_uuid
|
|
|
|
url = _build_url(resolved_path, cfg["base_url"])
|
|
timeout = int(options.get("timeout", cfg["timeout"]))
|
|
verify_tls = options.get("verify_tls", cfg["verify_tls"])
|
|
|
|
headers: Dict[str, str] = {
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
cfg["project_header"]: project_uuid,
|
|
}
|
|
extra_headers = options.get("headers")
|
|
if isinstance(extra_headers, Iterable):
|
|
for header in extra_headers:
|
|
if isinstance(header, str) and ":" in header:
|
|
name, value = header.split(":", 1)
|
|
headers[name.strip()] = value.strip()
|
|
|
|
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
|
req = urlrequest.Request(url, data=body, method="POST")
|
|
for name, value in headers.items():
|
|
req.add_header(name, value)
|
|
|
|
context = None
|
|
if not verify_tls:
|
|
context = ssl.create_default_context()
|
|
context.check_hostname = False
|
|
context.verify_mode = ssl.CERT_NONE
|
|
|
|
try:
|
|
with urlrequest.urlopen(req, timeout=timeout, context=context) as resp:
|
|
status = resp.getcode()
|
|
response_body = resp.read().decode("utf-8", errors="replace")
|
|
except urlerror.HTTPError as exc:
|
|
status = exc.getcode()
|
|
response_body = exc.read().decode("utf-8", errors="replace")
|
|
except Exception as exc: # pylint: disable=broad-except
|
|
return {
|
|
"success": False,
|
|
"error": "request_failed",
|
|
"message": str(exc),
|
|
}
|
|
|
|
decoded = None
|
|
if response_body:
|
|
try:
|
|
decoded = json.loads(response_body)
|
|
except json.JSONDecodeError:
|
|
decoded = None
|
|
|
|
if 200 <= status < 300:
|
|
return {
|
|
"success": True,
|
|
"status": status,
|
|
"data": decoded if decoded is not None else response_body,
|
|
}
|
|
|
|
error_message = "AI proxy request failed"
|
|
if isinstance(decoded, dict):
|
|
error_message = decoded.get("error") or decoded.get("message") or error_message
|
|
elif response_body:
|
|
error_message = response_body
|
|
|
|
return {
|
|
"success": False,
|
|
"status": status,
|
|
"error": error_message,
|
|
"response": decoded if decoded is not None else response_body,
|
|
}
|
|
|
|
|
|
def decode_json_from_response(response: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
"""Attempt to decode JSON emitted by the model (handles markdown fences)."""
|
|
text = _extract_text(response)
|
|
if text == "":
|
|
return None
|
|
|
|
try:
|
|
decoded = json.loads(text)
|
|
if isinstance(decoded, dict):
|
|
return decoded
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
stripped = text.strip()
|
|
if stripped.startswith("```json"):
|
|
stripped = stripped[7:]
|
|
if stripped.endswith("```"):
|
|
stripped = stripped[:-3]
|
|
stripped = stripped.strip()
|
|
if stripped and stripped != text:
|
|
try:
|
|
decoded = json.loads(stripped)
|
|
if isinstance(decoded, dict):
|
|
return decoded
|
|
except json.JSONDecodeError:
|
|
return None
|
|
return None
|
|
|
|
|
|
def _extract_text(response: Dict[str, Any]) -> str:
|
|
payload = response.get("data") if response.get("success") else response.get("response")
|
|
if isinstance(payload, dict):
|
|
output = payload.get("output")
|
|
if isinstance(output, list):
|
|
combined = ""
|
|
for item in output:
|
|
content = item.get("content") if isinstance(item, dict) else None
|
|
if isinstance(content, list):
|
|
for block in content:
|
|
if isinstance(block, dict) and block.get("type") == "output_text" and block.get("text"):
|
|
combined += str(block["text"])
|
|
if combined:
|
|
return combined
|
|
choices = payload.get("choices")
|
|
if isinstance(choices, list) and choices:
|
|
message = choices[0].get("message")
|
|
if isinstance(message, dict) and message.get("content"):
|
|
return str(message["content"])
|
|
if isinstance(payload, str):
|
|
return payload
|
|
return ""
|
|
|
|
|
|
def _config() -> Dict[str, Any]:
|
|
global _CONFIG_CACHE # noqa: PLW0603
|
|
if _CONFIG_CACHE is not None:
|
|
return _CONFIG_CACHE
|
|
|
|
_ensure_env_loaded()
|
|
|
|
base_url = os.getenv("AI_PROXY_BASE_URL", "https://flatlogic.com")
|
|
project_id = os.getenv("PROJECT_ID") or None
|
|
responses_path = os.getenv("AI_RESPONSES_PATH")
|
|
if not responses_path and project_id:
|
|
responses_path = f"/projects/{project_id}/ai-request"
|
|
|
|
_CONFIG_CACHE = {
|
|
"base_url": base_url,
|
|
"responses_path": responses_path,
|
|
"project_id": project_id,
|
|
"project_uuid": os.getenv("PROJECT_UUID"),
|
|
"project_header": os.getenv("AI_PROJECT_HEADER", "project-uuid"),
|
|
"default_model": os.getenv("AI_DEFAULT_MODEL", "gpt-5"),
|
|
"timeout": int(os.getenv("AI_TIMEOUT", "30")),
|
|
"verify_tls": os.getenv("AI_VERIFY_TLS", "true").lower() not in {"0", "false", "no"},
|
|
}
|
|
return _CONFIG_CACHE
|
|
|
|
|
|
def _build_url(path: str, base_url: str) -> str:
|
|
trimmed = path.strip()
|
|
if trimmed.startswith("http://") or trimmed.startswith("https://"):
|
|
return trimmed
|
|
if trimmed.startswith("/"):
|
|
return f"{base_url}{trimmed}"
|
|
return f"{base_url}/{trimmed}"
|
|
|
|
|
|
def _ensure_env_loaded() -> None:
|
|
"""Populate os.environ from executor/.env if variables are missing."""
|
|
if os.getenv("PROJECT_UUID") and os.getenv("PROJECT_ID"):
|
|
return
|
|
|
|
env_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".env"))
|
|
if not os.path.exists(env_path):
|
|
return
|
|
|
|
try:
|
|
with open(env_path, "r", encoding="utf-8") as handle:
|
|
for line in handle:
|
|
stripped = line.strip()
|
|
if not stripped or stripped.startswith("#") or "=" not in stripped:
|
|
continue
|
|
key, value = stripped.split("=", 1)
|
|
key = key.strip()
|
|
value = value.strip().strip('\'"')
|
|
if key and not os.getenv(key):
|
|
os.environ[key] = value
|
|
except OSError:
|
|
pass
|