Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ public final class ManagerMessages {
"Heartbeat service is started successfully.";
public static final String HEARTBEAT_SERVICE_IS_STOPPED_SUCCESSFULLY =
"Heartbeat service is stopped successfully.";
public static final String RECONFIGURE_PEER_PRIORITIES_FAILED =
"Failed to reconfigure ConfigNode peer priorities to {}.";
public static final String INCORRECT_VERSION_OF = "Incorrect version of ";
public static final String INIT_CONSENSUSMANAGER_SUCCESSFULLY_WHEN_RESTARTED =
"Init ConsensusManager successfully when restarted";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public final class ManagerMessages {
"Heartbeat service is started successfully.";
public static final String HEARTBEAT_SERVICE_IS_STOPPED_SUCCESSFULLY =
"Heartbeat service is stopped successfully.";
public static final String RECONFIGURE_PEER_PRIORITIES_FAILED = "调整 ConfigNode 节点优先级到 {} 失败。";
public static final String INCORRECT_VERSION_OF = "Incorrect version of ";
public static final String INIT_CONSENSUSMANAGER_SUCCESSFULLY_WHEN_RESTARTED =
"Init ConsensusManager successfully when restarted";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.io.File;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;

public class ConfigNodeConfig {

Expand Down Expand Up @@ -506,6 +507,15 @@ public void setConsensusDir(String consensusDir) {
this.consensusDir = consensusDir;
}

/**
* Directories whose loss would render this ConfigNode unable to serve. Used by the periodic
* disk-health check on both leader (in HeartbeatService loop) and followers (in the
* heartbeat-receive path).
*/
public List<String> getCriticalDirs() {
return Arrays.asList(systemDir, consensusDir);
}

public String getConfigNodeConsensusProtocolClass() {
return configNodeConsensusProtocolClass;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public LoadManager(IManager configManager) {
setHeartbeatService(configManager, loadCache);
this.statisticsService = new StatisticsService(loadCache);
this.topologyService = new TopologyService(configManager, loadCache::updateTopology);
this.eventService = new EventService(loadCache);
this.eventService = new EventService(configManager, loadCache);
this.eventService.register(configManager.getPipeManager().getPipeRuntimeCoordinator());
this.eventService.register(routeBalancer);
this.eventService.register(topologyService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,12 +520,22 @@
.orElseGet(() -> NodeStatus.Unknown.getStatus() + "(NoHeartbeat)");
}

/**

Check warning on line 523 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Summary javadoc is missing.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5EnVvbwxx8uFGPOygb&open=AZ5EnVvbwxx8uFGPOygb&pullRequest=17724
* @return The raw {@code statusReason} string for {@code nodeId}, or {@code null} when no cache
* entry exists yet or no reason has been reported.
*/
public String getNodeStatusReason(int nodeId) {
return Optional.ofNullable(nodeCacheMap.get(nodeId))
.map(BaseNodeCache::getStatusReason)
.orElse(null);
}

/**
* Get all Node's current status with reason.
*
* @return Map<NodeId, NodeStatus with reason>
*/
public Map<Integer, String> getNodeStatusWithReason() {

Check warning on line 538 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

All overloaded methods should be placed next to each other. Placing non-overloaded methods in between overloaded methods with the same type is a violation. Previous overloaded method located at line '517'.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5EnVvbwxx8uFGPOygc&open=AZ5EnVvbwxx8uFGPOygc&pullRequest=17724
return nodeCacheMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getNodeStatusWithReason()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,11 @@
? statistics.getStatus().getStatus()
: statistics.getStatus().getStatus() + "(" + statistics.getStatusReason() + ")";
}

/**

Check warning on line 70 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Summary javadoc is missing.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5EnVwVwxx8uFGPOygd&open=AZ5EnVwVwxx8uFGPOygd&pullRequest=17724
* @return The raw reason string (may be {@code null}) accompanying the current NodeStatus.
*/
public String getStatusReason() {
return ((NodeStatistics) currentStatistics.get()).getStatusReason();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.manager.load.cache.node;

import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;

Expand Down Expand Up @@ -49,28 +50,39 @@

@Override
public synchronized void updateCurrentStatistics(boolean forceUpdate) {
// Skip itself and the Removing status can not be updated
if (nodeId == CURRENT_NODE_ID || NodeStatus.Removing.equals(getNodeStatus())) {
// Removing status can not be updated
if (NodeStatus.Removing.equals(getNodeStatus())) {
return;
}

NodeHeartbeatSample lastSample;
// Update Node status
NodeStatus status;
long currentNanoTime = System.nanoTime();
final List<AbstractHeartbeatSample> heartbeatHistory;
synchronized (slidingWindow) {
lastSample = (NodeHeartbeatSample) getLastSample();
heartbeatHistory = Collections.unmodifiableList(slidingWindow);
NodeStatus status;
String statusReason;

if (nodeId == CURRENT_NODE_ID) {
// Self entry: heartbeat loop never sends to itself, so mirror the status that
// this ConfigNode's local disk-check / startup wrote into CommonConfig.
status = CommonDescriptor.getInstance().getConfig().getNodeStatus();
statusReason = CommonDescriptor.getInstance().getConfig().getStatusReason();
} else {
NodeHeartbeatSample lastSample;
final List<AbstractHeartbeatSample> heartbeatHistory;
synchronized (slidingWindow) {
lastSample = (NodeHeartbeatSample) getLastSample();
heartbeatHistory = Collections.unmodifiableList(slidingWindow);

if (lastSample == null) {
/* First heartbeat not received from this ConfigNode, status is UNKNOWN */
status = NodeStatus.Unknown;
} else if (!failureDetector.isAvailable(nodeId, heartbeatHistory)) {
/* Failure detector decides that this ConfigNode is UNKNOWN */
status = NodeStatus.Unknown;
} else {
status = lastSample.getStatus();
if (lastSample == null) {
/* First heartbeat not received from this ConfigNode, status is UNKNOWN */
status = NodeStatus.Unknown;
statusReason = null;
} else if (!failureDetector.isAvailable(nodeId, heartbeatHistory)) {
/* Failure detector decides that this ConfigNode is UNKNOWN */
status = NodeStatus.Unknown;
statusReason = null;
} else {

Check warning on line 82 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This branch's code block is the same as the block for the branch on line 74.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5DdKwc5iRc3QGuOFxD&open=AZ5DdKwc5iRc3QGuOFxD&pullRequest=17724
status = lastSample.getStatus();
statusReason = lastSample.getStatusReason();
}
}
}

Expand All @@ -79,6 +91,6 @@
// TODO: Construct load score module
long loadScore = NodeStatus.isNormalStatus(status) ? 0 : Long.MAX_VALUE;

currentStatistics.set(new NodeStatistics(currentNanoTime, status, null, loadScore));
currentStatistics.set(new NodeStatistics(currentNanoTime, status, statusReason, loadScore));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,11 @@ public NodeHeartbeatSample(TAIHeartbeatResp heartbeatResp) {
/** Constructor for ConfigNode sample. */
public NodeHeartbeatSample(TConfigNodeHeartbeatResp heartbeatResp) {
super(heartbeatResp.getTimestamp());
this.status = NodeStatus.Running;
this.statusReason = null;
this.status =
heartbeatResp.isSetStatus()
? NodeStatus.parse(heartbeatResp.getStatus())
: NodeStatus.Running;
this.statusReason = heartbeatResp.isSetStatusReason() ? heartbeatResp.getStatusReason() : null;
this.loadSample = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@

package org.apache.iotdb.confignode.manager.load.service;

import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
import org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupStatistics;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
Expand All @@ -34,6 +38,7 @@
import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
import org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatisticsChangeEvent;
import org.apache.iotdb.consensus.exception.ConsensusException;

import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
Expand All @@ -42,13 +47,17 @@
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* EventService periodically check statistics and broadcast corresponding change event if necessary.
Expand All @@ -68,14 +77,16 @@ public class EventService {
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
ThreadName.CONFIG_NODE_EVENT_SERVICE.getName());

private final IManager configManager;
private final LoadCache loadCache;
private final Map<Integer, NodeStatistics> previousNodeStatisticsMap;
private final Map<TConsensusGroupId, RegionGroupStatistics> previousRegionGroupStatisticsMap;
private final Map<TConsensusGroupId, ConsensusGroupStatistics>
previousConsensusGroupStatisticsMap;
private final EventBus eventPublisher;

public EventService(LoadCache loadCache) {
public EventService(IManager configManager, LoadCache loadCache) {
this.configManager = configManager;
this.loadCache = loadCache;
this.previousNodeStatisticsMap = new TreeMap<>();
this.previousRegionGroupStatisticsMap = new TreeMap<>();
Expand Down Expand Up @@ -154,6 +165,55 @@ public synchronized void checkAndBroadcastNodeStatisticsChangeEventIfNecessary()
if (!differentNodeStatisticsMap.isEmpty()) {
eventPublisher.post(new NodeStatisticsChangeEvent(differentNodeStatisticsMap));
recordNodeStatistics(differentNodeStatisticsMap);
reconcileConfigNodePriorities(differentNodeStatisticsMap);
}
}

/**
* Demote a ConfigNode's ConfigRegion election priority when its {@link NodeStatus} degrades (see
* {@link NodeStatus#priorityForStatus}). Runs only on the leader and only pushes peers whose
* priority bucket actually moved.
*/
private void reconcileConfigNodePriorities(
Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap) {
ConsensusManager consensusManager = configManager.getConsensusManager();
if (consensusManager == null || !consensusManager.isLeader()) {
return;
}
Set<Integer> configNodeIds =
configManager.getNodeManager().getRegisteredConfigNodes().stream()
.map(TConfigNodeLocation::getConfigNodeId)
.collect(Collectors.toSet());
Map<Integer, Integer> desired = new HashMap<>();
differentNodeStatisticsMap.forEach(
(nodeId, change) -> {
NodeStatistics current = change.getRight();
if (!configNodeIds.contains(nodeId) || current == null) {
return;
}
OptionalInt newPriority =
NodeStatus.priorityForStatus(current.getStatus(), current.getStatusReason());
if (!newPriority.isPresent()) {
return;
}
NodeStatistics previous = change.getLeft();
OptionalInt oldPriority =
previous == null
? OptionalInt.empty()
: NodeStatus.priorityForStatus(previous.getStatus(), previous.getStatusReason());
if (!oldPriority.isPresent() || oldPriority.getAsInt() != newPriority.getAsInt()) {
desired.put(nodeId, newPriority.getAsInt());
}
});
if (desired.isEmpty()) {
return;
}
try {
consensusManager
.getConsensusImpl()
.reconfigurePeerPriorities(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID, desired);
} catch (ConsensusException e) {
LOGGER.warn(ManagerMessages.RECONFIGURE_PEER_PRIORITIES_FAILED, desired, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.cluster.DiskChecker;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.confignode.client.async.AsyncAINodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
Expand Down Expand Up @@ -81,6 +83,15 @@ public class HeartbeatService {
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
ThreadName.CONFIG_NODE_HEART_BEAT_SERVICE.getName());
private final AtomicLong heartbeatCounter = new AtomicLong(0);

/**
* Sampling cadence for cluster-wide load and disk-health probes: one in every {@code N}
* heartbeats triggers the probe path on DataNodes, AINodes, the ConfigNode leader (itself), and
* ConfigNode followers (on receive). Keeping every probe site on the same cadence is what makes
* the cluster-wide view temporally consistent.
*/
public static final int LOAD_SAMPLING_INTERVAL = 10;

private static final int configNodeListPeriodicallySyncInterval = 100;

public HeartbeatService(IManager configManager, LoadCache loadCache) {
Expand Down Expand Up @@ -134,6 +145,13 @@ private void heartbeatLoopBody() {
genHeartbeatReq(), getNodeManager().getRegisteredDataNodes());
// Send heartbeat requests to all the registered AINodes
pingRegisteredAINodes(genAIHeartbeatReq(), getNodeManager().getRegisteredAINodes());
// Self-check free-space after the async fanout so the OS call doesn't delay it.
if (heartbeatCounter.get() % LOAD_SAMPLING_INTERVAL == 0) {
DiskChecker.checkFreeRatioAndApply(
ConfigNodeDescriptor.getInstance().getConf().getCriticalDirs(),
CommonDescriptor.getInstance().getConfig().getDiskSpaceWarningThreshold());
}
heartbeatCounter.getAndIncrement();
}
});
}
Expand All @@ -149,8 +167,7 @@ protected TDataNodeHeartbeatReq genHeartbeatReq() {
.getLogicalClock(ConfigNodeInfo.CONFIG_REGION_ID));
// Always sample RegionGroups' leadership as the Region heartbeat
heartbeatReq.setNeedJudgeLeader(true);
// We sample DataNode's load in every 10 heartbeat loop
heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % LOAD_SAMPLING_INTERVAL == 0);
Pair<Long, Long> schemaQuotaRemain =
configManager.getClusterSchemaManager().getSchemaQuotaRemain();
heartbeatReq.setTimeSeriesQuotaRemain(schemaQuotaRemain.left);
Expand All @@ -174,9 +191,6 @@ protected TDataNodeHeartbeatReq genHeartbeatReq() {
configManager.getProcedureManager().getRegionOperationConsensusIds());
}

/* Update heartbeat counter */
heartbeatCounter.getAndIncrement();

return heartbeatReq;
}

Expand All @@ -203,16 +217,15 @@ private void addConfigNodeLocationsToReq(int dataNodeId, TDataNodeHeartbeatReq r
protected TConfigNodeHeartbeatReq genConfigNodeHeartbeatReq() {
TConfigNodeHeartbeatReq req = new TConfigNodeHeartbeatReq();
req.setTimestamp(System.nanoTime());
req.setNeedSamplingLoad(heartbeatCounter.get() % LOAD_SAMPLING_INTERVAL == 0);
return req;
}

private TAIHeartbeatReq genAIHeartbeatReq() {
/* Generate heartbeat request */
TAIHeartbeatReq heartbeatReq = new TAIHeartbeatReq();
heartbeatReq.setHeartbeatTimestamp(System.nanoTime());

// We sample AINode's load in every 10 heartbeat loop
heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % LOAD_SAMPLING_INTERVAL == 0);

return heartbeatReq;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iotdb.commons.auth.entity.PrivilegeModelType;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.auth.entity.PrivilegeUnion;
import org.apache.iotdb.commons.cluster.DiskChecker;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
Expand Down Expand Up @@ -1085,6 +1086,17 @@ public TRegionRouteMapResp getLatestRegionRouteMap() {
public TConfigNodeHeartbeatResp getConfigNodeHeartBeat(TConfigNodeHeartbeatReq heartbeatReq) {
TConfigNodeHeartbeatResp resp = new TConfigNodeHeartbeatResp();
resp.setTimestamp(heartbeatReq.getTimestamp());
// Sample free-space only when the leader flags this heartbeat for load sampling — the same
// gate DataNode/AINode follow. DiskCrash is observed passively from the Ratis write-path
// on this node, not polled here.
if (heartbeatReq.isSetNeedSamplingLoad() && heartbeatReq.isNeedSamplingLoad()) {
DiskChecker.checkFreeRatioAndApply(
configNodeConfig.getCriticalDirs(), commonConfig.getDiskSpaceWarningThreshold());
}
resp.setStatus(commonConfig.getNodeStatus().getStatus());
if (commonConfig.getStatusReason() != null) {
resp.setStatusReason(commonConfig.getStatusReason());
}
return resp;
}

Expand Down
Loading
Loading