From a6a8291e5883b09a45f8d2d6c89c0bdbcb8f2af0 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Wed, 20 May 2026 11:27:23 +0800 Subject: [PATCH 1/5] Add ConfigNode ReadOnly state with DiskFull/DiskCrash heartbeat self-check ConfigNode now reports its own NodeStatus.ReadOnly when its critical directories (systemDir, consensusDir) are unwritable or near-full, mirroring the existing DataNode behavior. NodeStatus reasons are extended with a new DISK_CRASH constant alongside DISK_FULL, and the ConfigNode heartbeat carries status/statusReason back to the leader. - node-commons: new DiskChecker utility (probe + state-machine apply), with priority DiskCrash > DiskFull and recovery to Running when the reason was disk-related. i18n messages added in en + zh. - thrift-confignode: TConfigNodeHeartbeatResp gains optional status and statusReason fields (forward-compatible). - confignode: leader self-checks before fanning out heartbeats; follower self-checks on receive and reports back; cache reads from CommonConfig for the leader's self entry, otherwise from the sample. - datanode: FolderManager exposes a static hasAnyAbnormalFolder() aggregator; sampleDiskLoad treats any ABNORMAL folder as DiskCrash (which wins over DiskFull) and reuses DiskChecker.apply. --- .../confignode/conf/ConfigNodeConfig.java | 10 + .../cache/node/ConfigNodeHeartbeatCache.java | 48 ++-- .../load/cache/node/NodeHeartbeatSample.java | 9 +- .../load/service/HeartbeatService.java | 8 + .../thrift/ConfigNodeRPCServiceProcessor.java | 9 + .../impl/DataNodeInternalRPCServiceImpl.java | 27 ++- .../rescon/disk/FolderManager.java | 43 ++++ .../iotdb/commons/i18n/CommonMessages.java | 10 + .../iotdb/commons/i18n/CommonMessages.java | 10 + .../iotdb/commons/cluster/DiskChecker.java | 146 ++++++++++++ .../iotdb/commons/cluster/NodeStatus.java | 1 + .../commons/cluster/DiskCheckerTest.java | 224 ++++++++++++++++++ .../src/main/thrift/confignode.thrift | 7 + 13 files changed, 523 insertions(+), 29 deletions(-) create mode 100644 iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DiskChecker.java create mode 100644 iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index aacbe20203f5b..203032256d62d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -41,6 +41,7 @@ import java.io.File; import java.lang.reflect.Field; import java.util.Arrays; +import java.util.List; public class ConfigNodeConfig { @@ -506,6 +507,15 @@ public void setConsensusDir(String consensusDir) { this.consensusDir = consensusDir; } + /** + * Directories whose loss would render this ConfigNode unable to serve. Used by the periodic + * disk-health check on both leader (in HeartbeatService loop) and followers (in the + * heartbeat-receive path). + */ + public List getCriticalDirs() { + return Arrays.asList(systemDir, consensusDir); + } + public String getConfigNodeConsensusProtocolClass() { return configNodeConsensusProtocolClass; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java index 4d675e1e8a442..7228fca648cfe 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.manager.load.cache.node; import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample; @@ -49,28 +50,39 @@ public ConfigNodeHeartbeatCache(int configNodeId, NodeStatistics statistics) { @Override public synchronized void updateCurrentStatistics(boolean forceUpdate) { - // Skip itself and the Removing status can not be updated - if (nodeId == CURRENT_NODE_ID || NodeStatus.Removing.equals(getNodeStatus())) { + // Removing status can not be updated + if (NodeStatus.Removing.equals(getNodeStatus())) { return; } - NodeHeartbeatSample lastSample; - // Update Node status - NodeStatus status; long currentNanoTime = System.nanoTime(); - final List heartbeatHistory; - synchronized (slidingWindow) { - lastSample = (NodeHeartbeatSample) getLastSample(); - heartbeatHistory = Collections.unmodifiableList(slidingWindow); + NodeStatus status; + String statusReason; + + if (nodeId == CURRENT_NODE_ID) { + // Self entry: heartbeat loop never sends to itself, so mirror the status that + // this ConfigNode's local disk-check / startup wrote into CommonConfig. + status = CommonDescriptor.getInstance().getConfig().getNodeStatus(); + statusReason = CommonDescriptor.getInstance().getConfig().getStatusReason(); + } else { + NodeHeartbeatSample lastSample; + final List heartbeatHistory; + synchronized (slidingWindow) { + lastSample = (NodeHeartbeatSample) getLastSample(); + heartbeatHistory = Collections.unmodifiableList(slidingWindow); - if (lastSample == null) { - /* First heartbeat not received from this ConfigNode, status is UNKNOWN */ - status = NodeStatus.Unknown; - } else if (!failureDetector.isAvailable(nodeId, heartbeatHistory)) { - /* Failure detector decides that this ConfigNode is UNKNOWN */ - status = NodeStatus.Unknown; - } else { - status = lastSample.getStatus(); + if (lastSample == null) { + /* First heartbeat not received from this ConfigNode, status is UNKNOWN */ + status = NodeStatus.Unknown; + statusReason = null; + } else if (!failureDetector.isAvailable(nodeId, heartbeatHistory)) { + /* Failure detector decides that this ConfigNode is UNKNOWN */ + status = NodeStatus.Unknown; + statusReason = null; + } else { + status = lastSample.getStatus(); + statusReason = lastSample.getStatusReason(); + } } } @@ -79,6 +91,6 @@ public synchronized void updateCurrentStatistics(boolean forceUpdate) { // TODO: Construct load score module long loadScore = NodeStatus.isNormalStatus(status) ? 0 : Long.MAX_VALUE; - currentStatistics.set(new NodeStatistics(currentNanoTime, status, null, loadScore)); + currentStatistics.set(new NodeStatistics(currentNanoTime, status, statusReason, loadScore)); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java index 8217593f5d67d..5ead20d291f46 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java @@ -74,8 +74,13 @@ public NodeHeartbeatSample(TAIHeartbeatResp heartbeatResp) { /** Constructor for ConfigNode sample. */ public NodeHeartbeatSample(TConfigNodeHeartbeatResp heartbeatResp) { super(heartbeatResp.getTimestamp()); - this.status = NodeStatus.Running; - this.statusReason = null; + // Old ConfigNodes don't populate status/statusReason — fall back to Running/null + // so a rolling upgrade leaves the leader's view of legacy peers unchanged. + this.status = + heartbeatResp.isSetStatus() + ? NodeStatus.parse(heartbeatResp.getStatus()) + : NodeStatus.Running; + this.statusReason = heartbeatResp.isSetStatusReason() ? heartbeatResp.getStatusReason() : null; this.loadSample = null; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java index a491b3960c3c3..454a27412072d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java @@ -24,9 +24,11 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.cluster.DiskChecker; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.confignode.client.async.AsyncAINodeHeartbeatClientPool; import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool; @@ -126,6 +128,12 @@ private void heartbeatLoopBody() { .ifPresent( consensusManager -> { if (getConsensusManager().isLeader()) { + // Leader self-checks its own disk health before fanning out heartbeats. + // Followers run the same check when receiving each heartbeat request + // (see ConfigNodeRPCServiceProcessor#getConfigNodeHeartBeat). + DiskChecker.checkAndApply( + ConfigNodeDescriptor.getInstance().getConf().getCriticalDirs(), + CommonDescriptor.getInstance().getConfig().getDiskSpaceWarningThreshold()); // Send heartbeat requests to all the registered ConfigNodes pingRegisteredConfigNodes( genConfigNodeHeartbeatReq(), getNodeManager().getRegisteredConfigNodes()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index dbb35839045d0..c027f8234bf84 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -40,6 +40,7 @@ import org.apache.iotdb.commons.auth.entity.PrivilegeModelType; import org.apache.iotdb.commons.auth.entity.PrivilegeType; import org.apache.iotdb.commons.auth.entity.PrivilegeUnion; +import org.apache.iotdb.commons.cluster.DiskChecker; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.ConsensusGroupId; @@ -1085,6 +1086,14 @@ public TRegionRouteMapResp getLatestRegionRouteMap() { public TConfigNodeHeartbeatResp getConfigNodeHeartBeat(TConfigNodeHeartbeatReq heartbeatReq) { TConfigNodeHeartbeatResp resp = new TConfigNodeHeartbeatResp(); resp.setTimestamp(heartbeatReq.getTimestamp()); + // Follower self-check: probe critical dirs each time the leader pings us. + // The leader runs the same check in its HeartbeatService loop. + DiskChecker.checkAndApply( + configNodeConfig.getCriticalDirs(), commonConfig.getDiskSpaceWarningThreshold()); + resp.setStatus(commonConfig.getNodeStatus().getStatus()); + if (commonConfig.getStatusReason() != null) { + resp.setStatusReason(commonConfig.getStatusReason()); + } return resp; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index e5c996405649d..1ab957b5dfcf8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -49,6 +49,7 @@ import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.auth.entity.PrivilegeType; import org.apache.iotdb.commons.client.request.AsyncRequestContext; +import org.apache.iotdb.commons.cluster.DiskChecker; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.Await; import org.apache.iotdb.commons.concurrent.AwaitTimeoutException; @@ -213,6 +214,7 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager; import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; @@ -2424,29 +2426,36 @@ private void sampleDiskLoad(TLoadSample loadSample) { SYSTEM) .getValue(); + // Derive the disk status: an ABNORMAL data folder observed by any FolderManager wins over + // a full-disk reading, and a low free-space ratio still drives DISK_FULL when nothing is + // crashed. DiskChecker.apply then performs the actual NodeStatus transition (including the + // "ReadOnly(DiskFull|DiskCrash) -> Running" recovery for the DiskFull path). + DiskChecker.DiskStatus diskStatus = DiskChecker.DiskStatus.NORMAL; + if (FolderManager.hasAnyAbnormalFolder()) { + diskStatus = DiskChecker.DiskStatus.DISK_CRASH; + } if (availableDisk != 0 && totalDisk != 0) { double freeDiskRatio = availableDisk / totalDisk; loadSample.setFreeDiskSpace(availableDisk); loadSample.setDiskUsageRate(1d - freeDiskRatio); - // Reset NodeStatus if necessary - if (freeDiskRatio < commonConfig.getDiskSpaceWarningThreshold()) { + if (diskStatus == DiskChecker.DiskStatus.NORMAL + && freeDiskRatio < commonConfig.getDiskSpaceWarningThreshold()) { LOGGER.warn( "The available disk space is : {}, " + "the total disk space is : {}, " + "and the remaining disk usage ratio: {} is " - + "less than disk_space_warning_threshold: {}, set system to readonly!", + + "less than disk_space_warning_threshold: {}.", RamUsageEstimator.humanReadableUnits((long) availableDisk), RamUsageEstimator.humanReadableUnits((long) totalDisk), freeDiskRatio, commonConfig.getDiskSpaceWarningThreshold()); - commonConfig.setNodeStatus(NodeStatus.ReadOnly); - commonConfig.setStatusReason(NodeStatus.DISK_FULL); - } else if (NodeStatus.ReadOnly.equals(commonConfig.getNodeStatus()) - && NodeStatus.DISK_FULL.equals(commonConfig.getStatusReason())) { - commonConfig.setNodeStatus(NodeStatus.Running); - commonConfig.setStatusReason(null); + diskStatus = DiskChecker.DiskStatus.DISK_FULL; } + } else if (diskStatus == DiskChecker.DiskStatus.NORMAL) { + // Metrics not available yet — fall back to no-op so we don't churn the status. + return; } + DiskChecker.apply(diskStatus); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManager.java index 77a332152ab9f..80f739abbc381 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManager.java @@ -33,13 +33,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.ref.WeakReference; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; public class FolderManager { private static final Logger logger = LoggerFactory.getLogger(FolderManager.class); + /** + * Registry of every live {@link FolderManager} instance so the DataNode heartbeat path can ask + * "is any folder anywhere on this node currently ABNORMAL?" without each subsystem having to push + * state into a central reporter. Weak references avoid keeping short-lived managers alive (e.g. + * those created per snapshot/load). + */ + private static final List> ALL_INSTANCES = + new CopyOnWriteArrayList<>(); + /** Represents the operational states of a data folder. */ public enum FolderState { /** Indicates the folder is functioning normally with no issues. */ @@ -62,6 +74,7 @@ public FolderManager(List folders, DirectoryStrategyType type) throws DiskSpaceInsufficientException { this.folders = folders; folders.forEach(dir -> foldersStates.put(dir, FolderState.HEALTHY)); + ALL_INSTANCES.add(new WeakReference<>(this)); switch (type) { case SEQUENCE_STRATEGY: this.selectStrategy = new SequenceStrategy(); @@ -147,4 +160,34 @@ public T getNextWithRetry(ThrowingFunction getFolders() { return folders; } + + /** + * Walks every live FolderManager instance and reports whether any folder is currently {@link + * FolderState#ABNORMAL}. Used by the DataNode heartbeat path to derive a {@code + * NodeStatus.ReadOnly(DiskCrash)} signal from already-observed write failures. + * + *

Stale (GC'd) weak references are pruned as a side effect. + */ + public static boolean hasAnyAbnormalFolder() { + boolean anyAbnormal = false; + Iterator> it = ALL_INSTANCES.iterator(); + while (it.hasNext()) { + FolderManager fm = it.next().get(); + if (fm == null) { + continue; + } + for (FolderState state : fm.foldersStates.values()) { + if (state == FolderState.ABNORMAL) { + anyAbnormal = true; + break; + } + } + if (anyAbnormal) { + break; + } + } + // Prune dead entries. CopyOnWriteArrayList tolerates concurrent removeIf safely. + ALL_INSTANCES.removeIf(ref -> ref.get() == null); + return anyAbnormal; + } } diff --git a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java index 5a2e41961978f..462f01f5f34da 100644 --- a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java +++ b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java @@ -37,6 +37,16 @@ public final class CommonMessages { public static final String NODE_STATUS_NOT_EXIST = "NodeStatus %s doesn't exist."; public static final String UNKNOWN_NODE_STATUS = "Unknown NodeStatus %s."; + // --- disk health --- + public static final String DISK_FULL_SET_READ_ONLY = + "Free disk space ratio is below the configured threshold; set node status to ReadOnly(DiskFull)."; + public static final String DISK_CRASH_SET_READ_ONLY = + "Detected unwritable disk directory; set node status to ReadOnly(DiskCrash)."; + public static final String DISK_CRASH_PROBE_FAILED = + "Disk health probe write failed for directory {}."; + public static final String DISK_RECOVERED_SET_RUNNING = + "Disk health recovered (previous reason: {}); set node status to Running."; + // --- consensus --- public static final String UNRECOGNIZED_CONSENSUS_GROUP_ID = "Unrecognized ConsensusGroupId: %s"; diff --git a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java index a4b72b9e4271b..dfe57a5f9c60c 100644 --- a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java +++ b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java @@ -36,6 +36,16 @@ public final class CommonMessages { public static final String NODE_STATUS_NOT_EXIST = "NodeStatus %s 不存在。"; public static final String UNKNOWN_NODE_STATUS = "未知 NodeStatus %s。"; + // --- disk health --- + public static final String DISK_FULL_SET_READ_ONLY = + "磁盘剩余空间比例低于配置阈值,将节点状态设为 ReadOnly(DiskFull)。"; + public static final String DISK_CRASH_SET_READ_ONLY = + "检测到不可写的磁盘目录,将节点状态设为 ReadOnly(DiskCrash)。"; + public static final String DISK_CRASH_PROBE_FAILED = + "对目录 {} 进行磁盘健康探测时写入失败。"; + public static final String DISK_RECOVERED_SET_RUNNING = + "磁盘健康已恢复(先前原因:{}),将节点状态设为 Running。"; + // --- consensus --- public static final String UNRECOGNIZED_CONSENSUS_GROUP_ID = "无法识别的 ConsensusGroupId:%s"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DiskChecker.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DiskChecker.java new file mode 100644 index 0000000000000..4632ddf28dd59 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DiskChecker.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.cluster; + +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.i18n.CommonMessages; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; + +/** + * Shared utility used by both ConfigNode and DataNode to evaluate the health of a set of critical + * directories and, optionally, drive the global {@link NodeStatus} on {@link CommonConfig}. + * + *

Only transitions between {@link NodeStatus#Running} and {@link NodeStatus#ReadOnly} with + * reason {@link NodeStatus#DISK_FULL}/{@link NodeStatus#DISK_CRASH} are managed here. Other + * ReadOnly reasons (e.g. manual) are left untouched. + */ +public class DiskChecker { + + private static final Logger LOGGER = LoggerFactory.getLogger(DiskChecker.class); + + private static final byte[] PROBE_PAYLOAD = new byte[] {0x01}; + private static final String PROBE_PREFIX = "iotdb-disk-probe-"; + private static final String PROBE_SUFFIX = ".tmp"; + + public enum DiskStatus { + NORMAL, + DISK_FULL, + DISK_CRASH + } + + private DiskChecker() {} + + /** + * Evaluate the supplied directories. A single unwritable directory yields {@link + * DiskStatus#DISK_CRASH}; any directory whose usable/total ratio is below the threshold yields + * {@link DiskStatus#DISK_FULL} (when no crash is detected); otherwise {@link DiskStatus#NORMAL}. + */ + public static DiskStatus check(List dirs, double freeRatioThreshold) { + if (dirs == null || dirs.isEmpty()) { + return DiskStatus.NORMAL; + } + boolean anyFull = false; + for (String dir : dirs) { + if (dir == null || dir.isEmpty()) { + continue; + } + File f = new File(dir); + if (!f.isDirectory()) { + LOGGER.warn(CommonMessages.DISK_CRASH_PROBE_FAILED, dir); + return DiskStatus.DISK_CRASH; + } + try { + Path probe = Files.createTempFile(Paths.get(dir), PROBE_PREFIX, PROBE_SUFFIX); + try { + Files.write(probe, PROBE_PAYLOAD); + } finally { + Files.deleteIfExists(probe); + } + } catch (IOException e) { + LOGGER.warn(CommonMessages.DISK_CRASH_PROBE_FAILED, dir, e); + return DiskStatus.DISK_CRASH; + } + long total = f.getTotalSpace(); + long usable = f.getUsableSpace(); + if (total > 0 && (double) usable / total < freeRatioThreshold) { + anyFull = true; + } + } + return anyFull ? DiskStatus.DISK_FULL : DiskStatus.NORMAL; + } + + /** + * Run {@link #check} and apply the result to {@link CommonConfig}. See class javadoc for + * transition rules. + */ + public static void checkAndApply(List dirs, double freeRatioThreshold) { + apply(check(dirs, freeRatioThreshold)); + } + + /** Visible for tests; package-public callers should prefer {@link #checkAndApply}. */ + public static void apply(DiskStatus result) { + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + NodeStatus currentStatus = config.getNodeStatus(); + String currentReason = config.getStatusReason(); + boolean currentlyFull = + NodeStatus.ReadOnly.equals(currentStatus) && NodeStatus.DISK_FULL.equals(currentReason); + boolean currentlyCrash = + NodeStatus.ReadOnly.equals(currentStatus) && NodeStatus.DISK_CRASH.equals(currentReason); + + switch (result) { + case DISK_CRASH: + if (!currentlyCrash) { + LOGGER.warn(CommonMessages.DISK_CRASH_SET_READ_ONLY); + config.setNodeStatus(NodeStatus.ReadOnly); + config.setStatusReason(NodeStatus.DISK_CRASH); + } + break; + case DISK_FULL: + // DiskCrash has higher priority — do not downgrade an existing crash to full. + if (currentlyCrash) { + return; + } + if (!currentlyFull) { + LOGGER.warn(CommonMessages.DISK_FULL_SET_READ_ONLY); + config.setNodeStatus(NodeStatus.ReadOnly); + config.setStatusReason(NodeStatus.DISK_FULL); + } + break; + case NORMAL: + default: + if (currentlyFull || currentlyCrash) { + LOGGER.info(CommonMessages.DISK_RECOVERED_SET_RUNNING, currentReason); + config.setNodeStatus(NodeStatus.Running); + config.setStatusReason(null); + } + break; + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java index 518a9faaed2ec..e37b44edafda6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java @@ -35,6 +35,7 @@ public enum NodeStatus { /** Only query statements are permitted */ ReadOnly("ReadOnly"); public static final String DISK_FULL = "DiskFull"; + public static final String DISK_CRASH = "DiskCrash"; private final String status; diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java new file mode 100644 index 0000000000000..fde492c1090de --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.cluster; + +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class DiskCheckerTest { + + @Rule public TemporaryFolder tmp = new TemporaryFolder(); + + private NodeStatus savedStatus; + private String savedReason; + + @Before + public void setUp() { + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + savedStatus = config.getNodeStatus(); + savedReason = config.getStatusReason(); + config.setNodeStatus(NodeStatus.Running); + config.setStatusReason(null); + } + + @After + public void tearDown() { + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + config.setNodeStatus(savedStatus); + config.setStatusReason(savedReason); + } + + @Test + public void checkReturnsNormalForWritableDirectoryWithSpace() throws Exception { + File dir = tmp.newFolder(); + // threshold 0.0 → any positive usable space passes + assertEquals( + DiskChecker.DiskStatus.NORMAL, + DiskChecker.check(Collections.singletonList(dir.getAbsolutePath()), 0.0)); + } + + @Test + public void checkReturnsDiskFullWhenBelowThreshold() throws Exception { + File dir = tmp.newFolder(); + // threshold > 1 forces every directory to be reported as full + assertEquals( + DiskChecker.DiskStatus.DISK_FULL, + DiskChecker.check(Collections.singletonList(dir.getAbsolutePath()), 2.0)); + } + + @Test + public void checkReturnsDiskCrashWhenDirectoryMissing() { + String missing = new File(tmp.getRoot(), "does-not-exist").getAbsolutePath(); + assertEquals( + DiskChecker.DiskStatus.DISK_CRASH, + DiskChecker.check(Collections.singletonList(missing), 0.0)); + } + + @Test + public void checkReturnsDiskCrashWhenPathIsAFile() throws Exception { + File file = tmp.newFile(); + assertEquals( + DiskChecker.DiskStatus.DISK_CRASH, + DiskChecker.check(Collections.singletonList(file.getAbsolutePath()), 0.0)); + } + + @Test + public void checkPrioritizesCrashOverFull() throws Exception { + File healthy = tmp.newFolder(); + String missing = new File(tmp.getRoot(), "missing").getAbsolutePath(); + // Even with a "full" threshold the missing dir trumps it. + assertEquals( + DiskChecker.DiskStatus.DISK_CRASH, + DiskChecker.check(java.util.Arrays.asList(healthy.getAbsolutePath(), missing), 2.0)); + } + + @Test + public void checkSkipsNullAndEmptyEntries() throws Exception { + File dir = tmp.newFolder(); + assertEquals( + DiskChecker.DiskStatus.NORMAL, + DiskChecker.check(java.util.Arrays.asList(null, "", dir.getAbsolutePath()), 0.0)); + } + + @Test + public void applyDiskFullSetsReadOnlyFromRunning() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL); + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals(NodeStatus.DISK_FULL, config.getStatusReason()); + } + + @Test + public void applyDiskCrashSetsReadOnlyFromRunning() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals(NodeStatus.DISK_CRASH, config.getStatusReason()); + } + + @Test + public void applyDiskCrashUpgradesFromDiskFull() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL); + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals(NodeStatus.DISK_CRASH, config.getStatusReason()); + } + + @Test + public void applyDiskFullDoesNotDowngradeDiskCrash() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL); + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals( + "DiskCrash must outrank DiskFull", NodeStatus.DISK_CRASH, config.getStatusReason()); + } + + @Test + public void applyNormalRecoversFromDiskFull() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL); + DiskChecker.apply(DiskChecker.DiskStatus.NORMAL); + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + assertEquals(NodeStatus.Running, config.getNodeStatus()); + assertNull(config.getStatusReason()); + } + + @Test + public void applyNormalRecoversFromDiskCrash() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + DiskChecker.apply(DiskChecker.DiskStatus.NORMAL); + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + assertEquals(NodeStatus.Running, config.getNodeStatus()); + assertNull(config.getStatusReason()); + } + + @Test + public void applyLeavesNonDiskReadOnlyReasonUntouched() { + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + config.setNodeStatus(NodeStatus.ReadOnly); + config.setStatusReason("ManualMaintenance"); + + DiskChecker.apply(DiskChecker.DiskStatus.NORMAL); + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals("ManualMaintenance", config.getStatusReason()); + + DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL); + // DISK_FULL only fires when not already DiskFull/DiskCrash — it does take over here, + // mirroring the existing behavior for the legacy sampleDiskLoad path. + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals(NodeStatus.DISK_FULL, config.getStatusReason()); + } + + @Test + public void applyIsIdempotentForRepeatedDiskCrash() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + NodeStatus before = CommonDescriptor.getInstance().getConfig().getNodeStatus(); + String reasonBefore = CommonDescriptor.getInstance().getConfig().getStatusReason(); + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + assertEquals(before, CommonDescriptor.getInstance().getConfig().getNodeStatus()); + assertEquals(reasonBefore, CommonDescriptor.getInstance().getConfig().getStatusReason()); + } + + @Test + public void checkAndApplyDrivesStatusEndToEnd() throws Exception { + File healthy = tmp.newFolder(); + DiskChecker.checkAndApply(Collections.singletonList(healthy.getAbsolutePath()), 0.0); + assertEquals(NodeStatus.Running, CommonDescriptor.getInstance().getConfig().getNodeStatus()); + + String missing = new File(tmp.getRoot(), "still-missing").getAbsolutePath(); + DiskChecker.checkAndApply(Collections.singletonList(missing), 0.0); + assertEquals(NodeStatus.ReadOnly, CommonDescriptor.getInstance().getConfig().getNodeStatus()); + assertEquals( + NodeStatus.DISK_CRASH, CommonDescriptor.getInstance().getConfig().getStatusReason()); + + DiskChecker.checkAndApply(Collections.singletonList(healthy.getAbsolutePath()), 0.0); + assertEquals(NodeStatus.Running, CommonDescriptor.getInstance().getConfig().getNodeStatus()); + assertNull(CommonDescriptor.getInstance().getConfig().getStatusReason()); + } + + @Test + public void emptyDirListIsNormal() { + assertEquals(DiskChecker.DiskStatus.NORMAL, DiskChecker.check(Collections.emptyList(), 1.0)); + assertEquals(DiskChecker.DiskStatus.NORMAL, DiskChecker.check(null, 1.0)); + } + + @Test + public void smokeProbeFileIsDeleted() throws Exception { + File dir = tmp.newFolder(); + DiskChecker.check(Collections.singletonList(dir.getAbsolutePath()), 0.0); + File[] leftovers = dir.listFiles(); + assertTrue( + "Disk probe should clean up its temp file", leftovers == null || leftovers.length == 0); + } +} diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 22529ffbb737a..b6f16c0ca999c 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -524,6 +524,13 @@ struct TConfigNodeHeartbeatResp { 1: required i64 timestamp 2: optional string activateStatus 3: optional common.TLicense license + // Reported ConfigNode status (e.g. Running, ReadOnly). Optional for forward + // compatibility — old ConfigNodes do not populate this field, in which case + // the leader falls back to assuming Running. + 4: optional string status + // Optional human/machine readable reason accompanying ReadOnly status, + // e.g. NodeStatus.DISK_FULL or NodeStatus.DISK_CRASH. + 5: optional string statusReason } struct TAddConsensusGroupReq { From fb634598a5f04a5e8a329d79dd22a132d64332d2 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Wed, 20 May 2026 14:35:07 +0800 Subject: [PATCH 2/5] Throttle ConfigNode disk check to leader load-sampling cadence and run after fanout - HeartbeatService: the leader's DiskChecker.checkAndApply now runs after pingRegisteredConfigNodes/DataNodes/AINodes so its (blocking) probe IO does not delay the async heartbeat dispatch. Gated on a snapshot of heartbeatCounter taken at loop entry so the leader self-samples on the same iterations DataNode/AINode load sampling fires. The 10-iteration cadence is lifted into a shared LOAD_SAMPLING_INTERVAL constant and reused by the existing setNeedSamplingLoad callsites. - ConfigNodeRPCServiceProcessor: follower side gates its disk check on a local heartbeatReceivedCounter (every Nth receive, matching the leader cadence) instead of running on every received heartbeat. - Tidy comments per code-review feedback. --- .../load/cache/node/NodeHeartbeatSample.java | 2 -- .../load/service/HeartbeatService.java | 33 ++++++++++++------- .../thrift/ConfigNodeRPCServiceProcessor.java | 15 ++++++--- .../commons/cluster/DiskCheckerTest.java | 3 +- .../src/main/thrift/confignode.thrift | 7 ++-- 5 files changed, 37 insertions(+), 23 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java index 5ead20d291f46..ca3f6112201be 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java @@ -74,8 +74,6 @@ public NodeHeartbeatSample(TAIHeartbeatResp heartbeatResp) { /** Constructor for ConfigNode sample. */ public NodeHeartbeatSample(TConfigNodeHeartbeatResp heartbeatResp) { super(heartbeatResp.getTimestamp()); - // Old ConfigNodes don't populate status/statusReason — fall back to Running/null - // so a rolling upgrade leaves the leader's view of legacy peers unchanged. this.status = heartbeatResp.isSetStatus() ? NodeStatus.parse(heartbeatResp.getStatus()) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java index 454a27412072d..be833ac537caa 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java @@ -83,6 +83,15 @@ public class HeartbeatService { IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( ThreadName.CONFIG_NODE_HEART_BEAT_SERVICE.getName()); private final AtomicLong heartbeatCounter = new AtomicLong(0); + + /** + * Sampling cadence for cluster-wide load and disk-health probes: one in every {@code N} + * heartbeats triggers the probe path on DataNodes, AINodes, the ConfigNode leader (itself), and + * ConfigNode followers (on receive). Keeping every probe site on the same cadence is what makes + * the cluster-wide view temporally consistent. + */ + public static final int LOAD_SAMPLING_INTERVAL = 10; + private static final int configNodeListPeriodicallySyncInterval = 100; public HeartbeatService(IManager configManager, LoadCache loadCache) { @@ -128,12 +137,9 @@ private void heartbeatLoopBody() { .ifPresent( consensusManager -> { if (getConsensusManager().isLeader()) { - // Leader self-checks its own disk health before fanning out heartbeats. - // Followers run the same check when receiving each heartbeat request - // (see ConfigNodeRPCServiceProcessor#getConfigNodeHeartBeat). - DiskChecker.checkAndApply( - ConfigNodeDescriptor.getInstance().getConf().getCriticalDirs(), - CommonDescriptor.getInstance().getConfig().getDiskSpaceWarningThreshold()); + // Snapshot the counter before genHeartbeatReq increments it so the leader's + // own sampling fires on the same iterations DataNode load sampling does. + long iterationIndex = heartbeatCounter.get(); // Send heartbeat requests to all the registered ConfigNodes pingRegisteredConfigNodes( genConfigNodeHeartbeatReq(), getNodeManager().getRegisteredConfigNodes()); @@ -142,6 +148,14 @@ private void heartbeatLoopBody() { genHeartbeatReq(), getNodeManager().getRegisteredDataNodes()); // Send heartbeat requests to all the registered AINodes pingRegisteredAINodes(genAIHeartbeatReq(), getNodeManager().getRegisteredAINodes()); + // Self-check disk health on the same cadence DataNode samples its load. + // Runs after the async heartbeat dispatches so the (blocking) probe IO + // does not delay fanout. Followers run the same check on receive. + if (iterationIndex % LOAD_SAMPLING_INTERVAL == 0) { + DiskChecker.checkAndApply( + ConfigNodeDescriptor.getInstance().getConf().getCriticalDirs(), + CommonDescriptor.getInstance().getConfig().getDiskSpaceWarningThreshold()); + } } }); } @@ -157,8 +171,7 @@ protected TDataNodeHeartbeatReq genHeartbeatReq() { .getLogicalClock(ConfigNodeInfo.CONFIG_REGION_ID)); // Always sample RegionGroups' leadership as the Region heartbeat heartbeatReq.setNeedJudgeLeader(true); - // We sample DataNode's load in every 10 heartbeat loop - heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0); + heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % LOAD_SAMPLING_INTERVAL == 0); Pair schemaQuotaRemain = configManager.getClusterSchemaManager().getSchemaQuotaRemain(); heartbeatReq.setTimeSeriesQuotaRemain(schemaQuotaRemain.left); @@ -218,9 +231,7 @@ private TAIHeartbeatReq genAIHeartbeatReq() { /* Generate heartbeat request */ TAIHeartbeatReq heartbeatReq = new TAIHeartbeatReq(); heartbeatReq.setHeartbeatTimestamp(System.nanoTime()); - - // We sample AINode's load in every 10 heartbeat loop - heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0); + heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % LOAD_SAMPLING_INTERVAL == 0); return heartbeatReq; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index c027f8234bf84..e312040fca835 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -87,6 +87,7 @@ import org.apache.iotdb.confignode.i18n.ConfigNodeMessages; import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; +import org.apache.iotdb.confignode.manager.load.service.HeartbeatService; import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager; import org.apache.iotdb.confignode.persistence.auth.AuthorInfo; import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService; @@ -246,6 +247,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** ConfigNodeRPCServer exposes the interface that interacts with the DataNode */ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Iface { @@ -257,6 +259,9 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac protected final ConfigNode configNode; protected final ConfigManager configManager; + /** Counts heartbeats received from the leader to drive disk-health sampling cadence. */ + private final AtomicLong heartbeatReceivedCounter = new AtomicLong(0); + public ConfigNodeRPCServiceProcessor(ConfigManager configManager) { this.commonConfig = CommonDescriptor.getInstance().getConfig(); this.configNodeConfig = ConfigNodeDescriptor.getInstance().getConf(); @@ -1086,10 +1091,12 @@ public TRegionRouteMapResp getLatestRegionRouteMap() { public TConfigNodeHeartbeatResp getConfigNodeHeartBeat(TConfigNodeHeartbeatReq heartbeatReq) { TConfigNodeHeartbeatResp resp = new TConfigNodeHeartbeatResp(); resp.setTimestamp(heartbeatReq.getTimestamp()); - // Follower self-check: probe critical dirs each time the leader pings us. - // The leader runs the same check in its HeartbeatService loop. - DiskChecker.checkAndApply( - configNodeConfig.getCriticalDirs(), commonConfig.getDiskSpaceWarningThreshold()); + // Sample disk health on the same cadence DataNode samples its load. The leader runs the + // equivalent self-check in its HeartbeatService loop. + if (heartbeatReceivedCounter.getAndIncrement() % HeartbeatService.LOAD_SAMPLING_INTERVAL == 0) { + DiskChecker.checkAndApply( + configNodeConfig.getCriticalDirs(), commonConfig.getDiskSpaceWarningThreshold()); + } resp.setStatus(commonConfig.getNodeStatus().getStatus()); if (commonConfig.getStatusReason() != null) { resp.setStatusReason(commonConfig.getStatusReason()); diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java index fde492c1090de..b38ce7bfc8b01 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java @@ -173,9 +173,8 @@ public void applyLeavesNonDiskReadOnlyReasonUntouched() { assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); assertEquals("ManualMaintenance", config.getStatusReason()); + // A non-disk ReadOnly reason is not guarded, so DISK_FULL takes it over. DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL); - // DISK_FULL only fires when not already DiskFull/DiskCrash — it does take over here, - // mirroring the existing behavior for the legacy sampleDiskLoad path. assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); assertEquals(NodeStatus.DISK_FULL, config.getStatusReason()); } diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index b6f16c0ca999c..4e7ffc8dcba99 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -524,11 +524,10 @@ struct TConfigNodeHeartbeatResp { 1: required i64 timestamp 2: optional string activateStatus 3: optional common.TLicense license - // Reported ConfigNode status (e.g. Running, ReadOnly). Optional for forward - // compatibility — old ConfigNodes do not populate this field, in which case - // the leader falls back to assuming Running. + // Reported ConfigNode status (e.g. Running, ReadOnly). Unset means the + // sender does not report this field and the leader treats it as Running. 4: optional string status - // Optional human/machine readable reason accompanying ReadOnly status, + // Machine-readable reason accompanying ReadOnly status, // e.g. NodeStatus.DISK_FULL or NodeStatus.DISK_CRASH. 5: optional string statusReason } From 3a11adc08ab32446085f41b4b78dac3a7bd972d5 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Wed, 20 May 2026 16:51:41 +0800 Subject: [PATCH 3/5] Drive ConfigNode ReadOnly into Ratis: step down, demote priority, passive crash detect Three changes that let ReadOnly state actually shape Raft behavior on ConfigNode: - Utils.rejectWrite / stallApply now match ConfigRegion in addition to DataRegion, so a ReadOnly ConfigNode leader hits the same forceStepDownLeader path that DataRegion leaders already use. Comment at RatisConsensus.write updated. - New NodeStatus.priorityForStatus maps Running=0, ReadOnly(DiskFull)=-1, ReadOnly(DiskCrash)=-2. HeartbeatService runs a reconciliation step on the leader (same cadence as the load-sampling pass, after async fanout) that pushes each ConfigNode peer's desired priority into Ratis. Unknown/Removing/manual ReadOnly are left empty so transient blips do not churn the group config. IConsensus gains a default-no-op reconfigurePeerPriorities; RatisConsensus overrides it to rebuild the peer list and call sendReconfiguration. - Replace DiskChecker.check (active testWrite probe) with a passive observer threaded through Ratis. ApplicationStateMachineProxy gains a diskFailureListener parameter and fires it from the applyTransaction catch when Utils.isDiskFailure matches the cause (IOError / FileSystemException). RatisConsensus also tags IOException out of writeLocallyWithRetry / writeRemotelyWithRetry so log-write failures register as DiskCrash. DiskChecker keeps only checkFreeRatio (for the DiskFull path) and apply (for the state machine); DiskCrash is now sticky on both DataNode and ConfigNode until restart. DiskCheckerTest trimmed to drop testWrite-specific cases and to assert that NORMAL no longer recovers DiskCrash; 14 cases pass. --- .../confignode/i18n/ManagerMessages.java | 2 + .../confignode/i18n/ManagerMessages.java | 1 + .../manager/load/cache/LoadCache.java | 10 ++ .../load/cache/node/BaseNodeCache.java | 7 ++ .../load/service/HeartbeatService.java | 51 +++++++++- .../thrift/ConfigNodeRPCServiceProcessor.java | 6 +- .../apache/iotdb/consensus/IConsensus.java | 18 ++++ .../ratis/ApplicationStateMachineProxy.java | 10 +- .../iotdb/consensus/ratis/RatisConsensus.java | 65 ++++++++++++- .../iotdb/consensus/ratis/utils/Utils.java | 30 +++++- .../iotdb/commons/i18n/CommonMessages.java | 2 - .../iotdb/commons/i18n/CommonMessages.java | 2 - .../iotdb/commons/cluster/DiskChecker.java | 70 ++++++-------- .../iotdb/commons/cluster/NodeStatus.java | 31 +++++++ .../commons/cluster/DiskCheckerTest.java | 93 ++++++++----------- 15 files changed, 284 insertions(+), 114 deletions(-) diff --git a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java index 5dbfe15157668..26db73e3a7852 100644 --- a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java +++ b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java @@ -216,6 +216,8 @@ public final class ManagerMessages { "Heartbeat service is started successfully."; public static final String HEARTBEAT_SERVICE_IS_STOPPED_SUCCESSFULLY = "Heartbeat service is stopped successfully."; + public static final String RECONFIGURE_PEER_PRIORITIES_FAILED = + "Failed to reconfigure ConfigNode peer priorities to {}."; public static final String INCORRECT_VERSION_OF = "Incorrect version of "; public static final String INIT_CONSENSUSMANAGER_SUCCESSFULLY_WHEN_RESTARTED = "Init ConsensusManager successfully when restarted"; diff --git a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java index 836fe7dd60b21..6889681d5117c 100644 --- a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java +++ b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java @@ -214,6 +214,7 @@ public final class ManagerMessages { "Heartbeat service is started successfully."; public static final String HEARTBEAT_SERVICE_IS_STOPPED_SUCCESSFULLY = "Heartbeat service is stopped successfully."; + public static final String RECONFIGURE_PEER_PRIORITIES_FAILED = "调整 ConfigNode 节点优先级到 {} 失败。"; public static final String INCORRECT_VERSION_OF = "Incorrect version of "; public static final String INIT_CONSENSUSMANAGER_SUCCESSFULLY_WHEN_RESTARTED = "Init ConsensusManager successfully when restarted"; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index 818171c89bcc8..5a696811f4f43 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -520,6 +520,16 @@ public String getNodeStatusWithReason(int nodeId) { .orElseGet(() -> NodeStatus.Unknown.getStatus() + "(NoHeartbeat)"); } + /** + * @return The raw {@code statusReason} string for {@code nodeId}, or {@code null} when no cache + * entry exists yet or no reason has been reported. + */ + public String getNodeStatusReason(int nodeId) { + return Optional.ofNullable(nodeCacheMap.get(nodeId)) + .map(BaseNodeCache::getStatusReason) + .orElse(null); + } + /** * Get all Node's current status with reason. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java index 4eeb96344ccd1..434d5bbf3fe25 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java @@ -66,4 +66,11 @@ public String getNodeStatusWithReason() { ? statistics.getStatus().getStatus() : statistics.getStatus().getStatus() + "(" + statistics.getStatusReason() + ")"; } + + /** + * @return The raw reason string (may be {@code null}) accompanying the current NodeStatus. + */ + public String getStatusReason() { + return ((NodeStatistics) currentStatistics.get()).getStatusReason(); + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java index be833ac537caa..018f31f336e08 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java @@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.cluster.DiskChecker; +import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; @@ -44,6 +45,7 @@ import org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache; import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatReq; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq; @@ -51,7 +53,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.Future; @@ -148,18 +152,57 @@ private void heartbeatLoopBody() { genHeartbeatReq(), getNodeManager().getRegisteredDataNodes()); // Send heartbeat requests to all the registered AINodes pingRegisteredAINodes(genAIHeartbeatReq(), getNodeManager().getRegisteredAINodes()); - // Self-check disk health on the same cadence DataNode samples its load. - // Runs after the async heartbeat dispatches so the (blocking) probe IO - // does not delay fanout. Followers run the same check on receive. + // Sample free-space on the same cadence DataNode samples its load. Runs after + // the async heartbeat dispatches so the OS call does not delay fanout. DiskCrash + // is observed passively by the Ratis write-path, not polled here. if (iterationIndex % LOAD_SAMPLING_INTERVAL == 0) { - DiskChecker.checkAndApply( + DiskChecker.checkFreeRatioAndApply( ConfigNodeDescriptor.getInstance().getConf().getCriticalDirs(), CommonDescriptor.getInstance().getConfig().getDiskSpaceWarningThreshold()); + reconcileConfigNodePeerPriorities(); } } }); } + /** + * Push Ratis peer priorities to reflect each ConfigNode's current {@link NodeStatus} (see {@link + * NodeStatus#priorityForStatus}). Only fires on the leader; only acts when at least one peer's + * desired priority differs from the live group configuration. {@code Unknown}/{@code Removing} + * peers and non-disk {@code ReadOnly} reasons are left untouched so transient blips and + * operator-driven states cannot churn the group config. + */ + private void reconcileConfigNodePeerPriorities() { + Map desired = new HashMap<>(); + for (TConfigNodeLocation peer : getNodeManager().getRegisteredConfigNodes()) { + int nodeId = peer.getConfigNodeId(); + NodeStatus status; + String reason; + if (nodeId == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) { + // The leader's own cache entry mirrors CommonConfig; reading from CommonConfig directly + // sidesteps any ordering between the local disk-check and the next cache refresh. + status = CommonDescriptor.getInstance().getConfig().getNodeStatus(); + reason = CommonDescriptor.getInstance().getConfig().getStatusReason(); + } else { + status = loadCache.getNodeStatus(nodeId); + reason = loadCache.getNodeStatusReason(nodeId); + } + NodeStatus.priorityForStatus(status, reason) + .ifPresent(priority -> desired.put(nodeId, priority)); + } + if (desired.isEmpty()) { + return; + } + try { + configManager + .getConsensusManager() + .getConsensusImpl() + .reconfigurePeerPriorities(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID, desired); + } catch (ConsensusException e) { + LOGGER.warn(ManagerMessages.RECONFIGURE_PEER_PRIORITIES_FAILED, desired, e); + } + } + protected TDataNodeHeartbeatReq genHeartbeatReq() { /* Generate heartbeat request */ TDataNodeHeartbeatReq heartbeatReq = new TDataNodeHeartbeatReq(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index e312040fca835..095d03443edf4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -1091,10 +1091,10 @@ public TRegionRouteMapResp getLatestRegionRouteMap() { public TConfigNodeHeartbeatResp getConfigNodeHeartBeat(TConfigNodeHeartbeatReq heartbeatReq) { TConfigNodeHeartbeatResp resp = new TConfigNodeHeartbeatResp(); resp.setTimestamp(heartbeatReq.getTimestamp()); - // Sample disk health on the same cadence DataNode samples its load. The leader runs the - // equivalent self-check in its HeartbeatService loop. + // Sample free-space on the same cadence DataNode samples its load. DiskCrash is observed + // passively from the Ratis write-path on this node, not polled here. if (heartbeatReceivedCounter.getAndIncrement() % HeartbeatService.LOAD_SAMPLING_INTERVAL == 0) { - DiskChecker.checkAndApply( + DiskChecker.checkFreeRatioAndApply( configNodeConfig.getCriticalDirs(), commonConfig.getDiskSpaceWarningThreshold()); } resp.setStatus(commonConfig.getNodeStatus().getStatus()); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java index c462aa3a046e1..20b2af0d1ba24 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java @@ -167,6 +167,24 @@ public interface IConsensus { */ void resetPeerList(ConsensusGroupId groupId, List correctPeers) throws ConsensusException; + /** + * Adjust the leader-election priority of one or more peers in {@code groupId}. + * + *

Implementations that have no concept of per-peer priority (e.g. Simple, IoT consensus) + * should leave the default no-op. The Ratis implementation rewrites the group configuration so a + * peer with a lower priority is less likely to win subsequent elections — this is the lever the + * cluster uses to demote a {@code ReadOnly} ConfigNode. + * + * @param groupId the consensus group whose peer priorities should be updated + * @param nodeIdToPriority desired priorities keyed by node id; peers not in the map keep their + * current priority + * @throws ConsensusException if the underlying reconfiguration fails + */ + default void reconfigurePeerPriorities( + ConsensusGroupId groupId, Map nodeIdToPriority) throws ConsensusException { + // no-op default + } + // management API /** diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java index 1134d8fd6f206..9eda0b4b27cc3 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java @@ -69,17 +69,20 @@ public class ApplicationStateMachineProxy extends BaseStateMachine { private final RaftGroupId groupId; private final TConsensusGroupType consensusGroupType; private final BiConsumer leaderChangeListener; + private final BiConsumer diskFailureListener; ApplicationStateMachineProxy(IStateMachine stateMachine, RaftGroupId id) { - this(stateMachine, id, null); + this(stateMachine, id, null, null); } ApplicationStateMachineProxy( IStateMachine stateMachine, RaftGroupId id, - BiConsumer onLeaderChanged) { + BiConsumer onLeaderChanged, + BiConsumer onDiskFailure) { this.applicationStateMachine = stateMachine; this.leaderChangeListener = onLeaderChanged; + this.diskFailureListener = onDiskFailure; this.groupId = id; snapshotStorage = new SnapshotStorage(applicationStateMachine, groupId); consensusGroupType = Utils.getConsensusGroupTypeFromPrefix(groupId.toString()); @@ -156,6 +159,9 @@ public CompletableFuture applyTransaction(TransactionContext trx) { break; } catch (Throwable rte) { logger.error(RatisMessages.STATEMACHINE_RUNTIME_EXCEPTION, rte); + if (Utils.isDiskFailure(rte) && diskFailureListener != null) { + diskFailureListener.accept(groupId, rte); + } ret = new ResponseMessage( new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index 372b45b8afc6f..df7836336997b 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.client.IClientPoolFactory; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.property.ClientPoolProperty; +import org.apache.iotdb.commons.cluster.DiskChecker; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.request.IConsensusRequest; import org.apache.iotdb.commons.service.metric.MetricService; @@ -234,7 +235,8 @@ public RatisConsensus(ConsensusConfig config, IStateMachine.Registry registry) { registry.apply( Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId)), raftGroupId, - this::onLeaderChanged)) + this::onLeaderChanged, + this::onDiskFailure)) .build()); } @@ -329,9 +331,9 @@ public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request) throw new ConsensusGroupNotExistException(groupId); } - // current Peer is group leader and in ReadOnly State - // We only judge dataRegions here, because schema write when readOnly is handled at - // RegionWriteExecutor + // Current peer is group leader and in ReadOnly state. SchemaRegion writes are gated at + // RegionWriteExecutor; here we step down DataRegion and ConfigRegion leaders so that a + // healthy peer can take over and continue serving writes. if (isLeader(groupId) && Utils.rejectWrite(consensusGroupType)) { try { forceStepDownLeader(raftGroup); @@ -369,6 +371,7 @@ && waitUntilLeaderReady(raftGroupId)) { } catch (GroupMismatchException e) { throw new ConsensusGroupNotExistException(groupId); } catch (Exception e) { + maybeReportDiskFailure(raftGroupId, e); throw new RatisRequestFailedException(e); } } @@ -386,6 +389,7 @@ && waitUntilLeaderReady(raftGroupId)) { } catch (GroupMismatchException e) { throw new ConsensusGroupNotExistException(groupId); } catch (Exception e) { + maybeReportDiskFailure(raftGroupId, e); throw new RatisRequestFailedException(e); } @@ -964,6 +968,39 @@ private RatisClient getRaftClient(RaftGroup group) throws ClientManagerException } } + @Override + public void reconfigurePeerPriorities( + ConsensusGroupId groupId, Map nodeIdToPriority) throws ConsensusException { + if (nodeIdToPriority == null || nodeIdToPriority.isEmpty()) { + return; + } + RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId); + RaftGroup group = getGroupInfo(raftGroupId); + if (group == null || !group.getPeers().contains(myself)) { + throw new ConsensusGroupNotExistException(groupId); + } + boolean changed = false; + List newPeers = new ArrayList<>(group.getPeers().size()); + for (RaftPeer p : group.getPeers()) { + Integer desired = nodeIdToPriority.get(Utils.fromRaftPeerIdToNodeId(p.getId())); + if (desired == null || desired == p.getPriority()) { + newPeers.add(p); + continue; + } + newPeers.add( + RaftPeer.newBuilder() + .setId(p.getId()) + .setAddress(p.getAddress()) + .setPriority(desired) + .build()); + changed = true; + } + if (!changed) { + return; + } + sendReconfiguration(RaftGroup.valueOf(raftGroupId, newPeers)); + } + private RatisClient getConfigurationRaftClient(RaftGroup group) throws ClientManagerException { try { return reconfigurationClientManager.borrowClient(group); @@ -990,6 +1027,26 @@ private RaftClientReply sendReconfiguration(RaftGroup newGroupConf) return reply; } + /** + * Called from the state machine apply path and from the write-path catch blocks when a Ratis + * operation surfaces a disk-level error (see {@link Utils#isDiskFailure(Throwable)}). Drives the + * node to {@code ReadOnly(DiskCrash)} via {@link DiskChecker#apply}, which encapsulates the + * priority rules (DiskCrash wins over DiskFull, etc.). + */ + private void onDiskFailure(RaftGroupId groupId, Throwable cause) { + logger.error( + "Disk failure observed in Ratis group {}; marking node ReadOnly(DiskCrash).", + groupId, + cause); + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + } + + private void maybeReportDiskFailure(RaftGroupId groupId, Throwable cause) { + if (Utils.isDiskFailure(cause)) { + onDiskFailure(groupId, cause); + } + } + private void onLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId leaderId) { Optional.ofNullable(canServeStaleRead) .ifPresent( diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java index 2c24ff12b6d36..67120caadc2c7 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java @@ -230,7 +230,10 @@ public static TConsensusGroupType getConsensusGroupTypeFromPrefix(String prefix) } public static boolean rejectWrite(TConsensusGroupType type) { - return type == TConsensusGroupType.DataRegion && config.isReadOnly(); + // SchemaRegion writes are gated by RegionWriteExecutor at a higher level, so we only need + // to short-circuit DataRegion and ConfigRegion Ratis writes here. + return (type == TConsensusGroupType.DataRegion || type == TConsensusGroupType.ConfigRegion) + && config.isReadOnly(); } /** @@ -240,7 +243,30 @@ public static boolean rejectWrite(TConsensusGroupType type) { * still allow statemachine to apply while rejecting new client write requests. */ public static boolean stallApply(TConsensusGroupType type) { - return type == TConsensusGroupType.DataRegion && config.isReadOnly() && !config.isStopping(); + return (type == TConsensusGroupType.DataRegion || type == TConsensusGroupType.ConfigRegion) + && config.isReadOnly() + && !config.isStopping(); + } + + /** + * Treat a throwable (including any wrapped cause) as a disk-level failure when it carries an + * {@link java.io.IOError} or a {@link java.nio.file.FileSystemException}. Both are concrete + * filesystem-layer signals: {@code IOError} is thrown by the JVM for unrecoverable storage + * errors, and {@code FileSystemException} surfaces I/O syscalls failing on a specific file. Plain + * {@link java.io.IOException} is intentionally excluded — it is also raised by network code and + * would otherwise misclassify transient connectivity failures as disk crashes. + */ + public static boolean isDiskFailure(Throwable t) { + for (Throwable cur = t; cur != null; cur = cur.getCause()) { + if (cur instanceof java.io.IOError || cur instanceof java.nio.file.FileSystemException) { + return true; + } + // Guard against pathological self-referencing cause chains + if (cur.getCause() == cur) { + break; + } + } + return false; } /** return the max wait duration for retry */ diff --git a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java index 462f01f5f34da..f75daed4b2732 100644 --- a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java +++ b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java @@ -42,8 +42,6 @@ public final class CommonMessages { "Free disk space ratio is below the configured threshold; set node status to ReadOnly(DiskFull)."; public static final String DISK_CRASH_SET_READ_ONLY = "Detected unwritable disk directory; set node status to ReadOnly(DiskCrash)."; - public static final String DISK_CRASH_PROBE_FAILED = - "Disk health probe write failed for directory {}."; public static final String DISK_RECOVERED_SET_RUNNING = "Disk health recovered (previous reason: {}); set node status to Running."; diff --git a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java index dfe57a5f9c60c..1b8b639affed0 100644 --- a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java +++ b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java @@ -41,8 +41,6 @@ public final class CommonMessages { "磁盘剩余空间比例低于配置阈值,将节点状态设为 ReadOnly(DiskFull)。"; public static final String DISK_CRASH_SET_READ_ONLY = "检测到不可写的磁盘目录,将节点状态设为 ReadOnly(DiskCrash)。"; - public static final String DISK_CRASH_PROBE_FAILED = - "对目录 {} 进行磁盘健康探测时写入失败。"; public static final String DISK_RECOVERED_SET_RUNNING = "磁盘健康已恢复(先前原因:{}),将节点状态设为 Running。"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DiskChecker.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DiskChecker.java index 4632ddf28dd59..682f9ad4a97a9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DiskChecker.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DiskChecker.java @@ -27,15 +27,21 @@ import org.slf4j.LoggerFactory; import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.List; /** - * Shared utility used by both ConfigNode and DataNode to evaluate the health of a set of critical - * directories and, optionally, drive the global {@link NodeStatus} on {@link CommonConfig}. + * Shared utility that drives the global {@link NodeStatus} on {@link CommonConfig} based on disk + * health signals. + * + *

Two sources feed it: + * + *

    + *
  • {@link #checkFreeRatioAndApply} polls usable / total space across critical directories and + * sets {@code ReadOnly(DISK_FULL)} when the ratio drops below a threshold. + *
  • {@link #apply}{@code (DISK_CRASH)} is called from passive failure observers — Ratis + * write-path catch blocks on ConfigNode and {@code FolderManager.ABNORMAL} aggregation on + * DataNode — when a real write IO error has already occurred. + *
* *

Only transitions between {@link NodeStatus#Running} and {@link NodeStatus#ReadOnly} with * reason {@link NodeStatus#DISK_FULL}/{@link NodeStatus#DISK_CRASH} are managed here. Other @@ -45,10 +51,6 @@ public class DiskChecker { private static final Logger LOGGER = LoggerFactory.getLogger(DiskChecker.class); - private static final byte[] PROBE_PAYLOAD = new byte[] {0x01}; - private static final String PROBE_PREFIX = "iotdb-disk-probe-"; - private static final String PROBE_SUFFIX = ".tmp"; - public enum DiskStatus { NORMAL, DISK_FULL, @@ -58,53 +60,39 @@ public enum DiskStatus { private DiskChecker() {} /** - * Evaluate the supplied directories. A single unwritable directory yields {@link - * DiskStatus#DISK_CRASH}; any directory whose usable/total ratio is below the threshold yields - * {@link DiskStatus#DISK_FULL} (when no crash is detected); otherwise {@link DiskStatus#NORMAL}. + * Evaluate the usable/total space ratio of each directory. Any directory whose ratio is below + * {@code freeRatioThreshold} yields {@link DiskStatus#DISK_FULL}; otherwise {@link + * DiskStatus#NORMAL}. This method never returns {@link DiskStatus#DISK_CRASH} — crash detection + * is driven by the Ratis write-path observer on ConfigNode and by {@code FolderManager} on + * DataNode, both of which call {@link #apply} directly. */ - public static DiskStatus check(List dirs, double freeRatioThreshold) { + public static DiskStatus checkFreeRatio(List dirs, double freeRatioThreshold) { if (dirs == null || dirs.isEmpty()) { return DiskStatus.NORMAL; } - boolean anyFull = false; for (String dir : dirs) { if (dir == null || dir.isEmpty()) { continue; } File f = new File(dir); - if (!f.isDirectory()) { - LOGGER.warn(CommonMessages.DISK_CRASH_PROBE_FAILED, dir); - return DiskStatus.DISK_CRASH; - } - try { - Path probe = Files.createTempFile(Paths.get(dir), PROBE_PREFIX, PROBE_SUFFIX); - try { - Files.write(probe, PROBE_PAYLOAD); - } finally { - Files.deleteIfExists(probe); - } - } catch (IOException e) { - LOGGER.warn(CommonMessages.DISK_CRASH_PROBE_FAILED, dir, e); - return DiskStatus.DISK_CRASH; - } long total = f.getTotalSpace(); long usable = f.getUsableSpace(); if (total > 0 && (double) usable / total < freeRatioThreshold) { - anyFull = true; + return DiskStatus.DISK_FULL; } } - return anyFull ? DiskStatus.DISK_FULL : DiskStatus.NORMAL; + return DiskStatus.NORMAL; } - /** - * Run {@link #check} and apply the result to {@link CommonConfig}. See class javadoc for - * transition rules. - */ - public static void checkAndApply(List dirs, double freeRatioThreshold) { - apply(check(dirs, freeRatioThreshold)); + /** Convenience: run {@link #checkFreeRatio} and apply the result to {@link CommonConfig}. */ + public static void checkFreeRatioAndApply(List dirs, double freeRatioThreshold) { + apply(checkFreeRatio(dirs, freeRatioThreshold)); } - /** Visible for tests; package-public callers should prefer {@link #checkAndApply}. */ + /** + * Apply a precomputed status to {@link CommonConfig}. Priority is {@code DiskCrash > DiskFull > + * Normal}; recovery to {@code Running} only fires when the active reason was disk-related. + */ public static void apply(DiskStatus result) { CommonConfig config = CommonDescriptor.getInstance().getConfig(); NodeStatus currentStatus = config.getNodeStatus(); @@ -135,7 +123,9 @@ public static void apply(DiskStatus result) { break; case NORMAL: default: - if (currentlyFull || currentlyCrash) { + // DiskCrash is sticky — only a restart clears it. The free-ratio probe can recover + // DiskFull alone because free-space reappearing is the literal inverse of running low. + if (currentlyFull) { LOGGER.info(CommonMessages.DISK_RECOVERED_SET_RUNNING, currentReason); config.setNodeStatus(NodeStatus.Running); config.setStatusReason(null); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java index e37b44edafda6..ff64a666f98e4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java @@ -21,6 +21,8 @@ import org.apache.iotdb.commons.i18n.CommonMessages; +import java.util.OptionalInt; + /** Node status for showing cluster */ public enum NodeStatus { /** Node running properly */ @@ -61,6 +63,35 @@ public static boolean isNormalStatus(NodeStatus status) { return status != null && status.equals(NodeStatus.Running); } + /** + * Map a node's {@link NodeStatus} (and optional reason) to the Ratis peer priority that should + * govern its candidacy in leader elections. + * + *

+   *   Running                          →   0   (full candidate)
+   *   ReadOnly + {@link #DISK_FULL}    →  -1   (out-rank healthy peers but ahead of crashed)
+   *   ReadOnly + {@link #DISK_CRASH}   →  -2   (most degraded — last choice)
+   *   anything else                    →  empty (priority must not be changed)
+   * 
+ * + * Returning {@link OptionalInt#empty()} for Unknown/Removing/manual ReadOnly keeps transient + * blips and operator-driven states from rewriting peer priorities. + */ + public static OptionalInt priorityForStatus(NodeStatus status, String statusReason) { + if (Running.equals(status)) { + return OptionalInt.of(0); + } + if (ReadOnly.equals(status)) { + if (DISK_CRASH.equals(statusReason)) { + return OptionalInt.of(-2); + } + if (DISK_FULL.equals(statusReason)) { + return OptionalInt.of(-1); + } + } + return OptionalInt.empty(); + } + public static boolean isReadable(NodeStatus status) { switch (status) { case Running: diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java index b38ce7bfc8b01..d6d16a986eae4 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java @@ -29,11 +29,11 @@ import org.junit.rules.TemporaryFolder; import java.io.File; +import java.util.Arrays; import java.util.Collections; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; public class DiskCheckerTest { @@ -58,57 +58,41 @@ public void tearDown() { config.setStatusReason(savedReason); } + // -- checkFreeRatio ------------------------------------------------------------------------ + @Test - public void checkReturnsNormalForWritableDirectoryWithSpace() throws Exception { + public void checkFreeRatioReturnsNormalWhenRatioMeetsThreshold() throws Exception { File dir = tmp.newFolder(); - // threshold 0.0 → any positive usable space passes assertEquals( DiskChecker.DiskStatus.NORMAL, - DiskChecker.check(Collections.singletonList(dir.getAbsolutePath()), 0.0)); + DiskChecker.checkFreeRatio(Collections.singletonList(dir.getAbsolutePath()), 0.0)); } @Test - public void checkReturnsDiskFullWhenBelowThreshold() throws Exception { + public void checkFreeRatioReturnsDiskFullWhenAnyDirBelowThreshold() throws Exception { File dir = tmp.newFolder(); - // threshold > 1 forces every directory to be reported as full + // threshold above 1.0 forces any directory with positive total space to register as full assertEquals( DiskChecker.DiskStatus.DISK_FULL, - DiskChecker.check(Collections.singletonList(dir.getAbsolutePath()), 2.0)); + DiskChecker.checkFreeRatio(Collections.singletonList(dir.getAbsolutePath()), 2.0)); } @Test - public void checkReturnsDiskCrashWhenDirectoryMissing() { - String missing = new File(tmp.getRoot(), "does-not-exist").getAbsolutePath(); + public void checkFreeRatioSkipsNullAndEmptyEntries() throws Exception { + File dir = tmp.newFolder(); assertEquals( - DiskChecker.DiskStatus.DISK_CRASH, - DiskChecker.check(Collections.singletonList(missing), 0.0)); + DiskChecker.DiskStatus.NORMAL, + DiskChecker.checkFreeRatio(Arrays.asList(null, "", dir.getAbsolutePath()), 0.0)); } @Test - public void checkReturnsDiskCrashWhenPathIsAFile() throws Exception { - File file = tmp.newFile(); + public void emptyDirListIsNormal() { assertEquals( - DiskChecker.DiskStatus.DISK_CRASH, - DiskChecker.check(Collections.singletonList(file.getAbsolutePath()), 0.0)); + DiskChecker.DiskStatus.NORMAL, DiskChecker.checkFreeRatio(Collections.emptyList(), 1.0)); + assertEquals(DiskChecker.DiskStatus.NORMAL, DiskChecker.checkFreeRatio(null, 1.0)); } - @Test - public void checkPrioritizesCrashOverFull() throws Exception { - File healthy = tmp.newFolder(); - String missing = new File(tmp.getRoot(), "missing").getAbsolutePath(); - // Even with a "full" threshold the missing dir trumps it. - assertEquals( - DiskChecker.DiskStatus.DISK_CRASH, - DiskChecker.check(java.util.Arrays.asList(healthy.getAbsolutePath(), missing), 2.0)); - } - - @Test - public void checkSkipsNullAndEmptyEntries() throws Exception { - File dir = tmp.newFolder(); - assertEquals( - DiskChecker.DiskStatus.NORMAL, - DiskChecker.check(java.util.Arrays.asList(null, "", dir.getAbsolutePath()), 0.0)); - } + // -- apply() state machine ----------------------------------------------------------------- @Test public void applyDiskFullSetsReadOnlyFromRunning() { @@ -155,12 +139,14 @@ public void applyNormalRecoversFromDiskFull() { } @Test - public void applyNormalRecoversFromDiskCrash() { + public void applyNormalDoesNotRecoverFromDiskCrash() { DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); DiskChecker.apply(DiskChecker.DiskStatus.NORMAL); CommonConfig config = CommonDescriptor.getInstance().getConfig(); - assertEquals(NodeStatus.Running, config.getNodeStatus()); - assertNull(config.getStatusReason()); + // DiskCrash is sticky: a free-ratio probe (the only source of NORMAL) cannot prove writes + // work again, so the node stays ReadOnly(DiskCrash) until restart. + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals(NodeStatus.DISK_CRASH, config.getStatusReason()); } @Test @@ -189,35 +175,32 @@ public void applyIsIdempotentForRepeatedDiskCrash() { assertEquals(reasonBefore, CommonDescriptor.getInstance().getConfig().getStatusReason()); } - @Test - public void checkAndApplyDrivesStatusEndToEnd() throws Exception { - File healthy = tmp.newFolder(); - DiskChecker.checkAndApply(Collections.singletonList(healthy.getAbsolutePath()), 0.0); - assertEquals(NodeStatus.Running, CommonDescriptor.getInstance().getConfig().getNodeStatus()); + // -- checkFreeRatioAndApply ---------------------------------------------------------------- - String missing = new File(tmp.getRoot(), "still-missing").getAbsolutePath(); - DiskChecker.checkAndApply(Collections.singletonList(missing), 0.0); + @Test + public void checkFreeRatioAndApplyDrivesStatusEndToEnd() throws Exception { + File dir = tmp.newFolder(); + // Threshold above 1.0 -> always "DiskFull" + DiskChecker.checkFreeRatioAndApply(Collections.singletonList(dir.getAbsolutePath()), 2.0); assertEquals(NodeStatus.ReadOnly, CommonDescriptor.getInstance().getConfig().getNodeStatus()); assertEquals( - NodeStatus.DISK_CRASH, CommonDescriptor.getInstance().getConfig().getStatusReason()); + NodeStatus.DISK_FULL, CommonDescriptor.getInstance().getConfig().getStatusReason()); - DiskChecker.checkAndApply(Collections.singletonList(healthy.getAbsolutePath()), 0.0); + // Threshold 0.0 -> always "Normal" -> recovery + DiskChecker.checkFreeRatioAndApply(Collections.singletonList(dir.getAbsolutePath()), 0.0); assertEquals(NodeStatus.Running, CommonDescriptor.getInstance().getConfig().getNodeStatus()); assertNull(CommonDescriptor.getInstance().getConfig().getStatusReason()); } @Test - public void emptyDirListIsNormal() { - assertEquals(DiskChecker.DiskStatus.NORMAL, DiskChecker.check(Collections.emptyList(), 1.0)); - assertEquals(DiskChecker.DiskStatus.NORMAL, DiskChecker.check(null, 1.0)); - } - - @Test - public void smokeProbeFileIsDeleted() throws Exception { + public void checkFreeRatioAndApplyDoesNotClearDiskCrash() throws Exception { File dir = tmp.newFolder(); - DiskChecker.check(Collections.singletonList(dir.getAbsolutePath()), 0.0); - File[] leftovers = dir.listFiles(); - assertTrue( - "Disk probe should clean up its temp file", leftovers == null || leftovers.length == 0); + // Simulate a Ratis-passive DiskCrash signal. + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + // Subsequent healthy free-ratio polling must keep the node in ReadOnly(DiskCrash). + DiskChecker.checkFreeRatioAndApply(Collections.singletonList(dir.getAbsolutePath()), 0.0); + assertEquals(NodeStatus.ReadOnly, CommonDescriptor.getInstance().getConfig().getNodeStatus()); + assertEquals( + NodeStatus.DISK_CRASH, CommonDescriptor.getInstance().getConfig().getStatusReason()); } } From 999ec31b4a9a0d9f4d599ef87fc7fb9c3744d55a Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Wed, 20 May 2026 17:32:42 +0800 Subject: [PATCH 4/5] Drive ConfigNode peer priorities from the EventService instead of HeartbeatService MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - TConfigNodeHeartbeatReq carries needSamplingLoad; ConfigNodeRPCServiceProcessor gates its disk-health check on that flag instead of a local counter, matching the DataNode/AINode heartbeat shape. - HeartbeatService now bumps heartbeatCounter exactly once at the top of heartbeatLoopBody and threads the iterationIndex into genHeartbeatReq / genConfigNodeHeartbeatReq / genAIHeartbeatReq / addConfigNodeLocationsToReq; the gen methods no longer touch the counter. - Cross-peer priority reconciliation moved out of HeartbeatService into a new ConfigRegionPriorityBalancer that implements IClusterStatusSubscriber. It is registered on the EventService alongside RouteBalancer/TopologyService and only reacts when NodeStatisticsChangeEvent reports a transition whose priority bucket actually moved (filtered to ConfigNode peers, gated on isLeader). - RatisConsensus.reconfigurePeerPriorities is now a mechanical merge — it rebuilds the peer list from the requested priorities and unconditionally calls sendReconfiguration. Decisions about "did the priority change" live in the balancer, not at the consensus layer. --- .../confignode/manager/load/LoadManager.java | 2 + .../ConfigRegionPriorityBalancer.java | 114 ++++++++++++++++++ .../load/service/HeartbeatService.java | 92 +++++--------- .../thrift/ConfigNodeRPCServiceProcessor.java | 12 +- .../iotdb/consensus/ratis/RatisConsensus.java | 24 ++-- .../src/main/thrift/confignode.thrift | 3 + 6 files changed, 162 insertions(+), 85 deletions(-) create mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/ConfigRegionPriorityBalancer.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index e97f32bdbda85..1ef9100e40a2d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -34,6 +34,7 @@ import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException; import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException; import org.apache.iotdb.confignode.manager.IManager; +import org.apache.iotdb.confignode.manager.load.balancer.ConfigRegionPriorityBalancer; import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer; import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer; import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer; @@ -90,6 +91,7 @@ public LoadManager(IManager configManager) { this.eventService.register(configManager.getPipeManager().getPipeRuntimeCoordinator()); this.eventService.register(routeBalancer); this.eventService.register(topologyService); + this.eventService.register(new ConfigRegionPriorityBalancer(configManager)); } protected void setHeartbeatService(IManager configManager, LoadCache loadCache) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/ConfigRegionPriorityBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/ConfigRegionPriorityBalancer.java new file mode 100644 index 0000000000000..dc9d1d5b0fdcb --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/ConfigRegionPriorityBalancer.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer; + +import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.confignode.i18n.ManagerMessages; +import org.apache.iotdb.confignode.manager.IManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; +import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; +import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber; +import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent; +import org.apache.iotdb.consensus.exception.ConsensusException; + +import org.apache.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.OptionalInt; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Reacts to {@link NodeStatisticsChangeEvent} on the ConfigNode leader and pushes Ratis peer + * priorities for the ConfigRegion so candidacy in leader elections reflects the latest {@link + * NodeStatus} of every ConfigNode (see {@link NodeStatus#priorityForStatus}). + * + *

Only fires when this ConfigNode is the ConfigRegion leader, since {@code setConfiguration} + * must be applied through the leader. Filters events to ConfigNode peers (DataNode/AINode entries + * are skipped) and accumulates only transitions whose priority bucket actually moves before issuing + * a single batched reconfiguration call. + */ +public class ConfigRegionPriorityBalancer implements IClusterStatusSubscriber { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRegionPriorityBalancer.class); + + private final IManager configManager; + + public ConfigRegionPriorityBalancer(IManager configManager) { + this.configManager = configManager; + } + + @Override + public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) { + if (!configManager.getConsensusManager().isLeader()) { + return; + } + Set configNodeIds = + configManager.getNodeManager().getRegisteredConfigNodes().stream() + .map(TConfigNodeLocation::getConfigNodeId) + .collect(Collectors.toSet()); + + Map desired = new HashMap<>(); + for (Map.Entry> entry : + event.getDifferentNodeStatisticsMap().entrySet()) { + int nodeId = entry.getKey(); + if (!configNodeIds.contains(nodeId)) { + continue; + } + NodeStatistics previous = entry.getValue().getLeft(); + NodeStatistics current = entry.getValue().getRight(); + if (current == null) { + // Node disappeared from the cache; peer removal flows through addConfigNodePeer / + // removeConfigNodePeer instead — no priority push to issue here. + continue; + } + OptionalInt newPriority = + NodeStatus.priorityForStatus(current.getStatus(), current.getStatusReason()); + if (!newPriority.isPresent()) { + // Transient (Unknown / Removing / manual ReadOnly) — leave the priority as-is. + continue; + } + OptionalInt oldPriority = + previous == null + ? OptionalInt.empty() + : NodeStatus.priorityForStatus(previous.getStatus(), previous.getStatusReason()); + if (oldPriority.isPresent() && oldPriority.getAsInt() == newPriority.getAsInt()) { + // The status moved but the priority bucket did not — no point churning the group config. + continue; + } + desired.put(nodeId, newPriority.getAsInt()); + } + if (desired.isEmpty()) { + return; + } + try { + configManager + .getConsensusManager() + .getConsensusImpl() + .reconfigurePeerPriorities(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID, desired); + } catch (ConsensusException e) { + LOGGER.warn(ManagerMessages.RECONFIGURE_PEER_PRIORITIES_FAILED, desired, e); + } + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java index 018f31f336e08..915fbe9d12047 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java @@ -25,7 +25,6 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.cluster.DiskChecker; -import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; @@ -45,7 +44,6 @@ import org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache; import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatReq; -import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq; @@ -53,9 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.Future; @@ -141,69 +137,36 @@ private void heartbeatLoopBody() { .ifPresent( consensusManager -> { if (getConsensusManager().isLeader()) { - // Snapshot the counter before genHeartbeatReq increments it so the leader's - // own sampling fires on the same iterations DataNode load sampling does. - long iterationIndex = heartbeatCounter.get(); + // The counter is bumped exactly once per loop iteration so every gen* call in + // this body observes the same value, and the leader's own sampling fires on the + // same iterations DataNode/AINode load sampling does. + long iterationIndex = heartbeatCounter.getAndIncrement(); // Send heartbeat requests to all the registered ConfigNodes pingRegisteredConfigNodes( - genConfigNodeHeartbeatReq(), getNodeManager().getRegisteredConfigNodes()); + genConfigNodeHeartbeatReq(iterationIndex), + getNodeManager().getRegisteredConfigNodes()); // Send heartbeat requests to all the registered DataNodes pingRegisteredDataNodes( - genHeartbeatReq(), getNodeManager().getRegisteredDataNodes()); + genHeartbeatReq(iterationIndex), + getNodeManager().getRegisteredDataNodes(), + iterationIndex); // Send heartbeat requests to all the registered AINodes - pingRegisteredAINodes(genAIHeartbeatReq(), getNodeManager().getRegisteredAINodes()); + pingRegisteredAINodes( + genAIHeartbeatReq(iterationIndex), getNodeManager().getRegisteredAINodes()); // Sample free-space on the same cadence DataNode samples its load. Runs after // the async heartbeat dispatches so the OS call does not delay fanout. DiskCrash - // is observed passively by the Ratis write-path, not polled here. + // is observed passively by the Ratis write-path, not polled here. Cross-peer + // priority reconciliation is event-driven (see ConfigRegionPriorityBalancer). if (iterationIndex % LOAD_SAMPLING_INTERVAL == 0) { DiskChecker.checkFreeRatioAndApply( ConfigNodeDescriptor.getInstance().getConf().getCriticalDirs(), CommonDescriptor.getInstance().getConfig().getDiskSpaceWarningThreshold()); - reconcileConfigNodePeerPriorities(); } } }); } - /** - * Push Ratis peer priorities to reflect each ConfigNode's current {@link NodeStatus} (see {@link - * NodeStatus#priorityForStatus}). Only fires on the leader; only acts when at least one peer's - * desired priority differs from the live group configuration. {@code Unknown}/{@code Removing} - * peers and non-disk {@code ReadOnly} reasons are left untouched so transient blips and - * operator-driven states cannot churn the group config. - */ - private void reconcileConfigNodePeerPriorities() { - Map desired = new HashMap<>(); - for (TConfigNodeLocation peer : getNodeManager().getRegisteredConfigNodes()) { - int nodeId = peer.getConfigNodeId(); - NodeStatus status; - String reason; - if (nodeId == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) { - // The leader's own cache entry mirrors CommonConfig; reading from CommonConfig directly - // sidesteps any ordering between the local disk-check and the next cache refresh. - status = CommonDescriptor.getInstance().getConfig().getNodeStatus(); - reason = CommonDescriptor.getInstance().getConfig().getStatusReason(); - } else { - status = loadCache.getNodeStatus(nodeId); - reason = loadCache.getNodeStatusReason(nodeId); - } - NodeStatus.priorityForStatus(status, reason) - .ifPresent(priority -> desired.put(nodeId, priority)); - } - if (desired.isEmpty()) { - return; - } - try { - configManager - .getConsensusManager() - .getConsensusImpl() - .reconfigurePeerPriorities(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID, desired); - } catch (ConsensusException e) { - LOGGER.warn(ManagerMessages.RECONFIGURE_PEER_PRIORITIES_FAILED, desired, e); - } - } - - protected TDataNodeHeartbeatReq genHeartbeatReq() { + protected TDataNodeHeartbeatReq genHeartbeatReq(long iterationIndex) { /* Generate heartbeat request */ TDataNodeHeartbeatReq heartbeatReq = new TDataNodeHeartbeatReq(); heartbeatReq.setHeartbeatTimestamp(System.nanoTime()); @@ -214,7 +177,7 @@ protected TDataNodeHeartbeatReq genHeartbeatReq() { .getLogicalClock(ConfigNodeInfo.CONFIG_REGION_ID)); // Always sample RegionGroups' leadership as the Region heartbeat heartbeatReq.setNeedJudgeLeader(true); - heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % LOAD_SAMPLING_INTERVAL == 0); + heartbeatReq.setNeedSamplingLoad(iterationIndex % LOAD_SAMPLING_INTERVAL == 0); Pair schemaQuotaRemain = configManager.getClusterSchemaManager().getSchemaQuotaRemain(); heartbeatReq.setTimeSeriesQuotaRemain(schemaQuotaRemain.left); @@ -222,7 +185,7 @@ protected TDataNodeHeartbeatReq genHeartbeatReq() { // We collect pipe meta in every 100 heartbeat loop heartbeatReq.setNeedPipeMetaList( !PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled() - && heartbeatCounter.get() + && iterationIndex % PipeConfig.getInstance() .getPipeHeartbeatIntervalSecondsForCollectingPipeMeta() == 0); @@ -233,18 +196,16 @@ protected TDataNodeHeartbeatReq genHeartbeatReq() { } // We broadcast region operations list every 100 heartbeat loops - if (heartbeatCounter.get() % 100 == 0) { + if (iterationIndex % 100 == 0) { heartbeatReq.setCurrentRegionOperations( configManager.getProcedureManager().getRegionOperationConsensusIds()); } - /* Update heartbeat counter */ - heartbeatCounter.getAndIncrement(); - return heartbeatReq; } - private void addConfigNodeLocationsToReq(int dataNodeId, TDataNodeHeartbeatReq req) { + private void addConfigNodeLocationsToReq( + int dataNodeId, TDataNodeHeartbeatReq req, long iterationIndex) { Set confirmedConfigNodes = loadCache.getConfirmedConfigNodeEndPoints(dataNodeId); Set actualConfigNodes = getNodeManager().getRegisteredConfigNodes().stream() @@ -259,22 +220,23 @@ private void addConfigNodeLocationsToReq(int dataNodeId, TDataNodeHeartbeatReq r 4. At this point, because actualConfigNodes and confirmedConfigNodes are identical, the ConfigNode list is not re-sent to the DataNode. */ if (!actualConfigNodes.equals(confirmedConfigNodes) - || heartbeatCounter.get() % configNodeListPeriodicallySyncInterval == 0) { + || iterationIndex % configNodeListPeriodicallySyncInterval == 0) { req.setConfigNodeEndPoints(actualConfigNodes); } } - protected TConfigNodeHeartbeatReq genConfigNodeHeartbeatReq() { + protected TConfigNodeHeartbeatReq genConfigNodeHeartbeatReq(long iterationIndex) { TConfigNodeHeartbeatReq req = new TConfigNodeHeartbeatReq(); req.setTimestamp(System.nanoTime()); + req.setNeedSamplingLoad(iterationIndex % LOAD_SAMPLING_INTERVAL == 0); return req; } - private TAIHeartbeatReq genAIHeartbeatReq() { + private TAIHeartbeatReq genAIHeartbeatReq(long iterationIndex) { /* Generate heartbeat request */ TAIHeartbeatReq heartbeatReq = new TAIHeartbeatReq(); heartbeatReq.setHeartbeatTimestamp(System.nanoTime()); - heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % LOAD_SAMPLING_INTERVAL == 0); + heartbeatReq.setNeedSamplingLoad(iterationIndex % LOAD_SAMPLING_INTERVAL == 0); return heartbeatReq; } @@ -311,7 +273,9 @@ protected ConfigNodeHeartbeatHandler getConfigNodeHeartbeatHandler(int configNod * @param registeredDataNodes DataNodes that registered in cluster */ private void pingRegisteredDataNodes( - TDataNodeHeartbeatReq heartbeatReq, List registeredDataNodes) { + TDataNodeHeartbeatReq heartbeatReq, + List registeredDataNodes, + long iterationIndex) { // Send heartbeat requests for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) { int dataNodeId = dataNodeInfo.getLocation().getDataNodeId(); @@ -330,7 +294,7 @@ private void pingRegisteredDataNodes( configManager.getClusterSchemaManager()::updateDeviceUsage, configManager.getPipeManager().getPipeRuntimeCoordinator()); configManager.getClusterQuotaManager().updateSpaceQuotaUsage(); - addConfigNodeLocationsToReq(dataNodeId, heartbeatReq); + addConfigNodeLocationsToReq(dataNodeId, heartbeatReq, iterationIndex); AsyncDataNodeHeartbeatClientPool.getInstance() .getDataNodeHeartBeat( dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 095d03443edf4..6518d014d5a25 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -87,7 +87,6 @@ import org.apache.iotdb.confignode.i18n.ConfigNodeMessages; import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; -import org.apache.iotdb.confignode.manager.load.service.HeartbeatService; import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager; import org.apache.iotdb.confignode.persistence.auth.AuthorInfo; import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService; @@ -247,7 +246,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; /** ConfigNodeRPCServer exposes the interface that interacts with the DataNode */ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Iface { @@ -259,9 +257,6 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac protected final ConfigNode configNode; protected final ConfigManager configManager; - /** Counts heartbeats received from the leader to drive disk-health sampling cadence. */ - private final AtomicLong heartbeatReceivedCounter = new AtomicLong(0); - public ConfigNodeRPCServiceProcessor(ConfigManager configManager) { this.commonConfig = CommonDescriptor.getInstance().getConfig(); this.configNodeConfig = ConfigNodeDescriptor.getInstance().getConf(); @@ -1091,9 +1086,10 @@ public TRegionRouteMapResp getLatestRegionRouteMap() { public TConfigNodeHeartbeatResp getConfigNodeHeartBeat(TConfigNodeHeartbeatReq heartbeatReq) { TConfigNodeHeartbeatResp resp = new TConfigNodeHeartbeatResp(); resp.setTimestamp(heartbeatReq.getTimestamp()); - // Sample free-space on the same cadence DataNode samples its load. DiskCrash is observed - // passively from the Ratis write-path on this node, not polled here. - if (heartbeatReceivedCounter.getAndIncrement() % HeartbeatService.LOAD_SAMPLING_INTERVAL == 0) { + // Sample free-space only when the leader flags this heartbeat for load sampling — the same + // gate DataNode/AINode follow. DiskCrash is observed passively from the Ratis write-path + // on this node, not polled here. + if (heartbeatReq.isSetNeedSamplingLoad() && heartbeatReq.isNeedSamplingLoad()) { DiskChecker.checkFreeRatioAndApply( configNodeConfig.getCriticalDirs(), commonConfig.getDiskSpaceWarningThreshold()); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index df7836336997b..22b1c185b6693 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -979,24 +979,22 @@ public void reconfigurePeerPriorities( if (group == null || !group.getPeers().contains(myself)) { throw new ConsensusGroupNotExistException(groupId); } - boolean changed = false; + // Mechanical merge: every peer in nodeIdToPriority gets the requested priority, every other + // peer keeps its current one. Deciding *whether* a priority change is worth pushing is the + // caller's job (see ConfigRegionPriorityBalancer); this method just applies what it is told. List newPeers = new ArrayList<>(group.getPeers().size()); for (RaftPeer p : group.getPeers()) { Integer desired = nodeIdToPriority.get(Utils.fromRaftPeerIdToNodeId(p.getId())); - if (desired == null || desired == p.getPriority()) { + if (desired == null) { newPeers.add(p); - continue; + } else { + newPeers.add( + RaftPeer.newBuilder() + .setId(p.getId()) + .setAddress(p.getAddress()) + .setPriority(desired) + .build()); } - newPeers.add( - RaftPeer.newBuilder() - .setId(p.getId()) - .setAddress(p.getAddress()) - .setPriority(desired) - .build()); - changed = true; - } - if (!changed) { - return; } sendReconfiguration(RaftGroup.valueOf(raftGroupId, newPeers)); } diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 4e7ffc8dcba99..d12fd53525b98 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -518,6 +518,9 @@ struct TConfigNodeHeartbeatReq { 1: required i64 timestamp 2: optional common.TLicense licence 3: optional TActivationControl activationControl + // Mirrors TDataNodeHeartbeatReq.needSamplingLoad — the leader sets this flag once every + // sampling interval so the receiver runs its disk-health check at the same cadence. + 4: optional bool needSamplingLoad } struct TConfigNodeHeartbeatResp { From 6d662fdee18e1a04b6333cf83f7f31eede9f1333 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Wed, 20 May 2026 18:37:55 +0800 Subject: [PATCH 5/5] Fold ConfigNode priority reconcile into EventService; drop iterationIndex threading - HeartbeatService bumps heartbeatCounter once at the tail of heartbeatLoopBody; the gen* methods read heartbeatCounter.get() directly again instead of taking an iterationIndex parameter. - Remove the standalone ConfigRegionPriorityBalancer. Priority reconciliation now lives in EventService, fired from checkAndBroadcastNodeStatisticsChangeEventIfNecessary exactly when ConfigNode statistics change. It is leader-gated, filtered to ConfigNode peers, and only pushes peers whose priority bucket actually moved. EventService now takes the IManager to reach the consensus impl. --- .../confignode/manager/load/LoadManager.java | 4 +- .../ConfigRegionPriorityBalancer.java | 114 ------------------ .../manager/load/service/EventService.java | 62 +++++++++- .../load/service/HeartbeatService.java | 49 +++----- 4 files changed, 80 insertions(+), 149 deletions(-) delete mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/ConfigRegionPriorityBalancer.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index 1ef9100e40a2d..2706226b07578 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -34,7 +34,6 @@ import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException; import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException; import org.apache.iotdb.confignode.manager.IManager; -import org.apache.iotdb.confignode.manager.load.balancer.ConfigRegionPriorityBalancer; import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer; import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer; import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer; @@ -87,11 +86,10 @@ public LoadManager(IManager configManager) { setHeartbeatService(configManager, loadCache); this.statisticsService = new StatisticsService(loadCache); this.topologyService = new TopologyService(configManager, loadCache::updateTopology); - this.eventService = new EventService(loadCache); + this.eventService = new EventService(configManager, loadCache); this.eventService.register(configManager.getPipeManager().getPipeRuntimeCoordinator()); this.eventService.register(routeBalancer); this.eventService.register(topologyService); - this.eventService.register(new ConfigRegionPriorityBalancer(configManager)); } protected void setHeartbeatService(IManager configManager, LoadCache loadCache) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/ConfigRegionPriorityBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/ConfigRegionPriorityBalancer.java deleted file mode 100644 index dc9d1d5b0fdcb..0000000000000 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/ConfigRegionPriorityBalancer.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.confignode.manager.load.balancer; - -import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; -import org.apache.iotdb.commons.cluster.NodeStatus; -import org.apache.iotdb.confignode.i18n.ManagerMessages; -import org.apache.iotdb.confignode.manager.IManager; -import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; -import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; -import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber; -import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent; -import org.apache.iotdb.consensus.exception.ConsensusException; - -import org.apache.tsfile.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.Map; -import java.util.OptionalInt; -import java.util.Set; -import java.util.stream.Collectors; - -/** - * Reacts to {@link NodeStatisticsChangeEvent} on the ConfigNode leader and pushes Ratis peer - * priorities for the ConfigRegion so candidacy in leader elections reflects the latest {@link - * NodeStatus} of every ConfigNode (see {@link NodeStatus#priorityForStatus}). - * - *

Only fires when this ConfigNode is the ConfigRegion leader, since {@code setConfiguration} - * must be applied through the leader. Filters events to ConfigNode peers (DataNode/AINode entries - * are skipped) and accumulates only transitions whose priority bucket actually moves before issuing - * a single batched reconfiguration call. - */ -public class ConfigRegionPriorityBalancer implements IClusterStatusSubscriber { - - private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRegionPriorityBalancer.class); - - private final IManager configManager; - - public ConfigRegionPriorityBalancer(IManager configManager) { - this.configManager = configManager; - } - - @Override - public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) { - if (!configManager.getConsensusManager().isLeader()) { - return; - } - Set configNodeIds = - configManager.getNodeManager().getRegisteredConfigNodes().stream() - .map(TConfigNodeLocation::getConfigNodeId) - .collect(Collectors.toSet()); - - Map desired = new HashMap<>(); - for (Map.Entry> entry : - event.getDifferentNodeStatisticsMap().entrySet()) { - int nodeId = entry.getKey(); - if (!configNodeIds.contains(nodeId)) { - continue; - } - NodeStatistics previous = entry.getValue().getLeft(); - NodeStatistics current = entry.getValue().getRight(); - if (current == null) { - // Node disappeared from the cache; peer removal flows through addConfigNodePeer / - // removeConfigNodePeer instead — no priority push to issue here. - continue; - } - OptionalInt newPriority = - NodeStatus.priorityForStatus(current.getStatus(), current.getStatusReason()); - if (!newPriority.isPresent()) { - // Transient (Unknown / Removing / manual ReadOnly) — leave the priority as-is. - continue; - } - OptionalInt oldPriority = - previous == null - ? OptionalInt.empty() - : NodeStatus.priorityForStatus(previous.getStatus(), previous.getStatusReason()); - if (oldPriority.isPresent() && oldPriority.getAsInt() == newPriority.getAsInt()) { - // The status moved but the priority bucket did not — no point churning the group config. - continue; - } - desired.put(nodeId, newPriority.getAsInt()); - } - if (desired.isEmpty()) { - return; - } - try { - configManager - .getConsensusManager() - .getConsensusImpl() - .reconfigurePeerPriorities(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID, desired); - } catch (ConsensusException e) { - LOGGER.warn(ManagerMessages.RECONFIGURE_PEER_PRIORITIES_FAILED, desired, e); - } - } -} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java index 762e6b1782b4c..9828d905a8a1c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java @@ -19,13 +19,17 @@ package org.apache.iotdb.confignode.manager.load.service; +import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.i18n.ManagerMessages; +import org.apache.iotdb.confignode.manager.IManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.load.cache.LoadCache; import org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupStatistics; import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; @@ -34,6 +38,7 @@ import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber; import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent; import org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatisticsChangeEvent; +import org.apache.iotdb.consensus.exception.ConsensusException; import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus; @@ -42,13 +47,17 @@ import org.slf4j.LoggerFactory; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.OptionalInt; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * EventService periodically check statistics and broadcast corresponding change event if necessary. @@ -68,6 +77,7 @@ public class EventService { IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( ThreadName.CONFIG_NODE_EVENT_SERVICE.getName()); + private final IManager configManager; private final LoadCache loadCache; private final Map previousNodeStatisticsMap; private final Map previousRegionGroupStatisticsMap; @@ -75,7 +85,8 @@ public class EventService { previousConsensusGroupStatisticsMap; private final EventBus eventPublisher; - public EventService(LoadCache loadCache) { + public EventService(IManager configManager, LoadCache loadCache) { + this.configManager = configManager; this.loadCache = loadCache; this.previousNodeStatisticsMap = new TreeMap<>(); this.previousRegionGroupStatisticsMap = new TreeMap<>(); @@ -154,6 +165,55 @@ public synchronized void checkAndBroadcastNodeStatisticsChangeEventIfNecessary() if (!differentNodeStatisticsMap.isEmpty()) { eventPublisher.post(new NodeStatisticsChangeEvent(differentNodeStatisticsMap)); recordNodeStatistics(differentNodeStatisticsMap); + reconcileConfigNodePriorities(differentNodeStatisticsMap); + } + } + + /** + * Demote a ConfigNode's ConfigRegion election priority when its {@link NodeStatus} degrades (see + * {@link NodeStatus#priorityForStatus}). Runs only on the leader and only pushes peers whose + * priority bucket actually moved. + */ + private void reconcileConfigNodePriorities( + Map> differentNodeStatisticsMap) { + ConsensusManager consensusManager = configManager.getConsensusManager(); + if (consensusManager == null || !consensusManager.isLeader()) { + return; + } + Set configNodeIds = + configManager.getNodeManager().getRegisteredConfigNodes().stream() + .map(TConfigNodeLocation::getConfigNodeId) + .collect(Collectors.toSet()); + Map desired = new HashMap<>(); + differentNodeStatisticsMap.forEach( + (nodeId, change) -> { + NodeStatistics current = change.getRight(); + if (!configNodeIds.contains(nodeId) || current == null) { + return; + } + OptionalInt newPriority = + NodeStatus.priorityForStatus(current.getStatus(), current.getStatusReason()); + if (!newPriority.isPresent()) { + return; + } + NodeStatistics previous = change.getLeft(); + OptionalInt oldPriority = + previous == null + ? OptionalInt.empty() + : NodeStatus.priorityForStatus(previous.getStatus(), previous.getStatusReason()); + if (!oldPriority.isPresent() || oldPriority.getAsInt() != newPriority.getAsInt()) { + desired.put(nodeId, newPriority.getAsInt()); + } + }); + if (desired.isEmpty()) { + return; + } + try { + consensusManager + .getConsensusImpl() + .reconfigurePeerPriorities(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID, desired); + } catch (ConsensusException e) { + LOGGER.warn(ManagerMessages.RECONFIGURE_PEER_PRIORITIES_FAILED, desired, e); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java index 915fbe9d12047..73e28b49dc7b8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java @@ -137,36 +137,26 @@ private void heartbeatLoopBody() { .ifPresent( consensusManager -> { if (getConsensusManager().isLeader()) { - // The counter is bumped exactly once per loop iteration so every gen* call in - // this body observes the same value, and the leader's own sampling fires on the - // same iterations DataNode/AINode load sampling does. - long iterationIndex = heartbeatCounter.getAndIncrement(); // Send heartbeat requests to all the registered ConfigNodes pingRegisteredConfigNodes( - genConfigNodeHeartbeatReq(iterationIndex), - getNodeManager().getRegisteredConfigNodes()); + genConfigNodeHeartbeatReq(), getNodeManager().getRegisteredConfigNodes()); // Send heartbeat requests to all the registered DataNodes pingRegisteredDataNodes( - genHeartbeatReq(iterationIndex), - getNodeManager().getRegisteredDataNodes(), - iterationIndex); + genHeartbeatReq(), getNodeManager().getRegisteredDataNodes()); // Send heartbeat requests to all the registered AINodes - pingRegisteredAINodes( - genAIHeartbeatReq(iterationIndex), getNodeManager().getRegisteredAINodes()); - // Sample free-space on the same cadence DataNode samples its load. Runs after - // the async heartbeat dispatches so the OS call does not delay fanout. DiskCrash - // is observed passively by the Ratis write-path, not polled here. Cross-peer - // priority reconciliation is event-driven (see ConfigRegionPriorityBalancer). - if (iterationIndex % LOAD_SAMPLING_INTERVAL == 0) { + pingRegisteredAINodes(genAIHeartbeatReq(), getNodeManager().getRegisteredAINodes()); + // Self-check free-space after the async fanout so the OS call doesn't delay it. + if (heartbeatCounter.get() % LOAD_SAMPLING_INTERVAL == 0) { DiskChecker.checkFreeRatioAndApply( ConfigNodeDescriptor.getInstance().getConf().getCriticalDirs(), CommonDescriptor.getInstance().getConfig().getDiskSpaceWarningThreshold()); } + heartbeatCounter.getAndIncrement(); } }); } - protected TDataNodeHeartbeatReq genHeartbeatReq(long iterationIndex) { + protected TDataNodeHeartbeatReq genHeartbeatReq() { /* Generate heartbeat request */ TDataNodeHeartbeatReq heartbeatReq = new TDataNodeHeartbeatReq(); heartbeatReq.setHeartbeatTimestamp(System.nanoTime()); @@ -177,7 +167,7 @@ protected TDataNodeHeartbeatReq genHeartbeatReq(long iterationIndex) { .getLogicalClock(ConfigNodeInfo.CONFIG_REGION_ID)); // Always sample RegionGroups' leadership as the Region heartbeat heartbeatReq.setNeedJudgeLeader(true); - heartbeatReq.setNeedSamplingLoad(iterationIndex % LOAD_SAMPLING_INTERVAL == 0); + heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % LOAD_SAMPLING_INTERVAL == 0); Pair schemaQuotaRemain = configManager.getClusterSchemaManager().getSchemaQuotaRemain(); heartbeatReq.setTimeSeriesQuotaRemain(schemaQuotaRemain.left); @@ -185,7 +175,7 @@ protected TDataNodeHeartbeatReq genHeartbeatReq(long iterationIndex) { // We collect pipe meta in every 100 heartbeat loop heartbeatReq.setNeedPipeMetaList( !PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled() - && iterationIndex + && heartbeatCounter.get() % PipeConfig.getInstance() .getPipeHeartbeatIntervalSecondsForCollectingPipeMeta() == 0); @@ -196,7 +186,7 @@ protected TDataNodeHeartbeatReq genHeartbeatReq(long iterationIndex) { } // We broadcast region operations list every 100 heartbeat loops - if (iterationIndex % 100 == 0) { + if (heartbeatCounter.get() % 100 == 0) { heartbeatReq.setCurrentRegionOperations( configManager.getProcedureManager().getRegionOperationConsensusIds()); } @@ -204,8 +194,7 @@ protected TDataNodeHeartbeatReq genHeartbeatReq(long iterationIndex) { return heartbeatReq; } - private void addConfigNodeLocationsToReq( - int dataNodeId, TDataNodeHeartbeatReq req, long iterationIndex) { + private void addConfigNodeLocationsToReq(int dataNodeId, TDataNodeHeartbeatReq req) { Set confirmedConfigNodes = loadCache.getConfirmedConfigNodeEndPoints(dataNodeId); Set actualConfigNodes = getNodeManager().getRegisteredConfigNodes().stream() @@ -220,23 +209,23 @@ private void addConfigNodeLocationsToReq( 4. At this point, because actualConfigNodes and confirmedConfigNodes are identical, the ConfigNode list is not re-sent to the DataNode. */ if (!actualConfigNodes.equals(confirmedConfigNodes) - || iterationIndex % configNodeListPeriodicallySyncInterval == 0) { + || heartbeatCounter.get() % configNodeListPeriodicallySyncInterval == 0) { req.setConfigNodeEndPoints(actualConfigNodes); } } - protected TConfigNodeHeartbeatReq genConfigNodeHeartbeatReq(long iterationIndex) { + protected TConfigNodeHeartbeatReq genConfigNodeHeartbeatReq() { TConfigNodeHeartbeatReq req = new TConfigNodeHeartbeatReq(); req.setTimestamp(System.nanoTime()); - req.setNeedSamplingLoad(iterationIndex % LOAD_SAMPLING_INTERVAL == 0); + req.setNeedSamplingLoad(heartbeatCounter.get() % LOAD_SAMPLING_INTERVAL == 0); return req; } - private TAIHeartbeatReq genAIHeartbeatReq(long iterationIndex) { + private TAIHeartbeatReq genAIHeartbeatReq() { /* Generate heartbeat request */ TAIHeartbeatReq heartbeatReq = new TAIHeartbeatReq(); heartbeatReq.setHeartbeatTimestamp(System.nanoTime()); - heartbeatReq.setNeedSamplingLoad(iterationIndex % LOAD_SAMPLING_INTERVAL == 0); + heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % LOAD_SAMPLING_INTERVAL == 0); return heartbeatReq; } @@ -273,9 +262,7 @@ protected ConfigNodeHeartbeatHandler getConfigNodeHeartbeatHandler(int configNod * @param registeredDataNodes DataNodes that registered in cluster */ private void pingRegisteredDataNodes( - TDataNodeHeartbeatReq heartbeatReq, - List registeredDataNodes, - long iterationIndex) { + TDataNodeHeartbeatReq heartbeatReq, List registeredDataNodes) { // Send heartbeat requests for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) { int dataNodeId = dataNodeInfo.getLocation().getDataNodeId(); @@ -294,7 +281,7 @@ private void pingRegisteredDataNodes( configManager.getClusterSchemaManager()::updateDeviceUsage, configManager.getPipeManager().getPipeRuntimeCoordinator()); configManager.getClusterQuotaManager().updateSpaceQuotaUsage(); - addConfigNodeLocationsToReq(dataNodeId, heartbeatReq, iterationIndex); + addConfigNodeLocationsToReq(dataNodeId, heartbeatReq); AsyncDataNodeHeartbeatClientPool.getInstance() .getDataNodeHeartBeat( dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);