diff --git a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java index 5dbfe15157668..26db73e3a7852 100644 --- a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java +++ b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java @@ -216,6 +216,8 @@ public final class ManagerMessages { "Heartbeat service is started successfully."; public static final String HEARTBEAT_SERVICE_IS_STOPPED_SUCCESSFULLY = "Heartbeat service is stopped successfully."; + public static final String RECONFIGURE_PEER_PRIORITIES_FAILED = + "Failed to reconfigure ConfigNode peer priorities to {}."; public static final String INCORRECT_VERSION_OF = "Incorrect version of "; public static final String INIT_CONSENSUSMANAGER_SUCCESSFULLY_WHEN_RESTARTED = "Init ConsensusManager successfully when restarted"; diff --git a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java index 836fe7dd60b21..6889681d5117c 100644 --- a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java +++ b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java @@ -214,6 +214,7 @@ public final class ManagerMessages { "Heartbeat service is started successfully."; public static final String HEARTBEAT_SERVICE_IS_STOPPED_SUCCESSFULLY = "Heartbeat service is stopped successfully."; + public static final String RECONFIGURE_PEER_PRIORITIES_FAILED = "调整 ConfigNode 节点优先级到 {} 失败。"; public static final String INCORRECT_VERSION_OF = "Incorrect version of "; public static final String INIT_CONSENSUSMANAGER_SUCCESSFULLY_WHEN_RESTARTED = "Init ConsensusManager successfully when restarted"; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index aacbe20203f5b..203032256d62d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -41,6 +41,7 @@ import java.io.File; import java.lang.reflect.Field; import java.util.Arrays; +import java.util.List; public class ConfigNodeConfig { @@ -506,6 +507,15 @@ public void setConsensusDir(String consensusDir) { this.consensusDir = consensusDir; } + /** + * Directories whose loss would render this ConfigNode unable to serve. Used by the periodic + * disk-health check on both leader (in HeartbeatService loop) and followers (in the + * heartbeat-receive path). + */ + public List getCriticalDirs() { + return Arrays.asList(systemDir, consensusDir); + } + public String getConfigNodeConsensusProtocolClass() { return configNodeConsensusProtocolClass; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index e97f32bdbda85..2706226b07578 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -86,7 +86,7 @@ public LoadManager(IManager configManager) { setHeartbeatService(configManager, loadCache); this.statisticsService = new StatisticsService(loadCache); this.topologyService = new TopologyService(configManager, loadCache::updateTopology); - this.eventService = new EventService(loadCache); + this.eventService = new EventService(configManager, loadCache); this.eventService.register(configManager.getPipeManager().getPipeRuntimeCoordinator()); this.eventService.register(routeBalancer); this.eventService.register(topologyService); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index 818171c89bcc8..5a696811f4f43 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -520,6 +520,16 @@ public String getNodeStatusWithReason(int nodeId) { .orElseGet(() -> NodeStatus.Unknown.getStatus() + "(NoHeartbeat)"); } + /** + * @return The raw {@code statusReason} string for {@code nodeId}, or {@code null} when no cache + * entry exists yet or no reason has been reported. + */ + public String getNodeStatusReason(int nodeId) { + return Optional.ofNullable(nodeCacheMap.get(nodeId)) + .map(BaseNodeCache::getStatusReason) + .orElse(null); + } + /** * Get all Node's current status with reason. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java index 4eeb96344ccd1..434d5bbf3fe25 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java @@ -66,4 +66,11 @@ public String getNodeStatusWithReason() { ? statistics.getStatus().getStatus() : statistics.getStatus().getStatus() + "(" + statistics.getStatusReason() + ")"; } + + /** + * @return The raw reason string (may be {@code null}) accompanying the current NodeStatus. + */ + public String getStatusReason() { + return ((NodeStatistics) currentStatistics.get()).getStatusReason(); + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java index 4d675e1e8a442..7228fca648cfe 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.manager.load.cache.node; import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample; @@ -49,28 +50,39 @@ public ConfigNodeHeartbeatCache(int configNodeId, NodeStatistics statistics) { @Override public synchronized void updateCurrentStatistics(boolean forceUpdate) { - // Skip itself and the Removing status can not be updated - if (nodeId == CURRENT_NODE_ID || NodeStatus.Removing.equals(getNodeStatus())) { + // Removing status can not be updated + if (NodeStatus.Removing.equals(getNodeStatus())) { return; } - NodeHeartbeatSample lastSample; - // Update Node status - NodeStatus status; long currentNanoTime = System.nanoTime(); - final List heartbeatHistory; - synchronized (slidingWindow) { - lastSample = (NodeHeartbeatSample) getLastSample(); - heartbeatHistory = Collections.unmodifiableList(slidingWindow); + NodeStatus status; + String statusReason; + + if (nodeId == CURRENT_NODE_ID) { + // Self entry: heartbeat loop never sends to itself, so mirror the status that + // this ConfigNode's local disk-check / startup wrote into CommonConfig. + status = CommonDescriptor.getInstance().getConfig().getNodeStatus(); + statusReason = CommonDescriptor.getInstance().getConfig().getStatusReason(); + } else { + NodeHeartbeatSample lastSample; + final List heartbeatHistory; + synchronized (slidingWindow) { + lastSample = (NodeHeartbeatSample) getLastSample(); + heartbeatHistory = Collections.unmodifiableList(slidingWindow); - if (lastSample == null) { - /* First heartbeat not received from this ConfigNode, status is UNKNOWN */ - status = NodeStatus.Unknown; - } else if (!failureDetector.isAvailable(nodeId, heartbeatHistory)) { - /* Failure detector decides that this ConfigNode is UNKNOWN */ - status = NodeStatus.Unknown; - } else { - status = lastSample.getStatus(); + if (lastSample == null) { + /* First heartbeat not received from this ConfigNode, status is UNKNOWN */ + status = NodeStatus.Unknown; + statusReason = null; + } else if (!failureDetector.isAvailable(nodeId, heartbeatHistory)) { + /* Failure detector decides that this ConfigNode is UNKNOWN */ + status = NodeStatus.Unknown; + statusReason = null; + } else { + status = lastSample.getStatus(); + statusReason = lastSample.getStatusReason(); + } } } @@ -79,6 +91,6 @@ public synchronized void updateCurrentStatistics(boolean forceUpdate) { // TODO: Construct load score module long loadScore = NodeStatus.isNormalStatus(status) ? 0 : Long.MAX_VALUE; - currentStatistics.set(new NodeStatistics(currentNanoTime, status, null, loadScore)); + currentStatistics.set(new NodeStatistics(currentNanoTime, status, statusReason, loadScore)); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java index 8217593f5d67d..ca3f6112201be 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java @@ -74,8 +74,11 @@ public NodeHeartbeatSample(TAIHeartbeatResp heartbeatResp) { /** Constructor for ConfigNode sample. */ public NodeHeartbeatSample(TConfigNodeHeartbeatResp heartbeatResp) { super(heartbeatResp.getTimestamp()); - this.status = NodeStatus.Running; - this.statusReason = null; + this.status = + heartbeatResp.isSetStatus() + ? NodeStatus.parse(heartbeatResp.getStatus()) + : NodeStatus.Running; + this.statusReason = heartbeatResp.isSetStatusReason() ? heartbeatResp.getStatusReason() : null; this.loadSample = null; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java index 762e6b1782b4c..9828d905a8a1c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java @@ -19,13 +19,17 @@ package org.apache.iotdb.confignode.manager.load.service; +import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.i18n.ManagerMessages; +import org.apache.iotdb.confignode.manager.IManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.load.cache.LoadCache; import org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupStatistics; import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; @@ -34,6 +38,7 @@ import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber; import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent; import org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatisticsChangeEvent; +import org.apache.iotdb.consensus.exception.ConsensusException; import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus; @@ -42,13 +47,17 @@ import org.slf4j.LoggerFactory; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.OptionalInt; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * EventService periodically check statistics and broadcast corresponding change event if necessary. @@ -68,6 +77,7 @@ public class EventService { IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( ThreadName.CONFIG_NODE_EVENT_SERVICE.getName()); + private final IManager configManager; private final LoadCache loadCache; private final Map previousNodeStatisticsMap; private final Map previousRegionGroupStatisticsMap; @@ -75,7 +85,8 @@ public class EventService { previousConsensusGroupStatisticsMap; private final EventBus eventPublisher; - public EventService(LoadCache loadCache) { + public EventService(IManager configManager, LoadCache loadCache) { + this.configManager = configManager; this.loadCache = loadCache; this.previousNodeStatisticsMap = new TreeMap<>(); this.previousRegionGroupStatisticsMap = new TreeMap<>(); @@ -154,6 +165,55 @@ public synchronized void checkAndBroadcastNodeStatisticsChangeEventIfNecessary() if (!differentNodeStatisticsMap.isEmpty()) { eventPublisher.post(new NodeStatisticsChangeEvent(differentNodeStatisticsMap)); recordNodeStatistics(differentNodeStatisticsMap); + reconcileConfigNodePriorities(differentNodeStatisticsMap); + } + } + + /** + * Demote a ConfigNode's ConfigRegion election priority when its {@link NodeStatus} degrades (see + * {@link NodeStatus#priorityForStatus}). Runs only on the leader and only pushes peers whose + * priority bucket actually moved. + */ + private void reconcileConfigNodePriorities( + Map> differentNodeStatisticsMap) { + ConsensusManager consensusManager = configManager.getConsensusManager(); + if (consensusManager == null || !consensusManager.isLeader()) { + return; + } + Set configNodeIds = + configManager.getNodeManager().getRegisteredConfigNodes().stream() + .map(TConfigNodeLocation::getConfigNodeId) + .collect(Collectors.toSet()); + Map desired = new HashMap<>(); + differentNodeStatisticsMap.forEach( + (nodeId, change) -> { + NodeStatistics current = change.getRight(); + if (!configNodeIds.contains(nodeId) || current == null) { + return; + } + OptionalInt newPriority = + NodeStatus.priorityForStatus(current.getStatus(), current.getStatusReason()); + if (!newPriority.isPresent()) { + return; + } + NodeStatistics previous = change.getLeft(); + OptionalInt oldPriority = + previous == null + ? OptionalInt.empty() + : NodeStatus.priorityForStatus(previous.getStatus(), previous.getStatusReason()); + if (!oldPriority.isPresent() || oldPriority.getAsInt() != newPriority.getAsInt()) { + desired.put(nodeId, newPriority.getAsInt()); + } + }); + if (desired.isEmpty()) { + return; + } + try { + consensusManager + .getConsensusImpl() + .reconfigurePeerPriorities(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID, desired); + } catch (ConsensusException e) { + LOGGER.warn(ManagerMessages.RECONFIGURE_PEER_PRIORITIES_FAILED, desired, e); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java index a491b3960c3c3..73e28b49dc7b8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java @@ -24,9 +24,11 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.cluster.DiskChecker; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.confignode.client.async.AsyncAINodeHeartbeatClientPool; import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool; @@ -81,6 +83,15 @@ public class HeartbeatService { IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( ThreadName.CONFIG_NODE_HEART_BEAT_SERVICE.getName()); private final AtomicLong heartbeatCounter = new AtomicLong(0); + + /** + * Sampling cadence for cluster-wide load and disk-health probes: one in every {@code N} + * heartbeats triggers the probe path on DataNodes, AINodes, the ConfigNode leader (itself), and + * ConfigNode followers (on receive). Keeping every probe site on the same cadence is what makes + * the cluster-wide view temporally consistent. + */ + public static final int LOAD_SAMPLING_INTERVAL = 10; + private static final int configNodeListPeriodicallySyncInterval = 100; public HeartbeatService(IManager configManager, LoadCache loadCache) { @@ -134,6 +145,13 @@ private void heartbeatLoopBody() { genHeartbeatReq(), getNodeManager().getRegisteredDataNodes()); // Send heartbeat requests to all the registered AINodes pingRegisteredAINodes(genAIHeartbeatReq(), getNodeManager().getRegisteredAINodes()); + // Self-check free-space after the async fanout so the OS call doesn't delay it. + if (heartbeatCounter.get() % LOAD_SAMPLING_INTERVAL == 0) { + DiskChecker.checkFreeRatioAndApply( + ConfigNodeDescriptor.getInstance().getConf().getCriticalDirs(), + CommonDescriptor.getInstance().getConfig().getDiskSpaceWarningThreshold()); + } + heartbeatCounter.getAndIncrement(); } }); } @@ -149,8 +167,7 @@ protected TDataNodeHeartbeatReq genHeartbeatReq() { .getLogicalClock(ConfigNodeInfo.CONFIG_REGION_ID)); // Always sample RegionGroups' leadership as the Region heartbeat heartbeatReq.setNeedJudgeLeader(true); - // We sample DataNode's load in every 10 heartbeat loop - heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0); + heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % LOAD_SAMPLING_INTERVAL == 0); Pair schemaQuotaRemain = configManager.getClusterSchemaManager().getSchemaQuotaRemain(); heartbeatReq.setTimeSeriesQuotaRemain(schemaQuotaRemain.left); @@ -174,9 +191,6 @@ protected TDataNodeHeartbeatReq genHeartbeatReq() { configManager.getProcedureManager().getRegionOperationConsensusIds()); } - /* Update heartbeat counter */ - heartbeatCounter.getAndIncrement(); - return heartbeatReq; } @@ -203,6 +217,7 @@ private void addConfigNodeLocationsToReq(int dataNodeId, TDataNodeHeartbeatReq r protected TConfigNodeHeartbeatReq genConfigNodeHeartbeatReq() { TConfigNodeHeartbeatReq req = new TConfigNodeHeartbeatReq(); req.setTimestamp(System.nanoTime()); + req.setNeedSamplingLoad(heartbeatCounter.get() % LOAD_SAMPLING_INTERVAL == 0); return req; } @@ -210,9 +225,7 @@ private TAIHeartbeatReq genAIHeartbeatReq() { /* Generate heartbeat request */ TAIHeartbeatReq heartbeatReq = new TAIHeartbeatReq(); heartbeatReq.setHeartbeatTimestamp(System.nanoTime()); - - // We sample AINode's load in every 10 heartbeat loop - heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0); + heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % LOAD_SAMPLING_INTERVAL == 0); return heartbeatReq; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index dbb35839045d0..6518d014d5a25 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -40,6 +40,7 @@ import org.apache.iotdb.commons.auth.entity.PrivilegeModelType; import org.apache.iotdb.commons.auth.entity.PrivilegeType; import org.apache.iotdb.commons.auth.entity.PrivilegeUnion; +import org.apache.iotdb.commons.cluster.DiskChecker; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.ConsensusGroupId; @@ -1085,6 +1086,17 @@ public TRegionRouteMapResp getLatestRegionRouteMap() { public TConfigNodeHeartbeatResp getConfigNodeHeartBeat(TConfigNodeHeartbeatReq heartbeatReq) { TConfigNodeHeartbeatResp resp = new TConfigNodeHeartbeatResp(); resp.setTimestamp(heartbeatReq.getTimestamp()); + // Sample free-space only when the leader flags this heartbeat for load sampling — the same + // gate DataNode/AINode follow. DiskCrash is observed passively from the Ratis write-path + // on this node, not polled here. + if (heartbeatReq.isSetNeedSamplingLoad() && heartbeatReq.isNeedSamplingLoad()) { + DiskChecker.checkFreeRatioAndApply( + configNodeConfig.getCriticalDirs(), commonConfig.getDiskSpaceWarningThreshold()); + } + resp.setStatus(commonConfig.getNodeStatus().getStatus()); + if (commonConfig.getStatusReason() != null) { + resp.setStatusReason(commonConfig.getStatusReason()); + } return resp; } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java index c462aa3a046e1..20b2af0d1ba24 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java @@ -167,6 +167,24 @@ public interface IConsensus { */ void resetPeerList(ConsensusGroupId groupId, List correctPeers) throws ConsensusException; + /** + * Adjust the leader-election priority of one or more peers in {@code groupId}. + * + *

Implementations that have no concept of per-peer priority (e.g. Simple, IoT consensus) + * should leave the default no-op. The Ratis implementation rewrites the group configuration so a + * peer with a lower priority is less likely to win subsequent elections — this is the lever the + * cluster uses to demote a {@code ReadOnly} ConfigNode. + * + * @param groupId the consensus group whose peer priorities should be updated + * @param nodeIdToPriority desired priorities keyed by node id; peers not in the map keep their + * current priority + * @throws ConsensusException if the underlying reconfiguration fails + */ + default void reconfigurePeerPriorities( + ConsensusGroupId groupId, Map nodeIdToPriority) throws ConsensusException { + // no-op default + } + // management API /** diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java index 1134d8fd6f206..9eda0b4b27cc3 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java @@ -69,17 +69,20 @@ public class ApplicationStateMachineProxy extends BaseStateMachine { private final RaftGroupId groupId; private final TConsensusGroupType consensusGroupType; private final BiConsumer leaderChangeListener; + private final BiConsumer diskFailureListener; ApplicationStateMachineProxy(IStateMachine stateMachine, RaftGroupId id) { - this(stateMachine, id, null); + this(stateMachine, id, null, null); } ApplicationStateMachineProxy( IStateMachine stateMachine, RaftGroupId id, - BiConsumer onLeaderChanged) { + BiConsumer onLeaderChanged, + BiConsumer onDiskFailure) { this.applicationStateMachine = stateMachine; this.leaderChangeListener = onLeaderChanged; + this.diskFailureListener = onDiskFailure; this.groupId = id; snapshotStorage = new SnapshotStorage(applicationStateMachine, groupId); consensusGroupType = Utils.getConsensusGroupTypeFromPrefix(groupId.toString()); @@ -156,6 +159,9 @@ public CompletableFuture applyTransaction(TransactionContext trx) { break; } catch (Throwable rte) { logger.error(RatisMessages.STATEMACHINE_RUNTIME_EXCEPTION, rte); + if (Utils.isDiskFailure(rte) && diskFailureListener != null) { + diskFailureListener.accept(groupId, rte); + } ret = new ResponseMessage( new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index 372b45b8afc6f..22b1c185b6693 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.client.IClientPoolFactory; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.property.ClientPoolProperty; +import org.apache.iotdb.commons.cluster.DiskChecker; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.request.IConsensusRequest; import org.apache.iotdb.commons.service.metric.MetricService; @@ -234,7 +235,8 @@ public RatisConsensus(ConsensusConfig config, IStateMachine.Registry registry) { registry.apply( Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId)), raftGroupId, - this::onLeaderChanged)) + this::onLeaderChanged, + this::onDiskFailure)) .build()); } @@ -329,9 +331,9 @@ public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request) throw new ConsensusGroupNotExistException(groupId); } - // current Peer is group leader and in ReadOnly State - // We only judge dataRegions here, because schema write when readOnly is handled at - // RegionWriteExecutor + // Current peer is group leader and in ReadOnly state. SchemaRegion writes are gated at + // RegionWriteExecutor; here we step down DataRegion and ConfigRegion leaders so that a + // healthy peer can take over and continue serving writes. if (isLeader(groupId) && Utils.rejectWrite(consensusGroupType)) { try { forceStepDownLeader(raftGroup); @@ -369,6 +371,7 @@ && waitUntilLeaderReady(raftGroupId)) { } catch (GroupMismatchException e) { throw new ConsensusGroupNotExistException(groupId); } catch (Exception e) { + maybeReportDiskFailure(raftGroupId, e); throw new RatisRequestFailedException(e); } } @@ -386,6 +389,7 @@ && waitUntilLeaderReady(raftGroupId)) { } catch (GroupMismatchException e) { throw new ConsensusGroupNotExistException(groupId); } catch (Exception e) { + maybeReportDiskFailure(raftGroupId, e); throw new RatisRequestFailedException(e); } @@ -964,6 +968,37 @@ private RatisClient getRaftClient(RaftGroup group) throws ClientManagerException } } + @Override + public void reconfigurePeerPriorities( + ConsensusGroupId groupId, Map nodeIdToPriority) throws ConsensusException { + if (nodeIdToPriority == null || nodeIdToPriority.isEmpty()) { + return; + } + RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId); + RaftGroup group = getGroupInfo(raftGroupId); + if (group == null || !group.getPeers().contains(myself)) { + throw new ConsensusGroupNotExistException(groupId); + } + // Mechanical merge: every peer in nodeIdToPriority gets the requested priority, every other + // peer keeps its current one. Deciding *whether* a priority change is worth pushing is the + // caller's job (see ConfigRegionPriorityBalancer); this method just applies what it is told. + List newPeers = new ArrayList<>(group.getPeers().size()); + for (RaftPeer p : group.getPeers()) { + Integer desired = nodeIdToPriority.get(Utils.fromRaftPeerIdToNodeId(p.getId())); + if (desired == null) { + newPeers.add(p); + } else { + newPeers.add( + RaftPeer.newBuilder() + .setId(p.getId()) + .setAddress(p.getAddress()) + .setPriority(desired) + .build()); + } + } + sendReconfiguration(RaftGroup.valueOf(raftGroupId, newPeers)); + } + private RatisClient getConfigurationRaftClient(RaftGroup group) throws ClientManagerException { try { return reconfigurationClientManager.borrowClient(group); @@ -990,6 +1025,26 @@ private RaftClientReply sendReconfiguration(RaftGroup newGroupConf) return reply; } + /** + * Called from the state machine apply path and from the write-path catch blocks when a Ratis + * operation surfaces a disk-level error (see {@link Utils#isDiskFailure(Throwable)}). Drives the + * node to {@code ReadOnly(DiskCrash)} via {@link DiskChecker#apply}, which encapsulates the + * priority rules (DiskCrash wins over DiskFull, etc.). + */ + private void onDiskFailure(RaftGroupId groupId, Throwable cause) { + logger.error( + "Disk failure observed in Ratis group {}; marking node ReadOnly(DiskCrash).", + groupId, + cause); + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + } + + private void maybeReportDiskFailure(RaftGroupId groupId, Throwable cause) { + if (Utils.isDiskFailure(cause)) { + onDiskFailure(groupId, cause); + } + } + private void onLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId leaderId) { Optional.ofNullable(canServeStaleRead) .ifPresent( diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java index 2c24ff12b6d36..67120caadc2c7 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java @@ -230,7 +230,10 @@ public static TConsensusGroupType getConsensusGroupTypeFromPrefix(String prefix) } public static boolean rejectWrite(TConsensusGroupType type) { - return type == TConsensusGroupType.DataRegion && config.isReadOnly(); + // SchemaRegion writes are gated by RegionWriteExecutor at a higher level, so we only need + // to short-circuit DataRegion and ConfigRegion Ratis writes here. + return (type == TConsensusGroupType.DataRegion || type == TConsensusGroupType.ConfigRegion) + && config.isReadOnly(); } /** @@ -240,7 +243,30 @@ public static boolean rejectWrite(TConsensusGroupType type) { * still allow statemachine to apply while rejecting new client write requests. */ public static boolean stallApply(TConsensusGroupType type) { - return type == TConsensusGroupType.DataRegion && config.isReadOnly() && !config.isStopping(); + return (type == TConsensusGroupType.DataRegion || type == TConsensusGroupType.ConfigRegion) + && config.isReadOnly() + && !config.isStopping(); + } + + /** + * Treat a throwable (including any wrapped cause) as a disk-level failure when it carries an + * {@link java.io.IOError} or a {@link java.nio.file.FileSystemException}. Both are concrete + * filesystem-layer signals: {@code IOError} is thrown by the JVM for unrecoverable storage + * errors, and {@code FileSystemException} surfaces I/O syscalls failing on a specific file. Plain + * {@link java.io.IOException} is intentionally excluded — it is also raised by network code and + * would otherwise misclassify transient connectivity failures as disk crashes. + */ + public static boolean isDiskFailure(Throwable t) { + for (Throwable cur = t; cur != null; cur = cur.getCause()) { + if (cur instanceof java.io.IOError || cur instanceof java.nio.file.FileSystemException) { + return true; + } + // Guard against pathological self-referencing cause chains + if (cur.getCause() == cur) { + break; + } + } + return false; } /** return the max wait duration for retry */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index e5c996405649d..1ab957b5dfcf8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -49,6 +49,7 @@ import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.auth.entity.PrivilegeType; import org.apache.iotdb.commons.client.request.AsyncRequestContext; +import org.apache.iotdb.commons.cluster.DiskChecker; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.Await; import org.apache.iotdb.commons.concurrent.AwaitTimeoutException; @@ -213,6 +214,7 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager; import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; @@ -2424,29 +2426,36 @@ private void sampleDiskLoad(TLoadSample loadSample) { SYSTEM) .getValue(); + // Derive the disk status: an ABNORMAL data folder observed by any FolderManager wins over + // a full-disk reading, and a low free-space ratio still drives DISK_FULL when nothing is + // crashed. DiskChecker.apply then performs the actual NodeStatus transition (including the + // "ReadOnly(DiskFull|DiskCrash) -> Running" recovery for the DiskFull path). + DiskChecker.DiskStatus diskStatus = DiskChecker.DiskStatus.NORMAL; + if (FolderManager.hasAnyAbnormalFolder()) { + diskStatus = DiskChecker.DiskStatus.DISK_CRASH; + } if (availableDisk != 0 && totalDisk != 0) { double freeDiskRatio = availableDisk / totalDisk; loadSample.setFreeDiskSpace(availableDisk); loadSample.setDiskUsageRate(1d - freeDiskRatio); - // Reset NodeStatus if necessary - if (freeDiskRatio < commonConfig.getDiskSpaceWarningThreshold()) { + if (diskStatus == DiskChecker.DiskStatus.NORMAL + && freeDiskRatio < commonConfig.getDiskSpaceWarningThreshold()) { LOGGER.warn( "The available disk space is : {}, " + "the total disk space is : {}, " + "and the remaining disk usage ratio: {} is " - + "less than disk_space_warning_threshold: {}, set system to readonly!", + + "less than disk_space_warning_threshold: {}.", RamUsageEstimator.humanReadableUnits((long) availableDisk), RamUsageEstimator.humanReadableUnits((long) totalDisk), freeDiskRatio, commonConfig.getDiskSpaceWarningThreshold()); - commonConfig.setNodeStatus(NodeStatus.ReadOnly); - commonConfig.setStatusReason(NodeStatus.DISK_FULL); - } else if (NodeStatus.ReadOnly.equals(commonConfig.getNodeStatus()) - && NodeStatus.DISK_FULL.equals(commonConfig.getStatusReason())) { - commonConfig.setNodeStatus(NodeStatus.Running); - commonConfig.setStatusReason(null); + diskStatus = DiskChecker.DiskStatus.DISK_FULL; } + } else if (diskStatus == DiskChecker.DiskStatus.NORMAL) { + // Metrics not available yet — fall back to no-op so we don't churn the status. + return; } + DiskChecker.apply(diskStatus); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManager.java index 77a332152ab9f..80f739abbc381 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManager.java @@ -33,13 +33,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.ref.WeakReference; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; public class FolderManager { private static final Logger logger = LoggerFactory.getLogger(FolderManager.class); + /** + * Registry of every live {@link FolderManager} instance so the DataNode heartbeat path can ask + * "is any folder anywhere on this node currently ABNORMAL?" without each subsystem having to push + * state into a central reporter. Weak references avoid keeping short-lived managers alive (e.g. + * those created per snapshot/load). + */ + private static final List> ALL_INSTANCES = + new CopyOnWriteArrayList<>(); + /** Represents the operational states of a data folder. */ public enum FolderState { /** Indicates the folder is functioning normally with no issues. */ @@ -62,6 +74,7 @@ public FolderManager(List folders, DirectoryStrategyType type) throws DiskSpaceInsufficientException { this.folders = folders; folders.forEach(dir -> foldersStates.put(dir, FolderState.HEALTHY)); + ALL_INSTANCES.add(new WeakReference<>(this)); switch (type) { case SEQUENCE_STRATEGY: this.selectStrategy = new SequenceStrategy(); @@ -147,4 +160,34 @@ public T getNextWithRetry(ThrowingFunction getFolders() { return folders; } + + /** + * Walks every live FolderManager instance and reports whether any folder is currently {@link + * FolderState#ABNORMAL}. Used by the DataNode heartbeat path to derive a {@code + * NodeStatus.ReadOnly(DiskCrash)} signal from already-observed write failures. + * + *

Stale (GC'd) weak references are pruned as a side effect. + */ + public static boolean hasAnyAbnormalFolder() { + boolean anyAbnormal = false; + Iterator> it = ALL_INSTANCES.iterator(); + while (it.hasNext()) { + FolderManager fm = it.next().get(); + if (fm == null) { + continue; + } + for (FolderState state : fm.foldersStates.values()) { + if (state == FolderState.ABNORMAL) { + anyAbnormal = true; + break; + } + } + if (anyAbnormal) { + break; + } + } + // Prune dead entries. CopyOnWriteArrayList tolerates concurrent removeIf safely. + ALL_INSTANCES.removeIf(ref -> ref.get() == null); + return anyAbnormal; + } } diff --git a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java index 5a2e41961978f..f75daed4b2732 100644 --- a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java +++ b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java @@ -37,6 +37,14 @@ public final class CommonMessages { public static final String NODE_STATUS_NOT_EXIST = "NodeStatus %s doesn't exist."; public static final String UNKNOWN_NODE_STATUS = "Unknown NodeStatus %s."; + // --- disk health --- + public static final String DISK_FULL_SET_READ_ONLY = + "Free disk space ratio is below the configured threshold; set node status to ReadOnly(DiskFull)."; + public static final String DISK_CRASH_SET_READ_ONLY = + "Detected unwritable disk directory; set node status to ReadOnly(DiskCrash)."; + public static final String DISK_RECOVERED_SET_RUNNING = + "Disk health recovered (previous reason: {}); set node status to Running."; + // --- consensus --- public static final String UNRECOGNIZED_CONSENSUS_GROUP_ID = "Unrecognized ConsensusGroupId: %s"; diff --git a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java index a4b72b9e4271b..1b8b639affed0 100644 --- a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java +++ b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java @@ -36,6 +36,14 @@ public final class CommonMessages { public static final String NODE_STATUS_NOT_EXIST = "NodeStatus %s 不存在。"; public static final String UNKNOWN_NODE_STATUS = "未知 NodeStatus %s。"; + // --- disk health --- + public static final String DISK_FULL_SET_READ_ONLY = + "磁盘剩余空间比例低于配置阈值,将节点状态设为 ReadOnly(DiskFull)。"; + public static final String DISK_CRASH_SET_READ_ONLY = + "检测到不可写的磁盘目录,将节点状态设为 ReadOnly(DiskCrash)。"; + public static final String DISK_RECOVERED_SET_RUNNING = + "磁盘健康已恢复(先前原因:{}),将节点状态设为 Running。"; + // --- consensus --- public static final String UNRECOGNIZED_CONSENSUS_GROUP_ID = "无法识别的 ConsensusGroupId:%s"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DiskChecker.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DiskChecker.java new file mode 100644 index 0000000000000..682f9ad4a97a9 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DiskChecker.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.cluster; + +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.i18n.CommonMessages; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.List; + +/** + * Shared utility that drives the global {@link NodeStatus} on {@link CommonConfig} based on disk + * health signals. + * + *

Two sources feed it: + * + *

    + *
  • {@link #checkFreeRatioAndApply} polls usable / total space across critical directories and + * sets {@code ReadOnly(DISK_FULL)} when the ratio drops below a threshold. + *
  • {@link #apply}{@code (DISK_CRASH)} is called from passive failure observers — Ratis + * write-path catch blocks on ConfigNode and {@code FolderManager.ABNORMAL} aggregation on + * DataNode — when a real write IO error has already occurred. + *
+ * + *

Only transitions between {@link NodeStatus#Running} and {@link NodeStatus#ReadOnly} with + * reason {@link NodeStatus#DISK_FULL}/{@link NodeStatus#DISK_CRASH} are managed here. Other + * ReadOnly reasons (e.g. manual) are left untouched. + */ +public class DiskChecker { + + private static final Logger LOGGER = LoggerFactory.getLogger(DiskChecker.class); + + public enum DiskStatus { + NORMAL, + DISK_FULL, + DISK_CRASH + } + + private DiskChecker() {} + + /** + * Evaluate the usable/total space ratio of each directory. Any directory whose ratio is below + * {@code freeRatioThreshold} yields {@link DiskStatus#DISK_FULL}; otherwise {@link + * DiskStatus#NORMAL}. This method never returns {@link DiskStatus#DISK_CRASH} — crash detection + * is driven by the Ratis write-path observer on ConfigNode and by {@code FolderManager} on + * DataNode, both of which call {@link #apply} directly. + */ + public static DiskStatus checkFreeRatio(List dirs, double freeRatioThreshold) { + if (dirs == null || dirs.isEmpty()) { + return DiskStatus.NORMAL; + } + for (String dir : dirs) { + if (dir == null || dir.isEmpty()) { + continue; + } + File f = new File(dir); + long total = f.getTotalSpace(); + long usable = f.getUsableSpace(); + if (total > 0 && (double) usable / total < freeRatioThreshold) { + return DiskStatus.DISK_FULL; + } + } + return DiskStatus.NORMAL; + } + + /** Convenience: run {@link #checkFreeRatio} and apply the result to {@link CommonConfig}. */ + public static void checkFreeRatioAndApply(List dirs, double freeRatioThreshold) { + apply(checkFreeRatio(dirs, freeRatioThreshold)); + } + + /** + * Apply a precomputed status to {@link CommonConfig}. Priority is {@code DiskCrash > DiskFull > + * Normal}; recovery to {@code Running} only fires when the active reason was disk-related. + */ + public static void apply(DiskStatus result) { + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + NodeStatus currentStatus = config.getNodeStatus(); + String currentReason = config.getStatusReason(); + boolean currentlyFull = + NodeStatus.ReadOnly.equals(currentStatus) && NodeStatus.DISK_FULL.equals(currentReason); + boolean currentlyCrash = + NodeStatus.ReadOnly.equals(currentStatus) && NodeStatus.DISK_CRASH.equals(currentReason); + + switch (result) { + case DISK_CRASH: + if (!currentlyCrash) { + LOGGER.warn(CommonMessages.DISK_CRASH_SET_READ_ONLY); + config.setNodeStatus(NodeStatus.ReadOnly); + config.setStatusReason(NodeStatus.DISK_CRASH); + } + break; + case DISK_FULL: + // DiskCrash has higher priority — do not downgrade an existing crash to full. + if (currentlyCrash) { + return; + } + if (!currentlyFull) { + LOGGER.warn(CommonMessages.DISK_FULL_SET_READ_ONLY); + config.setNodeStatus(NodeStatus.ReadOnly); + config.setStatusReason(NodeStatus.DISK_FULL); + } + break; + case NORMAL: + default: + // DiskCrash is sticky — only a restart clears it. The free-ratio probe can recover + // DiskFull alone because free-space reappearing is the literal inverse of running low. + if (currentlyFull) { + LOGGER.info(CommonMessages.DISK_RECOVERED_SET_RUNNING, currentReason); + config.setNodeStatus(NodeStatus.Running); + config.setStatusReason(null); + } + break; + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java index 518a9faaed2ec..ff64a666f98e4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java @@ -21,6 +21,8 @@ import org.apache.iotdb.commons.i18n.CommonMessages; +import java.util.OptionalInt; + /** Node status for showing cluster */ public enum NodeStatus { /** Node running properly */ @@ -35,6 +37,7 @@ public enum NodeStatus { /** Only query statements are permitted */ ReadOnly("ReadOnly"); public static final String DISK_FULL = "DiskFull"; + public static final String DISK_CRASH = "DiskCrash"; private final String status; @@ -60,6 +63,35 @@ public static boolean isNormalStatus(NodeStatus status) { return status != null && status.equals(NodeStatus.Running); } + /** + * Map a node's {@link NodeStatus} (and optional reason) to the Ratis peer priority that should + * govern its candidacy in leader elections. + * + *

+   *   Running                          →   0   (full candidate)
+   *   ReadOnly + {@link #DISK_FULL}    →  -1   (out-rank healthy peers but ahead of crashed)
+   *   ReadOnly + {@link #DISK_CRASH}   →  -2   (most degraded — last choice)
+   *   anything else                    →  empty (priority must not be changed)
+   * 
+ * + * Returning {@link OptionalInt#empty()} for Unknown/Removing/manual ReadOnly keeps transient + * blips and operator-driven states from rewriting peer priorities. + */ + public static OptionalInt priorityForStatus(NodeStatus status, String statusReason) { + if (Running.equals(status)) { + return OptionalInt.of(0); + } + if (ReadOnly.equals(status)) { + if (DISK_CRASH.equals(statusReason)) { + return OptionalInt.of(-2); + } + if (DISK_FULL.equals(statusReason)) { + return OptionalInt.of(-1); + } + } + return OptionalInt.empty(); + } + public static boolean isReadable(NodeStatus status) { switch (status) { case Running: diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java new file mode 100644 index 0000000000000..d6d16a986eae4 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.cluster; + +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class DiskCheckerTest { + + @Rule public TemporaryFolder tmp = new TemporaryFolder(); + + private NodeStatus savedStatus; + private String savedReason; + + @Before + public void setUp() { + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + savedStatus = config.getNodeStatus(); + savedReason = config.getStatusReason(); + config.setNodeStatus(NodeStatus.Running); + config.setStatusReason(null); + } + + @After + public void tearDown() { + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + config.setNodeStatus(savedStatus); + config.setStatusReason(savedReason); + } + + // -- checkFreeRatio ------------------------------------------------------------------------ + + @Test + public void checkFreeRatioReturnsNormalWhenRatioMeetsThreshold() throws Exception { + File dir = tmp.newFolder(); + assertEquals( + DiskChecker.DiskStatus.NORMAL, + DiskChecker.checkFreeRatio(Collections.singletonList(dir.getAbsolutePath()), 0.0)); + } + + @Test + public void checkFreeRatioReturnsDiskFullWhenAnyDirBelowThreshold() throws Exception { + File dir = tmp.newFolder(); + // threshold above 1.0 forces any directory with positive total space to register as full + assertEquals( + DiskChecker.DiskStatus.DISK_FULL, + DiskChecker.checkFreeRatio(Collections.singletonList(dir.getAbsolutePath()), 2.0)); + } + + @Test + public void checkFreeRatioSkipsNullAndEmptyEntries() throws Exception { + File dir = tmp.newFolder(); + assertEquals( + DiskChecker.DiskStatus.NORMAL, + DiskChecker.checkFreeRatio(Arrays.asList(null, "", dir.getAbsolutePath()), 0.0)); + } + + @Test + public void emptyDirListIsNormal() { + assertEquals( + DiskChecker.DiskStatus.NORMAL, DiskChecker.checkFreeRatio(Collections.emptyList(), 1.0)); + assertEquals(DiskChecker.DiskStatus.NORMAL, DiskChecker.checkFreeRatio(null, 1.0)); + } + + // -- apply() state machine ----------------------------------------------------------------- + + @Test + public void applyDiskFullSetsReadOnlyFromRunning() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL); + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals(NodeStatus.DISK_FULL, config.getStatusReason()); + } + + @Test + public void applyDiskCrashSetsReadOnlyFromRunning() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals(NodeStatus.DISK_CRASH, config.getStatusReason()); + } + + @Test + public void applyDiskCrashUpgradesFromDiskFull() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL); + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals(NodeStatus.DISK_CRASH, config.getStatusReason()); + } + + @Test + public void applyDiskFullDoesNotDowngradeDiskCrash() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL); + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals( + "DiskCrash must outrank DiskFull", NodeStatus.DISK_CRASH, config.getStatusReason()); + } + + @Test + public void applyNormalRecoversFromDiskFull() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL); + DiskChecker.apply(DiskChecker.DiskStatus.NORMAL); + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + assertEquals(NodeStatus.Running, config.getNodeStatus()); + assertNull(config.getStatusReason()); + } + + @Test + public void applyNormalDoesNotRecoverFromDiskCrash() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + DiskChecker.apply(DiskChecker.DiskStatus.NORMAL); + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + // DiskCrash is sticky: a free-ratio probe (the only source of NORMAL) cannot prove writes + // work again, so the node stays ReadOnly(DiskCrash) until restart. + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals(NodeStatus.DISK_CRASH, config.getStatusReason()); + } + + @Test + public void applyLeavesNonDiskReadOnlyReasonUntouched() { + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + config.setNodeStatus(NodeStatus.ReadOnly); + config.setStatusReason("ManualMaintenance"); + + DiskChecker.apply(DiskChecker.DiskStatus.NORMAL); + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals("ManualMaintenance", config.getStatusReason()); + + // A non-disk ReadOnly reason is not guarded, so DISK_FULL takes it over. + DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL); + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals(NodeStatus.DISK_FULL, config.getStatusReason()); + } + + @Test + public void applyIsIdempotentForRepeatedDiskCrash() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + NodeStatus before = CommonDescriptor.getInstance().getConfig().getNodeStatus(); + String reasonBefore = CommonDescriptor.getInstance().getConfig().getStatusReason(); + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + assertEquals(before, CommonDescriptor.getInstance().getConfig().getNodeStatus()); + assertEquals(reasonBefore, CommonDescriptor.getInstance().getConfig().getStatusReason()); + } + + // -- checkFreeRatioAndApply ---------------------------------------------------------------- + + @Test + public void checkFreeRatioAndApplyDrivesStatusEndToEnd() throws Exception { + File dir = tmp.newFolder(); + // Threshold above 1.0 -> always "DiskFull" + DiskChecker.checkFreeRatioAndApply(Collections.singletonList(dir.getAbsolutePath()), 2.0); + assertEquals(NodeStatus.ReadOnly, CommonDescriptor.getInstance().getConfig().getNodeStatus()); + assertEquals( + NodeStatus.DISK_FULL, CommonDescriptor.getInstance().getConfig().getStatusReason()); + + // Threshold 0.0 -> always "Normal" -> recovery + DiskChecker.checkFreeRatioAndApply(Collections.singletonList(dir.getAbsolutePath()), 0.0); + assertEquals(NodeStatus.Running, CommonDescriptor.getInstance().getConfig().getNodeStatus()); + assertNull(CommonDescriptor.getInstance().getConfig().getStatusReason()); + } + + @Test + public void checkFreeRatioAndApplyDoesNotClearDiskCrash() throws Exception { + File dir = tmp.newFolder(); + // Simulate a Ratis-passive DiskCrash signal. + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + // Subsequent healthy free-ratio polling must keep the node in ReadOnly(DiskCrash). + DiskChecker.checkFreeRatioAndApply(Collections.singletonList(dir.getAbsolutePath()), 0.0); + assertEquals(NodeStatus.ReadOnly, CommonDescriptor.getInstance().getConfig().getNodeStatus()); + assertEquals( + NodeStatus.DISK_CRASH, CommonDescriptor.getInstance().getConfig().getStatusReason()); + } +} diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 22529ffbb737a..d12fd53525b98 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -518,12 +518,21 @@ struct TConfigNodeHeartbeatReq { 1: required i64 timestamp 2: optional common.TLicense licence 3: optional TActivationControl activationControl + // Mirrors TDataNodeHeartbeatReq.needSamplingLoad — the leader sets this flag once every + // sampling interval so the receiver runs its disk-health check at the same cadence. + 4: optional bool needSamplingLoad } struct TConfigNodeHeartbeatResp { 1: required i64 timestamp 2: optional string activateStatus 3: optional common.TLicense license + // Reported ConfigNode status (e.g. Running, ReadOnly). Unset means the + // sender does not report this field and the leader treats it as Running. + 4: optional string status + // Machine-readable reason accompanying ReadOnly status, + // e.g. NodeStatus.DISK_FULL or NodeStatus.DISK_CRASH. + 5: optional string statusReason } struct TAddConsensusGroupReq {