diff --git a/nemo_rl/algorithms/grpo.py b/nemo_rl/algorithms/grpo.py index ce90139e6c..9a0b5608e9 100644 --- a/nemo_rl/algorithms/grpo.py +++ b/nemo_rl/algorithms/grpo.py @@ -465,6 +465,7 @@ def _spinup_nemo_gym(base_urls, model_name): nemo_gym_py_exec = create_local_venv_on_each_node( nemo_gym_py_exec, "nemo_rl.environments.nemo_gym.NemoGym" ) + nemo_gym_venv = os.path.dirname(os.path.dirname(nemo_gym_py_exec)) nemo_gym_dict = env_configs["nemo_gym"] # NeMo-RL-side detection knobs are top-level NemoGymConfig fields # (where the detector reads them), not part of Gym's global config. @@ -497,8 +498,8 @@ def _spinup_nemo_gym(base_urls, model_name): "py_executable": nemo_gym_py_exec, "env_vars": { **os.environ, - "VIRTUAL_ENV": nemo_gym_py_exec, - "UV_PROJECT_ENVIRONMENT": nemo_gym_py_exec, + "VIRTUAL_ENV": nemo_gym_venv, + "UV_PROJECT_ENVIRONMENT": nemo_gym_venv, }, } actor = NemoGym.options(**nemo_gym_opts).remote(nemo_gym_cfg) diff --git a/nemo_rl/environments/nemo_gym.py b/nemo_rl/environments/nemo_gym.py index 611751af36..c85d755ab5 100644 --- a/nemo_rl/environments/nemo_gym.py +++ b/nemo_rl/environments/nemo_gym.py @@ -13,6 +13,7 @@ # limitations under the License. import os import subprocess +import sys from pathlib import Path from typing import Any, Dict, List, NotRequired, TypedDict @@ -38,6 +39,30 @@ DEFAULT_THINKING_TAGS = ["", ""] +def _ensure_nemo_gym_package_precedence() -> None: + """Prefer the third-party NeMo-Gym package over examples/nemo_gym.""" + repo_root = Path(__file__).resolve().parents[2] + gym_workspace = repo_root / "3rdparty" / "Gym-workspace" / "Gym" + gym_init = gym_workspace / "nemo_gym" / "__init__.py" + if not gym_init.exists(): + return + + gym_workspace_str = str(gym_workspace) + if sys.path[:1] != [gym_workspace_str]: + sys.path[:] = [p for p in sys.path if p != gym_workspace_str] + sys.path.insert(0, gym_workspace_str) + + shadowed_module = sys.modules.get("nemo_gym") + if ( + shadowed_module is not None + and getattr(shadowed_module, "__file__", None) is None + and not hasattr(shadowed_module, "PARENT_DIR") + ): + for module_name in list(sys.modules): + if module_name == "nemo_gym" or module_name.startswith("nemo_gym."): + del sys.modules[module_name] + + def get_nemo_gym_uv_cache_dir() -> str | None: """Return the uv cache directory inside a container, or None outside one. @@ -158,6 +183,8 @@ def _spinup(self) -> None: _gym_port_high = self.cfg.get("port_range_high", DEFAULT_GYM_PORT_RANGE_HIGH) self.head_server_port = _get_free_port_local(_gym_port_low, _gym_port_high) + _ensure_nemo_gym_package_precedence() + from nemo_gym.cli import GlobalConfigDictParserConfig, RunHelper from nemo_gym.rollout_collection import RolloutCollectionHelper from nemo_gym.server_utils import HEAD_SERVER_KEY_NAME, BaseServerConfig diff --git a/nemo_rl/models/generation/vllm/vllm_worker_async.py b/nemo_rl/models/generation/vllm/vllm_worker_async.py index c7e09455a4..1584879b77 100644 --- a/nemo_rl/models/generation/vllm/vllm_worker_async.py +++ b/nemo_rl/models/generation/vllm/vllm_worker_async.py @@ -722,15 +722,22 @@ async def create_chat_completion( generator = await openai_serving_chat.create_chat_completion( request, raw_request ) - except VLLMValidationError as e: + except (ValueError, VLLMValidationError) as e: # vLLM 0.20 raises VLLMValidationError for prompts exceeding # max_model_len during tokenization, instead of returning an - # ErrorResponse. Convert to HTTP 400 so the Gym proxy can - # detect context-length overflow and handle it gracefully. + # ErrorResponse. Our post-tokenization clamp can raise a local + # ValueError for the same condition after prefix replacement. + # Convert those cases to HTTP 400 so the Gym proxy can detect + # context-length overflow and handle it gracefully. + message = str(e) + if isinstance(e, ValueError) and not ( + "max_model_len" in message or "maximum context length" in message + ): + raise return JSONResponse( content={ "error": { - "message": str(e), + "message": message, "type": "invalid_request_error", "code": 400, } diff --git a/ray.sub b/ray.sub index 62caadcbf7..89c3d2ac19 100644 --- a/ray.sub +++ b/ray.sub @@ -525,7 +525,12 @@ echo "All workers connected!" # This driver process is responsible for launching a job on the Ray cluster CONTAINER_CWD=$(scontrol show job $SLURM_JOB_ID | grep -oP 'WorkDir=\K[^ ]+' | head -1) if [[ -n "$COMMAND" ]]; then + set +e srun --no-container-mount-home --overlap --container-name=ray-head --container-workdir=$CONTAINER_CWD --nodes=1 --ntasks=1 -w "$head_node" -o $LOG_DIR/ray-driver.log bash -c "$COMMAND" + driver_exit_code=$? + set -e + touch "$LOG_DIR/ENDED" + exit "$driver_exit_code" else echo "[INFO]: Ray Cluster is idled, run this on the slurm head node to get a shell to the head node:" cat <$SLURM_SUBMIT_DIR/${SLURM_JOB_ID}-attach.sh