From c37a2755fd815ed42d4898cfb1252e37f49b989a Mon Sep 17 00:00:00 2001 From: Zachary Pinto Date: Thu, 14 Mar 2024 12:26:22 -0700 Subject: [PATCH 01/15] Revert "upgrade xstream to 1.4.20 to pick up fixes for 2 CVEs (#2763)" This reverts commit 53b588982c281009085076b480a3ce58cccbe618. This dependency is banned at li. --- helix-admin-webapp/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helix-admin-webapp/pom.xml b/helix-admin-webapp/pom.xml index 4529b0807a..31502c7367 100644 --- a/helix-admin-webapp/pom.xml +++ b/helix-admin-webapp/pom.xml @@ -90,7 +90,7 @@ com.thoughtworks.xstream xstream - 1.4.20 + 1.4.19 com.fasterxml.jackson.core From 3c94e69622057ad110894089696eaebbb1a857d8 Mon Sep 17 00:00:00 2001 From: Himanshu Kandwal Date: Wed, 8 May 2024 19:21:44 -0700 Subject: [PATCH 02/15] [Linkedin/Helix] -- Provide JDK 1.8 (backward) compatibility for meta-client --- helix-core/pom.xml | 1 - meta-client/pom.xml | 48 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/helix-core/pom.xml b/helix-core/pom.xml index ba3d7b7e51..c8c5a504d2 100644 --- a/helix-core/pom.xml +++ b/helix-core/pom.xml @@ -219,7 +219,6 @@ package jar - test-jar ${project.build.outputDirectory}_jdk8 diff --git a/meta-client/pom.xml b/meta-client/pom.xml index 29092ef1e8..a4762eb371 100644 --- a/meta-client/pom.xml +++ b/meta-client/pom.xml @@ -89,6 +89,54 @@ under the License. + + org.apache.maven.plugins + maven-compiler-plugin + 3.12.1 + + + JDK 8 + compile + + compile + + + ${project.build.outputDirectory}_jdk8 + 8 + true + + + + JDK 11 + compile + + compile + + + 11 + true + + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.3.0 + + + default-package-jdk11 + package + + jar + + + ${project.build.outputDirectory}_jdk8 + jdk8 + + + + org.apache.maven.plugins maven-assembly-plugin From 72d305bd7947a72e740e1464d8e754c4468b80c9 Mon Sep 17 00:00:00 2001 From: Himanshu Kandwal Date: Thu, 9 May 2024 15:31:22 -0700 Subject: [PATCH 03/15] [Linkedin/Helix] -- Provide JDK 1.8 (backward) compatibility for helix modules --- helix-common/pom.xml | 48 +++++++++++++++++++++++++ metadata-store-directory-common/pom.xml | 48 +++++++++++++++++++++++++ metrics-common/pom.xml | 48 +++++++++++++++++++++++++ zookeeper-api/pom.xml | 48 +++++++++++++++++++++++++ 4 files changed, 192 insertions(+) diff --git a/helix-common/pom.xml b/helix-common/pom.xml index 40f64043b6..82e35c86dd 100644 --- a/helix-common/pom.xml +++ b/helix-common/pom.xml @@ -89,6 +89,54 @@ + + org.apache.maven.plugins + maven-compiler-plugin + 3.12.1 + + + JDK 8 + compile + + compile + + + ${project.build.outputDirectory}_jdk8 + 8 + true + + + + JDK 11 + compile + + compile + + + 11 + true + + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.3.0 + + + default-package-jdk11 + package + + jar + + + ${project.build.outputDirectory}_jdk8 + jdk8 + + + + org.apache.maven.plugins maven-assembly-plugin diff --git a/metadata-store-directory-common/pom.xml b/metadata-store-directory-common/pom.xml index f173397cc4..98e2e5cb03 100644 --- a/metadata-store-directory-common/pom.xml +++ b/metadata-store-directory-common/pom.xml @@ -113,6 +113,54 @@ + + org.apache.maven.plugins + maven-compiler-plugin + 3.12.1 + + + JDK 8 + compile + + compile + + + ${project.build.outputDirectory}_jdk8 + 8 + true + + + + JDK 11 + compile + + compile + + + 11 + true + + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.3.0 + + + default-package-jdk11 + package + + jar + + + ${project.build.outputDirectory}_jdk8 + jdk8 + + + + org.apache.maven.plugins maven-assembly-plugin diff --git a/metrics-common/pom.xml b/metrics-common/pom.xml index 2dbe016cb2..433c575aac 100644 --- a/metrics-common/pom.xml +++ b/metrics-common/pom.xml @@ -84,6 +84,54 @@ + + org.apache.maven.plugins + maven-compiler-plugin + 3.12.1 + + + JDK 8 + compile + + compile + + + ${project.build.outputDirectory}_jdk8 + 8 + true + + + + JDK 11 + compile + + compile + + + 11 + true + + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.3.0 + + + default-package-jdk11 + package + + jar + + + ${project.build.outputDirectory}_jdk8 + jdk8 + + + + org.apache.maven.plugins maven-assembly-plugin diff --git a/zookeeper-api/pom.xml b/zookeeper-api/pom.xml index d44160fdb7..bfb993feec 100644 --- a/zookeeper-api/pom.xml +++ b/zookeeper-api/pom.xml @@ -133,6 +133,54 @@ + + org.apache.maven.plugins + maven-compiler-plugin + 3.12.1 + + + JDK 8 + compile + + compile + + + ${project.build.outputDirectory}_jdk8 + 8 + true + + + + JDK 11 + compile + + compile + + + 11 + true + + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.3.0 + + + default-package-jdk11 + package + + jar + + + ${project.build.outputDirectory}_jdk8 + jdk8 + + + + org.apache.maven.plugins maven-assembly-plugin From 1344bbf336b62ec90953f1be26abfb959687bce8 Mon Sep 17 00:00:00 2001 From: Grant Palau Spencer Date: Fri, 21 Feb 2025 11:08:52 -0800 Subject: [PATCH 04/15] pin xstreams in helix-rest/pom.xml to 1.4.19 as 1.4.21 is banned at LI --- helix-rest/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helix-rest/pom.xml b/helix-rest/pom.xml index 24ddf0d621..e13a8c2c67 100644 --- a/helix-rest/pom.xml +++ b/helix-rest/pom.xml @@ -121,7 +121,7 @@ com.thoughtworks.xstream xstream - 1.4.21 + 1.4.19 com.fasterxml.jackson.core From 98b332a53709eca52b12facb9e5c275ec59958ac Mon Sep 17 00:00:00 2001 From: vivek8420 Date: Mon, 21 Jul 2025 17:51:51 +0530 Subject: [PATCH 05/15] Created the Sensor for DistClusterController failure --- .../org/apache/helix/PropertyPathBuilder.java | 1 - .../controller/GenericHelixController.java | 4 +- .../apache/helix/examples/ExampleProcess.java | 12 ++ .../apache/helix/examples/ZkDataExplorer.java | 0 .../DistClusterControllerStateModel.java | 131 ++++++++++++++++++ .../TestDistControllerStateModel.java | 19 +++ 6 files changed, 164 insertions(+), 3 deletions(-) create mode 100644 helix-core/src/main/java/org/apache/helix/examples/ZkDataExplorer.java diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java index f0002bdbde..0b14d6a475 100644 --- a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java +++ b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java @@ -59,7 +59,6 @@ public class PropertyPathBuilder { typeToClassMapping.put(PropertyType.IDEALSTATES, IdealState.class); typeToClassMapping.put(PropertyType.CONFIGS, InstanceConfig.class); typeToClassMapping.put(PropertyType.EXTERNALVIEW, ExternalView.class); - typeToClassMapping.put(PropertyType.CUSTOMIZEDVIEW, CustomizedView.class); typeToClassMapping.put(PropertyType.STATEMODELDEFS, StateModelDefinition.class); typeToClassMapping.put(PropertyType.MESSAGES, Message.class); typeToClassMapping.put(PropertyType.CURRENTSTATES, CurrentState.class); diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index e63981d29a..6ac39b8db7 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -1290,8 +1290,8 @@ private void pushToEventQueues(ClusterEventType eventType, NotificationContext c String uid = UUID.randomUUID().toString().substring(0, 8); ClusterEvent event = new ClusterEvent(_clusterName, eventType, String.format("%s_%s", uid, Pipeline.Type.DEFAULT.name())); - event.addAttribute(AttributeName.EVENT_SESSION.name(), - changeContext.getManager().getSessionIdIfLead()); +// event.addAttribute(AttributeName.EVENT_SESSION.name(), +// changeContext.getManager().getSessionIdIfLead()); event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager()); event.addAttribute(AttributeName.changeContext.name(), changeContext); event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool); diff --git a/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java b/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java index fd730131cd..832cd4beca 100644 --- a/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java +++ b/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java @@ -35,6 +35,8 @@ import org.apache.helix.InstanceType; import org.apache.helix.manager.zk.HelixManagerShutdownHook; import org.apache.helix.model.Message.MessageType; +import org.apache.helix.participant.DistClusterControllerStateModel; +import org.apache.helix.participant.DistClusterControllerStateModelFactory; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.participant.statemachine.StateModel; import org.apache.helix.participant.statemachine.StateModelFactory; @@ -84,6 +86,16 @@ public void start() throws Exception { } else if ("LeaderStandby".equalsIgnoreCase(stateModelType)) { stateModelFactory = new LeaderStandbyStateModelFactory(this.instanceName, delay); } + else if("DistCluster".equalsIgnoreCase(stateModelType)) { + StateModelFactory distClusterStateModelFactory = + new DistClusterControllerStateModelFactory(zkConnectString); + StateMachineEngine stateMach = manager.getStateMachineEngine(); + stateMach.registerStateModelFactory(stateModelType, distClusterStateModelFactory); + manager.connect(); + manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.name(), stateMach); + } else { + throw new IllegalArgumentException("Unknown state model type: " + stateModelType); + } // genericStateMachineHandler = new StateMachineEngine(); // genericStateMachineHandler.registerStateModelFactory(stateModelType, // stateModelFactory); diff --git a/helix-core/src/main/java/org/apache/helix/examples/ZkDataExplorer.java b/helix-core/src/main/java/org/apache/helix/examples/ZkDataExplorer.java new file mode 100644 index 0000000000..e69de29bb2 diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java index 67918c9fbb..01249e254f 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java @@ -21,6 +21,8 @@ import java.util.Optional; import java.util.Set; +import java.util.ArrayList; +import java.util.List; import com.google.common.collect.Sets; import org.apache.helix.HelixManager; @@ -29,10 +31,15 @@ import org.apache.helix.NotificationContext; import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.model.Message; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; import org.apache.helix.participant.statemachine.StateModelInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.management.JMException; + @StateModelInfo(initialState = "OFFLINE", states = {"LEADER", "STANDBY"}) public class DistClusterControllerStateModel extends AbstractHelixLeaderStandbyStateModel { @@ -43,6 +50,9 @@ public class DistClusterControllerStateModel extends AbstractHelixLeaderStandbyS // dedicated lock object to avoid cross-instance contention from Optional.empty() singleton private final Object _controllerLock = new Object(); + // Metrics monitor for leadership failures + private volatile DistControllerMetricsMonitor _metricsMonitor; + public DistClusterControllerStateModel(String zkAddr) { this(zkAddr, Sets.newHashSet(Pipeline.Type.DEFAULT, Pipeline.Type.TASK)); } @@ -73,6 +83,12 @@ public void onBecomeLeaderFromStandby(Message message, NotificationContext conte newController.connect(); newController.startTimerTasks(); _controllerOpt = Optional.of(newController); + if (!newController.isLeader()) { + logger.warn("Controller Leader session is not the same as the current session for " + + clusterName + ". This should not happen. Controller: " + controllerName); + // Publish metrics through Senseo when not a leader + //publishLeadershipFailureMetric(clusterName, controllerName); + } logStateTransition("STANDBY", "LEADER", clusterName, controllerName); } else { logger.error("controller already exists:" + _controllerOpt.get().getInstanceName() + " for " @@ -120,8 +136,123 @@ public void reset() { logger.info("Disconnecting controller: " + _controllerOpt.get().getInstanceName() + " for " + _controllerOpt.get().getClusterName()); _controllerOpt.get().disconnect(); + if(_controllerOpt.get().isLeader()) { + logger.warn("Controller is still leader after disconnecting: " + _controllerOpt.get().getInstanceName() + + " for " + _controllerOpt.get().getClusterName()); + + // Publish metrics when controller is still leader during reset +// publishStillLeaderDuringResetMetric(_controllerOpt.get().getClusterName(), +// _controllerOpt.get().getInstanceName()); + } _controllerOpt = Optional.empty(); } + + // Clean up metrics monitor + if (_metricsMonitor != null) { + try { + _metricsMonitor.unregister(); + logger.info("Unregistered DistController metrics monitor"); + } catch (Exception e) { + logger.warn("Failed to unregister DistController metrics monitor", e); + } + _metricsMonitor = null; + } + } + } + + /** + * Publishes metrics when controller fails to acquire leadership + * @param clusterName the cluster name + * @param controllerName the controller instance name + */ + private void publishLeadershipFailureMetric(String clusterName, String controllerName) { + try { + // Initialize metrics monitor if not already done + if (_metricsMonitor == null) { + _metricsMonitor = new DistControllerMetricsMonitor(clusterName, controllerName); + _metricsMonitor.register(); + } + + // Increment the leadership failure counter + if (_metricsMonitor != null) { + _metricsMonitor.incrementLeadershipFailureCount(); + } + } catch (Exception e) { + logger.error("Failed to publish leadership failure metric for controller {} in cluster {}", + controllerName, clusterName, e); + } + } + + /** + * Publishes metrics when controller is still leader during reset + * @param clusterName the cluster name + * @param controllerName the controller instance name + */ + private void publishStillLeaderDuringResetMetric(String clusterName, String controllerName) { + try { + // Initialize metrics monitor if not already done + if (_metricsMonitor == null) { + _metricsMonitor = new DistControllerMetricsMonitor(clusterName, controllerName); + _metricsMonitor.register(); + } + + // Increment the still leader during reset counter + if (_metricsMonitor != null) { + _metricsMonitor.incrementStillLeaderDuringResetCount(); + } + } catch (Exception e) { + logger.error("Failed to publish still-leader-during-reset metric for controller {} in cluster {}", + controllerName, clusterName, e); + } + } + + /** + * Metrics monitor for DistClusterController state model + * This class provides Senseo integration through DynamicMBeanProvider + */ + private static class DistControllerMetricsMonitor extends DynamicMBeanProvider { + private static final String MBEAN_DESCRIPTION = "DistClusterController Metrics Monitor"; + private final String _clusterName; + private final String _controllerName; + private final String _sensorName; + + private final SimpleDynamicMetric _leadershipFailureCounter; + private final SimpleDynamicMetric _stillLeaderDuringResetCounter; + + public DistControllerMetricsMonitor(String clusterName, String controllerName) { + _clusterName = clusterName; + _controllerName = controllerName; + // Create sensor name following Helix conventions for Senseo integration + _sensorName = String.format("ClusterController.%s.%s", clusterName, controllerName); + + // Initialize metrics + _leadershipFailureCounter = new SimpleDynamicMetric<>("LeadershipFailureCounter", 0L); + _stillLeaderDuringResetCounter = new SimpleDynamicMetric<>("StillLeaderDuringResetCounter", 0L); + } + + @Override + public String getSensorName() { + return _sensorName; + } + + public void incrementLeadershipFailureCount() { + _leadershipFailureCounter.updateValue(_leadershipFailureCounter.getValue() + 1); + } + + public void incrementStillLeaderDuringResetCount() { + _stillLeaderDuringResetCounter.updateValue(_stillLeaderDuringResetCounter.getValue() + 1); + } + + @Override + public DynamicMBeanProvider register() throws JMException { + List> attributeList = new ArrayList<>(); + attributeList.add(_leadershipFailureCounter); + attributeList.add(_stillLeaderDuringResetCounter); + + // Register the metrics with JMX for Senseo to consume + doRegister(attributeList, MBEAN_DESCRIPTION, + String.format("DistClusterController.%s.%s", _clusterName, _controllerName)); + return this; } } } \ No newline at end of file diff --git a/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java b/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java index 7e8974671c..d70f2e2796 100644 --- a/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java +++ b/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java @@ -62,6 +62,25 @@ public void afterMethod() throws Exception { } } + @Test + public void testOnBecomeLeaderFromStandby_whenMultipleInstancesTrigger() throws Exception { + Message message = new Message(MessageType.STATE_TRANSITION, "0"); + message.setPartitionName(clusterName); + message.setTgtName("controller_0"); + stateModel.onBecomeLeaderFromStandby(message, new NotificationContext(null)); + + message = new Message(MessageType.STATE_TRANSITION, "1"); + message.setPartitionName(clusterName); + message.setTgtName("controller_1"); + DistClusterControllerStateModel stateModel2 = new DistClusterControllerStateModel(ZK_ADDR); + stateModel2.onBecomeLeaderFromStandby(message, new NotificationContext(null)); + + // controller_0 is the leader of clusterName. + Assert.assertTrue(stateModel._controllerOpt.get().isLeader()); + // controller_1 was not able to become leader because controller_0 was already the leader. + Assert.assertFalse(stateModel2._controllerOpt.get().isLeader()); + } + @Test() public void testOnBecomeStandbyFromOffline() { stateModel.onBecomeStandbyFromOffline(new Message(new ZNRecord("test")), null); From aad3152c5e89f31a1e5f00e102de17cfb6ffe6ac Mon Sep 17 00:00:00 2001 From: vivek8420 Date: Tue, 22 Jul 2025 15:10:19 +0530 Subject: [PATCH 06/15] wip --- .../DistClusterControllerStateModel.java | 9 ++--- .../TestDistControllerStateModel.java | 38 +++++++++---------- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java index 01249e254f..e81492988f 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java @@ -86,8 +86,8 @@ public void onBecomeLeaderFromStandby(Message message, NotificationContext conte if (!newController.isLeader()) { logger.warn("Controller Leader session is not the same as the current session for " + clusterName + ". This should not happen. Controller: " + controllerName); - // Publish metrics through Senseo when not a leader - //publishLeadershipFailureMetric(clusterName, controllerName); + // Publish metrics through Sensor when not a leader + publishLeadershipFailureMetric(clusterName, controllerName); } logStateTransition("STANDBY", "LEADER", clusterName, controllerName); } else { @@ -141,8 +141,8 @@ public void reset() { + " for " + _controllerOpt.get().getClusterName()); // Publish metrics when controller is still leader during reset -// publishStillLeaderDuringResetMetric(_controllerOpt.get().getClusterName(), -// _controllerOpt.get().getInstanceName()); + publishStillLeaderDuringResetMetric(_controllerOpt.get().getClusterName(), + _controllerOpt.get().getInstanceName()); } _controllerOpt = Optional.empty(); } @@ -151,7 +151,6 @@ public void reset() { if (_metricsMonitor != null) { try { _metricsMonitor.unregister(); - logger.info("Unregistered DistController metrics monitor"); } catch (Exception e) { logger.warn("Failed to unregister DistController metrics monitor", e); } diff --git a/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java b/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java index d70f2e2796..d0d3beda4a 100644 --- a/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java +++ b/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java @@ -62,25 +62,6 @@ public void afterMethod() throws Exception { } } - @Test - public void testOnBecomeLeaderFromStandby_whenMultipleInstancesTrigger() throws Exception { - Message message = new Message(MessageType.STATE_TRANSITION, "0"); - message.setPartitionName(clusterName); - message.setTgtName("controller_0"); - stateModel.onBecomeLeaderFromStandby(message, new NotificationContext(null)); - - message = new Message(MessageType.STATE_TRANSITION, "1"); - message.setPartitionName(clusterName); - message.setTgtName("controller_1"); - DistClusterControllerStateModel stateModel2 = new DistClusterControllerStateModel(ZK_ADDR); - stateModel2.onBecomeLeaderFromStandby(message, new NotificationContext(null)); - - // controller_0 is the leader of clusterName. - Assert.assertTrue(stateModel._controllerOpt.get().isLeader()); - // controller_1 was not able to become leader because controller_0 was already the leader. - Assert.assertFalse(stateModel2._controllerOpt.get().isLeader()); - } - @Test() public void testOnBecomeStandbyFromOffline() { stateModel.onBecomeStandbyFromOffline(new Message(new ZNRecord("test")), null); @@ -284,4 +265,23 @@ public void testExplicitLockIndependence() throws Exception { Assert.assertTrue(instance1Interrupted.get(), "Instance1 should have been interrupted while holding its lock"); } + + @Test() + public void testOnBecomeLeaderFromStandby_whenMultipleInstancesTrigger() throws Exception { + Message message = new Message(MessageType.STATE_TRANSITION, "0"); + message.setPartitionName(clusterName); + message.setTgtName("controller_0"); + stateModel.onBecomeLeaderFromStandby(message, new NotificationContext(null)); + + message = new Message(MessageType.STATE_TRANSITION, "1"); + message.setPartitionName(clusterName); + message.setTgtName("controller_1"); + DistClusterControllerStateModel stateModel2 = new DistClusterControllerStateModel(ZK_ADDR); + stateModel2.onBecomeLeaderFromStandby(message, new NotificationContext(null)); + + // controller_0 is the leader of clusterName. + Assert.assertTrue(stateModel._controllerOpt.get().isLeader()); + // controller_1 was not able to become leader because controller_0 was already the leader. + Assert.assertFalse(stateModel2._controllerOpt.get().isLeader()); + } } \ No newline at end of file From 0c74eb311992eb8beb275f18e025c3df4e243184 Mon Sep 17 00:00:00 2001 From: vivek8420 Date: Wed, 23 Jul 2025 10:34:31 +0530 Subject: [PATCH 07/15] Moved the DistCluster state model sensor to clusterstatusmonitor --- .../mbeans/ClusterStatusMonitor.java | 28 ++++ .../mbeans/ClusterStatusMonitorMBean.java | 10 ++ .../DistClusterControllerStateModel.java | 142 ++++-------------- 3 files changed, 65 insertions(+), 115 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java index c8e0d6ef32..6cd4f38a24 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java @@ -87,6 +87,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { private AtomicLong _rebalanceFailureCount = new AtomicLong(0L); private AtomicLong _continuousResourceRebalanceFailureCount = new AtomicLong(0L); private AtomicLong _continuousTaskRebalanceFailureCount = new AtomicLong(0L); + private AtomicLong _leadershipFailureCount = new AtomicLong(0L); + private AtomicLong _stillLeaderDuringResetCount = new AtomicLong(0L); private final ConcurrentHashMap _resourceMonitorMap = new ConcurrentHashMap<>(); @@ -660,6 +662,8 @@ public void reset() { _rebalanceFailureCount.set(0L); _continuousResourceRebalanceFailureCount.set(0L); _continuousTaskRebalanceFailureCount.set(0L); + _leadershipFailureCount.set(0L); + _stillLeaderDuringResetCount.set(0L); } catch (Exception e) { LOG.error("Fail to reset ClusterStatusMonitor, cluster: " + _clusterName, e); } @@ -1137,4 +1141,28 @@ public long getNumOfResourcesRebalanceThrottledGauge() { } return total; } + + @Override + public long getLeadershipFailureCounter() { + return _leadershipFailureCount.get(); + } + + @Override + public long getStillLeaderDuringResetCounter() { + return _stillLeaderDuringResetCount.get(); + } + + /** + * Report a leadership failure for distributed controller + */ + public void reportLeadershipFailure() { + _leadershipFailureCount.incrementAndGet(); + } + + /** + * Report when controller is still leader during reset + */ + public void reportStillLeaderDuringReset() { + _stillLeaderDuringResetCount.incrementAndGet(); + } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java index 433818e457..5d76cc1c13 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java @@ -147,4 +147,14 @@ public interface ClusterStatusMonitorMBean extends SensorNameProvider { * state partition is larger than configured threshold (default is 1). */ long getNumOfResourcesRebalanceThrottledGauge(); + + /** + * @return The number of leadership failures for distributed controllers + */ + long getLeadershipFailureCounter(); + + /** + * @return The number of times controller was still leader during reset + */ + long getStillLeaderDuringResetCounter(); } diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java index e81492988f..cf9149c8a0 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java @@ -21,8 +21,6 @@ import java.util.Optional; import java.util.Set; -import java.util.ArrayList; -import java.util.List; import com.google.common.collect.Sets; import org.apache.helix.HelixManager; @@ -31,28 +29,26 @@ import org.apache.helix.NotificationContext; import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.model.Message; -import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric; -import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider; -import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; +import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.apache.helix.participant.statemachine.StateModelInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.management.JMException; - @StateModelInfo(initialState = "OFFLINE", states = {"LEADER", "STANDBY"}) public class DistClusterControllerStateModel extends AbstractHelixLeaderStandbyStateModel { private static Logger logger = LoggerFactory.getLogger(DistClusterControllerStateModel.class); + + // Constants for metric types + private static final String METRIC_LEADERSHIP_FAILURE = "leadership_failure"; + private static final String METRIC_STILL_LEADER_DURING_RESET = "still_leader_during_reset"; + protected volatile Optional _controllerOpt = Optional.empty(); private final Set _enabledPipelineTypes; // dedicated lock object to avoid cross-instance contention from Optional.empty() singleton private final Object _controllerLock = new Object(); - // Metrics monitor for leadership failures - private volatile DistControllerMetricsMonitor _metricsMonitor; - public DistClusterControllerStateModel(String zkAddr) { this(zkAddr, Sets.newHashSet(Pipeline.Type.DEFAULT, Pipeline.Type.TASK)); } @@ -84,10 +80,10 @@ public void onBecomeLeaderFromStandby(Message message, NotificationContext conte newController.startTimerTasks(); _controllerOpt = Optional.of(newController); if (!newController.isLeader()) { - logger.warn("Controller Leader session is not the same as the current session for " - + clusterName + ". This should not happen. Controller: " + controllerName); - // Publish metrics through Sensor when not a leader - publishLeadershipFailureMetric(clusterName, controllerName); + logger.warn("Controller Leader session is not the same as the current session for {}. " + + "This should not happen. Controller: {}", clusterName ,controllerName); + // Publish metrics through ClusterStatusMonitor when not a leader + publishControllerMetric(clusterName, METRIC_LEADERSHIP_FAILURE); } logStateTransition("STANDBY", "LEADER", clusterName, controllerName); } else { @@ -137,121 +133,37 @@ public void reset() { + _controllerOpt.get().getClusterName()); _controllerOpt.get().disconnect(); if(_controllerOpt.get().isLeader()) { - logger.warn("Controller is still leader after disconnecting: " + _controllerOpt.get().getInstanceName() - + " for " + _controllerOpt.get().getClusterName()); + logger.warn("Controller is still leader after disconnecting: {} for {}", + _controllerOpt.get().getInstanceName(), _controllerOpt.get().getClusterName()); // Publish metrics when controller is still leader during reset - publishStillLeaderDuringResetMetric(_controllerOpt.get().getClusterName(), - _controllerOpt.get().getInstanceName()); + publishControllerMetric(_controllerOpt.get().getClusterName(), METRIC_STILL_LEADER_DURING_RESET); } _controllerOpt = Optional.empty(); } - - // Clean up metrics monitor - if (_metricsMonitor != null) { - try { - _metricsMonitor.unregister(); - } catch (Exception e) { - logger.warn("Failed to unregister DistController metrics monitor", e); - } - _metricsMonitor = null; - } } } /** - * Publishes metrics when controller fails to acquire leadership + * Publishes controller metrics through ClusterStatusMonitor * @param clusterName the cluster name - * @param controllerName the controller instance name + * @param metricType the type of metric to publish */ - private void publishLeadershipFailureMetric(String clusterName, String controllerName) { + private void publishControllerMetric(String clusterName, String metricType) { try { - // Initialize metrics monitor if not already done - if (_metricsMonitor == null) { - _metricsMonitor = new DistControllerMetricsMonitor(clusterName, controllerName); - _metricsMonitor.register(); - } - - // Increment the leadership failure counter - if (_metricsMonitor != null) { - _metricsMonitor.incrementLeadershipFailureCount(); + ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName); + switch (metricType) { + case METRIC_LEADERSHIP_FAILURE: + monitor.reportLeadershipFailure(); + break; + case METRIC_STILL_LEADER_DURING_RESET: + monitor.reportStillLeaderDuringReset(); + break; + default: + logger.warn("Unknown metric type: {} for cluster {}", metricType, clusterName); } } catch (Exception e) { - logger.error("Failed to publish leadership failure metric for controller {} in cluster {}", - controllerName, clusterName, e); - } - } - - /** - * Publishes metrics when controller is still leader during reset - * @param clusterName the cluster name - * @param controllerName the controller instance name - */ - private void publishStillLeaderDuringResetMetric(String clusterName, String controllerName) { - try { - // Initialize metrics monitor if not already done - if (_metricsMonitor == null) { - _metricsMonitor = new DistControllerMetricsMonitor(clusterName, controllerName); - _metricsMonitor.register(); - } - - // Increment the still leader during reset counter - if (_metricsMonitor != null) { - _metricsMonitor.incrementStillLeaderDuringResetCount(); - } - } catch (Exception e) { - logger.error("Failed to publish still-leader-during-reset metric for controller {} in cluster {}", - controllerName, clusterName, e); - } - } - - /** - * Metrics monitor for DistClusterController state model - * This class provides Senseo integration through DynamicMBeanProvider - */ - private static class DistControllerMetricsMonitor extends DynamicMBeanProvider { - private static final String MBEAN_DESCRIPTION = "DistClusterController Metrics Monitor"; - private final String _clusterName; - private final String _controllerName; - private final String _sensorName; - - private final SimpleDynamicMetric _leadershipFailureCounter; - private final SimpleDynamicMetric _stillLeaderDuringResetCounter; - - public DistControllerMetricsMonitor(String clusterName, String controllerName) { - _clusterName = clusterName; - _controllerName = controllerName; - // Create sensor name following Helix conventions for Senseo integration - _sensorName = String.format("ClusterController.%s.%s", clusterName, controllerName); - - // Initialize metrics - _leadershipFailureCounter = new SimpleDynamicMetric<>("LeadershipFailureCounter", 0L); - _stillLeaderDuringResetCounter = new SimpleDynamicMetric<>("StillLeaderDuringResetCounter", 0L); - } - - @Override - public String getSensorName() { - return _sensorName; - } - - public void incrementLeadershipFailureCount() { - _leadershipFailureCounter.updateValue(_leadershipFailureCounter.getValue() + 1); - } - - public void incrementStillLeaderDuringResetCount() { - _stillLeaderDuringResetCounter.updateValue(_stillLeaderDuringResetCounter.getValue() + 1); - } - - @Override - public DynamicMBeanProvider register() throws JMException { - List> attributeList = new ArrayList<>(); - attributeList.add(_leadershipFailureCounter); - attributeList.add(_stillLeaderDuringResetCounter); - - // Register the metrics with JMX for Senseo to consume - doRegister(attributeList, MBEAN_DESCRIPTION, - String.format("DistClusterController.%s.%s", _clusterName, _controllerName)); - return this; + logger.error("Failed to publish {} metric for cluster {}", metricType, clusterName, e); } } } \ No newline at end of file From 08f60f675282a6d3032bf37965b137d5d537fe09 Mon Sep 17 00:00:00 2001 From: vivek8420 Date: Wed, 23 Jul 2025 10:38:34 +0530 Subject: [PATCH 08/15] removed the unused changes --- helix-admin-webapp/pom.xml | 2 +- .../java/org/apache/helix/PropertyPathBuilder.java | 1 + .../org/apache/helix/examples/ExampleProcess.java | 12 ------------ .../org/apache/helix/examples/ZkDataExplorer.java | 0 helix-rest/pom.xml | 2 +- 5 files changed, 3 insertions(+), 14 deletions(-) delete mode 100644 helix-core/src/main/java/org/apache/helix/examples/ZkDataExplorer.java diff --git a/helix-admin-webapp/pom.xml b/helix-admin-webapp/pom.xml index 90ac734ab0..9f7da0395d 100644 --- a/helix-admin-webapp/pom.xml +++ b/helix-admin-webapp/pom.xml @@ -90,7 +90,7 @@ com.thoughtworks.xstream xstream - 1.4.19 + 1.4.21 com.fasterxml.jackson.core diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java index 0b14d6a475..f0002bdbde 100644 --- a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java +++ b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java @@ -59,6 +59,7 @@ public class PropertyPathBuilder { typeToClassMapping.put(PropertyType.IDEALSTATES, IdealState.class); typeToClassMapping.put(PropertyType.CONFIGS, InstanceConfig.class); typeToClassMapping.put(PropertyType.EXTERNALVIEW, ExternalView.class); + typeToClassMapping.put(PropertyType.CUSTOMIZEDVIEW, CustomizedView.class); typeToClassMapping.put(PropertyType.STATEMODELDEFS, StateModelDefinition.class); typeToClassMapping.put(PropertyType.MESSAGES, Message.class); typeToClassMapping.put(PropertyType.CURRENTSTATES, CurrentState.class); diff --git a/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java b/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java index 832cd4beca..fd730131cd 100644 --- a/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java +++ b/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java @@ -35,8 +35,6 @@ import org.apache.helix.InstanceType; import org.apache.helix.manager.zk.HelixManagerShutdownHook; import org.apache.helix.model.Message.MessageType; -import org.apache.helix.participant.DistClusterControllerStateModel; -import org.apache.helix.participant.DistClusterControllerStateModelFactory; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.participant.statemachine.StateModel; import org.apache.helix.participant.statemachine.StateModelFactory; @@ -86,16 +84,6 @@ public void start() throws Exception { } else if ("LeaderStandby".equalsIgnoreCase(stateModelType)) { stateModelFactory = new LeaderStandbyStateModelFactory(this.instanceName, delay); } - else if("DistCluster".equalsIgnoreCase(stateModelType)) { - StateModelFactory distClusterStateModelFactory = - new DistClusterControllerStateModelFactory(zkConnectString); - StateMachineEngine stateMach = manager.getStateMachineEngine(); - stateMach.registerStateModelFactory(stateModelType, distClusterStateModelFactory); - manager.connect(); - manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.name(), stateMach); - } else { - throw new IllegalArgumentException("Unknown state model type: " + stateModelType); - } // genericStateMachineHandler = new StateMachineEngine(); // genericStateMachineHandler.registerStateModelFactory(stateModelType, // stateModelFactory); diff --git a/helix-core/src/main/java/org/apache/helix/examples/ZkDataExplorer.java b/helix-core/src/main/java/org/apache/helix/examples/ZkDataExplorer.java deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/helix-rest/pom.xml b/helix-rest/pom.xml index b242260f75..b2ac2bad6c 100644 --- a/helix-rest/pom.xml +++ b/helix-rest/pom.xml @@ -121,7 +121,7 @@ com.thoughtworks.xstream xstream - 1.4.19 + 1.4.21 com.fasterxml.jackson.core From 49c27496a2f33b6485c889cfe1b99694b8d64a68 Mon Sep 17 00:00:00 2001 From: vivek8420 Date: Wed, 23 Jul 2025 10:39:09 +0530 Subject: [PATCH 09/15] removed the unused changes --- .../org/apache/helix/controller/GenericHelixController.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 6ac39b8db7..e63981d29a 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -1290,8 +1290,8 @@ private void pushToEventQueues(ClusterEventType eventType, NotificationContext c String uid = UUID.randomUUID().toString().substring(0, 8); ClusterEvent event = new ClusterEvent(_clusterName, eventType, String.format("%s_%s", uid, Pipeline.Type.DEFAULT.name())); -// event.addAttribute(AttributeName.EVENT_SESSION.name(), -// changeContext.getManager().getSessionIdIfLead()); + event.addAttribute(AttributeName.EVENT_SESSION.name(), + changeContext.getManager().getSessionIdIfLead()); event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager()); event.addAttribute(AttributeName.changeContext.name(), changeContext); event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool); From 4bb0cddca954d774105e11e0c948ba4bfc6aa982 Mon Sep 17 00:00:00 2001 From: vivek8420 Date: Wed, 23 Jul 2025 11:00:06 +0530 Subject: [PATCH 10/15] Added the test for new sensor --- .vscode/settings.json | 3 + .../DistClusterControllerStateModel.java | 1 + .../mbeans/TestClusterStatusMonitor.java | 71 +++++++++++++++++++ 3 files changed, 75 insertions(+) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000000..c5f3f6b9c7 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "java.configuration.updateBuildConfiguration": "interactive" +} \ No newline at end of file diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java index cf9149c8a0..a6bd6d36c9 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java @@ -82,6 +82,7 @@ public void onBecomeLeaderFromStandby(Message message, NotificationContext conte if (!newController.isLeader()) { logger.warn("Controller Leader session is not the same as the current session for {}. " + "This should not happen. Controller: {}", clusterName ,controllerName); + // Publish metrics through ClusterStatusMonitor when not a leader publishControllerMetric(clusterName, METRIC_LEADERSHIP_FAILURE); } diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java index b0837869a9..575d203ecc 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java @@ -610,4 +610,75 @@ private void verifyMessageMetrics(ClusterStatusMonitor monitor, Map Date: Wed, 23 Jul 2025 11:38:01 +0530 Subject: [PATCH 11/15] added the unit tests --- .../DistClusterControllerStateModel.java | 23 +++++++++++++++---- .../TestDistControllerStateModel.java | 14 +++++++++++ 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java index a6bd6d36c9..025069a074 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java @@ -45,6 +45,7 @@ public class DistClusterControllerStateModel extends AbstractHelixLeaderStandbyS protected volatile Optional _controllerOpt = Optional.empty(); private final Set _enabledPipelineTypes; + private ClusterStatusMonitor _clusterStatusMonitor; // dedicated lock object to avoid cross-instance contention from Optional.empty() singleton private final Object _controllerLock = new Object(); @@ -130,6 +131,7 @@ public String getStateModeInstanceDescription(String partitionName, String insta public void reset() { synchronized (_controllerLock) { if (_controllerOpt.isPresent()) { + String clusterName = _controllerOpt.get().getClusterName(); logger.info("Disconnecting controller: " + _controllerOpt.get().getInstanceName() + " for " + _controllerOpt.get().getClusterName()); _controllerOpt.get().disconnect(); @@ -138,27 +140,38 @@ public void reset() { _controllerOpt.get().getInstanceName(), _controllerOpt.get().getClusterName()); // Publish metrics when controller is still leader during reset - publishControllerMetric(_controllerOpt.get().getClusterName(), METRIC_STILL_LEADER_DURING_RESET); + publishControllerMetric(clusterName, METRIC_STILL_LEADER_DURING_RESET); } _controllerOpt = Optional.empty(); } + + // Clean up cluster status monitor + if (_clusterStatusMonitor != null) { + _clusterStatusMonitor.reset(); + _clusterStatusMonitor = null; + } } } /** - * Publishes controller metrics through ClusterStatusMonitor + * Publishes controller metrics through the managed ClusterStatusMonitor * @param clusterName the cluster name * @param metricType the type of metric to publish */ private void publishControllerMetric(String clusterName, String metricType) { try { - ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName); + // Ensure we have a monitor for this cluster + if (_clusterStatusMonitor == null) { + _clusterStatusMonitor = new ClusterStatusMonitor(clusterName); + _clusterStatusMonitor.active(); + } + switch (metricType) { case METRIC_LEADERSHIP_FAILURE: - monitor.reportLeadershipFailure(); + _clusterStatusMonitor.reportLeadershipFailure(); break; case METRIC_STILL_LEADER_DURING_RESET: - monitor.reportStillLeaderDuringReset(); + _clusterStatusMonitor.reportStillLeaderDuringReset(); break; default: logger.warn("Unknown metric type: {} for cluster {}", metricType, clusterName); diff --git a/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java b/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java index d0d3beda4a..63fa55fdde 100644 --- a/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java +++ b/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java @@ -21,6 +21,7 @@ import org.apache.helix.NotificationContext; import org.apache.helix.TestHelper; +import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.ZkUnitTestBase; import org.apache.helix.model.Message; @@ -268,20 +269,33 @@ public void testExplicitLockIndependence() throws Exception { @Test() public void testOnBecomeLeaderFromStandby_whenMultipleInstancesTrigger() throws Exception { + // First controller becomes leader Message message = new Message(MessageType.STATE_TRANSITION, "0"); message.setPartitionName(clusterName); message.setTgtName("controller_0"); stateModel.onBecomeLeaderFromStandby(message, new NotificationContext(null)); + // Second controller attempts to become leader message = new Message(MessageType.STATE_TRANSITION, "1"); message.setPartitionName(clusterName); message.setTgtName("controller_1"); DistClusterControllerStateModel stateModel2 = new DistClusterControllerStateModel(ZK_ADDR); stateModel2.onBecomeLeaderFromStandby(message, new NotificationContext(null)); + // Verify leadership states // controller_0 is the leader of clusterName. Assert.assertTrue(stateModel._controllerOpt.get().isLeader()); // controller_1 was not able to become leader because controller_0 was already the leader. Assert.assertFalse(stateModel2._controllerOpt.get().isLeader()); + + // Verify that leadership failure metric was reported by accessing the internal monitor + // Use reflection to get the internal ClusterStatusMonitor from stateModel2 + Field monitorField = DistClusterControllerStateModel.class.getDeclaredField("_clusterStatusMonitor"); + monitorField.setAccessible(true); + ClusterStatusMonitor internalMonitor = (ClusterStatusMonitor) monitorField.get(stateModel2); + + // The monitor should have been created and the metric should be incremented + Assert.assertEquals(internalMonitor.getLeadershipFailureCounter(), 1, + "Leadership failure metric should be incremented when second controller fails to become leader"); } } \ No newline at end of file From 4fbe103046fd71f3a7a6b33f909deba351a4a3b4 Mon Sep 17 00:00:00 2001 From: vivek8420 Date: Wed, 23 Jul 2025 11:39:09 +0530 Subject: [PATCH 12/15] removed the .vscode files changes --- .vscode/settings.json | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index c5f3f6b9c7..0000000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "java.configuration.updateBuildConfiguration": "interactive" -} \ No newline at end of file From 93942471a0b9ae7100f5b1c619d475fbe204ed17 Mon Sep 17 00:00:00 2001 From: vivek8420 Date: Wed, 23 Jul 2025 12:33:19 +0530 Subject: [PATCH 13/15] Addressed the comments --- .../DistClusterControllerStateModel.java | 43 +++++-------------- 1 file changed, 11 insertions(+), 32 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java index 025069a074..a0a00f1936 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java @@ -39,10 +39,6 @@ public class DistClusterControllerStateModel extends AbstractHelixLeaderStandbyStateModel { private static Logger logger = LoggerFactory.getLogger(DistClusterControllerStateModel.class); - // Constants for metric types - private static final String METRIC_LEADERSHIP_FAILURE = "leadership_failure"; - private static final String METRIC_STILL_LEADER_DURING_RESET = "still_leader_during_reset"; - protected volatile Optional _controllerOpt = Optional.empty(); private final Set _enabledPipelineTypes; private ClusterStatusMonitor _clusterStatusMonitor; @@ -85,9 +81,10 @@ public void onBecomeLeaderFromStandby(Message message, NotificationContext conte + "This should not happen. Controller: {}", clusterName ,controllerName); // Publish metrics through ClusterStatusMonitor when not a leader - publishControllerMetric(clusterName, METRIC_LEADERSHIP_FAILURE); + getClusterStatusMonitor(clusterName).reportLeadershipFailure(); + } else { + logStateTransition("STANDBY", "LEADER", clusterName, controllerName); } - logStateTransition("STANDBY", "LEADER", clusterName, controllerName); } else { logger.error("controller already exists:" + _controllerOpt.get().getInstanceName() + " for " + clusterName); @@ -104,6 +101,7 @@ public void onBecomeStandbyFromLeader(Message message, NotificationContext conte if (_controllerOpt.isPresent()) { reset(); + //TODO: log this message only if reset is successful logStateTransition("LEADER", "STANDBY", clusterName, controllerName); } else { logger.error("No controller exists for " + clusterName); @@ -118,6 +116,7 @@ public void onBecomeOfflineFromStandby(Message message, NotificationContext cont @Override public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { reset(); + //TODO: log this message only if reset is successful logStateTransition("OFFLINE", "DROPPED", message == null ? "" : message.getPartitionName(), message == null ? "" : message.getTgtName()); } @@ -140,7 +139,7 @@ public void reset() { _controllerOpt.get().getInstanceName(), _controllerOpt.get().getClusterName()); // Publish metrics when controller is still leader during reset - publishControllerMetric(clusterName, METRIC_STILL_LEADER_DURING_RESET); + getClusterStatusMonitor(clusterName).reportStillLeaderDuringReset(); } _controllerOpt = Optional.empty(); } @@ -153,31 +152,11 @@ public void reset() { } } - /** - * Publishes controller metrics through the managed ClusterStatusMonitor - * @param clusterName the cluster name - * @param metricType the type of metric to publish - */ - private void publishControllerMetric(String clusterName, String metricType) { - try { - // Ensure we have a monitor for this cluster - if (_clusterStatusMonitor == null) { - _clusterStatusMonitor = new ClusterStatusMonitor(clusterName); - _clusterStatusMonitor.active(); - } - - switch (metricType) { - case METRIC_LEADERSHIP_FAILURE: - _clusterStatusMonitor.reportLeadershipFailure(); - break; - case METRIC_STILL_LEADER_DURING_RESET: - _clusterStatusMonitor.reportStillLeaderDuringReset(); - break; - default: - logger.warn("Unknown metric type: {} for cluster {}", metricType, clusterName); - } - } catch (Exception e) { - logger.error("Failed to publish {} metric for cluster {}", metricType, clusterName, e); + private ClusterStatusMonitor getClusterStatusMonitor(String clusterName) { + if (_clusterStatusMonitor == null) { + _clusterStatusMonitor = new ClusterStatusMonitor(clusterName); + _clusterStatusMonitor.active(); } + return _clusterStatusMonitor; } } \ No newline at end of file From 1da4f4e0974692b92690fcdd0f60a281b9b7fe90 Mon Sep 17 00:00:00 2001 From: vivek8420 Date: Wed, 23 Jul 2025 12:44:52 +0530 Subject: [PATCH 14/15] Addressed the comments --- .../mbeans/ClusterStatusMonitor.java | 24 ++++++++--------- .../mbeans/ClusterStatusMonitorMBean.java | 4 +-- .../DistClusterControllerStateModel.java | 4 +-- .../mbeans/TestClusterStatusMonitor.java | 26 +++++++++---------- .../TestDistControllerStateModel.java | 2 +- 5 files changed, 30 insertions(+), 30 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java index 6cd4f38a24..42390d5ff2 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java @@ -87,8 +87,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { private AtomicLong _rebalanceFailureCount = new AtomicLong(0L); private AtomicLong _continuousResourceRebalanceFailureCount = new AtomicLong(0L); private AtomicLong _continuousTaskRebalanceFailureCount = new AtomicLong(0L); - private AtomicLong _leadershipFailureCount = new AtomicLong(0L); - private AtomicLong _stillLeaderDuringResetCount = new AtomicLong(0L); + private AtomicLong _leaderFailureCount = new AtomicLong(0L); + private AtomicLong _resetLeaderFailureCount = new AtomicLong(0L); private final ConcurrentHashMap _resourceMonitorMap = new ConcurrentHashMap<>(); @@ -662,8 +662,8 @@ public void reset() { _rebalanceFailureCount.set(0L); _continuousResourceRebalanceFailureCount.set(0L); _continuousTaskRebalanceFailureCount.set(0L); - _leadershipFailureCount.set(0L); - _stillLeaderDuringResetCount.set(0L); + _leaderFailureCount.set(0L); + _resetLeaderFailureCount.set(0L); } catch (Exception e) { LOG.error("Fail to reset ClusterStatusMonitor, cluster: " + _clusterName, e); } @@ -1143,26 +1143,26 @@ public long getNumOfResourcesRebalanceThrottledGauge() { } @Override - public long getLeadershipFailureCounter() { - return _leadershipFailureCount.get(); + public long getLeaderFailureCounter() { + return _leaderFailureCount.get(); } @Override - public long getStillLeaderDuringResetCounter() { - return _stillLeaderDuringResetCount.get(); + public long getResetLeaderFailureCounter() { + return _resetLeaderFailureCount.get(); } /** * Report a leadership failure for distributed controller */ - public void reportLeadershipFailure() { - _leadershipFailureCount.incrementAndGet(); + public void reportLeaderFailure() { + _leaderFailureCount.incrementAndGet(); } /** * Report when controller is still leader during reset */ - public void reportStillLeaderDuringReset() { - _stillLeaderDuringResetCount.incrementAndGet(); + public void reportResetLeaderFailure() { + _resetLeaderFailureCount.incrementAndGet(); } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java index 5d76cc1c13..2feb6b41e4 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java @@ -151,10 +151,10 @@ public interface ClusterStatusMonitorMBean extends SensorNameProvider { /** * @return The number of leadership failures for distributed controllers */ - long getLeadershipFailureCounter(); + long getLeaderFailureCounter(); /** * @return The number of times controller was still leader during reset */ - long getStillLeaderDuringResetCounter(); + long getResetLeaderFailureCounter(); } diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java index a0a00f1936..ff38e53b70 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java @@ -81,7 +81,7 @@ public void onBecomeLeaderFromStandby(Message message, NotificationContext conte + "This should not happen. Controller: {}", clusterName ,controllerName); // Publish metrics through ClusterStatusMonitor when not a leader - getClusterStatusMonitor(clusterName).reportLeadershipFailure(); + getClusterStatusMonitor(clusterName).reportLeaderFailure(); } else { logStateTransition("STANDBY", "LEADER", clusterName, controllerName); } @@ -139,7 +139,7 @@ public void reset() { _controllerOpt.get().getInstanceName(), _controllerOpt.get().getClusterName()); // Publish metrics when controller is still leader during reset - getClusterStatusMonitor(clusterName).reportStillLeaderDuringReset(); + getClusterStatusMonitor(clusterName).reportResetLeaderFailure(); } _controllerOpt = Optional.empty(); } diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java index 575d203ecc..45f2c40798 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java @@ -612,7 +612,7 @@ private void verifyMessageMetrics(ClusterStatusMonitor monitor, Map Date: Wed, 23 Jul 2025 14:33:27 +0530 Subject: [PATCH 15/15] Addressed the comments --- .../helix/monitoring/mbeans/TestClusterStatusMonitor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java index 45f2c40798..77ce61b9c1 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java @@ -662,7 +662,7 @@ public void testResetLeaderFailureMetrics() throws Exception { // Initial state - still leader during reset counter should be 0 Object initialCount = _server.getAttribute(clusterMonitorObjName, "ResetLeaderFailureCounter"); Assert.assertTrue(initialCount instanceof Long); - Assert.assertEquals((Long) initialCount, Long.valueOf(0)); + Assert.assertEquals(initialCount, 0L); // Report still leader during reset multiple times monitor.reportResetLeaderFailure();