Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -266,15 +266,15 @@ public void dropInstance(String clusterName, InstanceConfig instanceConfig) {
String instanceName = instanceConfig.getInstanceName();

String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
if (!_zkClient.exists(instanceConfigPath)) {
throw new HelixException(
"Node " + instanceName + " does not exist in config for cluster " + clusterName);
}

String instancePath = PropertyPathBuilder.instance(clusterName, instanceName);
if (!_zkClient.exists(instancePath)) {
boolean hasConfig = _zkClient.exists(instanceConfigPath);
boolean hasInstance = _zkClient.exists(instancePath);
// dropInstancePathsRecursively is no longer atomic (config is deleted before
// the subtree). A retry after a partial drop may find the config already
// gone but the subtree still present; treat that as a resume case.
if (!hasConfig && !hasInstance) {
throw new HelixException(
"Node " + instanceName + " does not exist in instances for cluster " + clusterName);
"Node " + instanceName + " does not exist in config for cluster " + clusterName);
}

String liveInstancePath = PropertyPathBuilder.liveInstance(clusterName, instanceName);
Expand All @@ -286,13 +286,34 @@ public void dropInstance(String clusterName, InstanceConfig instanceConfig) {
dropInstancePathsRecursively(clusterName, instanceName);
}

// Two-phase drop. Previously a single deleteRecursivelyAtomic([instance, config])
// built one multi() packet whose size grew O(#znodes); on instances with ~13K
// accumulated MESSAGES it crossed the 4 MB jute.maxbuffer limit, surfaced as
// CONNECTIONLOSS, and the default 24h ZkClient retry timeout pinned the Helix
// REST Jetty thread pool until restart.
//
// Phase 1: delete InstanceConfig first. This makes the instance non-Assignable,
// so the controller stops generating new state-transition messages while the
// subtree delete is in flight.
// Phase 2: deleteRecursively on /INSTANCES/{instance}. ZkClient.deleteRecursively
// batches internally to stay within jute.maxbuffer.
//
// Trade-off: this is no longer atomic. If the JVM dies between Phase 1 and the
// end of Phase 2, a stale /INSTANCES/{instance} subtree remains. The next
// dropInstance call resumes cleanup; dropInstance's existence check above is
// relaxed to allow the resume case where InstanceConfig is already gone.
private void dropInstancePathsRecursively(String clusterName, String instanceName) {
String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
String instancePath = PropertyPathBuilder.instance(clusterName, instanceName);
int retryCnt = 0;
while (true) {
try {
_zkClient.deleteRecursivelyAtomic(Arrays.asList(instancePath, instanceConfigPath));
// Phase 1
if (_zkClient.exists(instanceConfigPath)) {
_zkClient.delete(instanceConfigPath);
}
// Phase 2
_zkClient.deleteRecursively(instancePath);
return;
} catch (ZkClientException e) {
if (retryCnt < 3 && e.getCause() instanceof ZkException && e.getCause()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,20 +224,22 @@ public void testZkHelixAdmin() {
}

// Tests that ZkClientException thrown from ZkClient should be caught
// and it should be converted HelixException to be rethrown
// and converted to HelixException. dropInstance is now a two-phase batched
// drop that delegates the subtree delete to ZkClient.deleteRecursively;
// simulate the racy NotEmpty case by having deleteRecursively throw the
// legacy exception shape so the existing 3-retry loop is exercised.
String instancePath = PropertyPathBuilder.instance(clusterName, config.getInstanceName());
String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
String liveInstancePath = PropertyPathBuilder.liveInstance(clusterName, instanceName);
RealmAwareZkClient mockZkClient = Mockito.mock(RealmAwareZkClient.class);
// Mock the exists() method to let dropInstance() reach deleteRecursively().
Mockito.when(mockZkClient.exists(instanceConfigPath)).thenReturn(true);
Mockito.when(mockZkClient.exists(instancePath)).thenReturn(true);
Mockito.when(mockZkClient.exists(liveInstancePath)).thenReturn(false);
Mockito.doThrow(new ZkClientException("ZkClientException: failed to delete " + instancePath,
new ZkException("ZkException: failed to delete " + instancePath,
new KeeperException.NotEmptyException(
"NotEmptyException: directory" + instancePath + " is not empty"))))
.when(mockZkClient).deleteRecursivelyAtomic(Arrays.asList(instancePath, instanceConfigPath));
.when(mockZkClient).deleteRecursively(instancePath);

HelixAdmin helixAdminMock = new ZKHelixAdmin(mockZkClient);
try {
Expand Down Expand Up @@ -1367,4 +1369,144 @@ public void testDropInstance() {

System.out.println("End test :" + TestHelper.getTestMethodName());
}

// Verifies dropInstance handles a subtree larger than the ZkClient
// delete-batch threshold (~1000 ops) end-to-end against real ZK. Reproduces
// the production scenario where an instance accumulates many MESSAGES; the
// legacy single deleteRecursivelyAtomic() built one multi() packet that
// crossed jute.maxbuffer.
@Test
public void testDropInstanceWithLargeMessageSubtree() {
System.out.println("Start test :" + TestHelper.getTestMethodName());
final String clusterName = "TestDropInstanceLargeSubtree";
final String instanceName = "host_with_many_messages";
final int numMessages = 2500; // > 2 batches at ZkClient layer

HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
admin.addCluster(clusterName, true);
admin.addInstance(clusterName, new InstanceConfig(instanceName));

String messagesPath = PropertyPathBuilder.instanceMessage(clusterName, instanceName);
for (int i = 0; i < numMessages; i++) {
_gZkClient.createPersistent(messagesPath + "/msg-" + i);
}
AssertJUnit.assertEquals(numMessages, _gZkClient.getChildren(messagesPath).size());

admin.dropInstance(clusterName, new InstanceConfig(instanceName));

String instancePath = PropertyPathBuilder.instance(clusterName, instanceName);
String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
AssertJUnit.assertFalse("instance subtree should be gone", _gZkClient.exists(instancePath));
AssertJUnit.assertFalse("instance config should be gone", _gZkClient.exists(instanceConfigPath));
AssertJUnit.assertTrue("cluster instance list should be empty",
admin.getInstancesInCluster(clusterName).isEmpty());

_gSetupTool.deleteCluster(clusterName);
System.out.println("End test :" + TestHelper.getTestMethodName());
}

// Resume case: if a previous dropInstance partially completed (config
// deleted but subtree delete failed), a follow-up dropInstance should clean
// up the remaining subtree instead of erroring on "config does not exist".
@Test
public void testDropInstanceResumesAfterPartialDelete() {
System.out.println("Start test :" + TestHelper.getTestMethodName());
final String clusterName = "TestDropInstanceResume";
final String instanceName = "host_partial";

HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
admin.addCluster(clusterName, true);
admin.addInstance(clusterName, new InstanceConfig(instanceName));
String messagesPath = PropertyPathBuilder.instanceMessage(clusterName, instanceName);
_gZkClient.createPersistent(messagesPath + "/leftover-msg");

// Simulate a prior partial drop: InstanceConfig already deleted, subtree remains.
String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
_gZkClient.delete(instanceConfigPath);
AssertJUnit.assertFalse(_gZkClient.exists(instanceConfigPath));
AssertJUnit.assertTrue(_gZkClient.exists(PropertyPathBuilder.instance(clusterName, instanceName)));

admin.dropInstance(clusterName, new InstanceConfig(instanceName));

AssertJUnit.assertFalse(_gZkClient.exists(PropertyPathBuilder.instance(clusterName, instanceName)));
_gSetupTool.deleteCluster(clusterName);
System.out.println("End test :" + TestHelper.getTestMethodName());
}

// Realistic instance shape: addInstance creates 7 standard subdirs, plus
// depth>=2 paths under CURRENTSTATES (sessionId/resource) and ERRORS.
// Verifies the BFS children-first ordering inside ZkClient.deleteRecursively
// works for non-trivial trees.
@Test
public void testDropInstanceWithDeepSubtreeShape() {
System.out.println("Start test :" + TestHelper.getTestMethodName());
final String clusterName = "TestDropInstanceDeepShape";
final String instanceName = "host_deep";

HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
admin.addCluster(clusterName, true);
admin.addInstance(clusterName, new InstanceConfig(instanceName));

String csPath = PropertyPathBuilder.instanceCurrentState(clusterName, instanceName);
String session = "session-1";
_gZkClient.createPersistent(csPath + "/" + session);
for (int i = 0; i < 50; i++) {
_gZkClient.createPersistent(csPath + "/" + session + "/resource-" + i);
}
String msgPath = PropertyPathBuilder.instanceMessage(clusterName, instanceName);
for (int i = 0; i < 100; i++) {
_gZkClient.createPersistent(msgPath + "/msg-" + i);
}
String errPath = PropertyPathBuilder.instanceError(clusterName, instanceName);
_gZkClient.createPersistent(errPath + "/" + session);
_gZkClient.createPersistent(errPath + "/" + session + "/res-1");

admin.dropInstance(clusterName, new InstanceConfig(instanceName));

AssertJUnit.assertFalse(_gZkClient.exists(PropertyPathBuilder.instance(clusterName, instanceName)));
AssertJUnit.assertFalse(_gZkClient.exists(PropertyPathBuilder.instanceConfig(clusterName, instanceName)));
_gSetupTool.deleteCluster(clusterName);
System.out.println("End test :" + TestHelper.getTestMethodName());
}

// Non-NotEmpty errors from deleteRecursively must NOT trigger the 3-retry
// loop. The production incident was 1880 threads stuck retrying
// CONNECTIONLOSS for 24h; we want fail-fast for anything that isn't the
// racy NotEmpty case.
@Test
public void testDropInstanceFailsFastOnNonRetryableError() {
System.out.println("Start test :" + TestHelper.getTestMethodName());
final String clusterName = "TestDropInstanceFailFast";
final String instanceName = "host_failfast";
InstanceConfig config = new InstanceConfig(instanceName);

String instancePath = PropertyPathBuilder.instance(clusterName, instanceName);
String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
String liveInstancePath = PropertyPathBuilder.liveInstance(clusterName, instanceName);

RealmAwareZkClient mockZkClient = Mockito.mock(RealmAwareZkClient.class);
Mockito.when(mockZkClient.exists(instanceConfigPath)).thenReturn(true);
Mockito.when(mockZkClient.exists(instancePath)).thenReturn(true);
Mockito.when(mockZkClient.exists(liveInstancePath)).thenReturn(false);
// Simulate an unrecoverable error (no NotEmpty cause chain, so the
// outer NotEmpty retry loop must NOT retry).
Mockito.doThrow(new ZkClientException("simulated unrecoverable ZK error"))
.when(mockZkClient).deleteRecursively(instancePath);

HelixAdmin helixAdminMock = new ZKHelixAdmin(mockZkClient);
long start = System.currentTimeMillis();
try {
helixAdminMock.dropInstance(clusterName, config);
Assert.fail("Should throw HelixException");
} catch (HelixException expected) {
Assert.assertEquals(expected.getMessage(),
"Failed to drop instance: " + instanceName + ". Retry times: 0",
"Non-NotEmpty errors must not trigger the 3-retry loop");
}
long elapsed = System.currentTimeMillis() - start;
AssertJUnit.assertTrue("dropInstance should fail fast (took " + elapsed + " ms)", elapsed < 2000);

Mockito.verify(mockZkClient, Mockito.times(1)).deleteRecursively(instancePath);
System.out.println("End test :" + TestHelper.getTestMethodName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public class ZkClient implements Watcher {
// TODO: remove it once we have a better way to exit retry for this case
private static final int NUM_CHILDREN_LIMIT = 100 * 1000;

// Batch size for deleteRecursively. Sized so each multi() packet stays well
// below jute.maxbuffer (4 MB default): ~240 bytes/op * 1000 ops ~= 240 KB.
// See deleteRecursively for context.
private static final int DELETE_RECURSIVE_BATCH_SIZE = 1000;

private static final boolean SYNC_ON_SESSION = Boolean.parseBoolean(
System.getProperty(ZkSystemPropertyKeys.ZK_AUTOSYNC_ENABLED, "true"));
private static final String SYNC_PATH = "/";
Expand Down Expand Up @@ -1777,31 +1782,117 @@ public boolean deleteRecursive(String path) {

/**
* Delete the path as well as all its children.
* @param path
* @throws ZkClientException
*
* <p>This operation is <b>not atomic</b>: it may partially complete on
* failure. A second call is safe to run and will resume the deletion
* (already-deleted znodes are tolerated as NoNode results).
*
* <p>Implementation: BFS-collects all descendant paths children-first, then
* issues delete operations in batches of {@value #DELETE_RECURSIVE_BATCH_SIZE}
* via {@link #multi(Iterable)}. Batching keeps each on-wire packet well
* below {@code jute.maxbuffer} (4 MB by default) so this method works for
* arbitrarily large subtrees. If the parent znode is racily re-populated
* mid-delete (e.g. another agent writing children), the resulting NotEmpty
* is bubbled up wrapped in
* {@code ZkClientException -> ZkException -> KeeperException.NotEmptyException}
* so callers' existing NotEmpty-retry loops can re-walk and retry.
*
* @param path ZK path to delete (no-op if path does not exist)
* @throws ZkClientException on unrecoverable failure
*/
public void deleteRecursively(String path) throws ZkClientException {
List<String> children;
if (!exists(path)) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: checked the zk side code of this, The if (!exists(path)) return; here is an extra ZK round-trip for a case collectSubtreeChildrenFirst already handles via its ZkNoNodeException catch on the initial getChildren. On the no-op path you pay 2 round-trips instead of 1.

return;
}
List<String> orderedPaths;
try {
children = getChildren(path, false);
orderedPaths = collectSubtreeChildrenFirst(path);
} catch (ZkNoNodeException e) {
// if the node to be deleted does not exist, treat it as success.
return;
}

for (String subPath : children) {
deleteRecursively(path + "/" + subPath);
if (orderedPaths.isEmpty()) {
return;
}
for (int i = 0; i < orderedPaths.size(); i += DELETE_RECURSIVE_BATCH_SIZE) {
int end = Math.min(i + DELETE_RECURSIVE_BATCH_SIZE, orderedPaths.size());
List<Op> ops = new ArrayList<>(end - i);
for (int j = i; j < end; j++) {
ops.add(Op.delete(orderedPaths.get(j), -1));
}
List<OpResult> opResults;
try {
opResults = multi(ops);
} catch (Exception e) {
LOG.error("zkclient {}, Failed batched delete for {} (batch index {}, size {}), exception {}",
_uid, path, i, ops.size(), e);
throw new ZkClientException("Failed to delete " + path, e);
}
Map<String, KeeperException.Code> failedPathsMap = new HashMap<>();
Map<String, KeeperException.Code> notEmptyPathsMap = new HashMap<>();
for (int k = 0; k < opResults.size(); k++) {
if (opResults.get(k) instanceof OpResult.ErrorResult) {
KeeperException.Code code = KeeperException.Code
.get(((OpResult.ErrorResult) opResults.get(k)).getErr());
if (code == KeeperException.Code.OK || code == KeeperException.Code.NONODE) {
// NoNode is tolerated: a previous partial delete or concurrent
// delete may have already removed this znode.
continue;
}
if (code == KeeperException.Code.NOTEMPTY) {
// NotEmpty surfaces when a child znode appears under a parent we
// are deleting (e.g. controller writing ParticipantHistory while
// an instance is being dropped). Bubble in the same exception
// shape callers may already inspect for retry.
notEmptyPathsMap.put(ops.get(k).getPath(), code);
} else {
failedPathsMap.put(ops.get(k).getPath(), code);
}
}
}
if (!notEmptyPathsMap.isEmpty()) {
String firstNotEmptyPath = notEmptyPathsMap.keySet().iterator().next();
LOG.warn("zkclient {}, NotEmpty during recursive delete of {} on paths {}",
_uid, path, notEmptyPathsMap.keySet());
throw new ZkClientException("Failed to delete " + path,
new ZkException(
"Failed to delete " + path + " due to NotEmpty on " + notEmptyPathsMap.keySet(),
new KeeperException.NotEmptyException(firstNotEmptyPath)));
}
if (!failedPathsMap.isEmpty()) {
LOG.error("zkclient {}, Failed batched delete for {} with error codes {}",
_uid, path, failedPathsMap);
throw new ZkClientException(
"Failed to delete " + path + " with ZK error codes: " + failedPathsMap);
}
}
}

// delete() function call will return true if successful, false if the path does not
// exist (in this context, it should be treated as successful), and throw exception
// if there is any other failure case.
try {
delete(path);
} catch (Exception e) {
LOG.error("zkclient {}, Failed to delete {}, exception {}", _uid, path, e);
throw new ZkClientException("Failed to delete " + path, e);
/**
* BFS-walk the subtree rooted at {@code root} and return all znode paths
* ordered children-first (safe for sequential deletion). NoNode encountered
* during traversal is tolerated: the missing branch is skipped.
*/
private List<String> collectSubtreeChildrenFirst(String root) {
List<String> orderedPaths = new ArrayList<>();
Queue<String> queue = new LinkedList<>();
queue.offer(root);
while (!queue.isEmpty()) {
String node = queue.poll();
List<String> children;
try {
children = getChildren(node, false);
} catch (ZkNoNodeException e) {
continue;
}
if (children != null) {
for (String child : children) {
queue.offer(node + "/" + child);
}
}
orderedPaths.add(node);
}
Collections.reverse(orderedPaths);
return orderedPaths;
}

/**
Expand Down
Loading
Loading