Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,11 @@ FIREWORKS_API_KEY="your_fireworks_api_key_here"
# E2B API Key (if working with E2B code execution features)
# E2B_API_KEY="your_e2b_api_key_here"

# Runloop API Key (if hosting remote rollout servers in Runloop Devboxes)
# RUNLOOP_API_KEY="your_runloop_api_key_here"

# Optional: Runloop blueprint used by examples/runloop_remote_rollout/test_eval.py
# RUNLOOP_BLUEPRINT_ID="your_runloop_blueprint_id_here"

# Other environment variables your custom reward functions might need
# MY_CUSTOM_SERVICE_API_KEY="some_other_key"
71 changes: 71 additions & 0 deletions docs/integrations/runloop_remote_rollout.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Runloop Remote Rollouts

`RunloopRolloutProcessor` runs your remote rollout server inside a Runloop Devbox and then delegates rollout execution to Eval Protocol's existing `RemoteRolloutProcessor`.

This is useful when your rollout server needs an isolated, reproducible environment but you still want Eval Protocol to use the standard `/init` request and Fireworks tracing metadata flow.

## Install

```bash
pip install "eval-protocol[runloop]"
```

Set the API keys used by the local evaluator and remote server:

```bash
export RUNLOOP_API_KEY=...
export FIREWORKS_API_KEY=...
```

## Usage

```python
from eval_protocol.pytest import RunloopRolloutProcessor, evaluation_test


@evaluation_test(
rollout_processor=RunloopRolloutProcessor(
blueprint_id="bpt_your_blueprint_id",
server_command=(
"python -m uvicorn examples.runloop_remote_rollout.server:app "
"--host 0.0.0.0 --port 8000"
),
port=8000,
),
)
async def test_my_eval(row):
return row
```

The server command must bind to `0.0.0.0` on the configured port so the Runloop tunnel can reach it. The server must expose `POST /init` and should use `FireworksTracingHttpHandler` plus `RolloutIdFilter` to publish rollout completion status.

## Creating A Blueprint

`blueprint_id` is required when you want `RunloopRolloutProcessor` to create a fresh Devbox for each eval invocation. The blueprint should contain the rollout server code and its Python dependencies.

The included example can create a blueprint for a new Runloop account:

```bash
export RUNLOOP_API_KEY=...
eval "$(python examples/runloop_remote_rollout/create_blueprint.py)"
```

That helper uploads the current repository as a temporary Runloop build context and builds a Python image with `eval-protocol[runloop]` installed. Use the printed `RUNLOOP_BLUEPRINT_ID` with `examples/runloop_remote_rollout/test_eval.py`.

## Existing Devboxes

You can attach to an existing Devbox instead of creating one from a blueprint:

```python
RunloopRolloutProcessor(
devbox_id="devbox_existing_id",
server_command="python -m uvicorn server:app --host 0.0.0.0 --port 8000",
port=8000,
)
```

Eval Protocol only shuts down Devboxes created by `RunloopRolloutProcessor` when `shutdown_on_cleanup=True`. Existing Devboxes are left running.

## Trace Flow

`RunloopRolloutProcessor` does not change default rollout behavior. After setup it calls `RemoteRolloutProcessor(remote_base_url=...)`; `RemoteRolloutProcessor` sends `/init`, polls Fireworks tracing status by rollout ID, and backfills the final row from trace data.
3 changes: 3 additions & 0 deletions eval_protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
"evaluation_test": (".pytest", "evaluation_test"),
"SingleTurnRolloutProcessor": (".pytest", "SingleTurnRolloutProcessor"),
"RemoteRolloutProcessor": (".pytest", "RemoteRolloutProcessor"),
"RunloopRolloutProcessor": (".pytest", "RunloopRolloutProcessor"),
"GithubActionRolloutProcessor": (".pytest", "GithubActionRolloutProcessor"),
# From .pytest.parameterize
"DefaultParameterIdGenerator": (".pytest.parameterize", "DefaultParameterIdGenerator"),
Expand Down Expand Up @@ -174,6 +175,7 @@ def __init__(self, *args, **kwargs):
"DataLoaderConfig",
"Status",
"RemoteRolloutProcessor",
"RunloopRolloutProcessor",
"GithubActionRolloutProcessor",
"InputMetadata",
"EvaluationRow",
Expand Down Expand Up @@ -278,6 +280,7 @@ def _get_version():
evaluation_test,
SingleTurnRolloutProcessor,
RemoteRolloutProcessor,
RunloopRolloutProcessor,
GithubActionRolloutProcessor,
)
from .pytest.parameterize import DefaultParameterIdGenerator
Expand Down
3 changes: 3 additions & 0 deletions eval_protocol/pytest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"NoOpRolloutProcessor": (".default_no_op_rollout_processor", "NoOpRolloutProcessor"),
"SingleTurnRolloutProcessor": (".default_single_turn_rollout_process", "SingleTurnRolloutProcessor"),
"RemoteRolloutProcessor": (".remote_rollout_processor", "RemoteRolloutProcessor"),
"RunloopRolloutProcessor": (".runloop_rollout_processor", "RunloopRolloutProcessor"),
"GithubActionRolloutProcessor": (".github_action_rollout_processor", "GithubActionRolloutProcessor"),
"RolloutProcessor": (".rollout_processor", "RolloutProcessor"),
# Dataset adapter
Expand Down Expand Up @@ -103,6 +104,7 @@ def __dir__():
"RolloutProcessor",
"SingleTurnRolloutProcessor",
"RemoteRolloutProcessor",
"RunloopRolloutProcessor",
"GithubActionRolloutProcessor",
"NoOpRolloutProcessor",
# Dataset
Expand Down Expand Up @@ -133,6 +135,7 @@ def __dir__():
from .default_no_op_rollout_processor import NoOpRolloutProcessor as NoOpRolloutProcessor
from .default_single_turn_rollout_process import SingleTurnRolloutProcessor as SingleTurnRolloutProcessor
from .remote_rollout_processor import RemoteRolloutProcessor as RemoteRolloutProcessor
from .runloop_rollout_processor import RunloopRolloutProcessor as RunloopRolloutProcessor
from .github_action_rollout_processor import GithubActionRolloutProcessor as GithubActionRolloutProcessor
from .evaluation_test import evaluation_test as evaluation_test
from .exception_config import (
Expand Down
238 changes: 238 additions & 0 deletions eval_protocol/pytest/runloop_rollout_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
"""Runloop-backed remote rollout processor."""

from __future__ import annotations

import asyncio
import os
import time
import urllib.error
import urllib.request
from typing import Any

from eval_protocol.models import EvaluationRow
from eval_protocol.pytest.remote_rollout_processor import RemoteRolloutProcessor
from eval_protocol.pytest.rollout_processor import RolloutProcessor
from eval_protocol.pytest.types import RolloutProcessorConfig


def _load_runloop_sdk() -> Any:
try:
from runloop_api_client import RunloopSDK
except ImportError as exc:
raise ImportError(
"RunloopRolloutProcessor requires the optional Runloop dependency. "
"Install it with `pip install 'eval-protocol[runloop]'`."
) from exc
return RunloopSDK


class RunloopRolloutProcessor(RolloutProcessor):
"""Host a remote rollout server in a Runloop Devbox.

This processor only orchestrates Runloop lifecycle. Row processing is delegated
to :class:`RemoteRolloutProcessor`, so completion and trace collection continue
to use Eval Protocol's existing remote rollout contract.
"""

def __init__(
self,
*,
blueprint_id: str | None = None,
devbox_id: str | None = None,
server_command: str,
port: int = 8000,
model_base_url: str = "https://tracing.fireworks.ai",
poll_interval: float = 1.0,
timeout_seconds: float = 120.0,
startup_timeout_seconds: float = 60.0,
include_payloads: bool = False,
shutdown_on_cleanup: bool = True,
runloop_api_key: str | None = None,
) -> None:
if not blueprint_id and not devbox_id:
raise ValueError("Either blueprint_id or devbox_id is required for RunloopRolloutProcessor")
if not server_command:
raise ValueError("server_command is required for RunloopRolloutProcessor")

self._blueprint_id = blueprint_id
self._devbox_id = devbox_id
self._server_command = server_command
self._port = port
self._model_base_url = model_base_url
self._poll_interval = poll_interval
self._timeout_seconds = timeout_seconds
self._startup_timeout_seconds = startup_timeout_seconds
self._include_payloads = include_payloads
self._shutdown_on_cleanup = shutdown_on_cleanup
self._runloop_api_key = runloop_api_key

self._client: Any | None = None
self._devbox: Any | None = None
self._server_execution: Any | None = None
self._remote_base_url: str | None = None
self._remote_processor: RemoteRolloutProcessor | None = None
self._owns_devbox = False
self._shutdown_complete = False

@property
def remote_base_url(self) -> str | None:
"""The derived public URL for the Runloop-hosted rollout server."""
return self._remote_base_url

@property
def devbox_id(self) -> str | None:
"""The Devbox ID used by this processor once setup has completed."""
if self._devbox is not None and hasattr(self._devbox, "id"):
return self._devbox.id
return self._devbox_id

def setup(self) -> None:
"""Create or attach to a Devbox, expose the server port, and start the server."""
if self._remote_processor is not None:
return

api_key = self._runloop_api_key or os.getenv("RUNLOOP_API_KEY")
if not api_key:
raise ValueError(
"RUNLOOP_API_KEY is required for RunloopRolloutProcessor. "
"Set the environment variable or pass runloop_api_key explicitly."
)

RunloopSDK = _load_runloop_sdk()
client: Any = RunloopSDK(bearer_token=api_key)
self._client = client

try:
if self._devbox_id:
devbox = client.devbox.from_id(self._devbox_id)
self._owns_devbox = False
else:
assert self._blueprint_id is not None
devbox = client.devbox.create_from_blueprint_id(self._blueprint_id)
self._owns_devbox = True

self._devbox = devbox
self._await_running()
tunnel = self._create_tunnel()
self._remote_base_url = self._derive_remote_base_url(tunnel)
self._server_execution = devbox.cmd.exec_async(self._server_command)
self._wait_for_server_startup()
self._remote_processor = RemoteRolloutProcessor(
remote_base_url=self._remote_base_url,
model_base_url=self._model_base_url,
poll_interval=self._poll_interval,
timeout_seconds=self._timeout_seconds,
include_payloads=self._include_payloads,
)
except Exception:
self._cleanup_partial_setup()
raise

def __call__(self, rows: list[EvaluationRow], config: RolloutProcessorConfig) -> list[asyncio.Task[EvaluationRow]]:
if self._remote_processor is None:
self.setup()
assert self._remote_processor is not None
return self._remote_processor(rows, config)

async def acleanup(self) -> None:
"""Async cleanup for the delegated processor and any owned Devbox."""
if self._remote_processor is not None:
await self._remote_processor.acleanup()
if self._should_shutdown_devbox():
await asyncio.to_thread(self._shutdown_devbox)

def cleanup(self) -> None:
"""Best-effort synchronous cleanup."""
if self._remote_processor is not None:
self._remote_processor.cleanup()
if self._should_shutdown_devbox():
self._shutdown_devbox()

def _await_running(self) -> None:
await_running = getattr(self._devbox, "await_running", None)
if await_running is None:
return
await_running()

def _create_tunnel(self) -> Any:
assert self._devbox is not None
net = self._devbox.net
create_tunnel = getattr(net, "create_tunnel", None)
if create_tunnel is not None:
return create_tunnel(port=self._port)

enable_tunnel = getattr(net, "enable_tunnel", None)
if enable_tunnel is None:
raise RuntimeError("Runloop Devbox networking API does not expose create_tunnel or enable_tunnel")
return enable_tunnel(auth_mode="open")

def _derive_remote_base_url(self, tunnel: Any) -> str:
get_tunnel_url = getattr(self._devbox, "get_tunnel_url", None)
if get_tunnel_url is not None:
url = get_tunnel_url(self._port)
if url:
return str(url).rstrip("/")

for attr in ("url", "base_url", "public_url"):
value = getattr(tunnel, attr, None)
if value:
return str(value).rstrip("/")

tunnel_key = getattr(tunnel, "tunnel_key", None)
if tunnel_key:
return f"https://{self._port}-{tunnel_key}.tunnel.runloop.ai"

raise RuntimeError("Could not determine Runloop tunnel URL for the rollout server")

def _wait_for_server_startup(self) -> None:
if self._startup_timeout_seconds <= 0:
return
assert self._remote_base_url is not None

deadline = time.monotonic() + self._startup_timeout_seconds
last_error: Exception | None = None
while time.monotonic() < deadline:
try:
request = urllib.request.Request(self._remote_base_url, method="GET")
with urllib.request.urlopen(request, timeout=min(5.0, self._startup_timeout_seconds)) as response:
response.read(1)
return
except urllib.error.HTTPError as exc:
if exc.code < 500:
return
last_error = exc
time.sleep(min(1.0, max(0.0, deadline - time.monotonic())))
except Exception as exc:
last_error = exc
time.sleep(min(1.0, max(0.0, deadline - time.monotonic())))

message = f"Runloop rollout server did not become reachable within {self._startup_timeout_seconds} seconds"
if last_error is not None:
message = f"{message}: {last_error}"
raise TimeoutError(message)

def _should_shutdown_devbox(self) -> bool:
return (
self._devbox is not None
and self._owns_devbox
and self._shutdown_on_cleanup
and not self._shutdown_complete
)

def _shutdown_devbox(self) -> None:
if self._devbox is None or self._shutdown_complete:
return
self._devbox.shutdown()
self._shutdown_complete = True

def _cleanup_partial_setup(self) -> None:
if self._remote_processor is not None:
self._remote_processor.cleanup()
self._remote_processor = None
if self._should_shutdown_devbox():
self._shutdown_devbox()
self._devbox = None
self._server_execution = None
self._remote_base_url = None
self._owns_devbox = False
self._shutdown_complete = False
28 changes: 28 additions & 0 deletions examples/runloop_remote_rollout/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Runloop Remote Rollout Example

This example hosts an Eval Protocol remote rollout server in a Runloop Devbox.

## Requirements

```bash
pip install "eval-protocol[runloop]"
export RUNLOOP_API_KEY=...
export FIREWORKS_API_KEY=...
```

Create a Runloop blueprint that contains this repository and its Python dependencies, then set `RUNLOOP_BLUEPRINT_ID`:

```bash
eval "$(python examples/runloop_remote_rollout/create_blueprint.py)"
pytest examples/runloop_remote_rollout/test_eval.py
```

The blueprint ID matters because `RunloopRolloutProcessor` uses it to create a Devbox that already has this repository and `eval-protocol[runloop]` installed. If you already have a suitable running Devbox, you can pass `devbox_id` to `RunloopRolloutProcessor` instead and skip `RUNLOOP_BLUEPRINT_ID`.

The processor starts:

```bash
python -m uvicorn examples.runloop_remote_rollout.server:app --host 0.0.0.0 --port 8000
```

The server receives `POST /init`, performs a chat completion through the Fireworks tracing base URL provided by Eval Protocol, and logs rollout completion using Fireworks tracing metadata.
Loading