diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java index ec04aab39bd7b..232263e4d795e 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java @@ -21,27 +21,37 @@ import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework; import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.isession.ITableSession; import org.apache.iotdb.isession.SessionConfig; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.record.Tablet; import org.awaitility.Awaitility; import org.junit.Assert; import org.junit.Before; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -84,6 +94,9 @@ public abstract class IoTDBIoTConsensusV23C3DBasicITBase protected static final String SHOW_TIMESERIES_D1 = "SHOW TIMESERIES root.sg.d1.*"; protected static final String SELECT_SURVIVING_QUERY = "SELECT temperature, power FROM root.sg.d1"; + protected static final String OBJECT_DATABASE = "object_db"; + protected static final String OBJECT_TABLE = "object_table"; + protected static final long OBJECT_TIMESTAMP = 1L; /** * Returns IoTConsensusV2 mode: {@link ConsensusFactory#IOT_CONSENSUS_V2_BATCH_MODE} or {@link @@ -356,6 +369,354 @@ public void testDeleteTimeSeriesReplicaConsistency() throws Exception { } } + /** + * Test that OBJECT values written as object-file pieces are replicated to every IoTConsensusV2 + * replica, and that a follower can still serve the object metadata after the former leader stops. + */ + public void testObjectReplicaConsistency() throws Exception { + final byte[] objectBytes = readObjectExampleBytes(); + + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS " + OBJECT_DATABASE); + session.executeNonQueryStatement("USE " + OBJECT_DATABASE); + session.executeNonQueryStatement( + "CREATE TABLE " + + OBJECT_TABLE + + " (region_id STRING TAG, plant_id STRING TAG, device_id STRING TAG, " + + "temperature FLOAT FIELD, file OBJECT FIELD)"); + insertObjectTablet(session, objectBytes); + session.executeNonQueryStatement("FLUSH"); + } + + try (Connection tableConnection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement tableStatement = tableConnection.createStatement()) { + tableStatement.execute("USE " + OBJECT_DATABASE); + verifyObjectLength(tableStatement, "initial table connection", objectBytes.length); + + final Map>> dataRegionMap = + waitForDataRegionMap(tableStatement); + final Set leaderNodeIds = new HashSet<>(); + for (final Pair> leaderAndReplicas : dataRegionMap.values()) { + if (leaderAndReplicas.getLeft() > 0) { + leaderNodeIds.add(leaderAndReplicas.getLeft()); + } + } + for (final int leaderNodeId : leaderNodeIds) { + EnvFactory.getEnv() + .dataNodeIdToWrapper(leaderNodeId) + .ifPresent(this::waitForReplicationComplete); + } + + final ObjectRegionSelection targetRegion = waitForObjectRegion(dataRegionMap, objectBytes); + final int targetRegionId = targetRegion.regionId; + final int leaderDataNodeId = targetRegion.leaderDataNodeId; + final int followerDataNodeId = targetRegion.followerDataNodeId; + final Set targetReplicaIds = targetRegion.replicaDataNodeIds; + verifyObjectFileOnReplicas(targetRegionId, targetReplicaIds, objectBytes); + + final DataNodeWrapper leaderNode = + EnvFactory.getEnv() + .dataNodeIdToWrapper(leaderDataNodeId) + .orElseThrow(() -> new AssertionError("Leader DataNode not found")); + final DataNodeWrapper followerNode = + EnvFactory.getEnv() + .dataNodeIdToWrapper(followerDataNodeId) + .orElseThrow(() -> new AssertionError("Follower DataNode not found")); + + LOGGER.info( + "Stopping object-region leader DataNode {} (region {})", + leaderDataNodeId, + targetRegionId); + leaderNode.stopForcibly(); + Assert.assertFalse("Leader should be stopped", leaderNode.isAlive()); + + Awaitility.await() + .pollDelay(2, TimeUnit.SECONDS) + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> { + try (Connection followerConnection = + EnvFactory.getEnv() + .getConnection( + followerNode, + SessionConfig.DEFAULT_USER, + SessionConfig.DEFAULT_PASSWORD, + BaseEnv.TABLE_SQL_DIALECT); + Statement followerStatement = followerConnection.createStatement()) { + followerStatement.execute("USE " + OBJECT_DATABASE); + verifyObjectLength( + followerStatement, "follower after object leader stop", objectBytes.length); + } + }); + } + } + + private Map>> waitForDataRegionMap( + final Statement statement) { + final AtomicReference>>> dataRegionMapReference = + new AtomicReference<>(Collections.emptyMap()); + Awaitility.await() + .atMost(60, TimeUnit.SECONDS) + .untilAsserted( + () -> { + final Map>> dataRegionMap = + getDataRegionMapWithLeader(statement); + Assert.assertFalse( + "Expected at least one non-system DataRegion", dataRegionMap.isEmpty()); + dataRegionMapReference.set(dataRegionMap); + }); + return dataRegionMapReference.get(); + } + + private ObjectRegionSelection waitForObjectRegion( + final Map>> dataRegionMap, final byte[] objectBytes) { + final AtomicReference targetRegionReference = new AtomicReference<>(); + Awaitility.await() + .atMost(60, TimeUnit.SECONDS) + .untilAsserted( + () -> { + final ObjectRegionSelection targetRegion = + selectObjectRegion(dataRegionMap, objectBytes); + Assert.assertNotNull( + "Should find a replicated data region containing the object file. " + + describeObjectFiles(dataRegionMap, objectBytes), + targetRegion); + targetRegionReference.set(targetRegion); + }); + return targetRegionReference.get(); + } + + private ObjectRegionSelection selectObjectRegion( + final Map>> dataRegionMap, final byte[] objectBytes) + throws IOException { + for (final Map.Entry>> entry : dataRegionMap.entrySet()) { + final Pair> leaderAndReplicas = entry.getValue(); + if (leaderAndReplicas.getRight().size() > 1 + && leaderAndReplicas.getRight().size() <= DATA_REPLICATION_FACTOR + && leaderAndReplicas.getLeft() > 0 + && containsObjectFileInAnyReplica( + entry.getKey(), leaderAndReplicas.getRight(), objectBytes)) { + final int leaderDataNodeId = leaderAndReplicas.getLeft(); + final int followerDataNodeId = + leaderAndReplicas.getRight().stream() + .filter(i -> i != leaderDataNodeId) + .findAny() + .orElse(-1); + if (followerDataNodeId > 0) { + return new ObjectRegionSelection( + entry.getKey(), leaderDataNodeId, followerDataNodeId, leaderAndReplicas.getRight()); + } + } + } + return null; + } + + private byte[] readObjectExampleBytes() throws IOException { + final String testObject = + System.getProperty("user.dir") + + File.separator + + "target" + + File.separator + + "test-classes" + + File.separator + + "object-example.pt"; + return Files.readAllBytes(Paths.get(testObject)); + } + + private void insertObjectTablet(final ITableSession session, final byte[] objectBytes) + throws Exception { + final Tablet tablet = + new Tablet( + OBJECT_TABLE, + Arrays.asList("region_id", "plant_id", "device_id", "temperature", "file"), + Arrays.asList( + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.FLOAT, + TSDataType.OBJECT), + Arrays.asList( + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.FIELD, + ColumnCategory.FIELD), + 1); + final int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, OBJECT_TIMESTAMP); + tablet.addValue(rowIndex, 0, "1"); + tablet.addValue(rowIndex, 1, "5"); + tablet.addValue(rowIndex, 2, "3"); + tablet.addValue(rowIndex, 3, 37.6F); + tablet.addValue(rowIndex, 4, true, 0, objectBytes); + session.insert(tablet); + } + + private void verifyObjectLength( + final Statement statement, final String context, final long expectedObjectLength) + throws Exception { + try (ResultSet resultSet = + statement.executeQuery( + "SELECT length(file) FROM " + + OBJECT_TABLE + + " WHERE time = " + + OBJECT_TIMESTAMP + + " AND region_id = '1' AND plant_id = '5' AND device_id = '3'")) { + Assert.assertTrue("[" + context + "] OBJECT row should exist", resultSet.next()); + Assert.assertEquals( + "[" + context + "] Unexpected OBJECT length", + expectedObjectLength, + parseLongFromString(resultSet.getString(1))); + Assert.assertFalse("[" + context + "] Only one OBJECT row is expected", resultSet.next()); + } + } + + private boolean containsObjectFileInAnyReplica( + final int regionId, final Set replicaIds, final byte[] expectedContent) + throws IOException { + for (final int dataNodeId : replicaIds) { + final DataNodeWrapper wrapper = + EnvFactory.getEnv() + .dataNodeIdToWrapper(dataNodeId) + .orElseThrow(() -> new AssertionError("DataNode not found: " + dataNodeId)); + if (containsObjectFile(wrapper, regionId, expectedContent)) { + return true; + } + } + return false; + } + + private void verifyObjectFileOnReplicas( + final int regionId, final Set replicaIds, final byte[] expectedContent) { + Awaitility.await() + .atMost(60, TimeUnit.SECONDS) + .untilAsserted( + () -> { + for (final int dataNodeId : replicaIds) { + final DataNodeWrapper wrapper = + EnvFactory.getEnv() + .dataNodeIdToWrapper(dataNodeId) + .orElseThrow(() -> new AssertionError("DataNode not found: " + dataNodeId)); + Assert.assertTrue( + "Expected object file on DataNode " + + dataNodeId + + " for region " + + regionId + + ". " + + describeObjectDir( + new File(wrapper.getDataNodeObjectDir(), String.valueOf(regionId)), + expectedContent), + containsObjectFile(wrapper, regionId, expectedContent)); + } + }); + } + + private boolean containsObjectFile( + final DataNodeWrapper wrapper, final int regionId, final byte[] expectedContent) + throws IOException { + final File objectRegionDir = new File(wrapper.getDataNodeObjectDir(), String.valueOf(regionId)); + return containsObjectFile(objectRegionDir, expectedContent); + } + + private boolean containsObjectFile(final File file, final byte[] expectedContent) + throws IOException { + if (!file.exists()) { + return false; + } + if (file.isFile()) { + return file.getName().endsWith(".bin") + && (expectedContent == null + || Arrays.equals(expectedContent, Files.readAllBytes(file.toPath()))); + } + final File[] children = file.listFiles(); + if (children == null) { + return false; + } + for (final File child : children) { + if (containsObjectFile(child, expectedContent)) { + return true; + } + } + return false; + } + + private String describeObjectFiles( + final Map>> dataRegionMap, final byte[] expectedContent) + throws IOException { + final StringBuilder builder = new StringBuilder("Object files by region:"); + for (final Map.Entry>> entry : dataRegionMap.entrySet()) { + builder.append(" region ").append(entry.getKey()).append(" ->"); + for (final int dataNodeId : entry.getValue().getRight()) { + final DataNodeWrapper wrapper = + EnvFactory.getEnv() + .dataNodeIdToWrapper(dataNodeId) + .orElseThrow(() -> new AssertionError("DataNode not found: " + dataNodeId)); + builder + .append(" DataNode ") + .append(dataNodeId) + .append(": ") + .append( + describeObjectDir( + new File(wrapper.getDataNodeObjectDir(), String.valueOf(entry.getKey())), + expectedContent)); + } + } + return builder.toString(); + } + + private String describeObjectDir(final File file, final byte[] expectedContent) + throws IOException { + if (!file.exists()) { + return "missing(" + file.getPath() + ")"; + } + if (file.isFile()) { + if (!file.getName().endsWith(".bin")) { + return "non-object-file(" + file.getPath() + ")"; + } + final byte[] content = Files.readAllBytes(file.toPath()); + return file.getPath() + + "[length=" + + content.length + + ", match=" + + Arrays.equals(expectedContent, content) + + "]"; + } + final File[] children = file.listFiles(); + if (children == null || children.length == 0) { + return "empty(" + file.getPath() + ")"; + } + final StringBuilder builder = new StringBuilder(); + for (final File child : children) { + final String childDescription = describeObjectDir(child, expectedContent); + if (childDescription.contains(".bin") || childDescription.startsWith("missing")) { + if (builder.length() > 0) { + builder.append(", "); + } + builder.append(childDescription); + } + } + return builder.length() == 0 ? "no-bin-under(" + file.getPath() + ")" : builder.toString(); + } + + private static class ObjectRegionSelection { + + private final int regionId; + private final int leaderDataNodeId; + private final int followerDataNodeId; + private final Set replicaDataNodeIds; + + private ObjectRegionSelection( + final int regionId, + final int leaderDataNodeId, + final int followerDataNodeId, + final Set replicaDataNodeIds) { + this.regionId = regionId; + this.leaderDataNodeId = leaderDataNodeId; + this.followerDataNodeId = followerDataNodeId; + this.replicaDataNodeIds = replicaDataNodeIds; + } + } + /** * Verify that after deleting root.sg.d1.speed, only temperature and power timeseries remain, and * that data queries do not return the deleted timeseries. diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java index bb97014d21341..0c932f963af19 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java @@ -55,4 +55,10 @@ public void test3C3DWriteFlushAndQuery() throws Exception { public void testDeleteTimeSeriesReplicaConsistency() throws Exception { super.testDeleteTimeSeriesReplicaConsistency(); } + + @Override + @Test + public void testObjectReplicaConsistency() throws Exception { + super.testObjectReplicaConsistency(); + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java index d4c0bf22ab433..c7ff3604b1fd4 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java @@ -55,4 +55,10 @@ public void test3C3DWriteFlushAndQuery() throws Exception { public void testDeleteTimeSeriesReplicaConsistency() throws Exception { super.testDeleteTimeSeriesReplicaConsistency(); } + + @Override + @Test + public void testObjectReplicaConsistency() throws Exception { + super.testObjectReplicaConsistency(); + } } diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/Base32ObjectPath.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/Base32ObjectPath.java new file mode 100644 index 0000000000000..195f26d36cd96 --- /dev/null +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/Base32ObjectPath.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.calc.utils; + +import com.google.common.io.BaseEncoding; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class Base32ObjectPath implements IObjectPath { + + private final long timestamp; + private final IDeviceID deviceID; + private final String measurement; + private final Path path; + private int serializedSize = -1; + + private static final Deserializer DESERIALIZER = + new Deserializer() { + @Override + public IObjectPath deserializeFrom(ByteBuffer byteBuffer) { + return deserialize(byteBuffer); + } + + @Override + public IObjectPath deserializeFrom(InputStream inputStream) throws IOException { + return deserialize(inputStream); + } + + @Override + public IObjectPath deserializeFromObjectValue(ByteBuffer byteBuffer) { + return deserialize(byteBuffer); + } + }; + + private static final Factory FACTORY = Base32ObjectPath::new; + + private Base32ObjectPath(String first, String... more) { + final String[] deviceIdSegments = new String[more.length - 2]; + for (int i = 0; i < more.length - 2; i++) { + if ("NUL".equals(more[i])) { + deviceIdSegments[i] = null; + } else if ("EPT".equals(more[i])) { + deviceIdSegments[i] = ""; + } else { + deviceIdSegments[i] = + new String(BaseEncoding.base32().omitPadding().decode(more[i]), StandardCharsets.UTF_8); + } + } + deviceID = IDeviceID.Factory.DEFAULT_FACTORY.create(deviceIdSegments); + measurement = + new String( + BaseEncoding.base32().omitPadding().decode(more[more.length - 2]), + StandardCharsets.UTF_8); + timestamp = + Long.parseLong(more[more.length - 1].substring(0, more[more.length - 1].indexOf('.'))); + path = Paths.get(first, more); + } + + public Base32ObjectPath(Path path) { + final String[] deviceIdSegments = new String[path.getNameCount() - 3]; + for (int i = 0; i < deviceIdSegments.length; i++) { + final String segment = path.getName(i + 1).toString(); + if ("NUL".equals(segment)) { + deviceIdSegments[i] = null; + } else if ("EPT".equals(segment)) { + deviceIdSegments[i] = ""; + } else { + deviceIdSegments[i] = + new String(BaseEncoding.base32().omitPadding().decode(segment), StandardCharsets.UTF_8); + } + } + deviceID = IDeviceID.Factory.DEFAULT_FACTORY.create(deviceIdSegments); + measurement = + new String( + BaseEncoding.base32() + .omitPadding() + .decode(path.getName(path.getNameCount() - 2).toString()), + StandardCharsets.UTF_8); + final String fileName = path.getFileName().toString(); + timestamp = Long.parseLong(fileName.substring(0, fileName.indexOf('.'))); + this.path = path; + } + + public Base32ObjectPath(int regionId, long time, IDeviceID deviceID, String measurement) { + final Object[] segments = deviceID.getSegments(); + final String[] pathSegments = new String[segments.length + 2]; + for (int i = 0; i < segments.length; i++) { + final Object segment = segments[i]; + if (segment == null) { + pathSegments[i] = "NUL"; + } else if ("".equals(segment)) { + pathSegments[i] = "EPT"; + } else { + pathSegments[i] = + BaseEncoding.base32() + .omitPadding() + .encode(segment.toString().getBytes(StandardCharsets.UTF_8)); + } + } + pathSegments[pathSegments.length - 2] = + BaseEncoding.base32().omitPadding().encode(measurement.getBytes(StandardCharsets.UTF_8)); + pathSegments[pathSegments.length - 1] = time + ".bin"; + this.path = Paths.get(String.valueOf(regionId), pathSegments); + this.timestamp = time; + this.deviceID = deviceID; + this.measurement = measurement; + } + + @Override + public int serialize(ByteBuffer byteBuffer) { + int count = 0; + count += ReadWriteForEncodingUtils.writeUnsignedVarInt(path.getNameCount(), byteBuffer); + for (final Path segment : path) { + count += ReadWriteIOUtils.writeVar(segment.toString(), byteBuffer); + } + return count; + } + + @Override + public int serialize(OutputStream outputStream) throws IOException { + int count = 0; + count += ReadWriteForEncodingUtils.writeUnsignedVarInt(path.getNameCount(), outputStream); + for (final Path segment : path) { + count += ReadWriteIOUtils.writeVar(segment.toString(), outputStream); + } + return count; + } + + @Override + public int getSerializedSize() { + if (serializedSize != -1) { + return serializedSize; + } + int count = ReadWriteForEncodingUtils.varIntSize(path.getNameCount()); + for (final Path segment : path) { + final byte[] bytes = segment.toString().getBytes(StandardCharsets.UTF_8); + count += ReadWriteForEncodingUtils.varIntSize(bytes.length); + count += bytes.length; + } + serializedSize = count; + return count; + } + + @Override + public void serializeToObjectValue(ByteBuffer byteBuffer) { + serialize(byteBuffer); + } + + @Override + public int getSerializeSizeToObjectValue() { + return getSerializedSize(); + } + + public static Base32ObjectPath deserialize(ByteBuffer byteBuffer) { + final int count = ReadWriteForEncodingUtils.readUnsignedVarInt(byteBuffer); + final String first = ReadWriteIOUtils.readVarIntString(byteBuffer); + final String[] more = new String[count - 1]; + for (int i = 0; i < count - 1; ++i) { + more[i] = ReadWriteIOUtils.readVarIntString(byteBuffer); + } + return new Base32ObjectPath(first, more); + } + + public static Base32ObjectPath deserialize(InputStream stream) throws IOException { + final int count = ReadWriteForEncodingUtils.readUnsignedVarInt(stream); + final String first = ReadWriteIOUtils.readVarIntString(stream); + final String[] more = new String[count - 1]; + for (int i = 0; i < count - 1; ++i) { + more[i] = ReadWriteIOUtils.readVarIntString(stream); + } + return new Base32ObjectPath(first, more); + } + + @Override + public String toString() { + return path.toString(); + } + + @Override + public long getTime() { + return timestamp; + } + + @Override + public IDeviceID getDeviceID() { + return deviceID; + } + + @Override + public String getMeasurement() { + return measurement; + } + + @Override + public Path getPath() { + return path; + } + + public static Factory getFACTORY() { + return FACTORY; + } + + public static Deserializer getDESERIALIZER() { + return DESERIALIZER; + } +} diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/IObjectPath.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/IObjectPath.java index 4be55129da3e4..5d1684a08fed7 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/IObjectPath.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/IObjectPath.java @@ -28,6 +28,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.file.Path; public interface IObjectPath { @@ -43,11 +44,22 @@ public interface IObjectPath { int getSerializeSizeToObjectValue(); + long getTime(); + + IDeviceID getDeviceID(); + + String getMeasurement(); + + Path getPath(); + interface Factory { IObjectPath create(int regionId, long time, IDeviceID iDeviceID, String measurement); - Factory FACTORY = null; + Factory FACTORY = + CONFIG.isRestrictObjectLimit() + ? PlainObjectPath.getFACTORY() + : Base32ObjectPath.getFACTORY(); } interface Deserializer { @@ -60,6 +72,8 @@ interface Deserializer { } static Deserializer getDeserializer() { - return null; + return CONFIG.isRestrictObjectLimit() + ? PlainObjectPath.getDESERIALIZER() + : Base32ObjectPath.getDESERIALIZER(); } } diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/ObjectTypeUtils.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/ObjectTypeUtils.java index e821b2f553a1b..bc12cc1227668 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/ObjectTypeUtils.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/ObjectTypeUtils.java @@ -32,6 +32,8 @@ import java.io.File; import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Optional; import java.util.ServiceLoader; @@ -109,7 +111,24 @@ public static Binary replaceRegionIdForObjectBinary(int newRegionId, Binary orig ObjectTypeUtils.parseObjectBinaryToSizeIObjectPathPair(originValue); IObjectPath objectPath = pair.getRight(); try { - IObjectPath newObjectPath = null; + final Path path = objectPath.getPath(); + final int regionId = Integer.parseInt(path.getName(0).toString()); + if (regionId == newRegionId) { + return originValue; + } + + final IObjectPath newObjectPath; + if (objectPath instanceof PlainObjectPath) { + newObjectPath = + new PlainObjectPath( + objectPath.toString().replaceFirst(regionId + "", newRegionId + "")); + } else { + final String[] subPath = new String[path.getNameCount() - 1]; + for (int i = 1; i < path.getNameCount(); i++) { + subPath[i - 1] = path.getName(i).toString(); + } + newObjectPath = new Base32ObjectPath(Paths.get(newRegionId + "", subPath)); + } return ObjectTypeUtils.generateObjectBinary(pair.getLeft(), newObjectPath); } catch (NumberFormatException e) { throw new IoTDBRuntimeException( diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/PlainObjectPath.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/PlainObjectPath.java new file mode 100644 index 0000000000000..cb3e488812ef9 --- /dev/null +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/PlainObjectPath.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.calc.utils; + +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class PlainObjectPath implements IObjectPath { + + private final IDeviceID deviceID; + private final long timestamp; + private final String measurement; + private final String filePath; + + private static final Deserializer DESERIALIZER = + new Deserializer() { + @Override + public IObjectPath deserializeFrom(ByteBuffer byteBuffer) { + return deserialize(byteBuffer); + } + + @Override + public IObjectPath deserializeFrom(InputStream inputStream) throws IOException { + return deserialize(inputStream); + } + + @Override + public IObjectPath deserializeFromObjectValue(ByteBuffer byteBuffer) { + return deserializeObjectValue(byteBuffer); + } + }; + + private static final Factory FACTORY = PlainObjectPath::new; + + public PlainObjectPath(String filePath) { + this.filePath = filePath; + final Path path = Paths.get(filePath); + final String[] deviceIdSegments = new String[path.getNameCount() - 3]; + for (int i = 0; i < deviceIdSegments.length; i++) { + deviceIdSegments[i] = path.getName(i + 1).toString(); + if ("NUL".equals(deviceIdSegments[i])) { + deviceIdSegments[i] = null; + } else if ("EPT".equals(deviceIdSegments[i])) { + deviceIdSegments[i] = ""; + } + } + deviceID = IDeviceID.Factory.DEFAULT_FACTORY.create(deviceIdSegments); + measurement = path.getName(path.getNameCount() - 2).toString(); + final String fileName = path.getFileName().toString(); + timestamp = Long.parseLong(fileName.substring(0, fileName.indexOf('.'))); + } + + public PlainObjectPath(Path path) { + this(path.toString()); + } + + public PlainObjectPath(int regionId, long time, IDeviceID deviceID, String measurement) { + final String objectFileName = time + ".bin"; + final Object[] segments = deviceID.getSegments(); + final StringBuilder relativePathString = + new StringBuilder(String.valueOf(regionId)).append(File.separator); + for (final Object segment : segments) { + if (segment == null) { + relativePathString.append("NUL"); + } else if ("".equals(segment)) { + relativePathString.append("EPT"); + } else { + relativePathString.append(segment.toString().toLowerCase()); + } + relativePathString.append(File.separator); + } + relativePathString.append(measurement).append(File.separator); + relativePathString.append(objectFileName); + this.deviceID = deviceID; + this.timestamp = time; + this.measurement = measurement; + this.filePath = relativePathString.toString(); + } + + @Override + public int serialize(ByteBuffer byteBuffer) { + return ReadWriteIOUtils.write(filePath, byteBuffer); + } + + @Override + public int serialize(OutputStream outputStream) throws IOException { + return ReadWriteIOUtils.write(filePath, outputStream); + } + + @Override + public int getSerializedSize() { + return ReadWriteIOUtils.sizeToWrite(filePath); + } + + @Override + public void serializeToObjectValue(ByteBuffer byteBuffer) { + byteBuffer.put(filePath.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public int getSerializeSizeToObjectValue() { + return filePath.getBytes(StandardCharsets.UTF_8).length; + } + + public static PlainObjectPath deserialize(ByteBuffer byteBuffer) { + final String filePath = ReadWriteIOUtils.readString(byteBuffer); + return new PlainObjectPath(filePath); + } + + public static PlainObjectPath deserialize(InputStream stream) throws IOException { + final String filePath = ReadWriteIOUtils.readString(stream); + return new PlainObjectPath(filePath); + } + + public static PlainObjectPath deserializeObjectValue(ByteBuffer byteBuffer) { + return new PlainObjectPath(StandardCharsets.UTF_8.decode(byteBuffer).toString()); + } + + @Override + public String toString() { + return filePath; + } + + @Override + public long getTime() { + return timestamp; + } + + @Override + public IDeviceID getDeviceID() { + return deviceID; + } + + @Override + public String getMeasurement() { + return measurement; + } + + @Override + public Path getPath() { + return Paths.get(filePath); + } + + public static Factory getFACTORY() { + return FACTORY; + } + + public static Deserializer getDESERIALIZER() { + return DESERIALIZER; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java index 88d23360b54f4..b07d220b957e6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java @@ -297,6 +297,13 @@ public TSStatus visitPipeEnrichedDeleteDataNode( @Override public TSStatus visitWriteObjectFile(ObjectNode node, DataRegion dataRegion) { - throw new UnsupportedOperationException(); + try { + dataRegion.writeObject(node); + dataRegion.insertSeparatorToWAL(); + return StatusUtils.OK; + } catch (final Exception e) { + LOGGER.error(DataNodeMiscMessages.ERROR_EXECUTING_PLAN_NODE, node, e); + return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java index 15fca0ef0b785..879bd0fab63bd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java @@ -562,6 +562,7 @@ private static Object filterValueColumnsByRowIndexList( case TEXT: case BLOB: case STRING: + case OBJECT: { final Binary[] binaryValueColumns = isSingleOriginValueColumn @@ -621,6 +622,7 @@ private void fillNullValue( case TEXT: case BLOB: case STRING: + case OBJECT: final Binary[] columns = new Binary[rowSize]; Arrays.fill(columns, Binary.EMPTY_VALUE); valueColumns[columnIndex] = columns; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index b1f3a5a4c1480..6bd3fec3589a0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -412,6 +412,7 @@ private boolean putValueToColumns(final BatchData data, final Tablet tablet, fin case TEXT: case BLOB: case STRING: + case OBJECT: tablet.addValue(rowIndex, i, Binary.EMPTY_VALUE.getValues()); } tablet.getBitMaps()[i].mark(rowIndex); @@ -442,6 +443,7 @@ private boolean putValueToColumns(final BatchData data, final Tablet tablet, fin case TEXT: case BLOB: case STRING: + case OBJECT: tablet.addValue(rowIndex, i, primitiveType.getBinary().getValues()); break; default: @@ -479,6 +481,7 @@ private boolean putValueToColumns(final BatchData data, final Tablet tablet, fin case TEXT: case BLOB: case STRING: + case OBJECT: tablet.addValue(rowIndex, 0, data.getBinary().getValues()); break; default: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java index e3caecd144d3c..973e91de66c39 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java @@ -488,6 +488,7 @@ private boolean fillMeasurementValueColumns( case TEXT: case BLOB: case STRING: + case OBJECT: tablet.addValue(rowIndex, i, Binary.EMPTY_VALUE.getValues()); } tablet.getBitMaps()[i].mark(rowIndex); @@ -518,6 +519,7 @@ private boolean fillMeasurementValueColumns( case TEXT: case BLOB: case STRING: + case OBJECT: Binary binary = primitiveType.getBinary(); tablet.addValue( rowIndex, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java index 59905e54dadbf..8b7060310e0ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java @@ -34,6 +34,7 @@ import org.apache.iotdb.commons.pipe.sink.payload.iotconsensusv2.request.IoTConsensusV2TransferFilePieceReq; import org.apache.iotdb.commons.pipe.sink.payload.iotconsensusv2.response.IoTConsensusV2TransferFilePieceResp; import org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; @@ -51,6 +52,7 @@ import org.apache.iotdb.db.pipe.consensus.metric.IoTConsensusV2ReceiverMetrics; import org.apache.iotdb.db.pipe.event.common.tsfile.aggregator.TsFileInsertionPointCounter; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2DeleteNodeReq; +import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2ObjectFilePieceReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletBinaryReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletInsertNodeReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFilePieceReq; @@ -58,6 +60,7 @@ import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFileSealWithModReq; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -183,8 +186,17 @@ public TIoTConsensusV2TransferResp receive(final TIoTConsensusV2TransferReq req) case TRANSFER_TS_FILE_PIECE_WITH_MOD: // Just take a place in requestExecutor's buffer, the further seal request will remove // its place from buffer. - requestExecutor.onRequest(req, true, false); - resp = loadEvent(req); + resp = requestExecutor.onRequest(req, true, false); + if (resp == null) { + resp = loadEvent(req); + } + break; + case TRANSFER_OBJECT_FILE_PIECE: + // Object file pieces share a commit id with the following insert request. The pieces are + // applied in commit order, while the final insert request removes the placeholder and + // advances the commit index. Keeping the piece writes ordered is important because object + // paths are data-point based and can be reused by later overwrites. + resp = requestExecutor.onRequest(req, false, false, false, true); break; case TRANSFER_TS_FILE_SEAL: case TRANSFER_TS_FILE_SEAL_WITH_MOD: @@ -263,6 +275,9 @@ private TIoTConsensusV2TransferResp loadEvent(final TIoTConsensusV2TransferReq r case TRANSFER_TABLET_BINARY: return handleTransferTabletBinary( IoTConsensusV2TabletBinaryReq.fromTIoTConsensusV2TransferReq(req)); + case TRANSFER_OBJECT_FILE_PIECE: + return handleTransferObjectFilePiece( + IoTConsensusV2ObjectFilePieceReq.fromTIoTConsensusV2TransferReq(req)); case TRANSFER_DELETION: return handleTransferDeletion( IoTConsensusV2DeleteNodeReq.fromTIoTConsensusV2TransferReq(req)); @@ -319,11 +334,34 @@ private TIoTConsensusV2TransferResp handleTransferTabletBinary( IoTConsensusV2ServerImpl impl = Optional.ofNullable(iotConsensusV2.getImpl(consensusGroupId)) .orElseThrow(() -> new ConsensusGroupNotExistException(consensusGroupId)); - final InsertNode insertNode = req.convertToInsertNode(); - insertNode.markAsGeneratedByRemoteConsensusLeader(); - insertNode.setProgressIndex( - ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex()))); - return new TIoTConsensusV2TransferResp(impl.writeOnFollowerReplica(insertNode)); + final PlanNode planNode = req.convertToPlanNode(); + if (planNode instanceof InsertNode) { + final InsertNode insertNode = (InsertNode) planNode; + insertNode.markAsGeneratedByRemoteConsensusLeader(); + insertNode.setProgressIndex( + ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex()))); + return new TIoTConsensusV2TransferResp(impl.writeOnFollowerReplica(insertNode)); + } + if (planNode instanceof ObjectNode) { + final ObjectNode objectNode = (ObjectNode) planNode; + objectNode.markAsGeneratedByRemoteConsensusLeader(); + return new TIoTConsensusV2TransferResp(impl.writeOnFollowerReplica(objectNode)); + } + + return new TIoTConsensusV2TransferResp( + RpcUtils.getStatus( + TSStatusCode.IOT_CONSENSUS_V2_TYPE_ERROR, + String.format("Unsupported tablet binary plan node %s.", planNode))); + } + + private TIoTConsensusV2TransferResp handleTransferObjectFilePiece( + final IoTConsensusV2ObjectFilePieceReq req) throws ConsensusGroupNotExistException { + IoTConsensusV2ServerImpl impl = + Optional.ofNullable(iotConsensusV2.getImpl(consensusGroupId)) + .orElseThrow(() -> new ConsensusGroupNotExistException(consensusGroupId)); + final ObjectNode objectNode = req.getObjectNode(); + objectNode.markAsGeneratedByRemoteConsensusLeader(); + return new TIoTConsensusV2TransferResp(impl.writeOnFollowerReplica(objectNode)); } private TIoTConsensusV2TransferResp handleTransferDeletion(final IoTConsensusV2DeleteNodeReq req) @@ -1452,6 +1490,16 @@ private TIoTConsensusV2TransferResp onRequest( final TIoTConsensusV2TransferReq req, final boolean isTransferTsFilePiece, final boolean isTransferTsFileSeal) { + return onRequest( + req, isTransferTsFilePiece, isTransferTsFileSeal, isTransferTsFilePiece, false); + } + + private TIoTConsensusV2TransferResp onRequest( + final TIoTConsensusV2TransferReq req, + final boolean isTransferTsFilePiece, + final boolean isTransferTsFileSeal, + final boolean shouldRecordTsFileEvent, + final boolean shouldKeepRequestMetaAfterSuccess) { long startAcquireLockNanos = System.nanoTime(); lock.lock(); try { @@ -1483,11 +1531,11 @@ private TIoTConsensusV2TransferResp onRequest( resetWithNewestRestartTime(tCommitId.getPipeTaskRestartTimes()); } // update metric - if (isTransferTsFilePiece && !reqExecutionOrderBuffer.contains(requestMeta)) { + if (shouldRecordTsFileEvent && !reqExecutionOrderBuffer.contains(requestMeta)) { // only update tsFileEventCount when tsFileEvent is first enqueue. tsFileEventCount.incrementAndGet(); } - if (!isTransferTsFileSeal && !isTransferTsFilePiece) { + if (!isTransferTsFileSeal && !isTransferTsFilePiece && !shouldKeepRequestMetaAfterSuccess) { WALEventCount.incrementAndGet(); } reqExecutionOrderBuffer.add(requestMeta); @@ -1521,13 +1569,15 @@ private TIoTConsensusV2TransferResp onRequest( // DataRegionStateMachine. TIoTConsensusV2TransferResp resp = loadEvent(req); - // Only when event apply is successful and what is transmitted is not TsFilePiece, req - // will be removed from the buffer and onSyncedCommitIndex will be updated. Because pipe - // will transfer multi reqs with same commitId in a single TsFileInsertionEvent, only - // when the last seal req is applied, we can discard this event. + // Only when event apply is successful and this request is not a placeholder, req will + // be removed from the buffer and onSyncedCommitIndex will be updated. Because pipe + // will transfer multi reqs with same commitId in a single TsFileInsertionEvent/Object + // event, only when the last seal/insert req is applied, we can discard this event. if (resp != null && resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - onSuccess(tCommitId, isTransferTsFileSeal); + if (!shouldKeepRequestMetaAfterSuccess) { + onSuccess(tCommitId, isTransferTsFileSeal); + } } return resp; } @@ -1548,7 +1598,9 @@ private TIoTConsensusV2TransferResp onRequest( if (resp != null && resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - onSuccess(tCommitId, isTransferTsFileSeal); + if (!shouldKeepRequestMetaAfterSuccess) { + onSuccess(tCommitId, isTransferTsFileSeal); + } } return resp; } else { @@ -1593,7 +1645,9 @@ private TIoTConsensusV2TransferResp onRequest( if (resp != null && resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - onSuccess(tCommitId, isTransferTsFileSeal); + if (!shouldKeepRequestMetaAfterSuccess) { + onSuccess(tCommitId, isTransferTsFileSeal); + } } return resp; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java index a4dba48b7e98b..d97dfa4ece637 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java @@ -47,11 +47,14 @@ import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.handler.IoTConsensusV2DeleteEventHandler; +import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.handler.IoTConsensusV2ObjectFileInsertNodeEventHandler; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.handler.IoTConsensusV2TabletBatchEventHandler; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.handler.IoTConsensusV2TabletInsertNodeEventHandler; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.handler.IoTConsensusV2TsFileInsertionEventHandler; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.builder.IoTConsensusV2AsyncBatchReqBuilder; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2DeleteNodeReq; +import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2ObjectFileUtils; +import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2ObjectFileUtils.ObjectFileDescriptor; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletInsertNodeReq; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.pipe.api.annotation.TableModel; @@ -70,6 +73,7 @@ import java.io.IOException; import java.util.Comparator; import java.util.Iterator; +import java.util.List; import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; @@ -322,20 +326,43 @@ private boolean transferInEventWithoutCheck(PipeInsertionEvent tabletInsertionEv pipeInsertNodeTabletInsertionEvent.getCommitterKey().getRestartTimes(), pipeInsertNodeTabletInsertionEvent.getRebootTimes()); - final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode(); - final ProgressIndex progressIndex = pipeInsertNodeTabletInsertionEvent.getProgressIndex(); - final TIoTConsensusV2TransferReq iotConsensusV2TransferReq = - IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq( - insertNode, tCommitId, tConsensusGroupId, progressIndex, thisDataNodeId); - final IoTConsensusV2TabletInsertNodeEventHandler iotConsensusV2InsertNodeReqHandler = - new IoTConsensusV2TabletInsertNodeEventHandler( - pipeInsertNodeTabletInsertionEvent, - iotConsensusV2TransferReq, - this, - iotConsensusV2SinkMetrics); - - transfer(iotConsensusV2InsertNodeReqHandler); - return true; + try { + final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode(); + final ProgressIndex progressIndex = pipeInsertNodeTabletInsertionEvent.getProgressIndex(); + final List objectFileDescriptors = + IoTConsensusV2ObjectFileUtils.collectObjectFileDescriptors(insertNode); + if (!objectFileDescriptors.isEmpty()) { + transfer( + new IoTConsensusV2ObjectFileInsertNodeEventHandler( + pipeInsertNodeTabletInsertionEvent, + insertNode, + objectFileDescriptors, + this, + tCommitId, + tConsensusGroupId, + progressIndex, + thisDataNodeId, + iotConsensusV2SinkMetrics)); + return true; + } + + final TIoTConsensusV2TransferReq iotConsensusV2TransferReq = + IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq( + insertNode, tCommitId, tConsensusGroupId, progressIndex, thisDataNodeId); + final IoTConsensusV2TabletInsertNodeEventHandler iotConsensusV2InsertNodeReqHandler = + new IoTConsensusV2TabletInsertNodeEventHandler( + pipeInsertNodeTabletInsertionEvent, + iotConsensusV2TransferReq, + this, + iotConsensusV2SinkMetrics); + + transfer(iotConsensusV2InsertNodeReqHandler); + return true; + } catch (final Exception e) { + pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount( + IoTConsensusV2AsyncSink.class.getName(), false); + throw e; + } } private void transfer( @@ -350,6 +377,19 @@ private void transfer( } } + private void transfer( + final IoTConsensusV2ObjectFileInsertNodeEventHandler + iotConsensusV2ObjectFileInsertNodeEventHandler) { + AsyncIoTConsensusV2ServiceClient client = null; + try { + client = asyncTransferClientManager.borrowClient(getFollowerUrl()); + iotConsensusV2ObjectFileInsertNodeEventHandler.transfer(client); + } catch (final Exception ex) { + logOnClientException(client, ex); + iotConsensusV2ObjectFileInsertNodeEventHandler.onError(ex); + } + } + @Override public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception { asyncTransferQueuedEventsIfNecessary(); @@ -402,6 +442,7 @@ private boolean transferWithoutCheck(TsFileInsertionEvent tsFileInsertionEvent) tConsensusGroupId, consensusPipeName, thisDataNodeId, + IoTConsensusV2ObjectFileUtils.collectObjectFileDescriptors(pipeTsFileInsertionEvent), iotConsensusV2SinkMetrics); transfer(iotConsensusV2TsFileInsertionEventHandler); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java index 481e340a739f3..328d63b39bbb7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java @@ -41,12 +41,16 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.builder.IoTConsensusV2SyncBatchReqBuilder; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2DeleteNodeReq; +import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2ObjectFilePieceReq; +import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2ObjectFileUtils; +import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2ObjectFileUtils.ObjectFileDescriptor; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletInsertNodeReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFilePieceReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFilePieceWithModReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFileSealReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFileSealWithModReq; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode; import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; @@ -75,6 +79,7 @@ public class IoTConsensusV2SyncSink extends IoTDBSink { private static final String IOT_CONSENSUS_V2_SYNC_CONNECTION_FAILED_FORMAT = "IoTConsensusV2: syncClient connection to %s:%s failed when %s, because: %s"; private static final String TABLET_INSERTION_NODE_SCENARIO = "transfer insertionNode tablet"; + private static final String OBJECT_FILE_SCENARIO = "transfer object file"; private static final String TSFILE_SCENARIO = "transfer tsfile"; private static final String TABLET_BATCH_SCENARIO = "transfer tablet batch"; private static final String DELETION_SCENARIO = "transfer deletion"; @@ -326,6 +331,12 @@ private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletI insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode(); progressIndex = pipeInsertNodeTabletInsertionEvent.getProgressIndex(); + for (final ObjectFileDescriptor descriptor : + IoTConsensusV2ObjectFileUtils.collectObjectFileDescriptors(insertNode)) { + transferObjectFilePieces( + descriptor, syncIoTConsensusV2ServiceClient, tCommitId, tConsensusGroupId); + } + resp = syncIoTConsensusV2ServiceClient.iotConsensusV2Transfer( IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq( @@ -354,6 +365,56 @@ private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletI } } + private void transferObjectFilePieces( + final ObjectFileDescriptor descriptor, + final SyncIoTConsensusV2ServiceClient syncIoTConsensusV2ServiceClient, + final TCommitId tCommitId, + final TConsensusGroupId tConsensusGroupId) + throws PipeException { + final int readFileBufferSize = + Math.max(1, PipeConfig.getInstance().getPipeSinkReadFileBufferSize()); + final long objectSize = descriptor.getObjectSize(); + long offset = 0; + + do { + final int pieceLength = + objectSize == 0 ? 0 : (int) Math.min(readFileBufferSize, objectSize - offset); + final boolean isEOF = objectSize == 0 || offset + pieceLength >= objectSize; + final TIoTConsensusV2TransferResp resp; + try { + resp = + syncIoTConsensusV2ServiceClient.iotConsensusV2Transfer( + IoTConsensusV2ObjectFilePieceReq.toTIoTConsensusV2TransferReq( + new ObjectNode(isEOF, offset, pieceLength, descriptor.getObjectPath()), + tCommitId, + tConsensusGroupId, + thisDataNodeId)); + } catch (final Exception e) { + throw new PipeRuntimeSinkRetryTimesConfigurableException( + String.format( + IOT_CONSENSUS_V2_SYNC_CONNECTION_FAILED_FORMAT, + getFollowerUrl().getIp(), + getFollowerUrl().getPort(), + OBJECT_FILE_SCENARIO, + e.getMessage()), + Integer.MAX_VALUE); + } + + final TSStatus status = resp.getStatus(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + receiverStatusHandler.handle( + status, + String.format( + "Transfer object file %s error, result status %s.", + descriptor.getObjectPathString(), status), + descriptor.getObjectPathString()); + } + + offset += pieceLength; + } while (offset < objectSize); + } + private void doTransfer(final PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException { final File tsFile = pipeTsFileInsertionEvent.getTsFile(); @@ -370,6 +431,12 @@ private void doTransfer(final PipeTsFileInsertionEvent pipeTsFileInsertionEvent) final TConsensusGroupId tConsensusGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, consensusGroupId); + for (final ObjectFileDescriptor descriptor : + IoTConsensusV2ObjectFileUtils.collectObjectFileDescriptors(pipeTsFileInsertionEvent)) { + transferObjectFilePieces( + descriptor, syncIoTConsensusV2ServiceClient, tCommitId, tConsensusGroupId); + } + // 1. Transfer tsFile, and mod file if exists if (pipeTsFileInsertionEvent.isWithMod()) { transferFilePieces( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2ObjectFileInsertNodeEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2ObjectFileInsertNodeEventHandler.java new file mode 100644 index 0000000000000..107f79e57a6dd --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2ObjectFileInsertNodeEventHandler.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.handler; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.async.AsyncIoTConsensusV2ServiceClient; +import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.utils.RetryUtils; +import org.apache.iotdb.consensus.iotconsensusv2.thrift.TCommitId; +import org.apache.iotdb.consensus.iotconsensusv2.thrift.TIoTConsensusV2TransferResp; +import org.apache.iotdb.db.i18n.DataNodePipeMessages; +import org.apache.iotdb.db.pipe.consensus.metric.IoTConsensusV2SinkMetrics; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; +import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.IoTConsensusV2AsyncSink; +import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2ObjectFilePieceReq; +import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2ObjectFileUtils.ObjectFileDescriptor; +import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletInsertNodeReq; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode; +import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class IoTConsensusV2ObjectFileInsertNodeEventHandler + implements AsyncMethodCallback { + + private static final Logger LOGGER = + LoggerFactory.getLogger(IoTConsensusV2ObjectFileInsertNodeEventHandler.class); + + private final PipeInsertNodeTabletInsertionEvent event; + private final InsertNode insertNode; + private final List objectFileDescriptors; + private final IoTConsensusV2AsyncSink connector; + private final TCommitId commitId; + private final TConsensusGroupId consensusGroupId; + private final ProgressIndex progressIndex; + private final int thisDataNodeId; + private final IoTConsensusV2SinkMetrics metric; + private final int readFileBufferSize; + private final long createTime; + + private AsyncIoTConsensusV2ServiceClient client; + private int currentObjectFileIndex; + private long currentObjectFileOffset; + private boolean insertNodeSent; + + public IoTConsensusV2ObjectFileInsertNodeEventHandler( + final PipeInsertNodeTabletInsertionEvent event, + final InsertNode insertNode, + final List objectFileDescriptors, + final IoTConsensusV2AsyncSink connector, + final TCommitId commitId, + final TConsensusGroupId consensusGroupId, + final ProgressIndex progressIndex, + final int thisDataNodeId, + final IoTConsensusV2SinkMetrics metric) { + this.event = event; + this.insertNode = insertNode; + this.objectFileDescriptors = objectFileDescriptors; + this.connector = connector; + this.commitId = commitId; + this.consensusGroupId = consensusGroupId; + this.progressIndex = progressIndex; + this.thisDataNodeId = thisDataNodeId; + this.metric = metric; + this.readFileBufferSize = Math.max(1, PipeConfig.getInstance().getPipeSinkReadFileBufferSize()); + this.createTime = System.nanoTime(); + } + + public void transfer(final AsyncIoTConsensusV2ServiceClient client) throws TException { + this.client = client; + client.setShouldReturnSelf(false); + transferNextReq(); + } + + private void transferNextReq() throws TException { + if (currentObjectFileIndex < objectFileDescriptors.size()) { + final ObjectFileDescriptor descriptor = objectFileDescriptors.get(currentObjectFileIndex); + final ObjectNode objectNode = nextObjectNode(descriptor); + client.iotConsensusV2Transfer( + IoTConsensusV2ObjectFilePieceReq.toTIoTConsensusV2TransferReq( + objectNode, commitId, consensusGroupId, thisDataNodeId), + this); + return; + } + + insertNodeSent = true; + client.iotConsensusV2Transfer( + IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq( + insertNode, commitId, consensusGroupId, progressIndex, thisDataNodeId), + this); + } + + private ObjectNode nextObjectNode(final ObjectFileDescriptor descriptor) { + final long objectSize = descriptor.getObjectSize(); + final int pieceLength = + objectSize == 0 + ? 0 + : (int) Math.min(readFileBufferSize, objectSize - currentObjectFileOffset); + final boolean isEOF = objectSize == 0 || currentObjectFileOffset + pieceLength >= objectSize; + final ObjectNode objectNode = + new ObjectNode(isEOF, currentObjectFileOffset, pieceLength, descriptor.getObjectPath()); + + if (isEOF) { + currentObjectFileIndex++; + currentObjectFileOffset = 0; + } else { + currentObjectFileOffset += pieceLength; + } + + return objectNode; + } + + @Override + public void onComplete(final TIoTConsensusV2TransferResp response) { + if (response == null) { + onError(new PipeException(DataNodePipeMessages.TIOTCONSENSUSV2TRANSFERRESP_IS_NULL)); + return; + } + + try { + final TSStatus status = response.getStatus(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + connector.statusHandler().handle(status, status.getMessage(), event.toString()); + } + + if (!insertNodeSent) { + transferNextReq(); + return; + } + + event.decreaseReferenceCount( + IoTConsensusV2ObjectFileInsertNodeEventHandler.class.getName(), true); + connector.removeEventFromBuffer(event); + metric.recordConnectorWalTransferTimer(System.nanoTime() - createTime); + + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.info( + DataNodePipeMessages.INSERTNODETRANSFER_NO_EVENT_SUCCESSFULLY_PROCESSED, + event.getReplicateIndexForIoTV2()); + } + + returnClientIfNecessary(); + } catch (final Exception e) { + onError(e); + } + } + + @Override + public void onError(final Exception exception) { + LOGGER.warn( + DataNodePipeMessages.FAILED_TO_TRANSFER_TABLETINSERTIONEVENT_COMMITTER_KEY_REPLICATE, + event.coreReportMessage(), + event.getCommitterKey(), + event.getReplicateIndexForIoTV2(), + exception); + + if (RetryUtils.needRetryWithIncreasingInterval(exception)) { + if (event.getRetryInterval() << 2 <= 0) { + event.setRetryInterval(1000L * 20); + } else { + event.setRetryInterval(Math.min(1000L * 20, event.getRetryInterval() << 2)); + } + } + + connector.addFailureEventToRetryQueue(event); + metric.recordRetryCounter(); + returnClientIfNecessary(); + } + + private void returnClientIfNecessary() { + if (client != null) { + client.setShouldReturnSelf(true); + client.returnSelf(); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java index 490806cfddb27..57862c8fe4f46 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java @@ -31,10 +31,13 @@ import org.apache.iotdb.db.pipe.consensus.metric.IoTConsensusV2SinkMetrics; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.IoTConsensusV2AsyncSink; +import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2ObjectFilePieceReq; +import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2ObjectFileUtils.ObjectFileDescriptor; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFilePieceReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFilePieceWithModReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFileSealReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFileSealWithModReq; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; @@ -47,6 +50,7 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.util.Arrays; +import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; @@ -64,6 +68,7 @@ public class IoTConsensusV2TsFileInsertionEventHandler private final int thisDataNodeId; private final File tsFile; private final File modFile; + private final List objectFileDescriptors; private File currentFile; private final boolean transferMod; @@ -71,6 +76,9 @@ public class IoTConsensusV2TsFileInsertionEventHandler private final int readFileBufferSize; private final byte[] readBuffer; private long position; + private int currentObjectFileIndex; + private long currentObjectFileOffset; + private boolean transferringObjectFile; private RandomAccessFile reader; @@ -91,6 +99,7 @@ public IoTConsensusV2TsFileInsertionEventHandler( final TConsensusGroupId consensusGroupId, final String consensusPipeName, final int thisDataNodeId, + final List objectFileDescriptors, final IoTConsensusV2SinkMetrics metric) throws FileNotFoundException { this.event = event; @@ -102,6 +111,7 @@ public IoTConsensusV2TsFileInsertionEventHandler( tsFile = event.getTsFile(); modFile = event.getModFile(); + this.objectFileDescriptors = objectFileDescriptors; transferMod = event.isWithMod(); currentFile = transferMod ? modFile : tsFile; @@ -127,6 +137,17 @@ public void transfer(final AsyncIoTConsensusV2ServiceClient client) this.client = client; client.setShouldReturnSelf(false); + if (currentObjectFileIndex < objectFileDescriptors.size()) { + transferringObjectFile = true; + final ObjectFileDescriptor descriptor = objectFileDescriptors.get(currentObjectFileIndex); + client.iotConsensusV2Transfer( + IoTConsensusV2ObjectFilePieceReq.toTIoTConsensusV2TransferReq( + nextObjectNode(descriptor), commitId, consensusGroupId, thisDataNodeId), + this); + return; + } + transferringObjectFile = false; + final int readLength = reader.read(readBuffer); if (readLength == -1) { if (currentFile == modFile) { @@ -194,8 +215,42 @@ public void transfer(final AsyncIoTConsensusV2ServiceClient client) position += readLength; } + private ObjectNode nextObjectNode(final ObjectFileDescriptor descriptor) { + final long objectSize = descriptor.getObjectSize(); + final int pieceLength = + objectSize == 0 + ? 0 + : (int) Math.min(readFileBufferSize, objectSize - currentObjectFileOffset); + final boolean isEOF = objectSize == 0 || currentObjectFileOffset + pieceLength >= objectSize; + final ObjectNode objectNode = + new ObjectNode(isEOF, currentObjectFileOffset, pieceLength, descriptor.getObjectPath()); + + if (isEOF) { + currentObjectFileIndex++; + currentObjectFileOffset = 0; + } else { + currentObjectFileOffset += pieceLength; + } + + return objectNode; + } + @Override public void onComplete(final TIoTConsensusV2TransferResp response) { + if (transferringObjectFile) { + try { + final TSStatus status = response.getStatus(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + connector.statusHandler().handle(status, status.getMessage(), tsFile.getName()); + } + transfer(client); + } catch (final Exception e) { + onError(e); + } + return; + } + if (isSealSignalSent.get()) { try { final TSStatus status = response.getStatus(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/request/IoTConsensusV2ObjectFilePieceReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/request/IoTConsensusV2ObjectFilePieceReq.java new file mode 100644 index 0000000000000..ad40bfdbc200d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/request/IoTConsensusV2ObjectFilePieceReq.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.commons.pipe.sink.payload.iotconsensusv2.request.IoTConsensusV2RequestType; +import org.apache.iotdb.commons.pipe.sink.payload.iotconsensusv2.request.IoTConsensusV2RequestVersion; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.consensus.iotconsensusv2.thrift.TCommitId; +import org.apache.iotdb.consensus.iotconsensusv2.thrift.TIoTConsensusV2TransferReq; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode; +import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; + +import java.nio.ByteBuffer; +import java.util.Objects; + +public class IoTConsensusV2ObjectFilePieceReq extends TIoTConsensusV2TransferReq { + + private transient ObjectNode objectNode; + + private IoTConsensusV2ObjectFilePieceReq() { + // Do nothing + } + + public ObjectNode getObjectNode() { + return objectNode; + } + + /////////////////////////////// Thrift /////////////////////////////// + + public static IoTConsensusV2ObjectFilePieceReq toTIoTConsensusV2TransferReq( + final ObjectNode objectNode, + final TCommitId commitId, + final TConsensusGroupId consensusGroupId, + final int thisDataNodeId) { + final IoTConsensusV2ObjectFilePieceReq req = new IoTConsensusV2ObjectFilePieceReq(); + + req.objectNode = objectNode; + req.commitId = commitId; + req.consensusGroupId = consensusGroupId; + req.dataNodeId = thisDataNodeId; + req.version = IoTConsensusV2RequestVersion.VERSION_1.getVersion(); + req.type = IoTConsensusV2RequestType.TRANSFER_OBJECT_FILE_PIECE.getType(); + req.body = objectNode.serialize(); + + return req; + } + + public static IoTConsensusV2ObjectFilePieceReq fromTIoTConsensusV2TransferReq( + final TIoTConsensusV2TransferReq transferReq) { + final IoTConsensusV2ObjectFilePieceReq req = new IoTConsensusV2ObjectFilePieceReq(); + + final ByteBuffer body = transferReq.body.duplicate(); + final PlanNode planNode = WALEntry.deserializeForConsensus(body); + req.objectNode = (ObjectNode) planNode; + + req.version = transferReq.version; + req.type = transferReq.type; + req.body = transferReq.body; + req.commitId = transferReq.commitId; + req.dataNodeId = transferReq.dataNodeId; + req.consensusGroupId = transferReq.consensusGroupId; + + return req; + } + + /////////////////////////////// Object /////////////////////////////// + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final IoTConsensusV2ObjectFilePieceReq that = (IoTConsensusV2ObjectFilePieceReq) obj; + return version == that.version + && type == that.type + && Objects.equals(body, that.body) + && Objects.equals(commitId, that.commitId) + && Objects.equals(consensusGroupId, that.consensusGroupId) + && Objects.equals(dataNodeId, that.dataNodeId); + } + + @Override + public int hashCode() { + return Objects.hash(version, type, body, commitId, consensusGroupId, dataNodeId); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/request/IoTConsensusV2ObjectFileUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/request/IoTConsensusV2ObjectFileUtils.java new file mode 100644 index 0000000000000..6ef6629fed20b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/request/IoTConsensusV2ObjectFileUtils.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request; + +import org.apache.iotdb.calc.utils.IObjectPath; +import org.apache.iotdb.calc.utils.ObjectTypeUtils; +import org.apache.iotdb.commons.exception.ObjectFileNotExist; +import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata; +import org.apache.tsfile.file.metadata.ChunkMetadata; +import org.apache.tsfile.file.metadata.IChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.MetadataIndexNode; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.file.metadata.TsFileMetadata; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.BatchData; +import org.apache.tsfile.read.common.Chunk; +import org.apache.tsfile.read.controller.IMetadataQuerier; +import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl; +import org.apache.tsfile.read.reader.IChunkReader; +import org.apache.tsfile.read.reader.chunk.TableChunkReader; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.TsPrimitiveType; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public final class IoTConsensusV2ObjectFileUtils { + + private IoTConsensusV2ObjectFileUtils() { + // Utility class + } + + public static List collectObjectFileDescriptors( + final InsertNode insertNode) { + final Map objectFileDescriptors = new LinkedHashMap<>(); + collectObjectFileDescriptors(insertNode, objectFileDescriptors); + return new ArrayList<>(objectFileDescriptors.values()); + } + + public static List collectObjectFileDescriptors( + final PipeTsFileInsertionEvent tsFileInsertionEvent) throws Exception { + final Map objectFileDescriptors = new LinkedHashMap<>(); + collectObjectFileDescriptors(tsFileInsertionEvent.getTsFile(), objectFileDescriptors); + return new ArrayList<>(objectFileDescriptors.values()); + } + + private static void collectObjectFileDescriptors( + final File tsFile, final Map objectFileDescriptors) + throws IOException { + try (final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { + final IMetadataQuerier metadataQuerier = new MetadataQuerierByFileImpl(reader); + final TsFileMetadata fileMetadata = metadataQuerier.getWholeFileMetadata(); + final Map tableSchemaMap = reader.getTableSchemaMap(); + if (tableSchemaMap == null) { + return; + } + + for (final Map.Entry entry : tableSchemaMap.entrySet()) { + if (!mayContainObjectValue(entry.getValue())) { + continue; + } + + final MetadataIndexNode tableRoot = fileMetadata.getTableMetadataIndexNode(entry.getKey()); + if (tableRoot == null) { + continue; + } + + final Iterator> deviceMetaIterator = + metadataQuerier.deviceIterator(tableRoot, null); + while (deviceMetaIterator.hasNext()) { + final Pair pair = deviceMetaIterator.next(); + for (final AbstractAlignedChunkMetadata alignedChunkMetadata : + reader.getAlignedChunkMetadata(pair.getLeft(), false)) { + collectObjectFileDescriptors(reader, alignedChunkMetadata, objectFileDescriptors); + } + } + } + } + } + + private static boolean mayContainObjectValue(final TableSchema tableSchema) { + if (tableSchema == null || tableSchema.getColumnSchemas() == null) { + return false; + } + for (final IMeasurementSchema schema : tableSchema.getColumnSchemas()) { + if (schema != null && schema.getType() == TSDataType.OBJECT) { + return true; + } + } + return false; + } + + private static void collectObjectFileDescriptors( + final TsFileSequenceReader reader, + final AbstractAlignedChunkMetadata alignedChunkMetadata, + final Map objectFileDescriptors) + throws IOException { + if (alignedChunkMetadata == null || alignedChunkMetadata.getValueChunkMetadataList() == null) { + return; + } + + final List objectChunks = new ArrayList<>(); + for (final IChunkMetadata chunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) { + if (chunkMetadata == null || chunkMetadata.getDataType() != TSDataType.OBJECT) { + continue; + } + objectChunks.add(reader.readMemChunk((ChunkMetadata) chunkMetadata)); + } + + if (objectChunks.isEmpty()) { + return; + } + + final Chunk timeChunk = + reader.readMemChunk((ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata()); + timeChunk.getData().rewind(); + for (final Chunk objectChunk : objectChunks) { + objectChunk.getData().rewind(); + } + + final IChunkReader chunkReader = new TableChunkReader(timeChunk, objectChunks, null); + while (chunkReader.hasNextSatisfiedPage()) { + final BatchData batchData = chunkReader.nextPageData(); + while (batchData.hasCurrent()) { + final TsPrimitiveType[] objectValues = batchData.getVector(); + if (objectValues != null) { + for (final TsPrimitiveType objectValue : objectValues) { + if (objectValue != null) { + collectObjectFileDescriptor(objectValue.getBinary(), objectFileDescriptors); + } + } + } + batchData.next(); + } + } + } + + private static void collectObjectFileDescriptors( + final InsertNode insertNode, final Map objectFileDescriptors) { + if (insertNode instanceof InsertRowNode) { + collectObjectFileDescriptors((InsertRowNode) insertNode, objectFileDescriptors); + return; + } + if (insertNode instanceof InsertTabletNode) { + collectObjectFileDescriptors((InsertTabletNode) insertNode, objectFileDescriptors); + return; + } + if (insertNode instanceof InsertRowsNode) { + for (final InsertRowNode insertRowNode : + ((InsertRowsNode) insertNode).getInsertRowNodeList()) { + collectObjectFileDescriptors(insertRowNode, objectFileDescriptors); + } + return; + } + if (insertNode instanceof InsertRowsOfOneDeviceNode) { + for (final InsertRowNode insertRowNode : + ((InsertRowsOfOneDeviceNode) insertNode).getInsertRowNodeList()) { + collectObjectFileDescriptors(insertRowNode, objectFileDescriptors); + } + return; + } + if (insertNode instanceof InsertMultiTabletsNode) { + for (final InsertTabletNode insertTabletNode : + ((InsertMultiTabletsNode) insertNode).getInsertTabletNodeList()) { + collectObjectFileDescriptors(insertTabletNode, objectFileDescriptors); + } + } + } + + private static void collectObjectFileDescriptors( + final InsertRowNode insertRowNode, + final Map objectFileDescriptors) { + final TSDataType[] dataTypes = insertRowNode.getDataTypes(); + final Object[] values = insertRowNode.getValues(); + final String[] measurements = insertRowNode.getMeasurements(); + if (dataTypes == null || values == null) { + return; + } + + final int columnCount = Math.min(dataTypes.length, values.length); + for (int i = 0; i < columnCount; ++i) { + if ((measurements != null && measurements[i] == null) + || dataTypes[i] != TSDataType.OBJECT + || values[i] == null) { + continue; + } + collectObjectFileDescriptor(values[i], objectFileDescriptors); + } + } + + private static void collectObjectFileDescriptors( + final InsertTabletNode insertTabletNode, + final Map objectFileDescriptors) { + final TSDataType[] dataTypes = insertTabletNode.getDataTypes(); + final Object[] columns = insertTabletNode.getColumns(); + final String[] measurements = insertTabletNode.getMeasurements(); + if (dataTypes == null || columns == null) { + return; + } + + final BitMap[] bitMaps = insertTabletNode.getBitMaps(); + final int columnCount = Math.min(dataTypes.length, columns.length); + for (int i = 0; i < columnCount; ++i) { + if ((measurements != null && measurements[i] == null) + || dataTypes[i] != TSDataType.OBJECT + || columns[i] == null) { + continue; + } + + final Binary[] objectValues = (Binary[]) columns[i]; + final BitMap bitMap = bitMaps == null || i >= bitMaps.length ? null : bitMaps[i]; + final int rowCount = Math.min(insertTabletNode.getRowCount(), objectValues.length); + for (int row = 0; row < rowCount; ++row) { + if (bitMap != null && bitMap.isMarked(row)) { + continue; + } + collectObjectFileDescriptor(objectValues[row], objectFileDescriptors); + } + } + } + + private static void collectObjectFileDescriptors( + final Tablet tablet, final Map objectFileDescriptors) { + final List schemas = tablet.getSchemas(); + final Object[] columns = tablet.getValues(); + if (schemas == null || columns == null) { + return; + } + + final BitMap[] bitMaps = tablet.getBitMaps(); + final int columnCount = Math.min(schemas.size(), columns.length); + for (int i = 0; i < columnCount; ++i) { + final IMeasurementSchema schema = schemas.get(i); + if (schema == null || schema.getType() != TSDataType.OBJECT || columns[i] == null) { + continue; + } + + final BitMap bitMap = bitMaps == null || i >= bitMaps.length ? null : bitMaps[i]; + final int rowCount = tablet.getRowSize(); + for (int row = 0; row < rowCount; ++row) { + if (bitMap != null && bitMap.isMarked(row)) { + continue; + } + collectObjectFileDescriptor(getTabletObjectValue(columns[i], row), objectFileDescriptors); + } + } + } + + private static Object getTabletObjectValue(final Object column, final int row) { + if (column instanceof Binary[]) { + return ((Binary[]) column)[row]; + } + if (column instanceof byte[][]) { + return new Binary(((byte[][]) column)[row]); + } + if (column instanceof Object[]) { + return ((Object[]) column)[row]; + } + throw new IllegalArgumentException( + String.format( + "Object type column should be Binary[], byte[][], or Object[], but actual type is %s.", + column.getClass().getName())); + } + + private static void collectObjectFileDescriptor( + final Object value, final Map objectFileDescriptors) { + if (value == null) { + return; + } + if (!(value instanceof Binary)) { + throw new IllegalArgumentException( + String.format( + "Object type value should be Binary, but actual type is %s.", + value.getClass().getName())); + } + + final Binary binary = (Binary) value; + if (binary.getValues() == null || binary.getLength() == 0) { + return; + } + + final Pair objectSizePathPair = + ObjectTypeUtils.parseObjectBinaryToSizeIObjectPathPair(binary); + final long objectSize = objectSizePathPair.getLeft(); + if (objectSize < 0) { + throw new IllegalArgumentException( + String.format( + "Object file size should be non-negative, but actual size is %s.", objectSize)); + } + + final IObjectPath objectPath = objectSizePathPair.getRight(); + final String objectPathString = objectPath.toString(); + final ObjectFileDescriptor existedDescriptor = objectFileDescriptors.get(objectPathString); + if (existedDescriptor != null) { + if (existedDescriptor.getObjectSize() != objectSize) { + throw new IllegalArgumentException( + String.format( + "Object file %s has inconsistent sizes %s and %s.", + objectPathString, existedDescriptor.getObjectSize(), objectSize)); + } + return; + } + + if (!TierManager.getInstance().getAbsoluteObjectFilePath(objectPathString, true).isPresent()) { + throw new ObjectFileNotExist(objectPathString); + } + + objectFileDescriptors.put(objectPathString, new ObjectFileDescriptor(objectSize, objectPath)); + } + + public static final class ObjectFileDescriptor { + private final long objectSize; + private final IObjectPath objectPath; + + private ObjectFileDescriptor(final long objectSize, final IObjectPath objectPath) { + this.objectSize = objectSize; + this.objectPath = objectPath; + } + + public long getObjectSize() { + return objectSize; + } + + public IObjectPath getObjectPath() { + return objectPath; + } + + public String getObjectPathString() { + return objectPath.toString(); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/request/IoTConsensusV2TabletBinaryReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/request/IoTConsensusV2TabletBinaryReq.java index d48121018c854..1ea196123cc0f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/request/IoTConsensusV2TabletBinaryReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/request/IoTConsensusV2TabletBinaryReq.java @@ -48,10 +48,14 @@ private IoTConsensusV2TabletBinaryReq() { } public InsertNode convertToInsertNode() { - final PlanNode node = WALEntry.deserializeForConsensus(byteBuffer); + final PlanNode node = convertToPlanNode(); return node instanceof InsertNode ? (InsertNode) node : null; } + public PlanNode convertToPlanNode() { + return WALEntry.deserializeForConsensus(byteBuffer.duplicate()); + } + /////////////////////////////// Thrift /////////////////////////////// public static IoTConsensusV2TabletBinaryReq toTIoTConsensusV2TransferReq( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java index bdc03cc57c42f..f612cbd20d76a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java @@ -22,12 +22,15 @@ import org.apache.iotdb.calc.utils.IObjectPath; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.ObjectFileNotExist; import org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.IPlanVisitor; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.i18n.DataNodeQueryMessages; import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; @@ -38,8 +41,10 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.apache.tsfile.write.schema.MeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +57,8 @@ import java.util.List; import java.util.Optional; +import static org.apache.iotdb.calc.utils.ObjectTypeUtils.generateObjectBinary; + public class ObjectNode extends SearchNode implements WALEntryValue { private static final Logger LOGGER = LoggerFactory.getLogger(ObjectNode.class); @@ -70,6 +77,8 @@ public class ObjectNode extends SearchNode implements WALEntryValue { private boolean isGeneratedByRemoteConsensusLeader; + private ProgressIndex progressIndex; + public ObjectNode(boolean isEOF, long offset, byte[] content, IObjectPath filePath) { super(new PlanNodeId("")); this.isEOF = isEOF; @@ -103,6 +112,10 @@ public void setFilePath(IObjectPath filePath) { this.filePath = filePath; } + public IObjectPath getFilePath() { + return filePath; + } + public String getFilePathString() { return filePath.toString(); } @@ -186,11 +199,13 @@ public SearchNode merge(List searchNodes) { @Override public ProgressIndex getProgressIndex() { - return null; + return progressIndex; } @Override - public void setProgressIndex(ProgressIndex progressIndex) {} + public void setProgressIndex(ProgressIndex progressIndex) { + this.progressIndex = progressIndex; + } @Override public List splitByPartition(IAnalysis analysis) { @@ -308,6 +323,23 @@ private void readContentFromFile(File file, byte[] contents) throws IOException } } + public RelationalInsertRowNode genValueInsertRowNode() throws IllegalPathException { + final RelationalInsertRowNode insertRowNode = new RelationalInsertRowNode(this.getPlanNodeId()); + insertRowNode.setAligned(true); + insertRowNode.setDeviceID(filePath.getDeviceID()); + insertRowNode.setTargetPath(new PartialPath(filePath.getDeviceID().getTableName())); + insertRowNode.setTime(filePath.getTime()); + insertRowNode.setMeasurements(new String[] {filePath.getMeasurement()}); + insertRowNode.setDataTypes(new TSDataType[] {TSDataType.OBJECT}); + insertRowNode.setMeasurementSchemas( + new MeasurementSchema[] { + new MeasurementSchema(filePath.getMeasurement(), TSDataType.OBJECT) + }); + insertRowNode.setColumnCategories(new TsTableColumnCategory[] {TsTableColumnCategory.FIELD}); + insertRowNode.setValues(new Object[] {generateObjectBinary(offset + content.length, filePath)}); + return insertRowNode; + } + @Override public PlanNodeType getType() { return PlanNodeType.OBJECT_FILE_NODE; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java index 83498ceefc9bc..4f66e7a404a44 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write; +import org.apache.iotdb.calc.utils.IObjectPath; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.IPlanVisitor; @@ -31,8 +32,11 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.storageengine.dataregion.memtable.AbstractMemTable; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IDeviceID.Factory; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataInputStream; import java.io.IOException; @@ -177,6 +181,8 @@ public List splitByPartition(IAnalysis analysis) { TimePartitionUtils.getTimePartitionSlot(insertRowNode.getTime()), analysis.getDatabaseName()); + handleObjectValue(insertRowNode, dataRegionReplicaSet, writePlanNodeList); + // Collect redirectInfo redirectInfo.add(dataRegionReplicaSet.getDataNodeLocations().get(0).getClientRpcEndPoint()); RelationalInsertRowsNode tmpNode = splitMap.get(dataRegionReplicaSet); @@ -195,6 +201,36 @@ public List splitByPartition(IAnalysis analysis) { return writePlanNodeList; } + private void handleObjectValue( + InsertRowNode insertRowNode, + TRegionReplicaSet dataRegionReplicaSet, + List writePlanNodeList) { + for (int i = 0; i < insertRowNode.getDataTypes().length; i++) { + if (insertRowNode.getDataTypes()[i] != TSDataType.OBJECT) { + continue; + } + final Object[] values = insertRowNode.getValues(); + if (values[i] == null) { + continue; + } + final byte[] binary = ((Binary) values[i]).getValues(); + final ByteBuffer buffer = ByteBuffer.wrap(binary); + final boolean isEOF = buffer.get() == 1; + final long offset = buffer.getLong(); + final byte[] content = ReadWriteIOUtils.readBytes(buffer, buffer.remaining()); + final IObjectPath relativePath = + IObjectPath.Factory.FACTORY.create( + dataRegionReplicaSet.getRegionId().getId(), + insertRowNode.getTime(), + insertRowNode.getDeviceID(), + insertRowNode.getMeasurements()[i]); + final ObjectNode objectNode = new ObjectNode(isEOF, offset, content, relativePath); + objectNode.setDataRegionReplicaSet(dataRegionReplicaSet); + writePlanNodeList.add(objectNode); + values[i] = null; + } + } + public RelationalInsertRowsNode emptyClone() { return new RelationalInsertRowsNode(this.getPlanNodeId()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java index 8d24ad7736434..2954648cf140f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write; +import org.apache.iotdb.calc.utils.IObjectPath; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; @@ -404,6 +405,11 @@ protected List doSplit(Map> spli // Avoid using system arraycopy when there is no need to split setRange(entry.getValue()); setDataRegionReplicaSet(entry.getKey()); + for (int i = 0; i < columns.length; i++) { + if (dataTypes[i] == TSDataType.OBJECT) { + handleObjectValue(i, 0, times.length, entry, result); + } + } result.add(this); return result; } @@ -440,9 +446,19 @@ private List generateOneSplitList( System.arraycopy(times, start, subNode.times, destLoc, length); for (int i = 0; i < subNode.columns.length; i++) { if (dataTypes[i] != null) { - System.arraycopy(columns[i], start, subNode.columns[i], destLoc, length); + if (dataTypes[i] == TSDataType.OBJECT) { + handleObjectValue(i, start, end, entry, result); + } else { + System.arraycopy(columns[i], start, subNode.columns[i], destLoc, length); + } } - if (subNode.bitMaps != null && this.bitMaps[i] != null) { + if (this.bitMaps != null && this.bitMaps[i] != null) { + if (subNode.bitMaps == null) { + subNode.bitMaps = new BitMap[subNode.columns.length]; + } + if (subNode.bitMaps[i] == null) { + subNode.bitMaps[i] = new BitMap(count); + } BitMap.copyOfRange(this.bitMaps[i], start, subNode.bitMaps[i], destLoc, length); } } @@ -455,6 +471,44 @@ private List generateOneSplitList( return result; } + private void handleObjectValue( + int column, + int startRow, + int endRow, + Map.Entry> entry, + List result) { + for (int row = startRow; row < endRow; row++) { + if (((Binary[]) columns[column])[row] == null) { + continue; + } + final byte[] binary = ((Binary[]) columns[column])[row].getValues(); + if (binary == null || binary.length == 0) { + continue; + } + final ByteBuffer buffer = ByteBuffer.wrap(binary); + final boolean isEOF = buffer.get() == 1; + final long offset = buffer.getLong(); + final byte[] content = ReadWriteIOUtils.readBytes(buffer, buffer.remaining()); + final IObjectPath relativePath = + IObjectPath.Factory.FACTORY.create( + entry.getKey().getRegionId().getId(), + times[row], + getDeviceID(row), + measurements[column]); + final ObjectNode objectNode = new ObjectNode(isEOF, offset, content, relativePath); + objectNode.setDataRegionReplicaSet(entry.getKey()); + result.add(objectNode); + ((Binary[]) columns[column])[row] = null; + if (bitMaps == null) { + bitMaps = new BitMap[columns.length]; + } + if (bitMaps[column] == null) { + bitMaps[column] = new BitMap(rowCount); + } + bitMaps[column].mark(row); + } + } + @Override public void checkDataType(AbstractMemTable memTable) throws DataTypeInconsistentException { if (singleDevice) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java index 5c95fbb62d037..f10923c915886 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java @@ -19,8 +19,11 @@ package org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.exception.SemanticException; +import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.i18n.DataNodeQueryMessages; import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; @@ -254,7 +257,24 @@ private void autoCreateOrUpdateDeviceSchema( } public static void checkObject4DeviceId(final Object[] deviceId) { - throw new SemanticException(DataNodeQueryMessages.THE_OBJECT_TYPE_COLUMN_IS_NOT_SUPPORTED); + if (!CommonDescriptor.getInstance().getConfig().isRestrictObjectLimit()) { + return; + } + if (hasMultipleTiers()) { + throw new SemanticException("The tiered storage does not support object type yet."); + } + for (final Object part : deviceId) { + final String value = (String) part; + if (Objects.nonNull(value) && TsTable.isInvalid4ObjectType(value)) { + throw new SemanticException( + TsTable.getObjectStringError("deviceId", Arrays.toString(deviceId))); + } + } + } + + public static boolean hasMultipleTiers() { + final String[][] tierDataDirs = IoTDBDescriptor.getInstance().getConfig().getTierDataDirs(); + return tierDataDirs.length > 1; } private static class ValidateResult { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 561cf106156fc..05a949c564027 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -90,7 +90,9 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.LastCacheLoadStrategy; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceSchemaCacheManager; @@ -164,6 +166,7 @@ import org.apache.iotdb.db.utils.CommonUtils; import org.apache.iotdb.db.utils.EncryptDBUtils; import org.apache.iotdb.db.utils.ModificationUtils; +import org.apache.iotdb.db.utils.ObjectWriter; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -197,6 +200,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -3968,6 +3972,59 @@ public int compact() { } } + public void writeObject(final ObjectNode objectNode) throws Exception { + writeLock("writeObject"); + try { + final String relativeObjectPath = objectNode.getFilePathString(); + final Optional existingObjectFile = + TierManager.getInstance().getAbsoluteObjectFilePath(relativeObjectPath, true); + final File objectFile; + if (existingObjectFile.isPresent()) { + objectFile = existingObjectFile.get(); + } else { + objectFile = + FSFactoryProducer.getFSFactory() + .getFile( + TierManager.getInstance().getNextFolderForObjectFile(), relativeObjectPath); + } + final File objectTmpFile = + FSFactoryProducer.getFSFactory().getFile(objectFile.getPath() + ".tmp"); + + try (final ObjectWriter writer = new ObjectWriter(objectTmpFile)) { + writer.write( + objectNode.isGeneratedByRemoteConsensusLeader(), + objectNode.getOffset(), + objectNode.getContent()); + } + + if (objectNode.isEOF()) { + if (objectFile.exists()) { + final File objectBackFile = + FSFactoryProducer.getFSFactory().getFile(objectFile.getPath() + ".back"); + Files.move( + objectFile.toPath(), objectBackFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + Files.move( + objectTmpFile.toPath(), objectFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + FileMetrics.getInstance().decreaseObjectFileNum(1); + FileMetrics.getInstance().decreaseObjectFileSize(objectBackFile.length()); + Files.delete(objectBackFile.toPath()); + } else { + Files.move( + objectTmpFile.toPath(), objectFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + } + final RelationalInsertRowNode valueNode = objectNode.genValueInsertRowNode(); + insert(valueNode); + FileMetrics.getInstance().increaseObjectFileNum(1); + FileMetrics.getInstance().increaseObjectFileSize(objectFile.length()); + } + + getWALNode() + .ifPresent(walNode -> walNode.log(TsFileProcessor.MEMTABLE_NOT_EXIST, objectNode)); + } finally { + writeUnlock(); + } + } + /** * Load a new tsfile to unsequence dir. * diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TabletDecoder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TabletDecoder.java index fa4375dd2d313..d245e67aad93f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TabletDecoder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TabletDecoder.java @@ -193,6 +193,7 @@ private Object decodeColumn(ByteBuffer uncompressed, int columnIndex) { case STRING: case BLOB: case TEXT: + case OBJECT: Binary[] binaryCol = new Binary[rowSize]; if (encoding == TSEncoding.PLAIN) { // PlainEncoder uses var int, which may cause compatibility problem diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/iotconsensusv2/request/IoTConsensusV2RequestType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/iotconsensusv2/request/IoTConsensusV2RequestType.java index 43682748ce5d5..c368ed9bc22f5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/iotconsensusv2/request/IoTConsensusV2RequestType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/iotconsensusv2/request/IoTConsensusV2RequestType.java @@ -35,6 +35,7 @@ public enum IoTConsensusV2RequestType { TRANSFER_TS_FILE_PIECE_WITH_MOD((short) 105), TRANSFER_TS_FILE_SEAL_WITH_MOD((short) 106), TRANSFER_DELETION((short) 107), + TRANSFER_OBJECT_FILE_PIECE((short) 108), // Note: temporarily IoTConsensusV2 only support data region. But we put this class in `node-common` // to reserve the scalability diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java index 833c37a59407f..61cb8fd0d63cf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java @@ -22,15 +22,17 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.exception.runtime.SchemaExecutionException; -import org.apache.iotdb.commons.i18n.CommonMessages; import org.apache.iotdb.commons.i18n.SchemaMessages; import org.apache.iotdb.commons.schema.table.column.TimeColumnSchema; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema; import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchemaUtil; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; +import org.apache.iotdb.commons.utils.WindowsOSUtils; +import org.apache.iotdb.rpc.TSStatusCode; import com.google.common.collect.ImmutableList; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -45,6 +47,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -421,7 +424,43 @@ public void setProps(Map props) { } public void checkTableNameAndObjectNames4Object() throws MetadataException { - throw new MetadataException(CommonMessages.OBJECT_TYPE_COLUMN_NOT_SUPPORTED); + if (!CommonDescriptor.getInstance().getConfig().isRestrictObjectLimit()) { + return; + } + if (isInvalid4ObjectType(tableName)) { + throw new MetadataException( + getObjectStringError("tableName", tableName), + TSStatusCode.SEMANTIC_ERROR.getStatusCode()); + } + for (final TsTableColumnSchema schema : columnSchemaMap.values()) { + if (schema.getDataType().equals(TSDataType.OBJECT) + && isInvalid4ObjectType(schema.getColumnName())) { + throw new MetadataException( + getObjectStringError("objectName", schema.getColumnName()), + TSStatusCode.SEMANTIC_ERROR.getStatusCode()); + } + } + } + + public static boolean isInvalid4ObjectType(final String path) { + return path.equals(".") + || path.equals("..") + || path.contains("/") + || path.contains("\\") + || !WindowsOSUtils.isLegalPathSegment4Windows(path); + } + + public static String getObjectStringError(final String columnType, final String columnName) { + return String.format( + isWindows() + ? OBJECT_STRING_ERROR + " " + WindowsOSUtils.OS_SEGMENT_ERROR + : OBJECT_STRING_ERROR, + columnType, + columnName); + } + + private static boolean isWindows() { + return System.getProperty("os.name").toLowerCase(Locale.ROOT).contains("win"); } @Override