From 1bc255ed352ad76462ede45eb950489b87a6a844 Mon Sep 17 00:00:00 2001 From: aoshen02 Date: Mon, 15 Jun 2026 03:06:17 +0000 Subject: [PATCH] fix(rollout): isolate per-trajectory exceptions in generate_and_rm_group MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `generate_and_rm_group` gathers per-trajectory tasks with a bare `asyncio.gather(*tasks)`. If any single trajectory raises an unhandled exception, gather cancels the siblings and propagates, crashing the entire rollout via CancelledError — which also swallows the root exception (logs show only CancelledError, not what actually failed). This is benign for plain RLVR rollouts but agentic rollouts can raise after the custom generate() returns (e.g. trajectory token-merge / prefix-drift edge cases). Observed on a 500-instance SWE-bench eval: a single bad trajectory (~1 in 350-400) reproducibly took down all 500. Fix: - `return_exceptions=True` so one failure no longer cancels the batch. - `logger.error(..., exc_info=res)` to surface the real traceback. - Substitute an ABORTED placeholder with the same fan-out list shape, reusing the existing abort contract (tokens=[0,0], loss_mask=[0], status=ABORTED, reward=0.0). Mirror of vllm-project/vime#200. Co-authored-by: Claude Signed-off-by: aoshen --- slime/rollout/sglang_rollout.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/slime/rollout/sglang_rollout.py b/slime/rollout/sglang_rollout.py index bb87360639..c11ac2a9fc 100644 --- a/slime/rollout/sglang_rollout.py +++ b/slime/rollout/sglang_rollout.py @@ -326,17 +326,34 @@ async def generate_and_rm_group( if sample.session_id is None: sample.session_id = str(uuid.uuid4()) - tasks = [] + pairs = [] for idx, sample in enumerate(group): current_sampling_params = sampling_params.copy() if getattr(args, "sglang_enable_deterministic_inference", False): seed = state.group_sampling_seeds[idx] current_sampling_params["sampling_seed"] = seed - tasks.append( - asyncio.create_task(generate_and_rm(args, sample, current_sampling_params, evaluation=evaluation)) + pairs.append( + (sample, asyncio.create_task(generate_and_rm(args, sample, current_sampling_params, evaluation=evaluation))) ) - group = await asyncio.gather(*tasks) + results = await asyncio.gather(*[t for _, t in pairs], return_exceptions=True) + group = [] + for sample, res in zip([s for s, _ in pairs], results): + if isinstance(res, BaseException): + logger.error( + "[generate_and_rm_group] trajectory crashed, isolating idx=%s: %r", + getattr(sample, "index", "?"), res, exc_info=res, + ) + sample.tokens = [0, 0] + sample.response = "" + sample.response_length = 1 + sample.loss_mask = [0] + sample.rollout_log_probs = [0.0] + sample.reward = 0.0 + sample.status = Sample.Status.ABORTED + group.append([sample]) + else: + group.append(res) # for the rm that need the whole group, we will do the rm here if not state.aborted and args.group_rm: