From 3d779e1b0b57ea9eb234a0a6d9b5f967b4c7adfe Mon Sep 17 00:00:00 2001 From: yutongli03 <2423786753@qq.com> Date: Sat, 18 Apr 2026 17:57:44 +0800 Subject: [PATCH] =?UTF-8?q?feat(reward):=20=E6=B7=BB=E5=8A=A0=E5=9F=BA?= =?UTF-8?q?=E4=BA=8E=E5=8E=86=E5=8F=B2=E5=B7=A5=E5=85=B7=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E7=BB=9F=E8=AE=A1=E7=9A=84=E5=8A=A8=E6=80=81=E5=A5=96=E5=8A=B1?= =?UTF-8?q?=E6=9D=83=E9=87=8D=E8=AE=A1=E7=AE=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 实现工具调用历史统计功能,包括平均调用次数、成功率和使用率 修改奖励计算函数以支持动态权重调整,根据历史数据优化工具使用行为 在NaiveRewardManager中添加工具调用统计和历史数据管理功能 --- verl/utils/reward_score/__init__.py | 22 ++- verl/utils/reward_score/vl_agent.py | 270 ++++++++++++++++++++++++--- verl/workers/reward_manager/naive.py | 139 +++++++++++++- 3 files changed, 389 insertions(+), 42 deletions(-) diff --git a/verl/utils/reward_score/__init__.py b/verl/utils/reward_score/__init__.py index 64e6bd22c..cf1f05738 100644 --- a/verl/utils/reward_score/__init__.py +++ b/verl/utils/reward_score/__init__.py @@ -14,7 +14,21 @@ # from . import gsm8k, math, prime_math, prime_code import torch -def _default_compute_score(data_source, solution_str, ground_truth, extra_info=None): +def _default_compute_score(data_source, solution_str, ground_truth, extra_info=None, history_stats=None): + """ + 默认的奖励计算函数 + + 参数: + data_source: 数据集名称 + solution_str: 模型生成的响应字符串 + ground_truth: 标准答案 + extra_info: 额外信息(如问题文本、图像路径等) + history_stats: 历史统计数据,包含近几轮的工具调用统计 + - avg_tool_calls: 平均工具调用次数 + - tool_success_rate: 工具调用成功率 + - tool_usage_rate: 工具使用率 + - history_size: 历史数据量 + """ if data_source == "openai/gsm8k": from . import gsm8k @@ -63,15 +77,15 @@ def _default_compute_score(data_source, solution_str, ground_truth, extra_info=N elif data_source in ['vstar', 'vl_agent', 'chart']: from . import vl_agent - res = vl_agent.compute_score(solution_str, ground_truth, extra_info) + res = vl_agent.compute_score(solution_str, ground_truth, extra_info, history_stats) elif data_source in ['geoguessr']: from . import vl_agent - res = vl_agent.compute_common_reasoning(solution_str, ground_truth, extra_info) + res = vl_agent.compute_common_reasoning(solution_str, ground_truth, extra_info, history_stats) elif data_source in ['thinklite_eureka', 'xince']: from . import vl_agent - res = vl_agent.compute_score_math(solution_str, ground_truth, extra_info) + res = vl_agent.compute_score_math(solution_str, ground_truth, extra_info, history_stats) elif data_source in ["frozenlake"]: res = 0.0 diff --git a/verl/utils/reward_score/vl_agent.py b/verl/utils/reward_score/vl_agent.py index 9a970b685..444be87e8 100644 --- a/verl/utils/reward_score/vl_agent.py +++ b/verl/utils/reward_score/vl_agent.py @@ -187,7 +187,134 @@ def extract_answer(text): return None -def compute_score(predict_str: str, ground_truth: str, extra_info=None) -> float: +def compute_dynamic_weights(history_stats: dict = None) -> dict: + """ + 基于历史工具调用统计计算动态权重 + + 参数: + history_stats: 历史统计数据,包含: + - avg_tool_calls: 平均工具调用次数 + - tool_success_rate: 工具调用成功率 + - tool_usage_rate: 工具使用率 + - history_size: 历史数据量 + + 返回: + 包含动态权重的字典: + - acc_weight: 准确性奖励权重 + - format_weight: 格式奖励权重 + - tool_weight: 工具使用奖励权重 + """ + # 默认权重(与原固定权重一致) + default_weights = { + "acc_weight": 0.8, + "format_weight": 0.2, + "tool_weight": 1.2 + } + + if history_stats is None or history_stats.get("history_size", 0) == 0: + return default_weights + + # 提取历史统计 + avg_tool_calls = history_stats.get("avg_tool_calls", 1.0) + tool_success_rate = history_stats.get("tool_success_rate", 0.5) + + # 超参数配置 + expected_tool_calls = 1.5 # 期望的工具调用次数 + min_tool_calls = 0.3 # 最低期望的工具调用次数 + max_tool_calls = 3.0 # 最高期望的工具调用次数 + + min_success_rate = 0.3 # 最低可接受的工具成功率 + target_success_rate = 0.7 # 理想的工具成功率 + + # 1. 基于工具调用次数的调整因子 + # - 调用次数过少:鼓励工具使用 → 降低 acc 权重,提高 tool 权重 + # - 调用次数过多:可能过度依赖工具 → 提高 acc 权重,降低 tool 权重 + + if avg_tool_calls < min_tool_calls: + # 工具调用严重不足,需要大力鼓励 + tool_call_factor = 0.7 # 降低 acc 权重 30% + tool_weight_factor = 1.5 # 提高 tool 权重 50% + elif avg_tool_calls > max_tool_calls: + # 工具调用过多,可能过度依赖 + tool_call_factor = 1.4 # 提高 acc 权重 40% + tool_weight_factor = 0.6 # 降低 tool 权重 40% + elif avg_tool_calls < expected_tool_calls * 0.8: + # 工具调用略少,适度鼓励 + ratio = (avg_tool_calls - min_tool_calls) / (expected_tool_calls * 0.8 - min_tool_calls) + tool_call_factor = 0.7 + 0.3 * ratio # 从 0.7 线性增加到 1.0 + tool_weight_factor = 1.5 - 0.5 * ratio # 从 1.5 线性减少到 1.0 + elif avg_tool_calls > expected_tool_calls * 1.5: + # 工具调用略多,适度抑制 + ratio = (avg_tool_calls - expected_tool_calls * 1.5) / (max_tool_calls - expected_tool_calls * 1.5) + tool_call_factor = 1.0 + 0.4 * ratio # 从 1.0 线性增加到 1.4 + tool_weight_factor = 1.0 - 0.4 * ratio # 从 1.0 线性减少到 0.6 + else: + # 工具调用次数在合理范围内 + tool_call_factor = 1.0 + tool_weight_factor = 1.0 + + # 2. 基于工具成功率的调整因子 + # - 成功率低:工具使用无效 → 提高 acc 权重,降低 tool 权重 + # - 成功率高:工具使用有效 → 保持或略微提高 tool 权重 + + if tool_success_rate < min_success_rate: + # 工具成功率极低,工具使用基本无效 + success_factor = 1.6 # 大幅提高 acc 权重 + success_tool_factor = 0.3 # 大幅降低 tool 权重 + elif tool_success_rate > target_success_rate: + # 工具成功率很高,工具使用有效 + success_factor = 0.9 # 略微降低 acc 权重 + success_tool_factor = 1.2 # 略微提高 tool 权重 + else: + # 成功率在中间范围,线性插值 + ratio = (tool_success_rate - min_success_rate) / (target_success_rate - min_success_rate) + success_factor = 1.6 - 0.7 * ratio # 从 1.6 线性减少到 0.9 + success_tool_factor = 0.3 + 0.9 * ratio # 从 0.3 线性增加到 1.2 + + # 3. 计算最终动态权重 + base_acc = default_weights["acc_weight"] + base_format = default_weights["format_weight"] + base_tool = default_weights["tool_weight"] + + # 组合调整因子 + final_acc_factor = tool_call_factor * success_factor + final_tool_factor = tool_weight_factor * success_tool_factor + + # 计算最终权重,并限制在合理范围内 + dynamic_acc_weight = base_acc * final_acc_factor + dynamic_acc_weight = max(0.3, min(1.8, dynamic_acc_weight)) # 限制范围 [0.3, 1.8] + + # 格式权重保持相对稳定,但可以根据 acc 权重进行适度调整 + # 确保格式惩罚不会被过度放大或缩小 + dynamic_format_weight = base_format * (1.0 + 0.3 * (1.0 - final_acc_factor)) + dynamic_format_weight = max(0.1, min(0.4, dynamic_format_weight)) # 限制范围 [0.1, 0.4] + + dynamic_tool_weight = base_tool * final_tool_factor + dynamic_tool_weight = max(0.2, min(2.0, dynamic_tool_weight)) # 限制范围 [0.2, 2.0] + + return { + "acc_weight": dynamic_acc_weight, + "format_weight": dynamic_format_weight, + "tool_weight": dynamic_tool_weight, + "base_acc_weight": base_acc, + "base_tool_weight": base_tool, + "tool_call_factor": tool_call_factor, + "success_factor": success_factor, + "avg_tool_calls": avg_tool_calls, + "tool_success_rate": tool_success_rate + } + + +def compute_score(predict_str: str, ground_truth: str, extra_info=None, history_stats=None) -> float: + """ + 计算视觉语言任务的奖励,支持基于历史统计的动态权重 + + 参数: + predict_str: 模型生成的响应字符串 + ground_truth: 标准答案 + extra_info: 额外信息(如问题文本) + history_stats: 历史统计数据,用于计算动态权重 + """ is_format_error = False # predict_str = "" + predict_str count_think_1 = predict_str.count("") @@ -208,14 +335,6 @@ def compute_score(predict_str: str, ground_truth: str, extra_info=None) -> float answer_text = predict_str.split("")[-1].split("")[0].strip() - # pattern = re.compile(r'<\|im_start\|>assistant(.*?)$', re.DOTALL) # 匹配最后一个 target 后的所有内容 - # match = pattern.search(predict_str) - # if match: - # answer_text = match.group(1).strip() - # print(f'DEBUG{answer_text=}') - # else: - # answer_text = "" - question_text = extra_info['question'] full_prompt = get_prompt(answer_text, ground_truth, question_text) @@ -257,26 +376,54 @@ def compute_score(predict_str: str, ground_truth: str, extra_info=None) -> float acc_reward = 0.0 is_format_error = True + # 计算各奖励分量 tool_reward_base = 1.0 if count_vision_1 > 0 else 0.0 tool_reward = 1.0 if count_vision_1 > 0 and acc_reward > 0.5 else 0.0 format_reward = -1.0 if is_format_error else 0.0 - # reward 1 - # return 0.8 * acc_reward + 0.2 * format_reward + 0.4 * tool_reward_base - # reward 2 - return 0.8 * acc_reward + 0.2 * format_reward + 1.2 * tool_reward - - # reward 2 - # return 1.0 * acc_reward + 0.2 * format_reward + 1.0 * tool_reward + 0.2 * tool_reward_base - # reward 3 - # tool_reward_alpha = 1.2 if count_vision_1 > 0 else 0.0 - # return 1.0 * acc_reward * tool_reward_alpha + 0.2 * format_reward - # reward 4 - # extra_reward = tool_reward_base * (count_vision_1 - 1) * (1 - acc_reward) - # return 0.8 * acc_reward + 0.2 * format_reward + 0.4 * tool_reward_base + 0.2 * extra_reward - - - -def compute_common_reasoning(predict_str: str, ground_truth: str, extra_info=None) -> float: + + # 获取动态权重 + weights = compute_dynamic_weights(history_stats) + + # 使用动态权重计算最终奖励 + final_reward = ( + weights["acc_weight"] * acc_reward + + weights["format_weight"] * format_reward + + weights["tool_weight"] * tool_reward + ) + + # 打印调试信息(显示权重调整情况) + if history_stats and history_stats.get("history_size", 0) > 0: + print(f" [Dynamic Weights] acc={weights['acc_weight']:.2f} (base={weights['base_acc_weight']:.2f}), " + f"tool={weights['tool_weight']:.2f} (base={weights['base_tool_weight']:.2f}), " + f"history_calls={weights['avg_tool_calls']:.2f}, " + f"history_success={weights['tool_success_rate']:.2f}") + + # 返回字典形式,包含详细信息以便追踪 + return { + "score": final_reward, + "acc": acc_reward, + "format": format_reward, + "tool": tool_reward, + "tool_calls": count_vision_1, + "acc_weight": weights["acc_weight"], + "format_weight": weights["format_weight"], + "tool_weight": weights["tool_weight"], + "base_acc_weight": weights["base_acc_weight"], + "base_tool_weight": weights["base_tool_weight"] + } + + + +def compute_common_reasoning(predict_str: str, ground_truth: str, extra_info=None, history_stats=None) -> float: + """ + 计算通用推理任务的奖励,支持基于历史统计的动态权重 + + 参数: + predict_str: 模型生成的响应字符串 + ground_truth: 标准答案 + extra_info: 额外信息(如问题文本) + history_stats: 历史统计数据,用于计算动态权重 + """ is_format_error = False # predict_str = "" + predict_str count_think_1 = predict_str.count("") @@ -335,11 +482,35 @@ def compute_common_reasoning(predict_str: str, ground_truth: str, extra_info=Non print(f' [ERROR] judgement format invalid: {judgement}') continue + # 计算各奖励分量 tool_reward_base = 1.0 if count_vision_1 > 0 else 0.0 tool_reward = 1.0 if count_vision_1 > 0 and acc_reward > 0.5 else 0.0 format_reward = -1.0 if is_format_error else 0.0 + + # 获取动态权重 + weights = compute_dynamic_weights(history_stats) + + # 使用动态权重计算最终奖励 + final_reward = ( + weights["acc_weight"] * acc_reward + + weights["format_weight"] * format_reward + + weights["tool_weight"] * tool_reward + ) + print(f' [DEBUG] query={extra_info["question"]}, {ground_truth=}, {answer_text=}, {acc_reward=}, {format_reward=}') - return 0.8 * acc_reward + 0.2 * format_reward + 1.2 * tool_reward + print(f' [Dynamic Weights] acc={weights["acc_weight"]:.2f}, tool={weights["tool_weight"]:.2f}') + + # 返回字典形式,包含详细信息以便追踪 + return { + "score": final_reward, + "acc": acc_reward, + "format": format_reward, + "tool": tool_reward, + "tool_calls": count_vision_1, + "acc_weight": weights["acc_weight"], + "format_weight": weights["format_weight"], + "tool_weight": weights["tool_weight"] + } def rule_math_verify(ground_truth, model_answer): @@ -385,7 +556,16 @@ def generative_verify(query, ground_truth, model_answer): print(f' [ERROR math] verify bug output: ') -def compute_score_math(predict_str: str, ground_truth: str, extra_info=None) -> float: +def compute_score_math(predict_str: str, ground_truth: str, extra_info=None, history_stats=None) -> float: + """ + 计算数学视觉推理任务的奖励,支持基于历史统计的动态权重 + + 参数: + predict_str: 模型生成的响应字符串 + ground_truth: 标准答案 + extra_info: 额外信息(如问题文本) + history_stats: 历史统计数据,用于计算动态权重 + """ is_format_error = False # predict_str = "" + predict_str count_think_1 = predict_str.count("") @@ -411,8 +591,40 @@ def compute_score_math(predict_str: str, ground_truth: str, extra_info=None) -> acc_reward = 1.0 if generative_verify(extra_info['question'], ground_truth, model_answer) else 0.0 format_reward = -1.0 if is_format_error else 0.0 + + # 数学任务通常不涉及工具调用,但为了接口一致性,我们仍然支持动态权重 + # 这里使用不同的基础权重(数学任务更强调准确性) + if history_stats is None or history_stats.get("history_size", 0) == 0: + # 没有历史数据,使用原始固定权重 + final_reward = 1.2 * acc_reward + 0.4 * format_reward + acc_weight = 1.2 + format_weight = 0.4 + else: + # 有历史数据,使用动态权重 + # 数学任务的基础权重与视觉任务不同 + base_acc = 1.2 + base_format = 0.4 + + # 基于历史统计进行调整 + # 对于数学任务,我们主要关注格式正确性和答案准确性 + # 如果历史格式错误率高,增加格式权重 + # 这里我们简化处理,直接使用基础权重,但可以根据需要扩展 + + final_reward = base_acc * acc_reward + base_format * format_reward + acc_weight = base_acc + format_weight = base_format + print(f' [DEBUG] query={extra_info["question"]}, {ground_truth=}, {model_answer=}, {acc_reward=}, {format_reward=}') - return 1.2 * acc_reward + 0.4 * format_reward + print(f' [Weights] acc={acc_weight:.2f}, format={format_weight:.2f}') + + # 返回字典形式,包含详细信息以便追踪 + return { + "score": final_reward, + "acc": acc_reward, + "format": format_reward, + "acc_weight": acc_weight, + "format_weight": format_weight + } if __name__ == '__main__': diff --git a/verl/workers/reward_manager/naive.py b/verl/workers/reward_manager/naive.py index 1ed8af909..2750c17c8 100644 --- a/verl/workers/reward_manager/naive.py +++ b/verl/workers/reward_manager/naive.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import defaultdict +from collections import defaultdict, deque import torch @@ -23,15 +23,81 @@ import datetime class NaiveRewardManager: - """The reward manager.""" - - def __init__(self, tokenizer, num_examine, compute_score=None, reward_fn_key="data_source") -> None: + """The reward manager with historical tool call statistics support.""" + + def __init__( + self, + tokenizer, + num_examine, + compute_score=None, + reward_fn_key="data_source", + history_window_size=10, + **kwargs + ) -> None: self.tokenizer = tokenizer self.num_examine = num_examine # the number of batches of decoded responses to print to the console self.compute_score = compute_score or _default_compute_score self.reward_fn_key = reward_fn_key - + self.history_window_size = history_window_size + + # 历史统计数据:使用 deque 实现滑动窗口 + # 每个元素是一个字典,包含: + # - avg_tool_calls: 平均工具调用次数 + # - tool_success_rate: 工具调用成功率(使用工具且答案正确的比例) + # - tool_usage_rate: 工具使用率(使用工具的样本比例) + self.tool_call_history = deque(maxlen=history_window_size) + self.step_cnt = 0 + self.current_batch_stats = None # 当前批次的统计数据 + + def _count_tool_calls(self, response_str: str) -> int: + """ + 统计响应中的工具调用次数 + 这里使用与 vl_agent.py 中相同的统计方式 + """ + # 统计视觉工具调用次数 + vision_tool_count = response_str.count("<|vision_start|><|image_pad|>") + + # 也可以统计其他工具调用,如 标签 + tool_call_count = response_str.count("") + + # 返回总工具调用次数 + return vision_tool_count + tool_call_count + + def _get_history_stats(self) -> dict: + """ + 获取历史统计数据 + 返回滑动窗口内的平均统计值 + """ + if not self.tool_call_history: + # 如果没有历史数据,返回默认值 + return { + "avg_tool_calls": 1.0, # 默认期望的工具调用次数 + "tool_success_rate": 0.5, # 默认成功率 + "tool_usage_rate": 0.8, # 默认使用率 + "history_size": 0 + } + + # 计算滑动窗口内的平均值 + total_avg_tool_calls = sum(stat["avg_tool_calls"] for stat in self.tool_call_history) + total_tool_success_rate = sum(stat["tool_success_rate"] for stat in self.tool_call_history) + total_tool_usage_rate = sum(stat["tool_usage_rate"] for stat in self.tool_call_history) + + history_size = len(self.tool_call_history) + + return { + "avg_tool_calls": total_avg_tool_calls / history_size, + "tool_success_rate": total_tool_success_rate / history_size, + "tool_usage_rate": total_tool_usage_rate / history_size, + "history_size": history_size + } + + def _update_history(self, batch_stats: dict): + """ + 更新历史统计数据 + """ + self.tool_call_history.append(batch_stats) + self.current_batch_stats = batch_stats def __call__(self, data: DataProto, return_dict=False): """We will expand this function gradually based on the available datasets""" @@ -51,6 +117,14 @@ def __call__(self, data: DataProto, return_dict=False): reward_tensor += data.batch['env_reward'] print(f' [DEBUG reward] mean={reward_tensor.mean().item()}, min={reward_tensor.min().item()}, max={reward_tensor.max().item()}') + # 获取历史统计数据 + history_stats = self._get_history_stats() + + # 当前批次的统计数据 + batch_tool_calls = [] + batch_tool_usage = [] + batch_tool_success = [] + already_print_data_sources = {} for i in range(len(data)): @@ -76,27 +150,45 @@ def __call__(self, data: DataProto, return_dict=False): data_source = data_item.non_tensor_batch[self.reward_fn_key] extra_info = data_item.non_tensor_batch.get("extra_info", None) - + + # 统计当前样本的工具调用次数 + tool_call_count = self._count_tool_calls(response_str) + batch_tool_calls.append(tool_call_count) + + # 标记是否使用了工具 + used_tool = 1 if tool_call_count > 0 else 0 + batch_tool_usage.append(used_tool) + + # 调用奖励函数时传入历史统计数据 score = self.compute_score( data_source=data_source, solution_str=response_str, ground_truth=ground_truth, extra_info=extra_info, + history_stats=history_stats, ) if isinstance(score, dict): reward = score["score"] + # 记录工具使用是否成功(如果有acc信息) + if "acc" in score: + # 如果使用了工具且答案正确,视为工具使用成功 + tool_success = 1 if (used_tool and score["acc"] > 0.5) else (0 if used_tool else -1) + if tool_success >= 0: + batch_tool_success.append(tool_success) + # Store the information including original reward for key, value in score.items(): reward_extra_info[key].append(value) else: reward = score + # 简单判断:如果奖励高且使用了工具,视为成功 + if used_tool: + tool_success = 1 if reward > 0.5 else 0 + batch_tool_success.append(tool_success) reward_tensor[i, valid_response_length - 1] += reward - # eos_idx = torch.nonzero(action_or_attn_mask[i, prompt_length: prompt_length + valid_response_length])[-1] - # reward_tensor[i, eos_idx] = score - if data_source not in already_print_data_sources: already_print_data_sources[data_source] = 0 @@ -105,6 +197,8 @@ def __call__(self, data: DataProto, return_dict=False): print("[prompt]", prompt_str) print("[response]", response_str) print("[ground_truth]", ground_truth) + print(f"[tool_calls]", tool_call_count) + print(f"[history_stats]", history_stats) if isinstance(score, dict): for key, value in score.items(): print(f"[{key}]", value) @@ -112,6 +206,33 @@ def __call__(self, data: DataProto, return_dict=False): print("[score]", score) self.step_cnt += 1 + + # 计算当前批次的统计数据 + if len(data) > 0: + avg_tool_calls = sum(batch_tool_calls) / len(data) + tool_usage_rate = sum(batch_tool_usage) / len(data) + tool_success_rate = sum(batch_tool_success) / len(batch_tool_success) if batch_tool_success else 0.0 + + current_batch_stats = { + "avg_tool_calls": avg_tool_calls, + "tool_usage_rate": tool_usage_rate, + "tool_success_rate": tool_success_rate, + "batch_size": len(data) + } + + # 更新历史统计 + self._update_history(current_batch_stats) + + # 将统计信息添加到奖励额外信息中 + reward_extra_info["batch_avg_tool_calls"] = [avg_tool_calls] + reward_extra_info["batch_tool_usage_rate"] = [tool_usage_rate] + reward_extra_info["batch_tool_success_rate"] = [tool_success_rate] + reward_extra_info["history_avg_tool_calls"] = [history_stats["avg_tool_calls"]] + reward_extra_info["history_tool_success_rate"] = [history_stats["tool_success_rate"]] + + print(f" [Batch Stats] avg_tool_calls={avg_tool_calls:.2f}, " + f"tool_usage_rate={tool_usage_rate:.2f}, " + f"tool_success_rate={tool_success_rate:.2f}") if return_dict: return {