diff --git a/sdk/demos/09_jungle_grid_gpu_execution/IMPLEMENTATION_DECISION.md b/sdk/demos/09_jungle_grid_gpu_execution/IMPLEMENTATION_DECISION.md new file mode 100644 index 000000000..64eba8a83 --- /dev/null +++ b/sdk/demos/09_jungle_grid_gpu_execution/IMPLEMENTATION_DECISION.md @@ -0,0 +1,98 @@ +# Jungle Grid Integration Decision + +## Selected Extension Point + +This contribution remains a runnable demo network with a deterministic Python +`WorkerAgent`. It uses OpenAgents projects for assignment and lifecycle, +project messages for human approval and meaningful status changes, and project +artifacts for durable execution state and sanitized results. + +Jungle Grid is an external workload execution service, not an OpenAgents +transport, launcher, credential type, or network mod. Keeping it as a demo makes +the approval boundary and asynchronous project behavior explicit and testable. +The agent calls REST directly because an MCP tool call would otherwise hide the +project-state transition around billable submission. + +## Jungle Grid Contract + +The implementation was aligned against `Jungle-Grid/mcp-server` and the current +orchestrator API implementation, not only the README: + +- `POST /v1/mcp/jobs/estimate` +- `POST /v1/mcp/jobs` +- `GET /v1/mcp/jobs/{job_id}` +- `GET /v1/jobs/{job_id}/events` +- `GET /v1/mcp/jobs/{job_id}/logs` +- `GET /v1/jobs/{job_id}/runtime` +- `POST /v1/mcp/jobs/{job_id}/cancel` +- `GET /v1/mcp/jobs/{job_id}/artifacts` +- `POST /v1/mcp/jobs/{job_id}/artifacts/{artifact_id}/download` + +The official API-base override is `JUNGLEGRID_API_BASE`. +`JUNGLE_GRID_API_URL` and the older demo variable `JUNGLE_GRID_API` remain +compatibility fallbacks. Trailing slashes are removed. + +The public workload types are `inference`, `training`, `fine_tuning`, and +`batch`; `fine_tuning` is sent to REST as `fine-tuning`. The preferred command +shape is an array. Legacy string `command` plus string-array `args` is combined +in order before estimation and submission. + +## Uploaded Files + +The demo accepts previously uploaded Jungle Grid `input_id` values through +`input_files` and `script_files`. This is the minimum safe file workflow: + +- IDs are validated locally and then verified by Jungle Grid during estimate or + submission. +- No goal field can name an executor host path. +- Upload URLs, completion tokens, and storage credentials never enter project + state. + +Uploading OpenAgents artifacts would require a separate authorization and +byte-transfer design. It is intentionally outside this demo rather than +allowing a project goal to read arbitrary local files. + +## Durable Idempotency + +`jungle_grid_execution_state` records the estimate ID, submission state, +recorded job ID, cancellation state, status fingerprint, event IDs, and log +cursor. The agent writes `submitting` before the non-idempotent submission call +and writes the returned job ID immediately afterward. + +After restart: + +- a recorded job resumes monitoring; +- a terminal project is not resubmitted; +- a `submitting` state without a recorded job is not retried automatically, + because the current submission contract does not expose a verified + idempotency key; +- duplicate approvals and cancellations are serialized by a per-project lock. + +This favors avoiding a duplicate billable job over guessing after an ambiguous +network failure. + +## Security Decisions + +- Estimation cannot submit compute. +- Submission requires exact `APPROVE ` from a `human:` identity. +- Cancellation requires exact `CANCEL ` from a `human:` identity. +- API and workload secrets are resolved from environment variables only. +- Callback auth uses `callback.auth_token_from_env`; literal callback secrets + are not accepted. +- Metadata with secret-like keys, Bearer tokens, API-key patterns, and signed + URLs are rejected or redacted. +- Artifact download URLs are not requested during finalization. The client + method exists to match the API, but project state stores metadata only. +- Automated tests mock all external calls. + +The committed `executors.password_hash` is a demo-only group credential. Its +purpose is to establish actual runtime topology membership so project +notifications reach the executor. It must be replaced for a shared deployment. + +## Deliberately Unsupported Goal Fields + +The current public MCP submission contract does not expose arbitrary +host-file paths, CPU or memory sizing, provider pinning, or user-controlled +retry policy. The demo does not invent those fields. It supports the verified +GPU, region, priority, timeout, callback, routing, upload-reference, template, +metadata, and expected-artifact fields accepted by the current API. diff --git a/sdk/demos/09_jungle_grid_gpu_execution/README.md b/sdk/demos/09_jungle_grid_gpu_execution/README.md new file mode 100644 index 000000000..f5df32ad9 --- /dev/null +++ b/sdk/demos/09_jungle_grid_gpu_execution/README.md @@ -0,0 +1,267 @@ +# Jungle Grid GPU Execution Demo + +This demo delegates asynchronous GPU workloads from an OpenAgents project to +[Jungle Grid](https://junglegrid.dev). A deterministic Python `WorkerAgent` +estimates first, waits for exact human approval, submits once, then polls +lifecycle events, status, logs, runtime details, and managed artifact metadata. + +```text +Project goal +→ estimate +→ human approval +→ optional input/script references +→ submit +→ lifecycle events and status +→ workload logs +→ runtime details +→ managed artifacts +``` + +The demo calls REST directly so the human approval boundary and durable +OpenAgents project state remain explicit and testable. It does not require an +LLM or an MCP runtime dependency. + +## Security And Billing Warning + +Jungle Grid jobs may consume credits or incur charges. Project creation only +estimates. Billable submission requires this exact command from a verified +human identity: + +```text +APPROVE +``` + +Cancellation also requires an exact human command: + +```text +CANCEL +``` + +Keep credentials in executor environment variables. Do not put secrets in +goals, messages, metadata, logs, or committed files. The demo rejects literal +API-key/Bearer patterns, resolves workload secrets only after approval, redacts +shared output, never reads arbitrary host paths, and never stores temporary +signed artifact URLs. + +## Prerequisites + +- OpenAgents development dependencies. +- A scoped Jungle Grid API key with estimate, submit, read, logs, artifact, and + cancellation access. +- A GPU-capable public container image or configured private-image credential. +- Previously uploaded Jungle Grid input IDs for file-backed jobs. + +Install the repository package and development tools: + +```bash +pip install -e ".[sdk,dev]" +``` + +## Environment Configuration + +```bash +export JUNGLE_GRID_API_KEY="jg_..." +export JUNGLEGRID_API_BASE="https://api.junglegrid.dev" +export JUNGLE_GRID_POLL_INTERVAL_SECONDS="10" +export JUNGLE_GRID_MAX_POLL_FAILURES="3" +``` + +`JUNGLEGRID_API_BASE` is the current official API-base override. +`JUNGLE_GRID_API_URL` and `JUNGLE_GRID_API` are compatibility fallbacks. The +executor removes trailing slashes. Workload variables referenced by +`environment_from_env` must be exported in the executor process. + +## Start The Network + +```bash +cd sdk/demos/09_jungle_grid_gpu_execution +openagents network start network.yaml +``` + +The network enables the project mod and restricts the template to the +`executors` group. The committed group password hash is a demo-only credential; +replace it before a shared deployment. + +## Start The Executor + +```bash +cd sdk/demos/09_jungle_grid_gpu_execution +python agents/jungle_grid_executor.py +``` + +The executor supplies the configured group password hash during +`async_start`. OpenAgents therefore records it in +`network.topology.agent_group_membership`; static metadata alone does not +establish group membership. Run one executor for this demo. + +## Create A Project + +Open Studio at `http://localhost:8700/studio`, choose +`Jungle Grid GPU Execution`, and provide a JSON goal. + +### Simple Command Job + +The preferred command representation is an array: + +```json +{ + "name": "openagents-training-demo", + "workload_type": "training", + "image": "pytorch/pytorch:2.4.0-cuda12.1-cudnn9-runtime", + "command": ["python", "-c", "import torch; print(torch.cuda.is_available())"], + "model_size_gb": 1, + "gpu_required": true, + "routing_mode": "cost" +} +``` + +The original format remains compatible and is converted without reordering: + +```json +{ + "name": "legacy-command-demo", + "workload_type": "batch", + "image": "nvidia/cuda:12.2.0-base-ubuntu22.04", + "command": "python", + "args": ["-c", "print('hello')"] +} +``` + +Accepted workload types are `inference`, `training`, `fine_tuning`, and +`batch`. + +### File-Backed Job + +Upload files through Jungle Grid first, then use only the returned IDs: + +```json +{ + "name": "openagents-transcription", + "workload_type": "inference", + "image": "ghcr.io/example/whisper-runtime:cuda", + "command": [ + "python", + "/workspace/scripts/transcribe.py", + "/workspace/inputs/audio.wav", + "/workspace/artifacts/transcript.txt" + ], + "script_files": [{"input_id": "inp_script123"}], + "input_files": [{"input_id": "inp_audio123"}], + "expected_artifacts": ["/workspace/artifacts/transcript.txt"] +} +``` + +Inputs mount under `/workspace/inputs`, scripts under `/workspace/scripts`, and +managed outputs belong under `/workspace/artifacts`. `local_path` and similar +host-file fields are not supported. + +### Environment And Callback Secrets + +```bash +export MODEL_TOKEN="..." +export CALLBACK_TOKEN="..." +``` + +```json +{ + "name": "secure-inference", + "workload_type": "inference", + "image": "ghcr.io/example/model-runtime:cuda", + "environment_from_env": {"MODEL_TOKEN": "MODEL_TOKEN"}, + "callback": { + "url": "https://example.com/hooks/jungle", + "metadata": {"source": "openagents"}, + "auth_token_from_env": "CALLBACK_TOKEN" + } +} +``` + +Environment and callback token values are absent from estimates and are +resolved only after approval. + +## Estimate And Approval + +The executor calls `POST /v1/mcp/jobs/estimate`, stores a sanitized structured +response in `jungle_grid_estimate`, and posts a short summary. It respects +`screening.can_submit`, availability, warnings, fixes, blocked checks, routing, +cost/rate ranges, duration, queue/start windows, and capacity fields returned by +the API. + +`screening.can_submit: true` does not prove immediate capacity. +`capacity_status.immediate_capacity_confirmed` is the relevant signal. Approval +is blocked when screening or availability explicitly rejects submission. + +## Monitoring + +After approval the executor: + +- polls `GET /v1/mcp/jobs/{job_id}` for status, execution phase, status message, + phase timing, delayed-start, scheduling, retry, failure, and completion data; +- polls `GET /v1/jobs/{job_id}/events` separately for platform lifecycle events; +- polls paginated `GET /v1/mcp/jobs/{job_id}/logs`; +- reads `GET /v1/jobs/{job_id}/runtime` at finalization; +- lists managed artifacts after terminal status. + +Lifecycle names are not restricted to a local enum. Event IDs and log cursors +prevent duplicates. Messages are posted only for meaningful state changes. +Empty workload logs during scheduling, provisioning, input preparation, or +container startup do not fail the project. This is polling, not true streaming. + +Shared event and log history is bounded to 200 entries each. API keys, Bearer +tokens, resolved environment values, authorization fields, and signed URLs are +redacted. + +## Artifacts + +Regular files written under `/workspace/artifacts` are eligible for managed +collection. `jungle_grid_result` contains sanitized job data, bounded lifecycle +events, bounded logs, runtime details when available, and artifact IDs, names, +paths, sizes, and content types returned by Jungle Grid. + +The API can mint temporary artifact download URLs, but this demo intentionally +does not request or store them. Downloading bytes into an OpenAgents artifact +would require a separate size, authorization, and content-handling policy. + +## Cancellation And Failure + +Cancellation is accepted only for the job ID already recorded for that project. +Unauthorized, mismatched, duplicate, and terminal-state cancellation requests +do not call Jungle Grid. + +Safe GET requests use bounded retries with exponential backoff. Submission is +never automatically retried because the current contract does not expose a +verified idempotency mechanism. If the executor restarts after recording a job, +it resumes monitoring. If it restarts with an uncertain `submitting` state and +no job ID, it refuses to resubmit blindly. + +Completed jobs complete the OpenAgents project. Failed, rejected, and cancelled +jobs stop it. Runtime details may be unavailable before assignment/startup and +do not by themselves fail finalization. + +## Current Jungle Grid MCP Tools + +The current registry exposes: + +- `estimate_job` +- `submit_job` +- `upload_job_input` +- `list_job_inputs` +- `list_jobs` +- `get_job` +- `get_job_events` +- `get_job_logs` +- `cancel_job` +- `list_artifacts` +- `get_artifact` + +## Tests + +All external requests are mocked. Tests never require a Jungle Grid account, +contact the live API, or submit paid work: + +```bash +pytest tests/agents/test_jungle_grid_executor.py -q +ruff check sdk/demos/09_jungle_grid_gpu_execution tests/agents/test_jungle_grid_executor.py +ruff format --check sdk/demos/09_jungle_grid_gpu_execution tests/agents/test_jungle_grid_executor.py +mypy --follow-untyped-imports sdk/demos/09_jungle_grid_gpu_execution/agents/jungle_grid_executor.py +``` diff --git a/sdk/demos/09_jungle_grid_gpu_execution/agents/jungle_grid_executor.py b/sdk/demos/09_jungle_grid_gpu_execution/agents/jungle_grid_executor.py new file mode 100644 index 000000000..a12fc9778 --- /dev/null +++ b/sdk/demos/09_jungle_grid_gpu_execution/agents/jungle_grid_executor.py @@ -0,0 +1,1016 @@ +#!/usr/bin/env python3 +"""Human-approved Jungle Grid execution through an OpenAgents project.""" + +from __future__ import annotations + +import asyncio +import copy +import json +import logging +import os +import re +import uuid +from dataclasses import asdict, dataclass, field +from typing import Any, Awaitable, Callable, Iterable, Mapping, Optional +from urllib.parse import quote, urlencode + +import aiohttp + +from openagents.agents.worker_agent import WorkerAgent, on_event +from openagents.models.event_context import EventContext +from openagents.mods.workspace.project import DefaultProjectAgentAdapter + +logger = logging.getLogger(__name__) + +DEFAULT_API_BASE = "https://api.junglegrid.dev" +EXECUTORS_GROUP_PASSWORD_HASH = "8fba13dab71d6fdd8a9b9db1f06e81315dfbfd69167b6097f724604db3c91cdf" +STATE_ARTIFACT = "jungle_grid_execution_state" +TERMINAL_STATUSES = {"completed", "failed", "rejected", "cancelled", "canceled"} +VALID_WORKLOAD_TYPES = {"inference", "training", "fine_tuning", "batch"} +VALID_OPTIMIZE_FOR = {"balanced", "cost", "speed"} +VALID_GPU_CLASSES = {"consumer", "datacenter"} +VALID_REGION_MODES = {"prefer", "strict"} +VALID_PRIORITIES = {"low", "balanced", "high", "low_latency", "low_cost", "high_reliability"} +VALID_PRECISIONS = {"fp32", "fp16", "bf16", "int8"} +CONSTRAINT_FIELDS = { + "max_price_per_hour", + "gpu_type", + "gpu_class", + "preferred_gpu_family", + "avoid_gpu_families", + "region_preference", + "region_mode", + "latency_priority", + "cost_priority", +} +MAX_SHARED_LOGS = 200 +MAX_SHARED_EVENTS = 200 + +SUBMIT_FIELDS = { + "name", + "workload_type", + "image", + "command", + "args", + "environment_from_env", + "input_files", + "script_files", + "expected_artifacts", + "template", + "metadata", + "callback", + "model_size_gb", + "batch_size", + "precision", + "disk_gb", + "gpu_required", + "gpu_count", + "gpu_type", + "gpu_class", + "min_vram_gb", + "max_price_per_hour", + "preferred_gpu_family", + "avoid_gpu_families", + "region_preference", + "region_mode", + "priority", + "latency_priority", + "cost_priority", + "timeout_seconds", + "routing_mode", + "optimize_for", + "constraints", +} +ESTIMATE_FIELDS = SUBMIT_FIELDS - {"environment_from_env"} +SECRET_KEY_PATTERN = re.compile( + r"(?i)(api[_-]?key|authorization|password|secret|token|auth_token|upload_url|download_url|complete_url)" +) +SECRET_TEXT_PATTERN = re.compile( + r"(?i)(bearer\s+)[^\s,;]+|(? str: + """Return a project-safe string.""" + text = str(value) + for secret in secrets: + if secret: + text = text.replace(secret, "[REDACTED]") + return SECRET_TEXT_PATTERN.sub(lambda match: f"{match.group(1) or ''}[REDACTED]", text) + + +def contains_sensitive_key(value: object) -> bool: + if isinstance(value, Mapping): + return any( + SECRET_KEY_PATTERN.search(str(key)) or contains_sensitive_key(nested) for key, nested in value.items() + ) + if isinstance(value, list): + return any(contains_sensitive_key(nested) for nested in value) + return False + + +def sanitize_project_data(value: object, secrets: Iterable[str] = ()) -> object: + """Recursively redact credentials, signed URLs, and resolved environment values.""" + secret_values = [secret for secret in secrets if secret] + if isinstance(value, str): + return redact_sensitive(value, secret_values) + if isinstance(value, Mapping): + result: dict[str, object] = {} + for key, nested in value.items(): + clean_key = str(key) + if SECRET_KEY_PATTERN.search(clean_key): + result[clean_key] = "[REDACTED]" + else: + result[clean_key] = sanitize_project_data(nested, secret_values) + return result + if isinstance(value, list): + return [sanitize_project_data(nested, secret_values) for nested in value] + return value + + +def unwrap_response(data: object) -> object: + if isinstance(data, Mapping) and data.get("ok") is True and "data" in data: + return data["data"] + return data + + +def error_detail(data: object, status: int) -> tuple[str, str]: + if isinstance(data, Mapping): + nested = data.get("error") + source = nested if isinstance(nested, Mapping) else data + return ( + redact_sensitive(source.get("code") or "API_ERROR"), + redact_sensitive(source.get("message") or f"HTTP {status}"), + ) + return "API_ERROR", f"HTTP {status}" + + +class JungleGridClient: + """Async client matching the current Jungle Grid MCP-backed REST contract.""" + + def __init__( + self, + api_base: Optional[str] = None, + timeout_seconds: float = 30.0, + read_retries: int = 2, + retry_delay_seconds: float = 0.5, + sleep: Callable[[float], Awaitable[None]] = asyncio.sleep, + ): + configured_base = ( + api_base + or os.getenv("JUNGLEGRID_API_BASE") + or os.getenv("JUNGLE_GRID_API_URL") + or os.getenv("JUNGLE_GRID_API") + or DEFAULT_API_BASE + ) + self.api_key = os.getenv("JUNGLE_GRID_API_KEY", "").strip() + self.api_base = configured_base.strip().rstrip("/") + self.timeout_seconds = timeout_seconds + self.read_retries = max(0, read_retries) + self.retry_delay_seconds = max(0.0, retry_delay_seconds) + self.sleep = sleep + + def _require_api_key(self) -> str: + if not self.api_key: + raise JungleGridError("MISSING_API_KEY", "JUNGLE_GRID_API_KEY is required.") + return self.api_key + + async def _request( + self, + method: str, + path: str, + payload: Optional[dict[str, object]] = None, + ) -> dict[str, object]: + api_key = self._require_api_key() + attempts = self.read_retries + 1 if method == "GET" else 1 + for attempt in range(attempts): + try: + timeout = aiohttp.ClientTimeout(total=self.timeout_seconds) + headers = { + "Accept": "application/json", + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + } + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.request( + method, f"{self.api_base}{path}", headers=headers, json=payload + ) as response: + text = await response.text() + try: + data = json.loads(text) if text.strip() else {} + except json.JSONDecodeError as exc: + raise JungleGridError( + "INVALID_API_RESPONSE", + "Jungle Grid returned invalid JSON.", + response.status, + ) from exc + if not 200 <= response.status < 300: + code, message = error_detail(data, response.status) + raise JungleGridError(code, message, response.status) + result = unwrap_response(data) + if not isinstance(result, dict): + raise JungleGridError( + "INVALID_API_RESPONSE", + "Jungle Grid returned an unexpected response shape.", + ) + return result + except (asyncio.TimeoutError, aiohttp.ClientError) as exc: + if attempt + 1 < attempts: + await self.sleep(self.retry_delay_seconds * (2**attempt)) + continue + code = "NETWORK_TIMEOUT" if isinstance(exc, asyncio.TimeoutError) else "NETWORK_ERROR" + message = ( + "Jungle Grid request timed out." + if code == "NETWORK_TIMEOUT" + else "Jungle Grid network request failed." + ) + raise JungleGridError(code, message) from exc + except JungleGridError as exc: + retryable = method == "GET" and (exc.status is None or exc.status == 429 or exc.status >= 500) + if retryable and attempt + 1 < attempts: + await self.sleep(self.retry_delay_seconds * (2**attempt)) + continue + raise JungleGridError( + redact_sensitive(exc.code, [api_key]), + redact_sensitive(exc, [api_key]), + exc.status, + ) from exc + raise JungleGridError("NETWORK_ERROR", "Jungle Grid request failed.") + + async def estimate_job(self, workload: dict[str, object]) -> dict[str, object]: + return await self._request("POST", "/v1/mcp/jobs/estimate", workload) + + async def submit_job(self, workload: dict[str, object]) -> dict[str, object]: + return await self._request("POST", "/v1/mcp/jobs", workload) + + async def get_job(self, job_id: str) -> dict[str, object]: + return await self._request("GET", f"/v1/mcp/jobs/{quote(job_id, safe='')}") + + async def get_job_events(self, job_id: str) -> dict[str, object]: + return await self._request("GET", f"/v1/jobs/{quote(job_id, safe='')}/events") + + async def get_job_logs( + self, + job_id: str, + *, + limit: int = 100, + cursor: Optional[str] = None, + tail: Optional[int] = None, + ) -> dict[str, object]: + params: dict[str, object] = {"limit": limit} + if cursor: + params["cursor"] = cursor + if tail is not None: + params["tail"] = tail + return await self._request("GET", f"/v1/mcp/jobs/{quote(job_id, safe='')}/logs?{urlencode(params)}") + + async def get_job_runtime(self, job_id: str) -> dict[str, object]: + return await self._request("GET", f"/v1/jobs/{quote(job_id, safe='')}/runtime") + + async def cancel_job(self, job_id: str, reason: str) -> dict[str, object]: + return await self._request( + "POST", + f"/v1/mcp/jobs/{quote(job_id, safe='')}/cancel", + {"reason": reason}, + ) + + async def list_artifacts(self, job_id: str) -> dict[str, object]: + return await self._request("GET", f"/v1/mcp/jobs/{quote(job_id, safe='')}/artifacts") + + async def get_artifact(self, job_id: str, artifact_id: str) -> dict[str, object]: + return await self._request( + "POST", + f"/v1/mcp/jobs/{quote(job_id, safe='')}/artifacts/{quote(artifact_id, safe='')}/download", + ) + + +def _string(value: object, field_name: str) -> str: + if not isinstance(value, str) or not value.strip(): + raise ValueError(f"{field_name} must be a non-empty string.") + return value.strip() + + +def _string_array(value: object, field_name: str) -> list[str]: + if not isinstance(value, list) or not all(isinstance(item, str) and item for item in value): + raise ValueError(f"{field_name} must be an array of non-empty strings.") + return value + + +def _positive_number(value: object, field_name: str, *, allow_zero: bool = False) -> None: + if isinstance(value, bool) or not isinstance(value, (int, float)): + raise ValueError(f"{field_name} must be a number.") + if value < 0 if allow_zero else value <= 0: + qualifier = "zero or greater" if allow_zero else "positive" + raise ValueError(f"{field_name} must be {qualifier}.") + + +def _validate_input_references(value: object, field_name: str) -> list[dict[str, str]]: + if not isinstance(value, list): + raise ValueError(f"{field_name} must be an array of input_id references.") + result: list[dict[str, str]] = [] + for item in value: + if isinstance(item, str): + input_id = item.strip() + elif isinstance(item, Mapping) and set(item) == {"input_id"}: + input_id = _string(item.get("input_id"), f"{field_name}.input_id") + else: + raise ValueError(f"{field_name} items must contain only input_id.") + if not INPUT_ID_PATTERN.fullmatch(input_id): + raise ValueError(f"{field_name} contains an invalid input_id.") + result.append({"input_id": input_id}) + return result + + +def _validate_callback(value: object) -> dict[str, object]: + if not isinstance(value, Mapping): + raise ValueError("callback must be an object.") + unsupported = set(value) - {"url", "metadata", "auth_token_from_env"} + if unsupported: + raise ValueError(f"Unsupported callback fields: {', '.join(sorted(unsupported))}.") + result: dict[str, object] = {"url": _string(value.get("url"), "callback.url")} + metadata = value.get("metadata") + if metadata is not None: + if not isinstance(metadata, Mapping) or not all( + isinstance(key, str) and isinstance(item, str) for key, item in metadata.items() + ): + raise ValueError("callback.metadata must map strings to strings.") + if contains_sensitive_key(metadata): + raise ValueError("callback.metadata must not contain secret-like keys.") + result["metadata"] = dict(metadata) + auth_env = value.get("auth_token_from_env") + if auth_env is not None: + result["auth_token_from_env"] = _string(auth_env, "callback.auth_token_from_env") + return result + + +def _validate_constraints(value: object) -> dict[str, object]: + if not isinstance(value, Mapping): + raise ValueError("constraints must be an object.") + unsupported = sorted(set(value) - CONSTRAINT_FIELDS) + if unsupported: + raise ValueError(f"Unsupported constraint fields: {', '.join(unsupported)}.") + result = dict(value) + if "max_price_per_hour" in result: + _positive_number(result["max_price_per_hour"], "constraints.max_price_per_hour") + if "gpu_class" in result and result["gpu_class"] not in VALID_GPU_CLASSES: + raise ValueError(f"constraints.gpu_class must be one of: {', '.join(sorted(VALID_GPU_CLASSES))}.") + if "region_mode" in result and result["region_mode"] not in VALID_REGION_MODES: + raise ValueError(f"constraints.region_mode must be one of: {', '.join(sorted(VALID_REGION_MODES))}.") + for field_name in ("latency_priority", "cost_priority"): + if field_name in result and result[field_name] not in {"low", "balanced", "high"}: + raise ValueError(f"constraints.{field_name} must be one of: balanced, high, low.") + if "avoid_gpu_families" in result: + result["avoid_gpu_families"] = _string_array(result["avoid_gpu_families"], "constraints.avoid_gpu_families") + for field_name in ("gpu_type", "preferred_gpu_family", "region_preference"): + if field_name in result: + result[field_name] = _string(result[field_name], f"constraints.{field_name}") + return result + + +def parse_workload_goal(goal: str) -> dict[str, object]: + """Parse and validate a project goal without resolving any secrets.""" + text = goal.strip() + if text.startswith("```"): + text = re.sub(r"^```(?:json)?\s*", "", text) + text = re.sub(r"\s*```$", "", text) + try: + raw = json.loads(text) + except json.JSONDecodeError as exc: + raise ValueError("Project goal must be a JSON object describing the Jungle Grid workload.") from exc + if not isinstance(raw, dict): + raise ValueError("Project goal must be a JSON object.") + if SECRET_TEXT_PATTERN.search(json.dumps(raw)): + raise ValueError("Workload must not contain API keys, Bearer tokens, or signed URLs.") + unsupported = sorted(set(raw) - SUBMIT_FIELDS) + if unsupported: + raise ValueError(f"Unsupported workload fields: {', '.join(unsupported)}.") + + workload = dict(raw) + for required in ("name", "workload_type", "image"): + workload[required] = _string(workload.get(required), required) + if workload["workload_type"] not in VALID_WORKLOAD_TYPES: + raise ValueError(f"workload_type must be one of: {', '.join(sorted(VALID_WORKLOAD_TYPES))}.") + + command = workload.get("command") + args = workload.get("args") + if isinstance(command, str): + workload["command"] = _string(command, "command") + if args is not None: + workload["args"] = _string_array(args, "args") + elif isinstance(command, list): + workload["command"] = _string_array(command, "command") + if args is not None: + raise ValueError("args cannot be combined with the command-array format.") + elif command is not None: + raise ValueError("command must be a string or an array of strings.") + elif args is not None: + raise ValueError("args requires command.") + + for field_name in ("input_files", "script_files"): + if field_name in workload: + workload[field_name] = _validate_input_references(workload[field_name], field_name) + if "expected_artifacts" in workload: + paths = _string_array(workload["expected_artifacts"], "expected_artifacts") + if not all(path.startswith("/workspace/artifacts/") for path in paths): + raise ValueError("expected_artifacts must be paths under /workspace/artifacts/.") + workload["expected_artifacts"] = paths + if any(key in workload for key in ("local_path", "path", "file_path")): + raise ValueError("Arbitrary local file access is not supported.") + + env_refs = workload.get("environment_from_env") + if env_refs is not None and ( + not isinstance(env_refs, Mapping) + or not all( + isinstance(key, str) and key.strip() and isinstance(value, str) and value.strip() + for key, value in env_refs.items() + ) + ): + raise ValueError("environment_from_env must map workload names to local environment names.") + if contains_sensitive_key(workload.get("metadata")): + raise ValueError("metadata must not contain secret-like keys.") + if "callback" in workload: + workload["callback"] = _validate_callback(workload["callback"]) + if "gpu_required" in workload and not isinstance(workload["gpu_required"], bool): + raise ValueError("gpu_required must be a boolean.") + + for field_name in ("model_size_gb", "batch_size", "disk_gb", "gpu_count", "min_vram_gb", "max_price_per_hour"): + if field_name in workload: + _positive_number( + workload[field_name], field_name, allow_zero=field_name in {"batch_size", "disk_gb", "gpu_count"} + ) + if "timeout_seconds" in workload: + _positive_number(workload["timeout_seconds"], "timeout_seconds") + for field_name, allowed in ( + ("gpu_class", VALID_GPU_CLASSES), + ("region_mode", VALID_REGION_MODES), + ("precision", VALID_PRECISIONS), + ("priority", VALID_PRIORITIES), + ("latency_priority", {"low", "balanced", "high"}), + ("cost_priority", {"low", "balanced", "high"}), + ): + if field_name in workload and workload[field_name] not in allowed: + raise ValueError(f"{field_name} must be one of: {', '.join(sorted(allowed))}.") + optimize = workload.get("routing_mode", workload.get("optimize_for")) + if "routing_mode" in workload and "optimize_for" in workload: + raise ValueError("Use routing_mode or optimize_for, not both.") + if optimize is not None and optimize not in VALID_OPTIMIZE_FOR: + raise ValueError(f"routing preference must be one of: {', '.join(sorted(VALID_OPTIMIZE_FOR))}.") + if "avoid_gpu_families" in workload: + workload["avoid_gpu_families"] = _string_array(workload["avoid_gpu_families"], "avoid_gpu_families") + if "constraints" in workload: + workload["constraints"] = _validate_constraints(workload["constraints"]) + return workload + + +def _api_workload_type(value: object) -> object: + return "fine-tuning" if value == "fine_tuning" else value + + +def normalize_api_payload(workload: Mapping[str, object]) -> dict[str, object]: + """Convert goal compatibility aliases to the current Jungle Grid shape.""" + payload = copy.deepcopy(dict(workload)) + payload["workload_type"] = _api_workload_type(payload["workload_type"]) + if "routing_mode" in payload: + payload["optimize_for"] = payload.pop("routing_mode") + if isinstance(payload.get("command"), str): + legacy_args = payload.pop("args", []) + payload["command"] = [ + payload["command"], + *(legacy_args if isinstance(legacy_args, list) else []), + ] + return payload + + +def build_estimate_payload(workload: Mapping[str, object]) -> dict[str, object]: + payload = normalize_api_payload({key: value for key, value in workload.items() if key in ESTIMATE_FIELDS}) + callback = payload.get("callback") + if isinstance(callback, dict): + callback.pop("auth_token_from_env", None) + return payload + + +def build_submit_payload(workload: Mapping[str, object]) -> tuple[dict[str, object], list[str]]: + """Resolve environment-backed secrets only after human approval.""" + payload = normalize_api_payload({key: value for key, value in workload.items() if key != "environment_from_env"}) + secrets: list[str] = [] + references = workload.get("environment_from_env") + if isinstance(references, Mapping): + missing = sorted(str(env_name) for env_name in references.values() if not os.getenv(str(env_name))) + if missing: + raise ValueError(f"Missing required local environment variables: {', '.join(missing)}.") + environment = {str(name): os.environ[str(env_name)] for name, env_name in references.items()} + payload["environment"] = environment + secrets.extend(environment.values()) + callback = payload.get("callback") + if isinstance(callback, dict): + auth_env = callback.pop("auth_token_from_env", None) + if auth_env: + token = os.getenv(str(auth_env)) + if not token: + raise ValueError(f"Missing required local environment variable: {auth_env}.") + callback["auth_token"] = token + secrets.append(token) + return payload, secrets + + +def public_workload(workload: Mapping[str, object]) -> dict[str, object]: + result = dict(workload) + metadata = result.get("metadata") + if isinstance(metadata, Mapping): + result["metadata"] = {str(key): "[REDACTED]" for key in metadata} + return result + + +def estimate_can_submit(estimate: Mapping[str, object]) -> bool: + screening = estimate.get("screening") + if isinstance(screening, Mapping) and screening.get("can_submit") is False: + return False + return estimate.get("available") is not False and estimate.get("can_submit") is not False + + +def estimate_summary(estimate: Mapping[str, object]) -> str: + """Build a compact summary without claiming immediate capacity.""" + parts: list[str] = [] + cost = estimate.get("estimated_cost_usd") + if cost is None: + minimum = estimate.get("estimated_cost_min_usd") + maximum = estimate.get("estimated_cost_max_usd") + if minimum is not None or maximum is not None: + cost = {"min": minimum, "max": maximum} + if cost is not None: + parts.append(f"estimated cost `{json.dumps(cost, sort_keys=True)}` USD") + duration_min = estimate.get("estimated_runtime_min_minutes") + duration_max = estimate.get("estimated_runtime_max_minutes") + if duration_min is not None or duration_max is not None: + parts.append(f"duration `{duration_min or '?'}-{duration_max or '?'}` minutes") + capacity = estimate.get("capacity_status") + if isinstance(capacity, Mapping): + if capacity.get("availability"): + parts.append(f"capacity `{capacity['availability']}`") + if capacity.get("immediate_capacity_confirmed") is False: + parts.append("immediate worker pickup not confirmed") + warnings = estimate.get("warnings") + if isinstance(warnings, list) and warnings: + parts.append(f"{len(warnings)} warning(s)") + return "; ".join(parts) if parts else "structured estimate stored in `jungle_grid_estimate`" + + +def status_fingerprint(job: Mapping[str, object]) -> str: + fields = ( + "status", + "execution_phase", + "status_message", + "phase_started_at", + "delayed_start", + "delay_reason", + "failure", + ) + return json.dumps({key: job.get(key) for key in fields}, sort_keys=True, default=str) + + +@dataclass +class ProjectExecution: + project_id: str + workload: dict[str, object] + estimate_id: str + estimate: dict[str, object] + job_id: Optional[str] = None + approved_by: Optional[str] = None + submission_state: str = "pending" + cancel_requested: bool = False + terminal: bool = False + last_status_fingerprint: Optional[str] = None + log_cursor: Optional[str] = None + seen_event_ids: list[str] = field(default_factory=list) + logs: list[object] = field(default_factory=list) + events: list[object] = field(default_factory=list) + secret_values: list[str] = field(default_factory=list, repr=False) + + def persisted(self) -> dict[str, object]: + data = asdict(self) + data.pop("secret_values", None) + return data + + @classmethod + def from_persisted(cls, value: Mapping[str, object]) -> ProjectExecution: + allowed = cls.__dataclass_fields__.keys() + return cls(**{key: value[key] for key in allowed if key in value}) # type: ignore[arg-type] + + +class JungleGridExecutorAgent(WorkerAgent): + """Deterministic executor for the Jungle Grid project demo.""" + + default_agent_id = "jungle-grid-executor" + + def __init__( + self, + jungle_grid_client: Optional[JungleGridClient] = None, + poll_interval_seconds: float = 10.0, + max_poll_failures: int = 3, + sleep: Callable[[float], Awaitable[None]] = asyncio.sleep, + **kwargs: Any, + ): + super().__init__(**kwargs) + self.jungle_grid = jungle_grid_client or JungleGridClient() + self.poll_interval_seconds = max(0.0, poll_interval_seconds) + self.max_poll_failures = max(1, max_poll_failures) + self.sleep = sleep + self.project_adapter = DefaultProjectAgentAdapter() + self.executions: dict[str, ProjectExecution] = {} + self.monitor_tasks: dict[str, asyncio.Task[None]] = {} + self.project_locks: dict[str, asyncio.Lock] = {} + + async def on_startup(self) -> None: + self.project_adapter.bind_client(self.client) + if self.client.connector is None: + raise RuntimeError("OpenAgents connector is unavailable during startup.") + self.project_adapter.bind_connector(self.client.connector) + self.project_adapter.bind_agent(self.agent_id) + logger.info("Jungle Grid executor is ready") + + async def on_shutdown(self) -> None: + for task in self.monitor_tasks.values(): + task.cancel() + if self.monitor_tasks: + await asyncio.gather(*self.monitor_tasks.values(), return_exceptions=True) + + async def _post(self, project_id: str, text: str) -> None: + await self.project_adapter.send_project_message(project_id=project_id, content={"text": text}) + + async def _set_artifact(self, project_id: str, key: str, value: object) -> None: + safe = sanitize_project_data(value, [self.jungle_grid.api_key]) + await self.project_adapter.set_project_artifact( + project_id=project_id, key=key, value=json.dumps(safe, indent=2, sort_keys=True) + ) + + async def _save_state(self, execution: ProjectExecution) -> None: + await self._set_artifact(execution.project_id, STATE_ARTIFACT, execution.persisted()) + + async def _load_state(self, project_id: str) -> Optional[ProjectExecution]: + if project_id in self.executions: + return self.executions[project_id] + response = await self.project_adapter.get_project_artifact(project_id=project_id, key=STATE_ARTIFACT) + if not response.get("success"): + return None + value = response.get("data", {}).get("value") + if not isinstance(value, str) or not value.strip(): + return None + try: + raw = json.loads(value) + if not isinstance(raw, dict): + return None + execution = ProjectExecution.from_persisted(raw) + except (TypeError, ValueError, json.JSONDecodeError): + return None + self.executions[project_id] = execution + return execution + + def _secrets(self, execution: ProjectExecution) -> list[str]: + return [self.jungle_grid.api_key, *execution.secret_values] + + def _safe(self, value: object, execution: ProjectExecution) -> object: + return sanitize_project_data(value, self._secrets(execution)) + + @staticmethod + def _is_human(sender_id: str) -> bool: + return sender_id.startswith("human:") and len(sender_id) > len("human:") + + @on_event("project.notification.started") + async def handle_project_started(self, context: EventContext) -> None: + payload = context.incoming_event.payload + project_id = payload.get("project_id") + if not isinstance(project_id, str) or not project_id: + return + lock = self.project_locks.setdefault(project_id, asyncio.Lock()) + async with lock: + existing = await self._load_state(project_id) + if existing: + if existing.job_id and not existing.terminal: + self._ensure_monitor(existing) + return + try: + workload = parse_workload_goal(str(payload.get("goal", ""))) + estimate = await self.jungle_grid.estimate_job(build_estimate_payload(workload)) + execution = ProjectExecution( + project_id=project_id, + workload=workload, + estimate_id=uuid.uuid4().hex[:12], + estimate=estimate, + ) + self.executions[project_id] = execution + await self._save_state(execution) + shared = { + "estimate_id": execution.estimate_id, + "workload": public_workload(workload), + "estimate": estimate, + } + await self._set_artifact(project_id, "jungle_grid_estimate", shared) + if not estimate_can_submit(estimate): + await self._post(project_id, "Jungle Grid screening blocked submission. No job was submitted.") + await self.project_adapter.stop_project( + project_id=project_id, reason="Jungle Grid screening blocked submission" + ) + return + await self._post( + project_id, + "Jungle Grid estimate ready. No job has been submitted. " + f"Summary: {estimate_summary(estimate)}.\n\n" + f"A human must reply exactly `APPROVE {execution.estimate_id}` " + "before billable compute can start.", + ) + except (ValueError, JungleGridError) as exc: + await self._post( + project_id, + f"Jungle Grid estimate failed: {redact_sensitive(exc, [self.jungle_grid.api_key])}", + ) + await self.project_adapter.stop_project(project_id=project_id, reason="Jungle Grid estimate failed") + + @on_event("project.notification.message_received") + async def handle_project_message(self, context: EventContext) -> None: + payload = context.incoming_event.payload + project_id = payload.get("project_id") + sender_id = str(payload.get("sender_id", "")) + content = payload.get("content") + text = content.get("text") if isinstance(content, Mapping) else None + if not isinstance(project_id, str) or not isinstance(text, str): + return + normalized_prefix = text.strip() + if not normalized_prefix.startswith(("APPROVE", "CANCEL")): + return + lock = self.project_locks.setdefault(project_id, asyncio.Lock()) + async with lock: + execution = await self._load_state(project_id) + if normalized_prefix.startswith("APPROVE"): + await self._handle_approval(project_id, sender_id, text, execution) + else: + await self._handle_cancellation(project_id, sender_id, text, execution) + + async def _handle_approval( + self, + project_id: str, + sender_id: str, + command: str, + execution: Optional[ProjectExecution], + ) -> None: + if not execution: + await self._post(project_id, "There is no pending Jungle Grid estimate for this project.") + return + if not self._is_human(sender_id): + await self._post(project_id, "Approval rejected: billable submission requires a verified human identity.") + return + if command != f"APPROVE {execution.estimate_id}": + await self._post(project_id, "Approval rejected: estimate id does not match the pending estimate.") + return + if execution.terminal or execution.submission_state != "pending": + suffix = f" as job `{execution.job_id}`" if execution.job_id else "" + await self._post(project_id, f"Jungle Grid submission has already been recorded{suffix}.") + return + await self._submit(execution, sender_id) + + async def _submit(self, execution: ProjectExecution, approved_by: str) -> None: + execution.submission_state = "submitting" + execution.approved_by = approved_by + await self._save_state(execution) + try: + submit_payload, secrets = build_submit_payload(execution.workload) + execution.secret_values = secrets + result = await self.jungle_grid.submit_job(submit_payload) + job_id = str(result.get("job_id") or result.get("id") or "").strip() + if not job_id: + raise JungleGridError("INVALID_API_RESPONSE", "Jungle Grid submit response did not include a job id.") + execution.job_id = job_id + execution.submission_state = "submitted" + execution.last_status_fingerprint = status_fingerprint(result) + await self._save_state(execution) + await self._set_artifact( + execution.project_id, + "jungle_grid_submission", + { + "approved_by": approved_by, + "estimate_id": execution.estimate_id, + "submission": self._safe(result, execution), + }, + ) + await self._post( + execution.project_id, + f"Jungle Grid job submitted after approval by `{approved_by}`: `{job_id}`.", + ) + self._ensure_monitor(execution) + except (ValueError, JungleGridError) as exc: + execution.submission_state = "submission_failed" + await self._save_state(execution) + await self._post( + execution.project_id, + f"Jungle Grid submission failed: {redact_sensitive(exc, self._secrets(execution))}", + ) + await self.project_adapter.stop_project( + project_id=execution.project_id, reason="Jungle Grid submission failed" + ) + + async def _handle_cancellation( + self, + project_id: str, + sender_id: str, + command: str, + execution: Optional[ProjectExecution], + ) -> None: + if not execution or not execution.job_id: + await self._post(project_id, "There is no submitted Jungle Grid job to cancel for this project.") + return + if command != f"CANCEL {execution.job_id}": + await self._post(project_id, "Cancellation rejected: job id does not match this project.") + return + if not self._is_human(sender_id): + await self._post(project_id, "Cancellation rejected: cancellation requires a verified human identity.") + return + if execution.terminal: + await self._post( + project_id, "Cancellation was not sent because this project already recorded a terminal job." + ) + return + if execution.cancel_requested: + await self._post(project_id, "Cancellation has already been requested for this job.") + return + execution.cancel_requested = True + await self._save_state(execution) + try: + result = await self.jungle_grid.cancel_job(execution.job_id, f"Requested from OpenAgents by {sender_id}") + await self._post( + project_id, + f"Cancellation requested for Jungle Grid job `{execution.job_id}`: " + f"{json.dumps(self._safe(result, execution), sort_keys=True)}", + ) + if str(result.get("status", "")).lower() in TERMINAL_STATUSES: + execution.terminal = True + await self._save_state(execution) + await self.project_adapter.stop_project( + project_id=project_id, reason=f"Jungle Grid job {execution.job_id} was cancelled." + ) + except JungleGridError as exc: + execution.cancel_requested = False + await self._save_state(execution) + await self._post( + project_id, + f"Jungle Grid cancellation failed: {redact_sensitive(exc, self._secrets(execution))}", + ) + + def _ensure_monitor(self, execution: ProjectExecution) -> None: + current = self.monitor_tasks.get(execution.project_id) + if current and not current.done(): + return + self.monitor_tasks[execution.project_id] = asyncio.create_task(self._monitor(execution)) + + async def _monitor(self, execution: ProjectExecution) -> None: + assert execution.job_id + failures = 0 + try: + while not execution.terminal: + try: + job = await self.jungle_grid.get_job(execution.job_id) + await self._collect_events(execution) + await self._collect_logs(execution) + failures = 0 + except JungleGridError as exc: + failures += 1 + if failures >= self.max_poll_failures: + raise exc + await self.sleep(self.poll_interval_seconds) + continue + fingerprint = status_fingerprint(job) + if fingerprint != execution.last_status_fingerprint: + execution.last_status_fingerprint = fingerprint + status = str(job.get("status") or "unknown") + phase = job.get("execution_phase") + delayed = " (delayed start)" if job.get("delayed_start") is True else "" + phase_text = f", phase `{phase}`" if phase else "" + await self._post( + execution.project_id, + f"Jungle Grid job `{execution.job_id}` is `{status}`{phase_text}{delayed}.", + ) + await self._save_state(execution) + if str(job.get("status", "")).lower() in TERMINAL_STATUSES: + await self._finalize(execution, job) + return + await self.sleep(self.poll_interval_seconds) + except JungleGridError as exc: + await self._post( + execution.project_id, + f"Jungle Grid monitoring failed after bounded retries: " + f"{redact_sensitive(exc, self._secrets(execution))}", + ) + await self.project_adapter.stop_project( + project_id=execution.project_id, reason="Jungle Grid monitoring failed" + ) + finally: + self.monitor_tasks.pop(execution.project_id, None) + + async def _collect_events(self, execution: ProjectExecution) -> None: + assert execution.job_id + response = await self.jungle_grid.get_job_events(execution.job_id) + items = response.get("items") + if not isinstance(items, list): + return + seen = set(execution.seen_event_ids) + new_items: list[object] = [] + for item in items: + if not isinstance(item, Mapping): + continue + event_id = str(item.get("id") or item.get("sequence") or item.get("created_at") or "") + if not event_id or event_id in seen: + continue + seen.add(event_id) + execution.seen_event_ids.append(event_id) + new_items.append(self._safe(item, execution)) + if new_items: + execution.events = (execution.events + new_items)[-MAX_SHARED_EVENTS:] + latest = new_items[-1] + title = latest.get("title") if isinstance(latest, Mapping) else None + if title: + await self._post(execution.project_id, f"Jungle Grid lifecycle: {title}.") + + async def _collect_logs(self, execution: ProjectExecution) -> None: + assert execution.job_id + response = await self.jungle_grid.get_job_logs(execution.job_id, limit=100, cursor=execution.log_cursor) + items = response.get("items", response.get("logs")) + if isinstance(items, list) and items: + safe_items = self._safe(items, execution) + if isinstance(safe_items, list): + execution.logs = (execution.logs + safe_items)[-MAX_SHARED_LOGS:] + next_cursor = response.get("next_cursor") + if next_cursor is not None and str(next_cursor) != execution.log_cursor: + execution.log_cursor = str(next_cursor) + + async def _finalize(self, execution: ProjectExecution, job: dict[str, object]) -> None: + assert execution.job_id + runtime: object = {} + artifacts: object = {} + try: + runtime = await self.jungle_grid.get_job_runtime(execution.job_id) + except JungleGridError as exc: + if exc.status not in {404, 409}: + runtime = {"unavailable": redact_sensitive(exc, self._secrets(execution))} + else: + runtime = {"unavailable": "Runtime details are not available for this job."} + try: + artifacts = await self.jungle_grid.list_artifacts(execution.job_id) + except JungleGridError as exc: + artifacts = {"unavailable": redact_sensitive(exc, self._secrets(execution))} + result = { + "job": self._safe(job, execution), + "events": execution.events, + "logs": execution.logs, + "runtime": self._safe(runtime, execution), + "artifacts": self._safe(artifacts, execution), + } + await self._set_artifact(execution.project_id, "jungle_grid_result", result) + execution.terminal = True + await self._save_state(execution) + status = str(job.get("status") or "unknown").lower() + await self._post( + execution.project_id, + f"Jungle Grid job `{execution.job_id}` finished with status `{status}`. " + "Sanitized lifecycle events, polled logs, runtime details, and artifact metadata are in " + "`jungle_grid_result`. Temporary download URLs are intentionally not requested or stored.", + ) + if status == "completed": + await self.project_adapter.complete_project( + project_id=execution.project_id, + summary=f"Jungle Grid job {execution.job_id} completed successfully.", + ) + else: + await self.project_adapter.stop_project( + project_id=execution.project_id, + reason=f"Jungle Grid job {execution.job_id} finished with status {status}.", + ) + + +async def main() -> None: + logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") + agent = JungleGridExecutorAgent( + poll_interval_seconds=float(os.getenv("JUNGLE_GRID_POLL_INTERVAL_SECONDS", "10")), + max_poll_failures=int(os.getenv("JUNGLE_GRID_MAX_POLL_FAILURES", "3")), + ) + try: + await agent.async_start( + network_host="localhost", + network_port=8700, + password_hash=EXECUTORS_GROUP_PASSWORD_HASH, + ) + while True: + await asyncio.sleep(3600) + finally: + await agent.async_stop() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/sdk/demos/09_jungle_grid_gpu_execution/network.yaml b/sdk/demos/09_jungle_grid_gpu_execution/network.yaml new file mode 100644 index 000000000..8bfb1f445 --- /dev/null +++ b/sdk/demos/09_jungle_grid_gpu_execution/network.yaml @@ -0,0 +1,91 @@ +network: + name: JungleGridGPUExecution + mode: centralized + node_id: jungle-grid-gpu-execution-1 + initialized: true + transports: + - type: http + config: + port: 8700 + serve_studio: true + serve_mcp: true + - type: grpc + config: + port: 8600 + manifest_transport: http + recommended_transport: grpc + encryption_enabled: false + default_agent_group: guest + requires_password: false + agent_groups: + executors: + description: Agents allowed to execute Jungle Grid project workflows + # Demo-only group credential used to establish runtime topology membership. + # Replace it before adapting this network for a shared or public deployment. + password_hash: 8fba13dab71d6fdd8a9b9db1f06e81315dfbfd69167b6097f724604db3c91cdf + metadata: + permissions: + - execute_external_compute + mods: + - name: openagents.mods.workspace.default + enabled: true + config: + custom_events_enabled: true + - name: openagents.mods.workspace.project + enabled: true + config: + max_concurrent_projects: 5 + project_templates: + jungle_grid_execution: + name: Jungle Grid GPU Execution + description: Estimate, approve, execute, and monitor an AI workload on Jungle Grid + expose_as_tool: true + tool_name: run_jungle_grid_workload + tool_description: Start a human-approved Jungle Grid workload project. The task must be a JSON object with name, workload_type, and image; use uploaded input_id references and environment_from_env for secret workload values. + tool_mode: async + agent_groups: + - executors + context: | + This project delegates a long-running AI or GPU workload to Jungle Grid. + The executor estimates cost first and will not submit a job until a human + replies with the exact approval command shown in the project. Do not put + credentials in the goal; use environment_from_env to reference variables + available only in the executor process. File jobs accept previously + uploaded Jungle Grid input_id references and never read arbitrary host paths. + created_by_version: 0.9.3 + +network_profile: + discoverable: true + name: Jungle Grid GPU Execution + description: A demo of human-approved asynchronous AI and GPU workload delegation through Jungle Grid. + tags: + - demo + - jungle-grid + - gpu + - execution + - project + categories: + - demo + - workflow + country: Worldwide + required_openagents_version: 0.9.3 + capacity: 10 + authentication: + type: none + host: 0.0.0.0 + port: 8700 + +log_level: INFO +data_dir: ./data/jungle-grid-gpu-execution +runtime_limit: null +shutdown_timeout: 30 + +external_access: + default_agent_group: guest + auth_token: null + auth_token_env: null + instruction: null + exposed_tools: + - start_run_jungle_grid_workload + - get_result_run_jungle_grid_workload + excluded_tools: [] diff --git a/tests/agents/test_jungle_grid_executor.py b/tests/agents/test_jungle_grid_executor.py new file mode 100644 index 000000000..b3466bc7f --- /dev/null +++ b/tests/agents/test_jungle_grid_executor.py @@ -0,0 +1,819 @@ +"""Mocked safety and contract tests for the Jungle Grid execution demo.""" + +import asyncio +import importlib.util +import json +import sys +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import AsyncMock + +import pytest +import yaml + +from openagents.core.network import AgentNetwork +from openagents.models.event import Event +from openagents.models.event_context import EventContext +from openagents.models.network_config import AgentGroupConfig, NetworkConfig +from openagents.models.transport import TransportType +from openagents.mods.workspace.project.mod import DefaultProjectNetworkMod + +MODULE_PATH = ( + Path(__file__).parent.parent.parent + / "sdk" + / "demos" + / "09_jungle_grid_gpu_execution" + / "agents" + / "jungle_grid_executor.py" +) +NETWORK_CONFIG_PATH = MODULE_PATH.parent.parent / "network.yaml" +SPEC = importlib.util.spec_from_file_location("jungle_grid_executor", MODULE_PATH) +MODULE = importlib.util.module_from_spec(SPEC) +assert SPEC and SPEC.loader +sys.modules[SPEC.name] = MODULE +SPEC.loader.exec_module(MODULE) + +JungleGridClient = MODULE.JungleGridClient +JungleGridError = MODULE.JungleGridError +JungleGridExecutorAgent = MODULE.JungleGridExecutorAgent +ProjectExecution = MODULE.ProjectExecution +EXECUTORS_GROUP_PASSWORD_HASH = MODULE.EXECUTORS_GROUP_PASSWORD_HASH +STATE_ARTIFACT = MODULE.STATE_ARTIFACT +build_estimate_payload = MODULE.build_estimate_payload +build_submit_payload = MODULE.build_submit_payload +estimate_can_submit = MODULE.estimate_can_submit +parse_workload_goal = MODULE.parse_workload_goal +public_workload = MODULE.public_workload +redact_sensitive = MODULE.redact_sensitive +sanitize_project_data = MODULE.sanitize_project_data + + +def context(event_name, payload): + return EventContext( + incoming_event=Event(event_name=event_name, source_id="system", payload=payload), + event_threads={}, + incoming_thread_id="thread-1", + ) + + +def workload(**updates): + value = { + "name": "training-demo", + "workload_type": "training", + "image": "pytorch/pytorch:2.4.0-cuda12.1-cudnn9-runtime", + "command": ["python", "-c", "print(42)"], + "model_size_gb": 1, + "routing_mode": "cost", + } + value.update(updates) + return value + + +class FakeJungleGridClient: + def __init__(self): + self.api_key = "jg_test_api_key" + self.estimate_job = AsyncMock( + return_value={ + "available": True, + "screening": {"can_submit": True}, + "capacity_status": {"immediate_capacity_confirmed": False}, + } + ) + self.submit_job = AsyncMock(return_value={"job_id": "job_123", "status": "queued"}) + self.get_job = AsyncMock(return_value={"job_id": "job_123", "status": "completed"}) + self.get_job_events = AsyncMock( + return_value={ + "items": [ + { + "id": "evt_1", + "type": "job.completed", + "title": "Job completed", + "message": "done", + "created_at": "2026-06-11T00:00:00Z", + } + ] + } + ) + self.get_job_logs = AsyncMock( + return_value={ + "items": [{"category": "workload_stdout", "message": "done"}], + "next_cursor": None, + } + ) + self.get_job_runtime = AsyncMock(return_value={"exit_code": 0, "stdout_tail": "done"}) + self.cancel_job = AsyncMock(return_value={"job_id": "job_123", "status": "cancelled"}) + self.list_artifacts = AsyncMock( + return_value={ + "artifacts": [ + { + "artifact_id": "artifact_1", + "filename": "output.json", + "content_type": "application/json", + "size_bytes": 12, + } + ] + } + ) + self.get_artifact = AsyncMock(return_value={"download_url": "https://storage.example/file?signature=secret"}) + + +def agent_with_mocks(fake=None): + agent = JungleGridExecutorAgent( + jungle_grid_client=fake or FakeJungleGridClient(), + poll_interval_seconds=0, + sleep=AsyncMock(), + ) + agent.project_adapter = AsyncMock() + agent.project_adapter.send_project_message = AsyncMock(return_value={"success": True}) + agent.project_adapter.set_project_artifact = AsyncMock(return_value={"success": True}) + agent.project_adapter.get_project_artifact = AsyncMock(return_value={"success": True, "data": {"value": None}}) + agent.project_adapter.complete_project = AsyncMock(return_value={"success": True}) + agent.project_adapter.stop_project = AsyncMock(return_value={"success": True}) + return agent + + +def message_texts(agent): + return [call.kwargs["content"]["text"] for call in agent.project_adapter.send_project_message.await_args_list] + + +@pytest.mark.asyncio +async def test_group_authentication_runtime_membership_and_project_delivery(): + network_yaml = yaml.safe_load(NETWORK_CONFIG_PATH.read_text()) + executor_group = network_yaml["network"]["agent_groups"]["executors"] + assert executor_group["password_hash"] == EXECUTORS_GROUP_PASSWORD_HASH + assert "agents" not in executor_group.get("metadata", {}) + + config = NetworkConfig( + name="JungleGridGroupTest", + default_agent_group="guest", + requires_password=False, + agent_groups={"executors": AgentGroupConfig(**executor_group)}, + ) + network = AgentNetwork.create_from_config(config) + registration = await network.register_agent( + agent_id="jungle-grid-executor", + transport_type=TransportType.HTTP, + metadata={"name": "Jungle Grid Executor"}, + certificate=None, + password_hash=EXECUTORS_GROUP_PASSWORD_HASH, + ) + assert registration.success + assert network.topology.agent_group_membership["jungle-grid-executor"] == "executors" + + project_mod = DefaultProjectNetworkMod() + project_mod.update_config( + { + "project_templates": { + "jungle_grid_execution": { + "name": "Jungle Grid GPU Execution", + "agent_groups": ["executors"], + } + } + } + ) + project_mod.initialize() + project_mod.bind_network(network) + assert project_mod._get_agents_in_group("executors") == ["jungle-grid-executor"] + + fake = FakeJungleGridClient() + executor = agent_with_mocks(fake) + + async def deliver(event): + if event.destination_id == "jungle-grid-executor": + await executor.handle_project_started( + EventContext(incoming_event=event, event_threads={}, incoming_thread_id="start") + ) + return SimpleNamespace(success=True) + + project_mod.send_event = AsyncMock(side_effect=deliver) + response = await project_mod.process_system_message( + Event( + event_name="project.start", + source_id="human:owner", + payload={ + "template_id": "jungle_grid_execution", + "goal": json.dumps(workload()), + "name": "Jungle Grid test", + }, + ) + ) + assert response.success + assert "jungle-grid-executor" in response.data["authorized_agents"] + fake.estimate_job.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_estimate_never_submits_and_requires_exact_human_approval(): + fake = FakeJungleGridClient() + agent = agent_with_mocks(fake) + await agent.handle_project_started( + context("project.notification.started", {"project_id": "project-1", "goal": json.dumps(workload())}) + ) + fake.submit_job.assert_not_awaited() + assert any("No job has been submitted" in text and "APPROVE" in text for text in message_texts(agent)) + + +@pytest.mark.asyncio +async def test_screening_can_submit_false_blocks_approval(): + fake = FakeJungleGridClient() + fake.estimate_job.return_value = { + "available": True, + "screening": {"can_submit": False, "blocked_checks": ["resource"]}, + } + agent = agent_with_mocks(fake) + await agent.handle_project_started( + context("project.notification.started", {"project_id": "project-1", "goal": json.dumps(workload())}) + ) + fake.submit_job.assert_not_awaited() + agent.project_adapter.stop_project.assert_awaited_once() + assert not any("APPROVE" in text for text in message_texts(agent)) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("sender", "command"), + [ + ("agent:other", "APPROVE estimate-1"), + ("human:user", "APPROVE wrong"), + ("human:user", " APPROVE estimate-1"), + ("human:user", "APPROVE estimate-1\n"), + ], +) +async def test_unauthorized_or_malformed_approval_is_rejected(sender, command): + fake = FakeJungleGridClient() + agent = agent_with_mocks(fake) + agent.executions["project-1"] = ProjectExecution("project-1", workload(), "estimate-1", {"available": True}) + await agent.handle_project_message( + context( + "project.notification.message_received", + {"project_id": "project-1", "sender_id": sender, "content": {"text": command}}, + ) + ) + fake.submit_job.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_duplicate_and_concurrent_approval_submit_only_once(): + fake = FakeJungleGridClient() + started = asyncio.Event() + release = asyncio.Event() + + async def delayed_submit(_payload): + started.set() + await release.wait() + return {"job_id": "job_123", "status": "queued"} + + fake.submit_job.side_effect = delayed_submit + agent = agent_with_mocks(fake) + agent._ensure_monitor = lambda execution: None + agent.executions["project-1"] = ProjectExecution("project-1", workload(), "estimate-1", {"available": True}) + approval = context( + "project.notification.message_received", + { + "project_id": "project-1", + "sender_id": "human:user", + "content": {"text": "APPROVE estimate-1"}, + }, + ) + first = asyncio.create_task(agent.handle_project_message(approval)) + await started.wait() + second = asyncio.create_task(agent.handle_project_message(approval)) + release.set() + await asyncio.gather(first, second) + fake.submit_job.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_restart_recovers_submitted_state_without_resubmitting(): + fake = FakeJungleGridClient() + agent = agent_with_mocks(fake) + persisted = ProjectExecution( + "project-1", + workload(), + "estimate-1", + {"available": True}, + job_id="job_existing", + submission_state="submitted", + ) + agent.project_adapter.get_project_artifact.return_value = { + "success": True, + "data": {"value": json.dumps(persisted.persisted())}, + } + agent._ensure_monitor = AsyncMock() + await agent.handle_project_started( + context("project.notification.started", {"project_id": "project-1", "goal": json.dumps(workload())}) + ) + fake.estimate_job.assert_not_awaited() + fake.submit_job.assert_not_awaited() + agent._ensure_monitor.assert_called_once() + + +@pytest.mark.asyncio +async def test_restart_does_not_retry_uncertain_submission(): + fake = FakeJungleGridClient() + agent = agent_with_mocks(fake) + persisted = ProjectExecution( + "project-1", + workload(), + "estimate-1", + {"available": True}, + submission_state="submitting", + ) + agent.project_adapter.get_project_artifact.return_value = { + "success": True, + "data": {"value": json.dumps(persisted.persisted())}, + } + await agent.handle_project_message( + context( + "project.notification.message_received", + { + "project_id": "project-1", + "sender_id": "human:user", + "content": {"text": "APPROVE estimate-1"}, + }, + ) + ) + fake.submit_job.assert_not_awaited() + + +def test_current_command_array_is_preserved(): + requested = parse_workload_goal(json.dumps(workload())) + assert build_estimate_payload(requested)["command"] == ["python", "-c", "print(42)"] + assert build_submit_payload(requested)[0]["command"] == ["python", "-c", "print(42)"] + + +def test_legacy_command_and_args_are_combined_without_semantic_change(): + requested = parse_workload_goal(json.dumps(workload(command="python", args=["-c", "print(42)"]))) + assert build_submit_payload(requested)[0]["command"] == ["python", "-c", "print(42)"] + assert "args" not in build_submit_payload(requested)[0] + + +def test_command_array_rejects_separate_args(): + with pytest.raises(ValueError, match="cannot be combined"): + parse_workload_goal(json.dumps(workload(args=["extra"]))) + + +def test_fine_tuning_is_accepted_and_normalized(): + requested = parse_workload_goal(json.dumps(workload(workload_type="fine_tuning"))) + assert build_submit_payload(requested)[0]["workload_type"] == "fine-tuning" + + +def test_invalid_workload_type_is_rejected(): + with pytest.raises(ValueError, match="workload_type must be one of"): + parse_workload_goal(json.dumps(workload(workload_type="interactive"))) + + +def test_input_script_and_expected_artifacts_are_forwarded(): + requested = parse_workload_goal( + json.dumps( + workload( + input_files=[{"input_id": "inp_audio123"}], + script_files=["inp_script123"], + expected_artifacts=["/workspace/artifacts/transcript.txt"], + ) + ) + ) + payload = build_submit_payload(requested)[0] + assert payload["input_files"] == [{"input_id": "inp_audio123"}] + assert payload["script_files"] == [{"input_id": "inp_script123"}] + assert payload["expected_artifacts"] == ["/workspace/artifacts/transcript.txt"] + + +@pytest.mark.parametrize( + "bad", + [ + {"input_files": [{"local_path": "/etc/passwd"}]}, + {"script_files": [{"input_id": "../../secret"}]}, + {"expected_artifacts": ["/tmp/output.txt"]}, + ], +) +def test_arbitrary_local_paths_and_invalid_references_are_rejected(bad): + with pytest.raises(ValueError): + parse_workload_goal(json.dumps(workload(**bad))) + + +def test_environment_references_resolve_only_for_submission(monkeypatch): + monkeypatch.setenv("MODEL_TOKEN", "secret-value") + requested = parse_workload_goal(json.dumps(workload(environment_from_env={"MODEL_TOKEN": "MODEL_TOKEN"}))) + assert "environment" not in build_estimate_payload(requested) + payload, secrets = build_submit_payload(requested) + assert payload["environment"] == {"MODEL_TOKEN": "secret-value"} + assert secrets == ["secret-value"] + + +def test_missing_environment_reference_blocks_submission(monkeypatch): + monkeypatch.delenv("MISSING_TOKEN", raising=False) + requested = parse_workload_goal(json.dumps(workload(environment_from_env={"MODEL_TOKEN": "MISSING_TOKEN"}))) + with pytest.raises(ValueError, match="MISSING_TOKEN"): + build_submit_payload(requested) + + +def test_callback_auth_token_is_environment_only(monkeypatch): + monkeypatch.setenv("CALLBACK_TOKEN", "callback-secret") + requested = parse_workload_goal( + json.dumps( + workload( + callback={ + "url": "https://example.test/hooks/jungle", + "metadata": {"project": "demo"}, + "auth_token_from_env": "CALLBACK_TOKEN", + } + ) + ) + ) + estimate = build_estimate_payload(requested) + assert "auth_token" not in json.dumps(estimate) + payload, secrets = build_submit_payload(requested) + assert payload["callback"]["auth_token"] == "callback-secret" + assert secrets == ["callback-secret"] + + +def test_literal_secrets_and_secret_metadata_are_rejected(): + with pytest.raises(ValueError, match="must not contain"): + parse_workload_goal(json.dumps(workload(command=["curl", "-H", "Bearer secret"]))) + with pytest.raises(ValueError, match="secret-like"): + parse_workload_goal(json.dumps(workload(metadata={"api_token": "value"}))) + + +def test_supported_resource_routing_and_timeout_fields_are_forwarded(): + requested = parse_workload_goal( + json.dumps( + workload( + gpu_required=True, + gpu_count=1, + gpu_class="datacenter", + gpu_type="A100", + min_vram_gb=40, + region_preference="us-east", + region_mode="strict", + timeout_seconds=600, + precision="bf16", + disk_gb=50, + ) + ) + ) + payload = build_submit_payload(requested)[0] + assert payload["gpu_required"] is True + assert payload["gpu_type"] == "A100" + assert payload["timeout_seconds"] == 600 + + +def test_constraints_reject_unverified_fields(): + with pytest.raises(ValueError, match="Unsupported constraint fields"): + parse_workload_goal(json.dumps(workload(constraints={"provider": "runpod"}))) + + +@pytest.mark.asyncio +async def test_malformed_approval_posts_rejection(): + agent = agent_with_mocks() + agent.executions["project-1"] = ProjectExecution("project-1", workload(), "estimate-1", {"available": True}) + await agent.handle_project_message( + context( + "project.notification.message_received", + { + "project_id": "project-1", + "sender_id": "human:user", + "content": {"text": " APPROVE estimate-1"}, + }, + ) + ) + assert any("Approval rejected" in text for text in message_texts(agent)) + + +def test_estimate_can_submit_honors_screening_and_availability(): + assert estimate_can_submit({"available": True, "screening": {"can_submit": True}}) + assert not estimate_can_submit({"available": False}) + assert not estimate_can_submit({"screening": {"can_submit": False}}) + + +@pytest.mark.asyncio +async def test_status_changes_are_deduplicated(): + fake = FakeJungleGridClient() + running = { + "job_id": "job_123", + "status": "running", + "execution_phase": "executing", + "phase_started_at": "2026-06-11T00:00:00Z", + } + fake.get_job.side_effect = [running, running, {"job_id": "job_123", "status": "completed"}] + agent = agent_with_mocks(fake) + execution = ProjectExecution( + "project-1", workload(), "estimate-1", {}, job_id="job_123", submission_state="submitted" + ) + await agent._monitor(execution) + assert sum("`running`" in text for text in message_texts(agent)) == 1 + + +@pytest.mark.asyncio +async def test_lifecycle_endpoint_and_event_deduplication(): + fake = FakeJungleGridClient() + agent = agent_with_mocks(fake) + execution = ProjectExecution("project-1", workload(), "estimate-1", {}, job_id="job_123") + await agent._collect_events(execution) + await agent._collect_events(execution) + fake.get_job_events.assert_awaited_with("job_123") + assert len(execution.events) == 1 + assert sum("Job completed" in text for text in message_texts(agent)) == 1 + + +@pytest.mark.asyncio +async def test_empty_workload_logs_during_startup_do_not_fail(): + fake = FakeJungleGridClient() + fake.get_job_logs.return_value = {"items": [], "next_cursor": None} + agent = agent_with_mocks(fake) + execution = ProjectExecution("project-1", workload(), "estimate-1", {}, job_id="job_123") + await agent._collect_logs(execution) + assert execution.logs == [] + agent.project_adapter.stop_project.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_log_pagination_and_bounded_output(): + fake = FakeJungleGridClient() + fake.get_job_logs.side_effect = [ + {"items": [{"message": "first"}], "next_cursor": "cursor-1"}, + {"items": [{"message": f"line-{index}"} for index in range(250)], "next_cursor": None}, + ] + agent = agent_with_mocks(fake) + execution = ProjectExecution("project-1", workload(), "estimate-1", {}, job_id="job_123") + await agent._collect_logs(execution) + await agent._collect_logs(execution) + assert fake.get_job_logs.await_args_list[1].kwargs["cursor"] == "cursor-1" + assert len(execution.logs) == 200 + + +@pytest.mark.asyncio +async def test_runtime_unavailable_is_nonfatal_and_artifacts_have_no_signed_url(): + fake = FakeJungleGridClient() + fake.get_job_runtime.side_effect = JungleGridError("NOT_FOUND", "not ready", 404) + fake.list_artifacts.return_value = { + "artifacts": [ + { + "artifact_id": "artifact_1", + "filename": "output.json", + "download_url": "https://storage.example/file?signature=secret", + } + ] + } + agent = agent_with_mocks(fake) + execution = ProjectExecution("project-1", workload(), "estimate-1", {}, job_id="job_123") + await agent._finalize(execution, {"job_id": "job_123", "status": "completed"}) + result_call = next( + call + for call in agent.project_adapter.set_project_artifact.await_args_list + if call.kwargs["key"] == "jungle_grid_result" + ) + value = result_call.kwargs["value"] + assert "Runtime details are not available" in value + assert "https://storage.example" not in value + assert "signature=secret" not in value + fake.get_artifact.assert_not_awaited() + agent.project_adapter.complete_project.assert_awaited_once() + + +@pytest.mark.asyncio +@pytest.mark.parametrize("status", ["failed", "cancelled"]) +async def test_failed_or_cancelled_job_stops_project(status): + agent = agent_with_mocks() + execution = ProjectExecution("project-1", workload(), "estimate-1", {}, job_id="job_123") + await agent._finalize(execution, {"job_id": "job_123", "status": status}) + agent.project_adapter.stop_project.assert_awaited_once() + agent.project_adapter.complete_project.assert_not_awaited() + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("sender", "command"), + [ + ("agent:other", "CANCEL job_123"), + ("human:user", "CANCEL job_other"), + ("human:user", " CANCEL job_123"), + ("human:user", "CANCEL job_123\n"), + ], +) +async def test_unauthorized_mismatched_or_malformed_cancellation_is_rejected(sender, command): + fake = FakeJungleGridClient() + agent = agent_with_mocks(fake) + agent.executions["project-1"] = ProjectExecution("project-1", workload(), "estimate-1", {}, job_id="job_123") + await agent.handle_project_message( + context( + "project.notification.message_received", + {"project_id": "project-1", "sender_id": sender, "content": {"text": command}}, + ) + ) + fake.cancel_job.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_duplicate_and_terminal_cancellation_are_safe(): + fake = FakeJungleGridClient() + agent = agent_with_mocks(fake) + execution = ProjectExecution("project-1", workload(), "estimate-1", {}, job_id="job_123", cancel_requested=True) + agent.executions["project-1"] = execution + cancellation = context( + "project.notification.message_received", + { + "project_id": "project-1", + "sender_id": "human:user", + "content": {"text": "CANCEL job_123"}, + }, + ) + await agent.handle_project_message(cancellation) + execution.cancel_requested = False + execution.terminal = True + await agent.handle_project_message(cancellation) + fake.cancel_job.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_matching_human_cancellation_uses_recorded_job_only(): + fake = FakeJungleGridClient() + agent = agent_with_mocks(fake) + agent.executions["project-1"] = ProjectExecution("project-1", workload(), "estimate-1", {}, job_id="job_123") + await agent.handle_project_message( + context( + "project.notification.message_received", + { + "project_id": "project-1", + "sender_id": "human:user", + "content": {"text": "CANCEL job_123"}, + }, + ) + ) + fake.cancel_job.assert_awaited_once_with("job_123", "Requested from OpenAgents by human:user") + + +def test_redaction_removes_api_keys_environment_values_and_signed_urls(): + safe = sanitize_project_data( + { + "message": "Bearer jg_test_api_key secret-value", + "download_url": "https://storage.example/file?signature=abc", + "authorization": "Bearer abc", + }, + ["jg_test_api_key", "secret-value"], + ) + encoded = json.dumps(safe) + assert "jg_test_api_key" not in encoded + assert "secret-value" not in encoded + assert "storage.example" not in encoded + assert encoded.count("[REDACTED]") >= 3 + + +def test_public_workload_hides_metadata_values(): + shared = public_workload(workload(metadata={"customer": "private-value"})) + assert shared["metadata"] == {"customer": "[REDACTED]"} + + +@pytest.mark.asyncio +async def test_missing_api_key_fails_before_network(monkeypatch): + monkeypatch.delenv("JUNGLE_GRID_API_KEY", raising=False) + with pytest.raises(JungleGridError, match="JUNGLE_GRID_API_KEY is required"): + await JungleGridClient().estimate_job(workload()) + + +class FakeResponse: + def __init__(self, status, text): + self.status = status + self._text = text + + async def text(self): + return self._text + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return None + + +class FakeSession: + def __init__(self, response=None, error=None, **kwargs): + self.response = response + self.error = error + + def request(self, *args, **kwargs): + if self.error: + raise self.error + return self.response + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return None + + +@pytest.mark.asyncio +async def test_timeout_uses_bounded_retries_for_reads_only(monkeypatch): + monkeypatch.setenv("JUNGLE_GRID_API_KEY", "jg_test_api_key") + monkeypatch.setattr( + MODULE.aiohttp, + "ClientSession", + lambda **kwargs: FakeSession(error=asyncio.TimeoutError()), + ) + sleep = AsyncMock() + client = JungleGridClient(read_retries=2, retry_delay_seconds=0, sleep=sleep) + with pytest.raises(JungleGridError, match="timed out"): + await client.get_job("job_123") + assert sleep.await_count == 2 + sleep.reset_mock() + with pytest.raises(JungleGridError, match="timed out"): + await client.submit_job(workload()) + sleep.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_malformed_json_response_is_handled(monkeypatch): + monkeypatch.setenv("JUNGLE_GRID_API_KEY", "jg_test_api_key") + monkeypatch.setattr( + MODULE.aiohttp, + "ClientSession", + lambda **kwargs: FakeSession(FakeResponse(200, "not-json")), + ) + with pytest.raises(JungleGridError, match="invalid JSON"): + await JungleGridClient(read_retries=0).get_job("job_123") + + +@pytest.mark.asyncio +async def test_api_error_code_and_message_are_sanitized(monkeypatch): + monkeypatch.setenv("JUNGLE_GRID_API_KEY", "jg_test_api_key") + body = json.dumps( + { + "error": { + "code": "provider_jg_private_backend", + "message": "Bearer jg_test_api_key is forbidden", + } + } + ) + monkeypatch.setattr( + MODULE.aiohttp, + "ClientSession", + lambda **kwargs: FakeSession(FakeResponse(403, body)), + ) + with pytest.raises(JungleGridError) as exc_info: + await JungleGridClient().get_job("job_123") + assert "jg_private_backend" not in exc_info.value.code + assert "jg_test_api_key" not in str(exc_info.value) + + +def test_client_prefers_official_api_base_and_normalizes_slashes(monkeypatch): + monkeypatch.setenv("JUNGLEGRID_API_BASE", "https://official.example.test///") + monkeypatch.setenv("JUNGLE_GRID_API_URL", "https://legacy.example.test") + client = JungleGridClient() + assert client.api_base == "https://official.example.test" + + +def test_client_keeps_legacy_api_base_fallback(monkeypatch): + monkeypatch.delenv("JUNGLEGRID_API_BASE", raising=False) + monkeypatch.setenv("JUNGLE_GRID_API_URL", "https://legacy.example.test/") + assert JungleGridClient().api_base == "https://legacy.example.test" + + +@pytest.mark.asyncio +async def test_client_uses_current_routes_and_log_pagination(monkeypatch): + monkeypatch.setenv("JUNGLE_GRID_API_KEY", "jg_test_api_key") + client = JungleGridClient() + client._request = AsyncMock(return_value={}) + await client.estimate_job({}) + await client.submit_job({}) + await client.get_job("job 123") + await client.get_job_events("job 123") + await client.get_job_logs("job 123", limit=50, cursor="cursor-1") + await client.get_job_runtime("job 123") + await client.list_artifacts("job 123") + await client.get_artifact("job 123", "artifact 1") + await client.cancel_job("job 123", "reason") + paths = [call.args[1] for call in client._request.await_args_list] + assert paths == [ + "/v1/mcp/jobs/estimate", + "/v1/mcp/jobs", + "/v1/mcp/jobs/job%20123", + "/v1/jobs/job%20123/events", + "/v1/mcp/jobs/job%20123/logs?limit=50&cursor=cursor-1", + "/v1/jobs/job%20123/runtime", + "/v1/mcp/jobs/job%20123/artifacts", + "/v1/mcp/jobs/job%20123/artifacts/artifact%201/download", + "/v1/mcp/jobs/job%20123/cancel", + ] + + +def test_execution_state_never_persists_secret_values(): + execution = ProjectExecution( + "project-1", + workload(environment_from_env={"TOKEN": "LOCAL_TOKEN"}), + "estimate-1", + {"available": True}, + secret_values=["resolved-secret"], + ) + assert "resolved-secret" not in json.dumps(execution.persisted()) + assert execution.persisted()["workload"]["environment_from_env"] == {"TOKEN": "LOCAL_TOKEN"} + + +def test_state_artifact_name_is_stable(): + assert STATE_ARTIFACT == "jungle_grid_execution_state" + + +def test_redact_sensitive_handles_bearer_and_jungle_grid_keys(): + text = redact_sensitive("Bearer abc and jg_super_secret") + assert "abc" not in text + assert "jg_super_secret" not in text