[ci] chore: add some NPU's UT/ST#6831
Conversation
There was a problem hiding this comment.
Code Review
This pull request refactors the training runner by extracting a BaseTaskRunner class from TaskRunner to support local and NPU unit/system testing. The review feedback points out that the base class still contains hardcoded Ray dependencies and that the core training workflow is implemented in the subclass, which limits reusability. It suggests abstracting worker wrapping and moving the core workflow to the base class to make it truly environment-agnostic.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| class BaseTaskRunner: | ||
| def __init__(self): | ||
| self.role_worker_mapping = {} | ||
| self.mapping = {} |
There was a problem hiding this comment.
Design Feedback: Decoupling BaseTaskRunner from Ray for UT/ST Support
The extraction of BaseTaskRunner is a good step toward supporting local/NPU unit and system testing. However, the current implementation has two major design limitations that hinder its reusability and testability:
-
Hardcoded Ray Dependencies in the Base Class:
Methods likeadd_actor_rollout_workerandadd_critic_workerinsideBaseTaskRunnerstill directly callray.remote(...)and use Ray-specific classes (e.g.,RayWorkerGroup). This means any non-Ray or local subclass ofBaseTaskRunnerwill still be forced to spin up Ray actors, defeating the purpose of an environment-agnostic base class. -
Monolithic and Non-Reusable
runMethod:
BaseTaskRunner.runis defined as a no-op (pass), while the entire PPO training workflow is implemented insideTaskRunner.run. If you want to create a local/NPU runner subclass to run the actual training loop, you would have to duplicate the entirerunmethod.
Suggested Refactoring
To make BaseTaskRunner truly reusable and testable:
1. Abstract the Worker Wrapping
Introduce a helper method in BaseTaskRunner to wrap worker classes, and override it in TaskRunner to apply ray.remote:
# In BaseTaskRunner
def _wrap_worker(self, worker_cls):
return worker_cls
# In TaskRunner
def _wrap_worker(self, worker_cls):
return ray.remote(worker_cls)Then, update the worker registration in BaseTaskRunner to use this helper:
# In BaseTaskRunner.add_actor_rollout_worker
self.role_worker_mapping[role] = self._wrap_worker(actor_rollout_cls)2. Move the Core Workflow to the Base Class
Move the implementation of run from TaskRunner to BaseTaskRunner, and abstract the Ray-specific components (like RayPPOTrainer and ResourcePoolManager) into factory methods that can be overridden by subclasses.
| @pytest.mark.parametrize("strategy", ["fsdp", "fsdp2"]) | ||
| def test_critic_engine(strategy): | ||
| device_count = torch.cuda.device_count() | ||
| if device_name == "cuda": |
There was a problem hiding this comment.
可以调用verl/verl/utils/device.py里面的get_device_name,来避免if分支的判断
| hf_output = hf_model(input_ids.cuda(), attention_mask=attention_mask.cuda()) | ||
| hf_values = hf_output.logits[:, -response_length - 1 : -1, :].float().squeeze(-1).cpu() | ||
| elif device_name == "npu": | ||
| with torch.device("npu"), torch.autocast(device_type="npu", dtype=torch.bfloat16): |
There was a problem hiding this comment.
这里也是,可以把device_type用变量来传入
| model = "~/models/HuggingFaceTB/SmolLM2-135M-Instruct" | ||
| else: | ||
| model = "~/models/Qwen/Qwen2.5-0.5B" | ||
| model = "/mnt/weight/Qwen2.5-0.5B" |
| device_count = torch.cuda.device_count() | ||
| if device_name == "cuda": | ||
| device_count = torch.cuda.device_count() | ||
| elif device_name == "npu": |
There was a problem hiding this comment.
可以调用verl/verl/utils/device.py里面的get_device_name,来避免if分支的判断
| from verl.single_controller.ray import RayClassWithInitArgs, RayResourcePool, RayWorkerGroup | ||
| from verl.trainer.config import CheckpointConfig | ||
| from verl.utils import tensordict_utils as tu | ||
| from verl.utils.device import * |
There was a problem hiding this comment.
这里为啥需要import *, 最好用到了什么函数导入什么函数,按需导入
What does this PR do?
Checklist Before Starting
[{modules}] {type}: {description}(This will be checked by the CI){modules}includefsdp,megatron,veomni,sglang,vllm,rollout,trainer,ci,training_utils,recipe,hardware,deployment,ray,worker,single_controller,misc,perf,model,algo,env,tool,ckpt,doc,data,cfg,reward,fully_async,one_step_off,like[megatron, fsdp, doc]{type}is infeat,fix,refactor,chore,test[BREAKING]to the beginning of the title.[BREAKING][fsdp, megatron] feat: dynamic batchingTest
API and Usage Example
# Add code snippet or script demonstrating how to use thisDesign & Code Changes
Checklist Before Submitting
Important
Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=alwaysci-requestchannel in theverlSlack workspace. (If not accessible, please try the Feishu group (飞书群).)recipesubmodule, please also update the reference to the submodule commit viagit submodule update --remoteorcd recipe && git pull origin main.