""" LocalAIApi — lightweight Python client for the Flatlogic AI proxy. Usage (inside the Django workspace): from ai.local_ai_api import LocalAIApi response = LocalAIApi.create_response({ "input": [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Summarise this text in two sentences."}, ], "text": {"format": {"type": "json_object"}}, }) if response.get("success"): data = LocalAIApi.decode_json_from_response(response) # ... The helper automatically injects the project UUID header and falls back to reading executor/.env if environment variables are missing. """ from __future__ import annotations import json import os import ssl from typing import Any, Dict, Iterable, Optional from urllib import error as urlerror from urllib import request as urlrequest __all__ = [ "LocalAIApi", "create_response", "request", "decode_json_from_response", ] _CONFIG_CACHE: Optional[Dict[str, Any]] = None class LocalAIApi: """Static helpers mirroring the PHP implementation.""" @staticmethod def create_response(params: Dict[str, Any], options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: return create_response(params, options or {}) @staticmethod def request(path: Optional[str] = None, payload: Optional[Dict[str, Any]] = None, options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: return request(path, payload or {}, options or {}) @staticmethod def decode_json_from_response(response: Dict[str, Any]) -> Optional[Dict[str, Any]]: return decode_json_from_response(response) def create_response(params: Dict[str, Any], options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Signature compatible with the OpenAI Responses API.""" options = options or {} payload = dict(params) if not isinstance(payload.get("input"), list) or not payload["input"]: return { "success": False, "error": "input_missing", "message": 'Parameter "input" is required and must be a non-empty list.', } cfg = _config() if not payload.get("model"): payload["model"] = cfg["default_model"] return request(options.get("path"), payload, options) def request(path: Optional[str], payload: Dict[str, Any], options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Perform a raw request to the AI proxy.""" cfg = _config() options = options or {} resolved_path = path or options.get("path") or cfg["responses_path"] if not resolved_path: return { "success": False, "error": "project_id_missing", "message": "PROJECT_ID is not defined; cannot resolve AI proxy endpoint.", } project_uuid = cfg["project_uuid"] if not project_uuid: return { "success": False, "error": "project_uuid_missing", "message": "PROJECT_UUID is not defined; aborting AI request.", } if "project_uuid" not in payload and project_uuid: payload["project_uuid"] = project_uuid url = _build_url(resolved_path, cfg["base_url"]) timeout = int(options.get("timeout", cfg["timeout"])) verify_tls = options.get("verify_tls", cfg["verify_tls"]) headers: Dict[str, str] = { "Content-Type": "application/json", "Accept": "application/json", cfg["project_header"]: project_uuid, } extra_headers = options.get("headers") if isinstance(extra_headers, Iterable): for header in extra_headers: if isinstance(header, str) and ":" in header: name, value = header.split(":", 1) headers[name.strip()] = value.strip() body = json.dumps(payload, ensure_ascii=False).encode("utf-8") req = urlrequest.Request(url, data=body, method="POST") for name, value in headers.items(): req.add_header(name, value) context = None if not verify_tls: context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE try: with urlrequest.urlopen(req, timeout=timeout, context=context) as resp: status = resp.getcode() response_body = resp.read().decode("utf-8", errors="replace") except urlerror.HTTPError as exc: status = exc.getcode() response_body = exc.read().decode("utf-8", errors="replace") except Exception as exc: # pylint: disable=broad-except return { "success": False, "error": "request_failed", "message": str(exc), } decoded = None if response_body: try: decoded = json.loads(response_body) except json.JSONDecodeError: decoded = None if 200 <= status < 300: return { "success": True, "status": status, "data": decoded if decoded is not None else response_body, } error_message = "AI proxy request failed" if isinstance(decoded, dict): error_message = decoded.get("error") or decoded.get("message") or error_message elif response_body: error_message = response_body return { "success": False, "status": status, "error": error_message, "response": decoded if decoded is not None else response_body, } def decode_json_from_response(response: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Attempt to decode JSON emitted by the model (handles markdown fences).""" text = _extract_text(response) if text == "": return None try: decoded = json.loads(text) if isinstance(decoded, dict): return decoded except json.JSONDecodeError: pass stripped = text.strip() if stripped.startswith("```json"): stripped = stripped[7:] if stripped.endswith("```"): stripped = stripped[:-3] stripped = stripped.strip() if stripped and stripped != text: try: decoded = json.loads(stripped) if isinstance(decoded, dict): return decoded except json.JSONDecodeError: return None return None def _extract_text(response: Dict[str, Any]) -> str: payload = response.get("data") if response.get("success") else response.get("response") if isinstance(payload, dict): output = payload.get("output") if isinstance(output, list): combined = "" for item in output: content = item.get("content") if isinstance(item, dict) else None if isinstance(content, list): for block in content: if isinstance(block, dict) and block.get("type") == "output_text" and block.get("text"): combined += str(block["text"]) if combined: return combined choices = payload.get("choices") if isinstance(choices, list) and choices: message = choices[0].get("message") if isinstance(message, dict) and message.get("content"): return str(message["content"]) if isinstance(payload, str): return payload return "" def _config() -> Dict[str, Any]: global _CONFIG_CACHE # noqa: PLW0603 if _CONFIG_CACHE is not None: return _CONFIG_CACHE _ensure_env_loaded() base_url = os.getenv("AI_PROXY_BASE_URL", "https://flatlogic.com") project_id = os.getenv("PROJECT_ID") or None responses_path = os.getenv("AI_RESPONSES_PATH") if not responses_path and project_id: responses_path = f"/projects/{project_id}/ai-request" _CONFIG_CACHE = { "base_url": base_url, "responses_path": responses_path, "project_id": project_id, "project_uuid": os.getenv("PROJECT_UUID"), "project_header": os.getenv("AI_PROJECT_HEADER", "project-uuid"), "default_model": os.getenv("AI_DEFAULT_MODEL", "gpt-5"), "timeout": int(os.getenv("AI_TIMEOUT", "30")), "verify_tls": os.getenv("AI_VERIFY_TLS", "true").lower() not in {"0", "false", "no"}, } return _CONFIG_CACHE def _build_url(path: str, base_url: str) -> str: trimmed = path.strip() if trimmed.startswith("http://") or trimmed.startswith("https://"): return trimmed if trimmed.startswith("/"): return f"{base_url}{trimmed}" return f"{base_url}/{trimmed}" def _ensure_env_loaded() -> None: """Populate os.environ from executor/.env if variables are missing.""" if os.getenv("PROJECT_UUID") and os.getenv("PROJECT_ID"): return env_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".env")) if not os.path.exists(env_path): return try: with open(env_path, "r", encoding="utf-8") as handle: for line in handle: stripped = line.strip() if not stripped or stripped.startswith("#") or "=" not in stripped: continue key, value = stripped.split("=", 1) key = key.strip() value = value.strip().strip('\'"') if key and not os.getenv(key): os.environ[key] = value except OSError: pass