From 9b2a8fb5059c2478e70cfec443fffede8fa6ae71 Mon Sep 17 00:00:00 2001 From: lordcheng10 Date: Wed, 22 Mar 2023 14:20:56 +0800 Subject: [PATCH 1/2] use longTermMsgRate and longTermThroughput replace msgRate and throughput --- .../broker/loadbalance/impl/UniformLoadShedder.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java index b92af5b7c69f3..ab5f18d235295 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java @@ -32,6 +32,7 @@ import org.apache.pulsar.policies.data.loadbalancer.BrokerData; import org.apache.pulsar.policies.data.loadbalancer.BundleData; import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; +import org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData; import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData; /** @@ -70,9 +71,11 @@ public Multimap findBundlesForUnloading(final LoadData loadData, MutableDouble minMsgRate = new MutableDouble(Integer.MAX_VALUE); MutableDouble minThroughputRate = new MutableDouble(Integer.MAX_VALUE); brokersData.forEach((broker, data) -> { - double msgRate = data.getLocalData().getMsgRateIn() + data.getLocalData().getMsgRateOut(); - double throughputRate = data.getLocalData().getMsgThroughputIn() - + data.getLocalData().getMsgThroughputOut(); + TimeAverageBrokerData timeAverageData = data.getTimeAverageData(); + double msgRate = timeAverageData.getLongTermMsgRateIn() + + timeAverageData.getLongTermMsgRateOut(); + double throughputRate = timeAverageData.getLongTermMsgThroughputIn() + + timeAverageData.getLongTermMsgThroughputOut(); if (msgRate > maxMsgRate.getValue() || throughputRate > maxThroughputRate.getValue()) { overloadedBroker.setValue(broker); maxMsgRate.setValue(msgRate); From 83217482540cf85b47918c2d6d7e1146c1886cd7 Mon Sep 17 00:00:00 2001 From: lordcheng10 Date: Wed, 22 Mar 2023 14:29:42 +0800 Subject: [PATCH 2/2] use shortTermMsgRate or shortTermThrought --- .../broker/loadbalance/impl/UniformLoadShedder.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java index ab5f18d235295..9dcba966b0d34 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java @@ -72,10 +72,10 @@ public Multimap findBundlesForUnloading(final LoadData loadData, MutableDouble minThroughputRate = new MutableDouble(Integer.MAX_VALUE); brokersData.forEach((broker, data) -> { TimeAverageBrokerData timeAverageData = data.getTimeAverageData(); - double msgRate = timeAverageData.getLongTermMsgRateIn() - + timeAverageData.getLongTermMsgRateOut(); - double throughputRate = timeAverageData.getLongTermMsgThroughputIn() - + timeAverageData.getLongTermMsgThroughputOut(); + double msgRate = timeAverageData.getShortTermMsgRateIn() + + timeAverageData.getShortTermMsgRateOut(); + double throughputRate = timeAverageData.getShortTermMsgThroughputIn() + + timeAverageData.getShortTermMsgThroughputOut(); if (msgRate > maxMsgRate.getValue() || throughputRate > maxThroughputRate.getValue()) { overloadedBroker.setValue(broker); maxMsgRate.setValue(msgRate);