Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

package org.apache.iotdb.pipe.it.dual.tablemodel.manual.enhanced;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.cluster.RegionRoleType;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
Expand Down Expand Up @@ -51,6 +53,8 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -59,12 +63,15 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static org.awaitility.Awaitility.await;
import static org.junit.Assert.fail;

@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2DualTableManualEnhanced.class})
public class IoTDBPipeClusterIT extends AbstractPipeTableModelDualManualIT {

private static final double SYNC_LAG_DELTA = 0.001;

@Override
@Before
public void setUp() {
Expand Down Expand Up @@ -299,41 +306,7 @@ public void testPipeAfterDataRegionLeaderStop() {

TableModelUtils.insertData("test1", "test1", 100, 200, senderEnv);

final AtomicInteger leaderPort = new AtomicInteger(-1);
final TShowRegionResp showRegionResp =
client.showRegion(new TShowRegionReq().setIsTableModel(true));
showRegionResp
.getRegionInfoList()
.forEach(
regionInfo -> {
if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
leaderPort.set(regionInfo.getClientRpcPort());
}
});

int leaderIndex = -1;
for (int i = 0; i < 3; ++i) {
if (senderEnv.getDataNodeWrapper(i).getPort() == leaderPort.get()) {
leaderIndex = i;
try {
senderEnv.shutdownDataNode(i);
} catch (final Throwable e) {
e.printStackTrace();
return;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (final InterruptedException ignored) {
}
try {
senderEnv.startDataNode(i);
((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown();
} catch (final Throwable e) {
e.printStackTrace();
return;
}
}
}
final int leaderIndex = restartTableDataRegionLeader(client, "test1");
if (leaderIndex == -1) { // ensure the leader is stopped
fail();
}
Expand All @@ -343,6 +316,7 @@ public void testPipeAfterDataRegionLeaderStop() {
TableModelUtils.insertData("test1", "test1", 200, 300, senderEnv);

TableModelUtils.assertData("test", "test", 0, 300, receiverEnv, handleFailure);
waitForTableDataRegionReplicationComplete(Arrays.asList("test", "test1"));
}

try {
Expand Down Expand Up @@ -398,6 +372,140 @@ public void testPipeAfterDataRegionLeaderStop() {
}
}

private int restartTableDataRegionLeader(
final SyncConfigNodeIServiceClient client, final String database) throws TException {
final List<TRegionInfo> leaderRegionInfoList =
showTableDataRegionLeaders(Collections.singletonList(database), client);
if (leaderRegionInfoList.isEmpty()) {
return -1;
}

final TRegionInfo targetRegionInfo =
leaderRegionInfoList.stream()
.min(Comparator.comparingInt(regionInfo -> regionInfo.getConsensusGroupId().getId()))
.orElse(null);
if (targetRegionInfo == null) {
return -1;
}

final int leaderPort = targetRegionInfo.getClientRpcPort();
for (int i = 0; i < senderEnv.getDataNodeWrapperList().size(); ++i) {
if (senderEnv.getDataNodeWrapper(i).getPort() != leaderPort) {
continue;
}

try {
senderEnv.shutdownDataNode(i);
} catch (final Throwable e) {
e.printStackTrace();
return -1;
}

try {
TimeUnit.SECONDS.sleep(1);
} catch (final InterruptedException ignored) {
Thread.currentThread().interrupt();
return -1;
}

try {
senderEnv.startDataNode(i);
((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown();
} catch (final Throwable e) {
e.printStackTrace();
return -1;
}
return i;
}
return -1;
}

private void waitForTableDataRegionReplicationComplete(final List<String> databases) {
await()
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() -> {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final List<TRegionInfo> leaderRegionInfoList =
showTableDataRegionLeaders(databases, client);
Assert.assertFalse(
"No table DataRegion leader found for databases " + databases,
leaderRegionInfoList.isEmpty());

for (final TRegionInfo regionInfo : leaderRegionInfoList) {
final DataNodeWrapper leaderNode =
findDataNodeWrapperByPort(regionInfo.getClientRpcPort());
final String metricsUrl =
"http://"
+ leaderNode.getIp()
+ ":"
+ leaderNode.getMetricPort()
+ "/metrics";
final String metricsContent = senderEnv.getUrlContent(metricsUrl, null);
Assert.assertNotNull(
"Failed to fetch metrics from leader DataNode at " + metricsUrl,
metricsContent);
assertSyncLagIsZero(metricsContent, buildDataRegionTag(regionInfo), metricsUrl);
}
}
});
}

private List<TRegionInfo> showTableDataRegionLeaders(
final List<String> databases, final SyncConfigNodeIServiceClient client) throws TException {
final TShowRegionResp showRegionResp =
client.showRegion(
new TShowRegionReq()
.setConsensusGroupType(TConsensusGroupType.DataRegion)
.setDatabases(databases)
.setIsTableModel(true));
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode());
final List<TRegionInfo> result = new ArrayList<>();
for (final TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
result.add(regionInfo);
}
}
return result;
}

private DataNodeWrapper findDataNodeWrapperByPort(final int port) {
for (final DataNodeWrapper dataNodeWrapper : senderEnv.getDataNodeWrapperList()) {
if (dataNodeWrapper.getPort() == port) {
return dataNodeWrapper;
}
}
fail("Failed to find DataNodeWrapper for client rpc port " + port);
return null;
}

private String buildDataRegionTag(final TRegionInfo regionInfo) {
return "DataRegion[" + regionInfo.getConsensusGroupId().getId() + "]";
}

private void assertSyncLagIsZero(
final String metricsContent, final String dataRegionTag, final String metricsUrl) {
for (final String line : metricsContent.split("\\R")) {
if (!line.startsWith("iot_consensus{")
|| !line.contains("type=\"syncLag\"")
|| !line.contains("region=\"" + dataRegionTag + "\"")) {
continue;
}
final int lastSpaceIndex = line.lastIndexOf(' ');
Assert.assertTrue("Malformed syncLag metric line: " + line, lastSpaceIndex > 0);
Assert.assertEquals(
"Expected syncLag of " + dataRegionTag + " to be 0 at " + metricsUrl + " but got " + line,
0.0,
Double.parseDouble(line.substring(lastSpaceIndex + 1)),
SYNC_LAG_DELTA);
return;
}
fail("No syncLag metric found for " + dataRegionTag + " at " + metricsUrl);
}

@Test
public void testPipeAfterRegisterNewDataNode() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
Expand Down
Loading