421 lines
14 KiB
Python
421 lines
14 KiB
Python
"""
|
|
LocalAIApi — lightweight Python client for the Flatlogic AI proxy.
|
|
|
|
Usage (inside the Django workspace):
|
|
|
|
from ai.local_ai_api import LocalAIApi
|
|
|
|
response = LocalAIApi.create_response({
|
|
"input": [
|
|
{"role": "system", "content": "You are a helpful assistant."},
|
|
{"role": "user", "content": "Summarise this text in two sentences."},
|
|
],
|
|
"text": {"format": {"type": "json_object"}},
|
|
})
|
|
|
|
if response.get("success"):
|
|
data = LocalAIApi.decode_json_from_response(response)
|
|
# ...
|
|
|
|
# Typical successful payload (truncated):
|
|
# {
|
|
# "id": "resp_xxx",
|
|
# "status": "completed",
|
|
# "output": [
|
|
# {"type": "reasoning", "summary": []},
|
|
# {"type": "message", "content": [{"type": "output_text", "text": "Your final answer here."}]}
|
|
# ],
|
|
# "usage": { "input_tokens": 123, "output_tokens": 456 }
|
|
# }
|
|
|
|
The helper automatically injects the project UUID header and falls back to
|
|
reading executor/.env if environment variables are missing.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import time
|
|
import ssl
|
|
from typing import Any, Dict, Iterable, Optional
|
|
from urllib import error as urlerror
|
|
from urllib import request as urlrequest
|
|
|
|
__all__ = [
|
|
"LocalAIApi",
|
|
"create_response",
|
|
"request",
|
|
"fetch_status",
|
|
"await_response",
|
|
"extract_text",
|
|
"decode_json_from_response",
|
|
]
|
|
|
|
|
|
_CONFIG_CACHE: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
class LocalAIApi:
|
|
"""Static helpers mirroring the PHP implementation."""
|
|
|
|
@staticmethod
|
|
def create_response(params: Dict[str, Any], options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
return create_response(params, options or {})
|
|
|
|
@staticmethod
|
|
def request(path: Optional[str] = None, payload: Optional[Dict[str, Any]] = None,
|
|
options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
return request(path, payload or {}, options or {})
|
|
|
|
@staticmethod
|
|
def extract_text(response: Dict[str, Any]) -> str:
|
|
return extract_text(response)
|
|
|
|
@staticmethod
|
|
def decode_json_from_response(response: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
return decode_json_from_response(response)
|
|
|
|
|
|
def create_response(params: Dict[str, Any], options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
"""Signature compatible with the OpenAI Responses API."""
|
|
options = options or {}
|
|
payload = dict(params)
|
|
|
|
if not isinstance(payload.get("input"), list) or not payload["input"]:
|
|
return {
|
|
"success": False,
|
|
"error": "input_missing",
|
|
"message": 'Parameter "input" is required and must be a non-empty list.',
|
|
}
|
|
|
|
cfg = _config()
|
|
if not payload.get("model"):
|
|
payload["model"] = cfg["default_model"]
|
|
|
|
initial = request(options.get("path"), payload, options)
|
|
if not initial.get("success"):
|
|
return initial
|
|
|
|
data = initial.get("data")
|
|
if isinstance(data, dict) and "ai_request_id" in data:
|
|
ai_request_id = data["ai_request_id"]
|
|
poll_timeout = int(options.get("poll_timeout", 300))
|
|
poll_interval = int(options.get("poll_interval", 5))
|
|
return await_response(ai_request_id, {
|
|
"interval": poll_interval,
|
|
"timeout": poll_timeout,
|
|
"headers": options.get("headers"),
|
|
"timeout_per_call": options.get("timeout"),
|
|
})
|
|
|
|
return initial
|
|
|
|
|
|
def request(path: Optional[str], payload: Dict[str, Any], options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
"""Perform a raw request to the AI proxy."""
|
|
cfg = _config()
|
|
options = options or {}
|
|
|
|
resolved_path = path or options.get("path") or cfg["responses_path"]
|
|
if not resolved_path:
|
|
return {
|
|
"success": False,
|
|
"error": "project_id_missing",
|
|
"message": "PROJECT_ID is not defined; cannot resolve AI proxy endpoint.",
|
|
}
|
|
|
|
project_uuid = cfg["project_uuid"]
|
|
if not project_uuid:
|
|
return {
|
|
"success": False,
|
|
"error": "project_uuid_missing",
|
|
"message": "PROJECT_UUID is not defined; aborting AI request.",
|
|
}
|
|
|
|
if "project_uuid" not in payload and project_uuid:
|
|
payload["project_uuid"] = project_uuid
|
|
|
|
url = _build_url(resolved_path, cfg["base_url"])
|
|
opt_timeout = options.get("timeout")
|
|
timeout = int(cfg["timeout"] if opt_timeout is None else opt_timeout)
|
|
verify_tls = options.get("verify_tls", cfg["verify_tls"])
|
|
|
|
headers: Dict[str, str] = {
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
cfg["project_header"]: project_uuid,
|
|
}
|
|
extra_headers = options.get("headers")
|
|
if isinstance(extra_headers, Iterable):
|
|
for header in extra_headers:
|
|
if isinstance(header, str) and ":" in header:
|
|
name, value = header.split(":", 1)
|
|
headers[name.strip()] = value.strip()
|
|
|
|
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
|
return _http_request(url, "POST", body, headers, timeout, verify_tls)
|
|
|
|
|
|
def fetch_status(ai_request_id: Any, options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
"""Fetch status for a queued AI request."""
|
|
cfg = _config()
|
|
options = options or {}
|
|
|
|
project_uuid = cfg["project_uuid"]
|
|
if not project_uuid:
|
|
return {
|
|
"success": False,
|
|
"error": "project_uuid_missing",
|
|
"message": "PROJECT_UUID is not defined; aborting status check.",
|
|
}
|
|
|
|
status_path = _resolve_status_path(ai_request_id, cfg)
|
|
url = _build_url(status_path, cfg["base_url"])
|
|
|
|
opt_timeout = options.get("timeout")
|
|
timeout = int(cfg["timeout"] if opt_timeout is None else opt_timeout)
|
|
verify_tls = options.get("verify_tls", cfg["verify_tls"])
|
|
|
|
headers: Dict[str, str] = {
|
|
"Accept": "application/json",
|
|
cfg["project_header"]: project_uuid,
|
|
}
|
|
extra_headers = options.get("headers")
|
|
if isinstance(extra_headers, Iterable):
|
|
for header in extra_headers:
|
|
if isinstance(header, str) and ":" in header:
|
|
name, value = header.split(":", 1)
|
|
headers[name.strip()] = value.strip()
|
|
|
|
return _http_request(url, "GET", None, headers, timeout, verify_tls)
|
|
|
|
|
|
def await_response(ai_request_id: Any, options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
"""Poll status endpoint until the request is complete or timed out."""
|
|
options = options or {}
|
|
timeout = int(options.get("timeout", 300))
|
|
interval = int(options.get("interval", 5))
|
|
if interval <= 0:
|
|
interval = 5
|
|
per_call_timeout = options.get("timeout_per_call")
|
|
|
|
deadline = time.time() + max(timeout, interval)
|
|
|
|
while True:
|
|
status_resp = fetch_status(ai_request_id, {
|
|
"headers": options.get("headers"),
|
|
"timeout": per_call_timeout,
|
|
"verify_tls": options.get("verify_tls"),
|
|
})
|
|
if status_resp.get("success"):
|
|
data = status_resp.get("data") or {}
|
|
if isinstance(data, dict):
|
|
status_value = data.get("status")
|
|
if status_value == "success":
|
|
return {
|
|
"success": True,
|
|
"status": 200,
|
|
"data": data.get("response", data),
|
|
}
|
|
if status_value == "failed":
|
|
return {
|
|
"success": False,
|
|
"status": 500,
|
|
"error": str(data.get("error") or "AI request failed"),
|
|
"data": data,
|
|
}
|
|
else:
|
|
return status_resp
|
|
|
|
if time.time() >= deadline:
|
|
return {
|
|
"success": False,
|
|
"error": "timeout",
|
|
"message": "Timed out waiting for AI response.",
|
|
}
|
|
time.sleep(interval)
|
|
|
|
|
|
def extract_text(response: Dict[str, Any]) -> str:
|
|
"""Public helper to extract plain text from a Responses payload."""
|
|
return _extract_text(response)
|
|
|
|
|
|
def decode_json_from_response(response: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
"""Attempt to decode JSON emitted by the model (handles markdown fences)."""
|
|
text = _extract_text(response)
|
|
if text == "":
|
|
return None
|
|
|
|
try:
|
|
decoded = json.loads(text)
|
|
if isinstance(decoded, dict):
|
|
return decoded
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
stripped = text.strip()
|
|
if stripped.startswith("```json"):
|
|
stripped = stripped[7:]
|
|
if stripped.endswith("```"):
|
|
stripped = stripped[:-3]
|
|
stripped = stripped.strip()
|
|
if stripped and stripped != text:
|
|
try:
|
|
decoded = json.loads(stripped)
|
|
if isinstance(decoded, dict):
|
|
return decoded
|
|
except json.JSONDecodeError:
|
|
return None
|
|
return None
|
|
|
|
|
|
def _extract_text(response: Dict[str, Any]) -> str:
|
|
payload = response.get("data") if response.get("success") else response.get("response")
|
|
if isinstance(payload, dict):
|
|
output = payload.get("output")
|
|
if isinstance(output, list):
|
|
combined = ""
|
|
for item in output:
|
|
content = item.get("content") if isinstance(item, dict) else None
|
|
if isinstance(content, list):
|
|
for block in content:
|
|
if isinstance(block, dict) and block.get("type") == "output_text" and block.get("text"):
|
|
combined += str(block["text"])
|
|
if combined:
|
|
return combined
|
|
choices = payload.get("choices")
|
|
if isinstance(choices, list) and choices:
|
|
message = choices[0].get("message")
|
|
if isinstance(message, dict) and message.get("content"):
|
|
return str(message["content"])
|
|
if isinstance(payload, str):
|
|
return payload
|
|
return ""
|
|
|
|
|
|
def _config() -> Dict[str, Any]:
|
|
global _CONFIG_CACHE # noqa: PLW0603
|
|
if _CONFIG_CACHE is not None:
|
|
return _CONFIG_CACHE
|
|
|
|
_ensure_env_loaded()
|
|
|
|
base_url = os.getenv("AI_PROXY_BASE_URL", "https://flatlogic.com")
|
|
project_id = os.getenv("PROJECT_ID") or None
|
|
responses_path = os.getenv("AI_RESPONSES_PATH")
|
|
if not responses_path and project_id:
|
|
responses_path = f"/projects/{project_id}/ai-request"
|
|
|
|
_CONFIG_CACHE = {
|
|
"base_url": base_url,
|
|
"responses_path": responses_path,
|
|
"project_id": project_id,
|
|
"project_uuid": os.getenv("PROJECT_UUID"),
|
|
"project_header": os.getenv("AI_PROJECT_HEADER", "project-uuid"),
|
|
"default_model": os.getenv("AI_DEFAULT_MODEL", "gpt-5-mini"),
|
|
"timeout": int(os.getenv("AI_TIMEOUT", "30")),
|
|
"verify_tls": os.getenv("AI_VERIFY_TLS", "true").lower() not in {"0", "false", "no"},
|
|
}
|
|
return _CONFIG_CACHE
|
|
|
|
|
|
def _build_url(path: str, base_url: str) -> str:
|
|
trimmed = path.strip()
|
|
if trimmed.startswith("http://") or trimmed.startswith("https://"):
|
|
return trimmed
|
|
if trimmed.startswith("/"):
|
|
return f"{base_url}{trimmed}"
|
|
return f"{base_url}/{trimmed}"
|
|
|
|
|
|
def _resolve_status_path(ai_request_id: Any, cfg: Dict[str, Any]) -> str:
|
|
base_path = (cfg.get("responses_path") or "").rstrip("/")
|
|
if not base_path:
|
|
return f"/ai-request/{ai_request_id}/status"
|
|
if not base_path.endswith("/ai-request"):
|
|
base_path = f"{base_path}/ai-request"
|
|
return f"{base_path}/{ai_request_id}/status"
|
|
|
|
|
|
def _http_request(url: str, method: str, body: Optional[bytes], headers: Dict[str, str],
|
|
timeout: int, verify_tls: bool) -> Dict[str, Any]:
|
|
"""
|
|
Shared HTTP helper for GET/POST requests.
|
|
"""
|
|
req = urlrequest.Request(url, data=body, method=method.upper())
|
|
for name, value in headers.items():
|
|
req.add_header(name, value)
|
|
|
|
context = None
|
|
if not verify_tls:
|
|
context = ssl.create_default_context()
|
|
context.check_hostname = False
|
|
context.verify_mode = ssl.CERT_NONE
|
|
|
|
try:
|
|
with urlrequest.urlopen(req, timeout=timeout, context=context) as resp:
|
|
status = resp.getcode()
|
|
response_body = resp.read().decode("utf-8", errors="replace")
|
|
except urlerror.HTTPError as exc:
|
|
status = exc.getcode()
|
|
response_body = exc.read().decode("utf-8", errors="replace")
|
|
except Exception as exc: # pylint: disable=broad-except
|
|
return {
|
|
"success": False,
|
|
"error": "request_failed",
|
|
"message": str(exc),
|
|
}
|
|
|
|
decoded = None
|
|
if response_body:
|
|
try:
|
|
decoded = json.loads(response_body)
|
|
except json.JSONDecodeError:
|
|
decoded = None
|
|
|
|
if 200 <= status < 300:
|
|
return {
|
|
"success": True,
|
|
"status": status,
|
|
"data": decoded if decoded is not None else response_body,
|
|
}
|
|
|
|
error_message = "AI proxy request failed"
|
|
if isinstance(decoded, dict):
|
|
error_message = decoded.get("error") or decoded.get("message") or error_message
|
|
elif response_body:
|
|
error_message = response_body
|
|
|
|
return {
|
|
"success": False,
|
|
"status": status,
|
|
"error": error_message,
|
|
"response": decoded if decoded is not None else response_body,
|
|
}
|
|
|
|
|
|
def _ensure_env_loaded() -> None:
|
|
"""Populate os.environ from executor/.env if variables are missing."""
|
|
if os.getenv("PROJECT_UUID") and os.getenv("PROJECT_ID"):
|
|
return
|
|
|
|
env_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".env"))
|
|
if not os.path.exists(env_path):
|
|
return
|
|
|
|
try:
|
|
with open(env_path, "r", encoding="utf-8") as handle:
|
|
for line in handle:
|
|
stripped = line.strip()
|
|
if not stripped or stripped.startswith("#") or "=" not in stripped:
|
|
continue
|
|
key, value = stripped.split("=", 1)
|
|
key = key.strip()
|
|
value = value.strip().strip('\'"')
|
|
if key and not os.getenv(key):
|
|
os.environ[key] = value
|
|
except OSError:
|
|
pass
|