From 97df37339483d3533229dcd4df2b21c4b6af7b26 Mon Sep 17 00:00:00 2001 From: yuchenwang3 Date: Fri, 26 Jun 2026 22:01:45 +0000 Subject: [PATCH 1/2] fix: make ClientResponseError picklable for Ray error propagation raise_for_status re-raises aiohttp's ClientResponseError unchanged. It carries request_info/history/headers, which are multidict.CIMultiDictProxy objects that don't pickle. When rollout collection runs under Ray the exception is pickled to cross actors and the run dies with "can't pickle CIMultiDictProxy" - so any resource-server 5xx takes down the whole job instead of failing one rollout. Strip the unpicklable fields; keep status/message/response_content. Signed-off-by: yuchenwang3 --- nemo_gym/server_utils.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/nemo_gym/server_utils.py b/nemo_gym/server_utils.py index 607e477e41..29f0697203 100644 --- a/nemo_gym/server_utils.py +++ b/nemo_gym/server_utils.py @@ -232,6 +232,15 @@ async def raise_for_status(response: ClientResponse) -> None: # pragma: no cove except ClientResponseError as e: # Set the response content here so we have access to it down the line. e.response_content = content + # request_info/history/headers are multidict.CIMultiDictProxy objects + # that don't pickle, which breaks Ray's cross-actor error propagation + # (rollout collection dies with "can't pickle CIMultiDictProxy" on any + # resource-server 5xx). Drop them so the error stays picklable; keep + # status/message/response_content. + e.request_info = None + e.history = () + e.headers = None + e.args = (e.status, e.message) raise e From 22ed1af413ffa2c51c7473b08669788236e98d3b Mon Sep 17 00:00:00 2001 From: yuchenwang3 Date: Fri, 26 Jun 2026 22:01:45 +0000 Subject: [PATCH 2/2] feat: retry transient rollout /run failures _post_subroutine doesn't retry /run, so a transient 5xx (e.g. a momentarily overloaded code-exec server) aborts the batch. Retry a few times on 5xx and connection errors; leave 4xx alone. Signed-off-by: yuchenwang3 --- nemo_gym/rollout_collection.py | 39 ++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/nemo_gym/rollout_collection.py b/nemo_gym/rollout_collection.py index 617e50a54d..e36e8f6d8a 100644 --- a/nemo_gym/rollout_collection.py +++ b/nemo_gym/rollout_collection.py @@ -662,19 +662,32 @@ def run_examples( async def _post_subroutine(row: Dict) -> Tuple[Dict, Dict]: async with semaphore: - res = await server_client.post(server_name=row["agent_ref"]["name"], url_path="/run", json=row) - try: - await raise_for_status(res) - except Exception: - if is_global_aiohttp_client_request_debug_enabled(): - print( - "[rollout_collection] /run failed " - f"status={getattr(res, 'status', None)} " - f"row={json.dumps(_rollout_request_debug_summary(row), sort_keys=True)}", - flush=True, - ) - raise - return row, await get_response_json(res) + # Retry a transient 5xx/connection failure a few times so one flaky + # /run (e.g. a momentarily overloaded code-exec server) only costs + # that rollout an attempt instead of aborting the whole batch. 4xx + # is deterministic, so re-raise it immediately. + attempts = 4 + last_exc = None + for attempt in range(attempts): + res = await server_client.post(server_name=row["agent_ref"]["name"], url_path="/run", json=row) + try: + await raise_for_status(res) + except Exception as e: + last_exc = e + status = getattr(e, "status", None) or getattr(res, "status", None) + if is_global_aiohttp_client_request_debug_enabled(): + print( + "[rollout_collection] /run failed " + f"status={status} attempt={attempt + 1}/{attempts} " + f"row={json.dumps(_rollout_request_debug_summary(row), sort_keys=True)}", + flush=True, + ) + if isinstance(status, int) and 400 <= status < 500: + raise + await asyncio.sleep(1.0) + continue + return row, await get_response_json(res) + raise last_exc return tqdm.as_completed( map(_post_subroutine, examples),