diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 50a9f04390..c70d47d861 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -266,15 +266,15 @@ public void dropInstance(String clusterName, InstanceConfig instanceConfig) { String instanceName = instanceConfig.getInstanceName(); String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName); - if (!_zkClient.exists(instanceConfigPath)) { - throw new HelixException( - "Node " + instanceName + " does not exist in config for cluster " + clusterName); - } - String instancePath = PropertyPathBuilder.instance(clusterName, instanceName); - if (!_zkClient.exists(instancePath)) { + boolean hasConfig = _zkClient.exists(instanceConfigPath); + boolean hasInstance = _zkClient.exists(instancePath); + // dropInstancePathsRecursively is no longer atomic (config is deleted before + // the subtree). A retry after a partial drop may find the config already + // gone but the subtree still present; treat that as a resume case. + if (!hasConfig && !hasInstance) { throw new HelixException( - "Node " + instanceName + " does not exist in instances for cluster " + clusterName); + "Node " + instanceName + " does not exist in config for cluster " + clusterName); } String liveInstancePath = PropertyPathBuilder.liveInstance(clusterName, instanceName); @@ -286,13 +286,34 @@ public void dropInstance(String clusterName, InstanceConfig instanceConfig) { dropInstancePathsRecursively(clusterName, instanceName); } + // Two-phase drop. Previously a single deleteRecursivelyAtomic([instance, config]) + // built one multi() packet whose size grew O(#znodes); on instances with ~13K + // accumulated MESSAGES it crossed the 4 MB jute.maxbuffer limit, surfaced as + // CONNECTIONLOSS, and the default 24h ZkClient retry timeout pinned the Helix + // REST Jetty thread pool until restart. + // + // Phase 1: delete InstanceConfig first. This makes the instance non-Assignable, + // so the controller stops generating new state-transition messages while the + // subtree delete is in flight. + // Phase 2: deleteRecursively on /INSTANCES/{instance}. ZkClient.deleteRecursively + // batches internally to stay within jute.maxbuffer. + // + // Trade-off: this is no longer atomic. If the JVM dies between Phase 1 and the + // end of Phase 2, a stale /INSTANCES/{instance} subtree remains. The next + // dropInstance call resumes cleanup; dropInstance's existence check above is + // relaxed to allow the resume case where InstanceConfig is already gone. private void dropInstancePathsRecursively(String clusterName, String instanceName) { String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName); String instancePath = PropertyPathBuilder.instance(clusterName, instanceName); int retryCnt = 0; while (true) { try { - _zkClient.deleteRecursivelyAtomic(Arrays.asList(instancePath, instanceConfigPath)); + // Phase 1 + if (_zkClient.exists(instanceConfigPath)) { + _zkClient.delete(instanceConfigPath); + } + // Phase 2 + _zkClient.deleteRecursively(instancePath); return; } catch (ZkClientException e) { if (retryCnt < 3 && e.getCause() instanceof ZkException && e.getCause() diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java index b60b909a26..7f1cc03ece 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java @@ -224,12 +224,14 @@ public void testZkHelixAdmin() { } // Tests that ZkClientException thrown from ZkClient should be caught - // and it should be converted HelixException to be rethrown + // and converted to HelixException. dropInstance is now a two-phase batched + // drop that delegates the subtree delete to ZkClient.deleteRecursively; + // simulate the racy NotEmpty case by having deleteRecursively throw the + // legacy exception shape so the existing 3-retry loop is exercised. String instancePath = PropertyPathBuilder.instance(clusterName, config.getInstanceName()); String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName); String liveInstancePath = PropertyPathBuilder.liveInstance(clusterName, instanceName); RealmAwareZkClient mockZkClient = Mockito.mock(RealmAwareZkClient.class); - // Mock the exists() method to let dropInstance() reach deleteRecursively(). Mockito.when(mockZkClient.exists(instanceConfigPath)).thenReturn(true); Mockito.when(mockZkClient.exists(instancePath)).thenReturn(true); Mockito.when(mockZkClient.exists(liveInstancePath)).thenReturn(false); @@ -237,7 +239,7 @@ public void testZkHelixAdmin() { new ZkException("ZkException: failed to delete " + instancePath, new KeeperException.NotEmptyException( "NotEmptyException: directory" + instancePath + " is not empty")))) - .when(mockZkClient).deleteRecursivelyAtomic(Arrays.asList(instancePath, instanceConfigPath)); + .when(mockZkClient).deleteRecursively(instancePath); HelixAdmin helixAdminMock = new ZKHelixAdmin(mockZkClient); try { @@ -1367,4 +1369,144 @@ public void testDropInstance() { System.out.println("End test :" + TestHelper.getTestMethodName()); } + + // Verifies dropInstance handles a subtree larger than the ZkClient + // delete-batch threshold (~1000 ops) end-to-end against real ZK. Reproduces + // the production scenario where an instance accumulates many MESSAGES; the + // legacy single deleteRecursivelyAtomic() built one multi() packet that + // crossed jute.maxbuffer. + @Test + public void testDropInstanceWithLargeMessageSubtree() { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + final String clusterName = "TestDropInstanceLargeSubtree"; + final String instanceName = "host_with_many_messages"; + final int numMessages = 2500; // > 2 batches at ZkClient layer + + HelixAdmin admin = new ZKHelixAdmin(_gZkClient); + admin.addCluster(clusterName, true); + admin.addInstance(clusterName, new InstanceConfig(instanceName)); + + String messagesPath = PropertyPathBuilder.instanceMessage(clusterName, instanceName); + for (int i = 0; i < numMessages; i++) { + _gZkClient.createPersistent(messagesPath + "/msg-" + i); + } + AssertJUnit.assertEquals(numMessages, _gZkClient.getChildren(messagesPath).size()); + + admin.dropInstance(clusterName, new InstanceConfig(instanceName)); + + String instancePath = PropertyPathBuilder.instance(clusterName, instanceName); + String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName); + AssertJUnit.assertFalse("instance subtree should be gone", _gZkClient.exists(instancePath)); + AssertJUnit.assertFalse("instance config should be gone", _gZkClient.exists(instanceConfigPath)); + AssertJUnit.assertTrue("cluster instance list should be empty", + admin.getInstancesInCluster(clusterName).isEmpty()); + + _gSetupTool.deleteCluster(clusterName); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + // Resume case: if a previous dropInstance partially completed (config + // deleted but subtree delete failed), a follow-up dropInstance should clean + // up the remaining subtree instead of erroring on "config does not exist". + @Test + public void testDropInstanceResumesAfterPartialDelete() { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + final String clusterName = "TestDropInstanceResume"; + final String instanceName = "host_partial"; + + HelixAdmin admin = new ZKHelixAdmin(_gZkClient); + admin.addCluster(clusterName, true); + admin.addInstance(clusterName, new InstanceConfig(instanceName)); + String messagesPath = PropertyPathBuilder.instanceMessage(clusterName, instanceName); + _gZkClient.createPersistent(messagesPath + "/leftover-msg"); + + // Simulate a prior partial drop: InstanceConfig already deleted, subtree remains. + String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName); + _gZkClient.delete(instanceConfigPath); + AssertJUnit.assertFalse(_gZkClient.exists(instanceConfigPath)); + AssertJUnit.assertTrue(_gZkClient.exists(PropertyPathBuilder.instance(clusterName, instanceName))); + + admin.dropInstance(clusterName, new InstanceConfig(instanceName)); + + AssertJUnit.assertFalse(_gZkClient.exists(PropertyPathBuilder.instance(clusterName, instanceName))); + _gSetupTool.deleteCluster(clusterName); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + // Realistic instance shape: addInstance creates 7 standard subdirs, plus + // depth>=2 paths under CURRENTSTATES (sessionId/resource) and ERRORS. + // Verifies the BFS children-first ordering inside ZkClient.deleteRecursively + // works for non-trivial trees. + @Test + public void testDropInstanceWithDeepSubtreeShape() { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + final String clusterName = "TestDropInstanceDeepShape"; + final String instanceName = "host_deep"; + + HelixAdmin admin = new ZKHelixAdmin(_gZkClient); + admin.addCluster(clusterName, true); + admin.addInstance(clusterName, new InstanceConfig(instanceName)); + + String csPath = PropertyPathBuilder.instanceCurrentState(clusterName, instanceName); + String session = "session-1"; + _gZkClient.createPersistent(csPath + "/" + session); + for (int i = 0; i < 50; i++) { + _gZkClient.createPersistent(csPath + "/" + session + "/resource-" + i); + } + String msgPath = PropertyPathBuilder.instanceMessage(clusterName, instanceName); + for (int i = 0; i < 100; i++) { + _gZkClient.createPersistent(msgPath + "/msg-" + i); + } + String errPath = PropertyPathBuilder.instanceError(clusterName, instanceName); + _gZkClient.createPersistent(errPath + "/" + session); + _gZkClient.createPersistent(errPath + "/" + session + "/res-1"); + + admin.dropInstance(clusterName, new InstanceConfig(instanceName)); + + AssertJUnit.assertFalse(_gZkClient.exists(PropertyPathBuilder.instance(clusterName, instanceName))); + AssertJUnit.assertFalse(_gZkClient.exists(PropertyPathBuilder.instanceConfig(clusterName, instanceName))); + _gSetupTool.deleteCluster(clusterName); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + // Non-NotEmpty errors from deleteRecursively must NOT trigger the 3-retry + // loop. The production incident was 1880 threads stuck retrying + // CONNECTIONLOSS for 24h; we want fail-fast for anything that isn't the + // racy NotEmpty case. + @Test + public void testDropInstanceFailsFastOnNonRetryableError() { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + final String clusterName = "TestDropInstanceFailFast"; + final String instanceName = "host_failfast"; + InstanceConfig config = new InstanceConfig(instanceName); + + String instancePath = PropertyPathBuilder.instance(clusterName, instanceName); + String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName); + String liveInstancePath = PropertyPathBuilder.liveInstance(clusterName, instanceName); + + RealmAwareZkClient mockZkClient = Mockito.mock(RealmAwareZkClient.class); + Mockito.when(mockZkClient.exists(instanceConfigPath)).thenReturn(true); + Mockito.when(mockZkClient.exists(instancePath)).thenReturn(true); + Mockito.when(mockZkClient.exists(liveInstancePath)).thenReturn(false); + // Simulate an unrecoverable error (no NotEmpty cause chain, so the + // outer NotEmpty retry loop must NOT retry). + Mockito.doThrow(new ZkClientException("simulated unrecoverable ZK error")) + .when(mockZkClient).deleteRecursively(instancePath); + + HelixAdmin helixAdminMock = new ZKHelixAdmin(mockZkClient); + long start = System.currentTimeMillis(); + try { + helixAdminMock.dropInstance(clusterName, config); + Assert.fail("Should throw HelixException"); + } catch (HelixException expected) { + Assert.assertEquals(expected.getMessage(), + "Failed to drop instance: " + instanceName + ". Retry times: 0", + "Non-NotEmpty errors must not trigger the 3-retry loop"); + } + long elapsed = System.currentTimeMillis() - start; + AssertJUnit.assertTrue("dropInstance should fail fast (took " + elapsed + " ms)", elapsed < 2000); + + Mockito.verify(mockZkClient, Mockito.times(1)).deleteRecursively(instancePath); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java index 597a369a00..8afd226f29 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java @@ -106,6 +106,11 @@ public class ZkClient implements Watcher { // TODO: remove it once we have a better way to exit retry for this case private static final int NUM_CHILDREN_LIMIT = 100 * 1000; + // Batch size for deleteRecursively. Sized so each multi() packet stays well + // below jute.maxbuffer (4 MB default): ~240 bytes/op * 1000 ops ~= 240 KB. + // See deleteRecursively for context. + private static final int DELETE_RECURSIVE_BATCH_SIZE = 1000; + private static final boolean SYNC_ON_SESSION = Boolean.parseBoolean( System.getProperty(ZkSystemPropertyKeys.ZK_AUTOSYNC_ENABLED, "true")); private static final String SYNC_PATH = "/"; @@ -1777,31 +1782,117 @@ public boolean deleteRecursive(String path) { /** * Delete the path as well as all its children. - * @param path - * @throws ZkClientException + * + *

This operation is not atomic: it may partially complete on + * failure. A second call is safe to run and will resume the deletion + * (already-deleted znodes are tolerated as NoNode results). + * + *

Implementation: BFS-collects all descendant paths children-first, then + * issues delete operations in batches of {@value #DELETE_RECURSIVE_BATCH_SIZE} + * via {@link #multi(Iterable)}. Batching keeps each on-wire packet well + * below {@code jute.maxbuffer} (4 MB by default) so this method works for + * arbitrarily large subtrees. If the parent znode is racily re-populated + * mid-delete (e.g. another agent writing children), the resulting NotEmpty + * is bubbled up wrapped in + * {@code ZkClientException -> ZkException -> KeeperException.NotEmptyException} + * so callers' existing NotEmpty-retry loops can re-walk and retry. + * + * @param path ZK path to delete (no-op if path does not exist) + * @throws ZkClientException on unrecoverable failure */ public void deleteRecursively(String path) throws ZkClientException { - List children; + if (!exists(path)) { + return; + } + List orderedPaths; try { - children = getChildren(path, false); + orderedPaths = collectSubtreeChildrenFirst(path); } catch (ZkNoNodeException e) { - // if the node to be deleted does not exist, treat it as success. return; } - - for (String subPath : children) { - deleteRecursively(path + "/" + subPath); + if (orderedPaths.isEmpty()) { + return; + } + for (int i = 0; i < orderedPaths.size(); i += DELETE_RECURSIVE_BATCH_SIZE) { + int end = Math.min(i + DELETE_RECURSIVE_BATCH_SIZE, orderedPaths.size()); + List ops = new ArrayList<>(end - i); + for (int j = i; j < end; j++) { + ops.add(Op.delete(orderedPaths.get(j), -1)); + } + List opResults; + try { + opResults = multi(ops); + } catch (Exception e) { + LOG.error("zkclient {}, Failed batched delete for {} (batch index {}, size {}), exception {}", + _uid, path, i, ops.size(), e); + throw new ZkClientException("Failed to delete " + path, e); + } + Map failedPathsMap = new HashMap<>(); + Map notEmptyPathsMap = new HashMap<>(); + for (int k = 0; k < opResults.size(); k++) { + if (opResults.get(k) instanceof OpResult.ErrorResult) { + KeeperException.Code code = KeeperException.Code + .get(((OpResult.ErrorResult) opResults.get(k)).getErr()); + if (code == KeeperException.Code.OK || code == KeeperException.Code.NONODE) { + // NoNode is tolerated: a previous partial delete or concurrent + // delete may have already removed this znode. + continue; + } + if (code == KeeperException.Code.NOTEMPTY) { + // NotEmpty surfaces when a child znode appears under a parent we + // are deleting (e.g. controller writing ParticipantHistory while + // an instance is being dropped). Bubble in the same exception + // shape callers may already inspect for retry. + notEmptyPathsMap.put(ops.get(k).getPath(), code); + } else { + failedPathsMap.put(ops.get(k).getPath(), code); + } + } + } + if (!notEmptyPathsMap.isEmpty()) { + String firstNotEmptyPath = notEmptyPathsMap.keySet().iterator().next(); + LOG.warn("zkclient {}, NotEmpty during recursive delete of {} on paths {}", + _uid, path, notEmptyPathsMap.keySet()); + throw new ZkClientException("Failed to delete " + path, + new ZkException( + "Failed to delete " + path + " due to NotEmpty on " + notEmptyPathsMap.keySet(), + new KeeperException.NotEmptyException(firstNotEmptyPath))); + } + if (!failedPathsMap.isEmpty()) { + LOG.error("zkclient {}, Failed batched delete for {} with error codes {}", + _uid, path, failedPathsMap); + throw new ZkClientException( + "Failed to delete " + path + " with ZK error codes: " + failedPathsMap); + } } + } - // delete() function call will return true if successful, false if the path does not - // exist (in this context, it should be treated as successful), and throw exception - // if there is any other failure case. - try { - delete(path); - } catch (Exception e) { - LOG.error("zkclient {}, Failed to delete {}, exception {}", _uid, path, e); - throw new ZkClientException("Failed to delete " + path, e); + /** + * BFS-walk the subtree rooted at {@code root} and return all znode paths + * ordered children-first (safe for sequential deletion). NoNode encountered + * during traversal is tolerated: the missing branch is skipped. + */ + private List collectSubtreeChildrenFirst(String root) { + List orderedPaths = new ArrayList<>(); + Queue queue = new LinkedList<>(); + queue.offer(root); + while (!queue.isEmpty()) { + String node = queue.poll(); + List children; + try { + children = getChildren(node, false); + } catch (ZkNoNodeException e) { + continue; + } + if (children != null) { + for (String child : children) { + queue.offer(node + "/" + child); + } + } + orderedPaths.add(node); } + Collections.reverse(orderedPaths); + return orderedPaths; } /** diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java index bff40045ab..ec0654540b 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java @@ -1193,4 +1193,79 @@ void testDeleteRecursivelyAtomic() { Assert.assertFalse(_zkClient.exists(grandParent)); Assert.assertFalse(_zkClient.exists(newNode)); } + + // Verifies the batched implementation of deleteRecursively handles a subtree + // larger than DELETE_RECURSIVE_BATCH_SIZE (1000). The legacy single-op + // recursion was jute.maxbuffer-safe but O(N) round-trips; the new batched + // multi() path must complete in ceil(N/1000) round-trips for the same N + // znodes, all deleted. + @Test + void testDeleteRecursivelyLargeSubtree() { + System.out.println("Start test: " + TestHelper.getTestMethodName()); + String root = "/testDeleteRecursivelyLargeSubtree"; + int numChildren = 2500; // > 2 batches + _zkClient.createPersistent(root); + for (int i = 0; i < numChildren; i++) { + _zkClient.createPersistent(root + "/child-" + i); + } + Assert.assertEquals(_zkClient.getChildren(root).size(), numChildren); + + _zkClient.deleteRecursively(root); + + Assert.assertFalse(_zkClient.exists(root)); + } + + // Deeper subtree shape (depth > 2) - verifies the children-first BFS + // ordering produces a valid delete sequence when ops cross batch + // boundaries between depth levels. + @Test + void testDeleteRecursivelyDeepSubtree() { + System.out.println("Start test: " + TestHelper.getTestMethodName()); + String root = "/testDeleteRecursivelyDeepSubtree"; + _zkClient.createPersistent(root); + // Build a wide+deep tree: 3 grandchildren x 100 children each = 303 znodes + for (int g = 0; g < 3; g++) { + String gpath = root + "/group-" + g; + _zkClient.createPersistent(gpath); + for (int i = 0; i < 100; i++) { + _zkClient.createPersistent(gpath + "/leaf-" + i); + } + } + + _zkClient.deleteRecursively(root); + + Assert.assertFalse(_zkClient.exists(root)); + } + + // Idempotency: deleting a non-existent path is a no-op (preserves the + // legacy contract where ZkNoNodeException on initial getChildren returned + // success). + @Test + void testDeleteRecursivelyOnMissingPathIsNoOp() { + System.out.println("Start test: " + TestHelper.getTestMethodName()); + String missing = "/testDeleteRecursivelyMissing"; + Assert.assertFalse(_zkClient.exists(missing)); + _zkClient.deleteRecursively(missing); // must not throw + Assert.assertFalse(_zkClient.exists(missing)); + } + + // Idempotency under partial delete: simulate a previous incomplete run by + // deleting a leaf out-of-band, then call deleteRecursively on the parent. + // The internal multi() must tolerate NoNode results within a batch. + @Test + void testDeleteRecursivelyIdempotentAfterPartialDelete() { + System.out.println("Start test: " + TestHelper.getTestMethodName()); + String root = "/testDeleteRecursivelyPartial"; + _zkClient.createPersistent(root); + for (int i = 0; i < 5; i++) { + _zkClient.createPersistent(root + "/child-" + i); + } + // Out-of-band: delete one child to simulate prior partial cleanup. + _zkClient.delete(root + "/child-2"); + Assert.assertFalse(_zkClient.exists(root + "/child-2")); + + _zkClient.deleteRecursively(root); + + Assert.assertFalse(_zkClient.exists(root)); + } }