diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java index ad283d4a02ccd..3a0aaec2aeda8 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java @@ -19,11 +19,13 @@ package org.apache.iotdb.pipe.it.dual.tablemodel.manual.enhanced; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.commons.cluster.RegionRoleType; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; @@ -51,6 +53,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -59,12 +63,15 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.fail; @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT2DualTableManualEnhanced.class}) public class IoTDBPipeClusterIT extends AbstractPipeTableModelDualManualIT { + private static final double SYNC_LAG_DELTA = 0.001; + @Override @Before public void setUp() { @@ -299,41 +306,7 @@ public void testPipeAfterDataRegionLeaderStop() { TableModelUtils.insertData("test1", "test1", 100, 200, senderEnv); - final AtomicInteger leaderPort = new AtomicInteger(-1); - final TShowRegionResp showRegionResp = - client.showRegion(new TShowRegionReq().setIsTableModel(true)); - showRegionResp - .getRegionInfoList() - .forEach( - regionInfo -> { - if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) { - leaderPort.set(regionInfo.getClientRpcPort()); - } - }); - - int leaderIndex = -1; - for (int i = 0; i < 3; ++i) { - if (senderEnv.getDataNodeWrapper(i).getPort() == leaderPort.get()) { - leaderIndex = i; - try { - senderEnv.shutdownDataNode(i); - } catch (final Throwable e) { - e.printStackTrace(); - return; - } - try { - TimeUnit.SECONDS.sleep(1); - } catch (final InterruptedException ignored) { - } - try { - senderEnv.startDataNode(i); - ((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown(); - } catch (final Throwable e) { - e.printStackTrace(); - return; - } - } - } + final int leaderIndex = restartTableDataRegionLeader(client, "test1"); if (leaderIndex == -1) { // ensure the leader is stopped fail(); } @@ -343,6 +316,7 @@ public void testPipeAfterDataRegionLeaderStop() { TableModelUtils.insertData("test1", "test1", 200, 300, senderEnv); TableModelUtils.assertData("test", "test", 0, 300, receiverEnv, handleFailure); + waitForTableDataRegionReplicationComplete(Arrays.asList("test", "test1")); } try { @@ -398,6 +372,140 @@ public void testPipeAfterDataRegionLeaderStop() { } } + private int restartTableDataRegionLeader( + final SyncConfigNodeIServiceClient client, final String database) throws TException { + final List leaderRegionInfoList = + showTableDataRegionLeaders(Collections.singletonList(database), client); + if (leaderRegionInfoList.isEmpty()) { + return -1; + } + + final TRegionInfo targetRegionInfo = + leaderRegionInfoList.stream() + .min(Comparator.comparingInt(regionInfo -> regionInfo.getConsensusGroupId().getId())) + .orElse(null); + if (targetRegionInfo == null) { + return -1; + } + + final int leaderPort = targetRegionInfo.getClientRpcPort(); + for (int i = 0; i < senderEnv.getDataNodeWrapperList().size(); ++i) { + if (senderEnv.getDataNodeWrapper(i).getPort() != leaderPort) { + continue; + } + + try { + senderEnv.shutdownDataNode(i); + } catch (final Throwable e) { + e.printStackTrace(); + return -1; + } + + try { + TimeUnit.SECONDS.sleep(1); + } catch (final InterruptedException ignored) { + Thread.currentThread().interrupt(); + return -1; + } + + try { + senderEnv.startDataNode(i); + ((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown(); + } catch (final Throwable e) { + e.printStackTrace(); + return -1; + } + return i; + } + return -1; + } + + private void waitForTableDataRegionReplicationComplete(final List databases) { + await() + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> { + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final List leaderRegionInfoList = + showTableDataRegionLeaders(databases, client); + Assert.assertFalse( + "No table DataRegion leader found for databases " + databases, + leaderRegionInfoList.isEmpty()); + + for (final TRegionInfo regionInfo : leaderRegionInfoList) { + final DataNodeWrapper leaderNode = + findDataNodeWrapperByPort(regionInfo.getClientRpcPort()); + final String metricsUrl = + "http://" + + leaderNode.getIp() + + ":" + + leaderNode.getMetricPort() + + "/metrics"; + final String metricsContent = senderEnv.getUrlContent(metricsUrl, null); + Assert.assertNotNull( + "Failed to fetch metrics from leader DataNode at " + metricsUrl, + metricsContent); + assertSyncLagIsZero(metricsContent, buildDataRegionTag(regionInfo), metricsUrl); + } + } + }); + } + + private List showTableDataRegionLeaders( + final List databases, final SyncConfigNodeIServiceClient client) throws TException { + final TShowRegionResp showRegionResp = + client.showRegion( + new TShowRegionReq() + .setConsensusGroupType(TConsensusGroupType.DataRegion) + .setDatabases(databases) + .setIsTableModel(true)); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode()); + final List result = new ArrayList<>(); + for (final TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) { + if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) { + result.add(regionInfo); + } + } + return result; + } + + private DataNodeWrapper findDataNodeWrapperByPort(final int port) { + for (final DataNodeWrapper dataNodeWrapper : senderEnv.getDataNodeWrapperList()) { + if (dataNodeWrapper.getPort() == port) { + return dataNodeWrapper; + } + } + fail("Failed to find DataNodeWrapper for client rpc port " + port); + return null; + } + + private String buildDataRegionTag(final TRegionInfo regionInfo) { + return "DataRegion[" + regionInfo.getConsensusGroupId().getId() + "]"; + } + + private void assertSyncLagIsZero( + final String metricsContent, final String dataRegionTag, final String metricsUrl) { + for (final String line : metricsContent.split("\\R")) { + if (!line.startsWith("iot_consensus{") + || !line.contains("type=\"syncLag\"") + || !line.contains("region=\"" + dataRegionTag + "\"")) { + continue; + } + final int lastSpaceIndex = line.lastIndexOf(' '); + Assert.assertTrue("Malformed syncLag metric line: " + line, lastSpaceIndex > 0); + Assert.assertEquals( + "Expected syncLag of " + dataRegionTag + " to be 0 at " + metricsUrl + " but got " + line, + 0.0, + Double.parseDouble(line.substring(lastSpaceIndex + 1)), + SYNC_LAG_DELTA); + return; + } + fail("No syncLag metric found for " + dataRegionTag + " at " + metricsUrl); + } + @Test public void testPipeAfterRegisterNewDataNode() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);