Npu rebase#261
Conversation
There was a problem hiding this comment.
Code Review
This pull request adds Ascend NPU support to Vime, introducing installation guides, patches for Megatron and MindSpeed, and updating backend utilities to support both NPU (HCCL) and GPU (NCCL) environments. The reviewer feedback points out critical compatibility issues where NPU-specific checks were accidentally removed or replaced with CUDA-specific calls, leading to runtime failures on NPU platforms. Additionally, the review identifies an undefined variable (init_timeout_s) and a signature mismatch in _make_request, as well as Ray scheduling issues where standard GPUs are incorrectly requested via a custom 'GPU' resource instead of the standard num_gpus parameter. Several opportunities for code cleanup and device-agnostic improvements are also suggested.
| for attempt in range(1, 4): | ||
| try: | ||
| return self._make_request("init_weight_transfer_engine", payload) | ||
| return self._make_request("init_weight_transfer_engine", payload, timeout=init_timeout_s) |
There was a problem hiding this comment.
The variable init_timeout_s is undefined, which will raise a NameError at runtime. Additionally, _make_request does not currently accept a timeout parameter. Please use a defined constant or literal value (e.g., 300.0) and ensure _make_request is updated to support it.
| return self._make_request("init_weight_transfer_engine", payload, timeout=init_timeout_s) | |
| return self._make_request("init_weight_transfer_engine", payload, timeout=300.0) |
| torch.cuda.set_device(local_rank) | ||
| os.environ.setdefault("WORLD_SIZE", str(world_size)) | ||
| os.environ.setdefault("RANK", str(global_rank)) | ||
| os.environ.setdefault("LOCAL_RANK", str(local_rank)) | ||
| os.environ.setdefault("MASTER_ADDR", "localhost") | ||
| os.environ.setdefault("MASTER_PORT", "12355") | ||
| backend = "nccl" | ||
| if is_npu(): | ||
| dist.init_process_group( | ||
| backend="hccl", | ||
| world_size=world_size, | ||
| rank=global_rank, | ||
| ) | ||
| else: | ||
| dist.init_process_group( | ||
| backend="nccl", | ||
| world_size=world_size, | ||
| rank=global_rank, | ||
| device_id=torch.device(f"cuda:{local_rank}"), | ||
| ) | ||
| backend = "hccl" | ||
| dist.init_process_group( | ||
| backend=backend, | ||
| world_size=world_size, | ||
| rank=global_rank, | ||
| device_id=torch.device(f"cuda:{local_rank}"), | ||
| ) |
There was a problem hiding this comment.
On Ascend NPU platforms, calling torch.cuda.set_device and passing device_id=torch.device(f"cuda:{local_rank}") to dist.init_process_group will fail with a RuntimeError because CUDA is not available. Please use NPU-specific device initialization when is_npu() is true.
| torch.cuda.set_device(local_rank) | |
| os.environ.setdefault("WORLD_SIZE", str(world_size)) | |
| os.environ.setdefault("RANK", str(global_rank)) | |
| os.environ.setdefault("LOCAL_RANK", str(local_rank)) | |
| os.environ.setdefault("MASTER_ADDR", "localhost") | |
| os.environ.setdefault("MASTER_PORT", "12355") | |
| backend = "nccl" | |
| if is_npu(): | |
| dist.init_process_group( | |
| backend="hccl", | |
| world_size=world_size, | |
| rank=global_rank, | |
| ) | |
| else: | |
| dist.init_process_group( | |
| backend="nccl", | |
| world_size=world_size, | |
| rank=global_rank, | |
| device_id=torch.device(f"cuda:{local_rank}"), | |
| ) | |
| backend = "hccl" | |
| dist.init_process_group( | |
| backend=backend, | |
| world_size=world_size, | |
| rank=global_rank, | |
| device_id=torch.device(f"cuda:{local_rank}"), | |
| ) | |
| if is_npu(): | |
| torch.npu.set_device(local_rank) | |
| device = torch.device(f"npu:{local_rank}") | |
| else: | |
| torch.cuda.set_device(local_rank) | |
| device = torch.device(f"cuda:{local_rank}") | |
| os.environ.setdefault("WORLD_SIZE", str(world_size)) | |
| os.environ.setdefault("RANK", str(global_rank)) | |
| os.environ.setdefault("LOCAL_RANK", str(local_rank)) | |
| os.environ.setdefault("MASTER_ADDR", "localhost") | |
| os.environ.setdefault("MASTER_PORT", "12355") | |
| backend = "nccl" | |
| if is_npu(): | |
| backend = "hccl" | |
| dist.init_process_group( | |
| backend=backend, | |
| world_size=world_size, | |
| rank=global_rank, | |
| device_id=device, | |
| ) |
| torch.cuda.synchronize() | ||
| gc.collect() | ||
| if is_npu(): | ||
| torch.npu.empty_cache() | ||
| else: | ||
| torch.cuda.empty_cache() | ||
| torch.cuda.empty_cache() |
There was a problem hiding this comment.
Removing the is_npu() check here breaks NPU support because calling torch.cuda.synchronize() and torch.cuda.empty_cache() on NPU platforms will either fail or not correctly synchronize and clear the NPU device memory. Please restore the conditional NPU synchronization and cache clearing.
| torch.cuda.synchronize() | |
| gc.collect() | |
| if is_npu(): | |
| torch.npu.empty_cache() | |
| else: | |
| torch.cuda.empty_cache() | |
| torch.cuda.empty_cache() | |
| if is_npu(): | |
| torch.npu.synchronize() | |
| else: | |
| torch.cuda.synchronize() | |
| gc.collect() | |
| if is_npu(): | |
| torch.npu.empty_cache() | |
| else: | |
| torch.cuda.empty_cache() |
| else: | ||
| _wait_worker_process_alive(self.process) | ||
|
|
||
| def _make_request(self, endpoint: str, payload: dict | None = None) -> dict | None: |
There was a problem hiding this comment.
The _make_request method is called with a timeout keyword argument in init_weight_transfer_engine, but its signature does not accept timeout. Please update the signature to accept timeout and pass it to requests.post.
| def _make_request(self, endpoint: str, payload: dict | None = None) -> dict | None: | |
| def _make_request(self, endpoint: str, payload: dict | None = None, timeout: float | None = None) -> dict | None: |
| env_vars = { | ||
| "HCCL_CUMEM_ENABLE": os.environ.get("HCCL_CUMEM_ENABLE", "0"), | ||
| # NPU: let Ray manage ASCEND_RT_VISIBLE_DEVICES per worker | ||
| # **{name: "1" for name in NOSET_VISIBLE_DEVICES_ENV_VARS_LIST}, | ||
| # Default NCCL_CUMEM_ENABLE to "0" to prevent intermittent NCCL | ||
| # init errors observed when the vLLM side disables CUMEM. | ||
| "NCCL_CUMEM_ENABLE": os.environ.get("NCCL_CUMEM_ENABLE", "0"), | ||
| "NVTE_FP8_BLOCK_SCALING_FP32_SCALES": os.environ.get("NVTE_FP8_BLOCK_SCALING_FP32_SCALES", "1"), | ||
| **{name: "1" for name in NOSET_VISIBLE_DEVICES_ENV_VARS_LIST}, | ||
| **self.args.train_env_vars, | ||
| } |
There was a problem hiding this comment.
On NPU platforms, HCCL is used instead of NCCL, and HCCL_CUMEM_ENABLE must be set in env_vars to prevent intermittent initialization errors. Completely replacing HCCL_CUMEM_ENABLE with NCCL_CUMEM_ENABLE breaks NPU support. Please set both environment variables to support both NPU and GPU platforms.
| env_vars = { | |
| "HCCL_CUMEM_ENABLE": os.environ.get("HCCL_CUMEM_ENABLE", "0"), | |
| # NPU: let Ray manage ASCEND_RT_VISIBLE_DEVICES per worker | |
| # **{name: "1" for name in NOSET_VISIBLE_DEVICES_ENV_VARS_LIST}, | |
| # Default NCCL_CUMEM_ENABLE to "0" to prevent intermittent NCCL | |
| # init errors observed when the vLLM side disables CUMEM. | |
| "NCCL_CUMEM_ENABLE": os.environ.get("NCCL_CUMEM_ENABLE", "0"), | |
| "NVTE_FP8_BLOCK_SCALING_FP32_SCALES": os.environ.get("NVTE_FP8_BLOCK_SCALING_FP32_SCALES", "1"), | |
| **{name: "1" for name in NOSET_VISIBLE_DEVICES_ENV_VARS_LIST}, | |
| **self.args.train_env_vars, | |
| } | |
| env_vars = { | |
| # Default NCCL_CUMEM_ENABLE/HCCL_CUMEM_ENABLE to "0" to prevent intermittent NCCL/HCCL | |
| # init errors observed when the vLLM side disables CUMEM. | |
| "NCCL_CUMEM_ENABLE": os.environ.get("NCCL_CUMEM_ENABLE", "0"), | |
| "HCCL_CUMEM_ENABLE": os.environ.get("HCCL_CUMEM_ENABLE", "0"), | |
| "NVTE_FP8_BLOCK_SCALING_FP32_SCALES": os.environ.get("NVTE_FP8_BLOCK_SCALING_FP32_SCALES", "1"), | |
| **{name: "1" for name in NOSET_VISIBLE_DEVICES_ENV_VARS_LIST}, | |
| **self.args.train_env_vars, | |
| } |
| env_vars = {name: "1" for name in NOSET_VISIBLE_DEVICES_ENV_VARS_LIST} | ||
| env_vars["ASCEND_RT_VISIBLE_DEVICES"] = ",".join(str(base_gpu_id + j) for j in range(num_gpu_per_engine)) | ||
| rollout_engine = RolloutRayActor.options( | ||
| num_cpus=num_cpus, | ||
| num_gpus=0, | ||
| resources={"NPU": 0}, | ||
| scheduling_strategy=scheduling_strategy, | ||
| runtime_env={ | ||
| "env_vars": env_vars, | ||
| }, | ||
| resources={device_name:num_gpus} | ||
| ).remote( |
There was a problem hiding this comment.
Standard GPUs should be requested using the num_gpus parameter rather than a custom resource named "GPU". Specifying resources={"GPU": ...} can cause scheduling failures on standard GPU clusters. Please conditionally set resources for NPU and num_gpus for standard GPUs.
| env_vars = {name: "1" for name in NOSET_VISIBLE_DEVICES_ENV_VARS_LIST} | |
| env_vars["ASCEND_RT_VISIBLE_DEVICES"] = ",".join(str(base_gpu_id + j) for j in range(num_gpu_per_engine)) | |
| rollout_engine = RolloutRayActor.options( | |
| num_cpus=num_cpus, | |
| num_gpus=0, | |
| resources={"NPU": 0}, | |
| scheduling_strategy=scheduling_strategy, | |
| runtime_env={ | |
| "env_vars": env_vars, | |
| }, | |
| resources={device_name:num_gpus} | |
| ).remote( | |
| env_vars = {name: "1" for name in NOSET_VISIBLE_DEVICES_ENV_VARS_LIST} | |
| ray_options = { | |
| "num_cpus": num_cpus, | |
| "scheduling_strategy": scheduling_strategy, | |
| "runtime_env": { | |
| "env_vars": env_vars, | |
| }, | |
| } | |
| if is_npu(): | |
| ray_options["resources"] = {"NPU": num_gpus} | |
| else: | |
| ray_options["num_gpus"] = num_gpus | |
| rollout_engine = RolloutRayActor.options(**ray_options).remote( |
| init_tracking(args, primary=False) | ||
| self.rollout_engine_lock = Lock.options(num_cpus=1, num_gpus=0, resources={"NPU": 0}).remote() | ||
| device_name = "NPU" if is_npu() else "GPU" | ||
| self.rollout_engine_lock = Lock.options(num_cpus=1, num_gpus=0, resources={device_name:0}).remote() |
There was a problem hiding this comment.
Standard GPUs should be requested using the num_gpus parameter rather than a custom resource named "GPU". Specifying resources={"GPU": ...} can cause scheduling failures on standard GPU clusters. Please conditionally set resources for NPU and num_gpus for standard GPUs.
init_tracking(args, primary=False)
ray_options = {
"num_cpus": 1,
}
if is_npu():
ray_options["resources"] = {"NPU": 0}
else:
ray_options["num_gpus"] = 0
self.rollout_engine_lock = Lock.options(**ray_options).remote()| rollout_data["max_seq_lens"][i] if self.args.qkv_format == "bshd" else None, | ||
| ), | ||
| device=torch.npu.current_device(), | ||
| device=torch.cuda.current_device(), |
There was a problem hiding this comment.
| params = [ | ||
| torch.empty_like(param.data, device=torch.npu.current_device()) | ||
| torch.empty_like(param.data, device=torch.cuda.current_device()) | ||
| for _ in range(mpu.get_expert_model_parallel_world_size()) | ||
| ] |
There was a problem hiding this comment.
By default, torch.empty_like creates a tensor with the same device and dtype as the input tensor. You can omit the device argument entirely to avoid calling torch.cuda.current_device() and make the code cleaner and device-agnostic.
| params = [ | |
| torch.empty_like(param.data, device=torch.npu.current_device()) | |
| torch.empty_like(param.data, device=torch.cuda.current_device()) | |
| for _ in range(mpu.get_expert_model_parallel_world_size()) | |
| ] | |
| params = [ | |
| torch.empty_like(param.data) | |
| for _ in range(mpu.get_expert_model_parallel_world_size()) | |
| ] |
| if is_npu(): | ||
| cvd = os.environ.get("ASCEND_RT_VISIBLE_DEVICES") | ||
| else: | ||
| cvd = os.environ.get("CUDA_VISIBLE_DEVICES") | ||
| if not cvd: | ||
| return physical_gpu_id | ||
| visible = [int(x) for x in arvd.split(",") if x.strip() != ""] | ||
| visible = [int(x) for x in cvd.split(",") if x.strip() != ""] | ||
| if physical_gpu_id in visible: | ||
| return visible.index(physical_gpu_id) | ||
| if 0 <= physical_gpu_id < len(visible): | ||
| return physical_gpu_id | ||
| raise RuntimeError( | ||
| f"NPU id {physical_gpu_id} is not valid under ASCEND_RT_VISIBLE_DEVICES={arvd}. " | ||
| f"GPU id {physical_gpu_id} is not valid under CUDA_VISIBLE_DEVICES={cvd}. " | ||
| f"Expected one of {visible} (physical) or 0..{len(visible)-1} (local)." | ||
| ) |
There was a problem hiding this comment.
The RuntimeError message hardcodes CUDA_VISIBLE_DEVICES even when is_npu() is true. This can be confusing to users debugging NPU issues. Please dynamically reference the correct environment variable name in the error message.
env_name = "ASCEND_RT_VISIBLE_DEVICES" if is_npu() else "CUDA_VISIBLE_DEVICES"
cvd = os.environ.get(env_name)
if not cvd:
return physical_gpu_id
visible = [int(x) for x in cvd.split(",") if x.strip() != ""]
if physical_gpu_id in visible:
return visible.index(physical_gpu_id)
if 0 <= physical_gpu_id < len(visible):
return physical_gpu_id
raise RuntimeError(
f"GPU id {physical_gpu_id} is not valid under {env_name}={cvd}. "
f"Expected one of {visible} (physical) or 0..{len(visible)-1} (local)."
)Files updated with is_npu() dual NPU/CUDA support: - vime/backends/megatron_utils/__init__.py: add is_npu/mindspeed import - vime/backends/megatron_utils/actor.py: device selection with is_npu() - vime/backends/megatron_utils/update_weight/common.py: partition_dim with is_npu() - vime/backends/megatron_utils/update_weight/update_weight_from_distributed.py: HCCL/NCCL backend with is_npu() - vime/backends/vllm_utils/vllm_engine.py: _to_local_gpu_id and build_vllm_subprocess_env with is_npu() - vime/ray/actor_group.py: device_name with is_npu() - vime/ray/placement_group.py: device_name and get_ip_and_gpu_id with is_npu() - vime/ray/rollout.py: device_name with is_npu() - vime/ray/train_actor.py: get_local_gpu_id and init with is_npu() - vime/utils/memory_utils.py: clear_memory and available_memory with is_npu() - vime/utils/reloadable_process_group.py: backend with is_npu() Non-is_npu changes from npu branch are NOT applied, keeping npu_rebase code.
Signed-off-by: wangxiaoxin-sherie <wangxiaoxin7@huawei.com>
- All code, docker, scripts, docs, tests are now identical to npu branch - Deleted files not in npu: megatron-all-changes.patch, megatron-bridge.patch, megatron-npu.patch, mindspeed.patch, torch-memory-saver-npu.patch, torch_npu.patch, vllm-ascend.patch, npu_attention_patch.py - Added files from npu: npu_patch/*.patch, NPU.md, qwen3 NPU scripts
No description provided.