diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/PartitionedRecordIndexFileGroupLookupFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/PartitionedRecordIndexFileGroupLookupFunction.java new file mode 100644 index 0000000000000..c32dd7706e957 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/PartitionedRecordIndexFileGroupLookupFunction.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.index; + +import org.apache.hudi.common.data.HoodieListData; +import org.apache.hudi.common.data.HoodiePairData; +import org.apache.hudi.common.model.HoodieRecordGlobalLocation; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.metadata.HoodieTableMetadata; + +import org.apache.spark.api.java.function.PairFlatMapFunction; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import scala.Tuple2; + +/** + * Looks up record-index keys that have already been grouped into one shard of a partitioned record index. + */ +public class PartitionedRecordIndexFileGroupLookupFunction + implements PairFlatMapFunction>, String, HoodieRecordGlobalLocation> { + + private final HoodieTableMetadata metadataTable; + + public PartitionedRecordIndexFileGroupLookupFunction(HoodieTableMetadata metadataTable) { + this.metadataTable = metadataTable; + } + + @Override + public Iterator> call(Iterator> partitionPathRecordKeyIterator) { + String partitionName = null; + List keysToLookup = new ArrayList<>(); + while (partitionPathRecordKeyIterator.hasNext()) { + Pair partitionPathRecordKey = partitionPathRecordKeyIterator.next(); + keysToLookup.add(partitionPathRecordKey.getRight()); + if (partitionName == null) { + partitionName = partitionPathRecordKey.getLeft(); + } + } + + if (keysToLookup.isEmpty()) { + return Collections.emptyIterator(); + } + + HoodiePairData recordIndexData = + metadataTable.readRecordIndexLocationsWithKeys(HoodieListData.eager(keysToLookup), Option.of(partitionName)); + try { + Map recordIndexInfo = recordIndexData.collectAsList().stream() + .collect(HashMap::new, (map, pair) -> map.put(pair.getKey(), pair.getValue()), HashMap::putAll); + return recordIndexInfo.entrySet().stream() + .map(e -> new Tuple2<>(e.getKey(), e.getValue())).iterator(); + } finally { + recordIndexData.unpersistWithDependencies(); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordLevelIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordLevelIndex.java index e35e04d8f20ab..6a12dc3c67a57 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordLevelIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordLevelIndex.java @@ -20,14 +20,12 @@ package org.apache.hudi.index; import org.apache.hudi.common.data.HoodieData; -import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordGlobalLocation; import org.apache.hudi.common.util.Either; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; @@ -39,16 +37,11 @@ import org.apache.hudi.table.HoodieTable; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.PairFlatMapFunction; -import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; -import scala.Tuple2; - /** * Index to be used with RLI. Queries the record index for tables with non-global record keys */ @@ -85,7 +78,7 @@ protected HoodiePairData lookupRecords(H .map(t -> t._2); ValidationUtils.checkState(partitionedKeyRDD.getNumPartitions() <= numFileGroups); // Lookup the keys in the record index - return HoodieJavaPairRDD.of(partitionedKeyRDD.mapPartitionsToPair(new PartitionedRecordIndexFileGroupLookupFunction(hoodieTable))); + return HoodieJavaPairRDD.of(partitionedKeyRDD.mapPartitionsToPair(new PartitionedRecordIndexFileGroupLookupFunction(hoodieTable.getTableMetadata()))); } @Override @@ -105,36 +98,4 @@ protected int getTotalFileGroupCount(Either> fileG protected boolean shouldUpdatePartitionPath(HoodieTable hoodieTable) { return false; } - - /** - * Function that lookups a list of keys in a single shard of the record index - */ - private static class PartitionedRecordIndexFileGroupLookupFunction implements PairFlatMapFunction>, String, HoodieRecordGlobalLocation> { - private final HoodieTable hoodieTable; - - public PartitionedRecordIndexFileGroupLookupFunction(HoodieTable hoodieTable) { - this.hoodieTable = hoodieTable; - } - - @Override - public Iterator> call(Iterator> partitionPathRecordKeyIterator) throws Exception { - //Needs to be final, so we must use 1 element array to store the value - final String[] partitionName = {null}; - List keysToLookup = new ArrayList<>(); - partitionPathRecordKeyIterator.forEachRemaining(p -> { - keysToLookup.add(p.getRight()); - if (partitionName[0] == null) { - partitionName[0] = p.getLeft(); - } - }); - - // recordIndexInfo object only contains records that are present in record_index. - assert partitionName[0] != null || keysToLookup.isEmpty(); - Map recordIndexInfo = - hoodieTable.getTableMetadata().readRecordIndexLocationsWithKeys(HoodieListData.eager(keysToLookup), Option.ofNullable(partitionName[0])) - .collectAsList().stream().collect(HashMap::new, (map, pair) -> map.put(pair.getKey(), pair.getValue()), HashMap::putAll); - return recordIndexInfo.entrySet().stream() - .map(e -> new Tuple2<>(e.getKey(), e.getValue())).iterator(); - } - } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/GlobalRecordLevelIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/GlobalRecordLevelIndexSupport.scala new file mode 100644 index 0000000000000..c750e3eeb89cc --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/GlobalRecordLevelIndexSupport.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.RecordLevelIndexSupport.getPrunedStoragePaths +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.data.HoodieListData +import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.table.HoodieTableMetaClient + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.Expression + +import scala.collection.JavaConverters + +/** + * Data skipping based on a global Record Level Index (RLI), where a single set of file groups indexes + * the record keys across the whole table. All record keys are resolved with one metadata table lookup. + */ +class GlobalRecordLevelIndexSupport(spark: SparkSession, + metadataConfig: HoodieMetadataConfig, + metaClient: HoodieTableMetaClient) + extends RecordLevelIndexSupport(spark, metadataConfig, metaClient) { + + override protected def lookupCandidateFilesForRecordKeys(fileIndex: HoodieFileIndex, + prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])], + recordKeys: List[String]): Option[Set[String]] = { + val prunedStoragePaths = getPrunedStoragePaths(prunedPartitionsAndFileSlices, fileIndex) + val recordIndexData = metadataTable.readRecordIndexLocationsWithKeys( + HoodieListData.eager(JavaConverters.seqAsJavaListConverter(recordKeys).asJava)) + try { + val fileIdToPartitionMap = collectFileIdToPartitionMap(recordIndexData) + Option.apply(filterCandidateFiles(prunedStoragePaths, fileIdToPartitionMap)) + } finally { + // Clean up the RDD to avoid memory leaks + recordIndexData.unpersistWithDependencies() + } + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 2187e5ab6903f..e162f1a2edbf2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -116,7 +116,7 @@ case class HoodieFileIndex(spark: SparkSession, * during `lookupCandidateFilesInMetadataTable` */ @transient private lazy val indicesSupport: List[SparkBaseIndexSupport] = List( - new RecordLevelIndexSupport(spark, metadataConfig, metaClient), + RecordLevelIndexSupport.create(spark, metadataConfig, metaClient), if (PartitionBucketIndexUtils.isPartitionSimpleBucketIndex(metaClient.getStorageConf, metaClient.getBasePath.toString)) { new PartitionBucketIndexSupport(spark, metadataConfig, metaClient, options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant)) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionedRecordLevelIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionedRecordLevelIndexSupport.scala new file mode 100644 index 0000000000000..25132a53e0a6b --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionedRecordLevelIndexSupport.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.RecordLevelIndexSupport.{getPrunedStoragePaths, MAX_PARTITIONS} +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.util.ValidationUtils +import org.apache.hudi.common.util.collection.Pair +import org.apache.hudi.index.PartitionedRecordIndexFileGroupLookupFunction +import org.apache.hudi.metadata.{BucketizedMetadataTableFileGroupIndexParser, HoodieTableMetadataUtil, MetadataPartitionType} + +import org.apache.spark.Partitioner +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * Data skipping based on a partitioned Record Level Index (RLI), where the file groups indexing the + * record keys are sharded per data-table partition. The metadata lookup must therefore be scoped to each + * candidate partition. + * + * The candidate partitions are derived from the already pruned partitions. Because each partition requires a + * separate metadata table lookup, if the number of candidate partitions exceeds {@code MAX_PARTITIONS} the + * record index filtering is skipped (returns [[None]]) and the reader falls back to other indexes. + */ +class PartitionedRecordLevelIndexSupport(spark: SparkSession, + metadataConfig: HoodieMetadataConfig, + metaClient: HoodieTableMetaClient) + extends RecordLevelIndexSupport(spark, metadataConfig, metaClient) with Logging { + + override protected def lookupCandidateFilesForRecordKeys(fileIndex: HoodieFileIndex, + prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])], + recordKeys: List[String]): Option[Set[String]] = { + val partitions = prunedPartitionsAndFileSlices.flatMap { case (partitionPathOpt, _) => + partitionPathOpt.map(_.getPath) + }.toSet + if (partitions.isEmpty) { + // Cannot resolve candidate partitions, fall back to other indexes rather than over-pruning + Option.empty + } else if (partitions.size > MAX_PARTITIONS) { + logInfo(s"The number of candidate partitions ${partitions.size} exceeds the partitioned record level index " + + s"lookup threshold $MAX_PARTITIONS. Skipping record level index pruning.") + Option.empty + } else { + lookupRecordKeys(partitions, recordKeys) match { + case Some(fileIdToPartitionMap) => + val prunedStoragePaths = getPrunedStoragePaths(prunedPartitionsAndFileSlices, fileIndex) + Option.apply(filterCandidateFiles(prunedStoragePaths, fileIdToPartitionMap)) + case None => + // None of the candidate partitions are indexed by the partitioned RLI (e.g. partitions + // not yet indexed), so we cannot determine the matching files. Fall back to other indexes + // rather than over-pruning to an empty candidate set. + Option.empty + } + } + } + + private def lookupRecordKeys(partitions: Set[String], + recordKeys: List[String]): Option[mutable.Map[String, String]] = { + val fileGroups = metadataTable.getBucketizedFileGroupsForPartitionedRLI(MetadataPartitionType.RECORD_INDEX) + val fileGroupCountPerDataPartition = fileGroups.asScala + .filter { case (partition, _) => partitions.contains(partition) } + .map { case (partition, slices) => partition -> Integer.valueOf(slices.size()) } + .toMap + if (fileGroupCountPerDataPartition.isEmpty) { + None + } else { + val numFileGroups = BucketizedMetadataTableFileGroupIndexParser.calculateNumberOfFileGroups(fileGroupCountPerDataPartition.asJava) + val partitionOffsetIndexes = BucketizedMetadataTableFileGroupIndexParser.generatePartitionToBaseIndexOffsets(fileGroupCountPerDataPartition.asJava).asScala + // Like SparkMetadataTableRecordLevelIndex#lookupRecords: build (partition, recordKey) + // pairs, key by the global partitioned-RLI shard id, and let each Spark partition look up + // one record-index shard through PartitionedRecordIndexFileGroupLookupFunction. + val partitionRecordKeys = fileGroupCountPerDataPartition.keys.toSeq.flatMap { partition => + recordKeys.map { recordKey => Pair.of(partition, recordKey) } + } + val partitionedKeyRDD = spark.sparkContext.parallelize(partitionRecordKeys, numFileGroups) + .keyBy { partitionRecordKey => + val partition = partitionRecordKey.getLeft + partitionOffsetIndexes(partition).intValue() + + HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(partitionRecordKey.getRight, fileGroupCountPerDataPartition(partition).intValue()) + } + .partitionBy(new PartitionIdPassthrough(numFileGroups)) + .map(_._2) + .toJavaRDD() + ValidationUtils.checkState(partitionedKeyRDD.getNumPartitions <= numFileGroups) + val fileIdToPartitionMap = partitionedKeyRDD.mapPartitionsToPair(new PartitionedRecordIndexFileGroupLookupFunction(metadataTable)) + .collect() + .asScala + .foldLeft(mutable.Map.empty[String, String]) { (fileIdToPartitionMap, location) => + fileIdToPartitionMap.put(location._2.getFileId, location._2.getPartitionPath) + fileIdToPartitionMap + } + Some(fileIdToPartitionMap) + } + } +} + +private class PartitionIdPassthrough(override val numPartitions: Int) extends Partitioner { + override def getPartition(key: Any): Int = key.asInstanceOf[Int] +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala index fa2835fd896c5..95a9acb01f346 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala @@ -18,34 +18,42 @@ package org.apache.hudi import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, TIME_TRAVEL_AS_OF_INSTANT} -import org.apache.hudi.RecordLevelIndexSupport.getPrunedStoragePaths import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.data.HoodieListData +import org.apache.hudi.common.data.HoodiePairData import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.model.{FileSlice, HoodieRecordGlobalLocation} import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField import org.apache.hudi.common.model.HoodieTableQueryType.SNAPSHOT import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.InstantComparison import org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps import org.apache.hudi.common.util.HoodieDataUtils +import org.apache.hudi.index.record.HoodieRecordIndex import org.apache.hudi.keygen.KeyGenerator import org.apache.hudi.metadata.HoodieTableMetadataUtil +import org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX import org.apache.hudi.storage.StoragePath import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, Cast, EqualTo, Expression, In, Literal} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils -import scala.collection.{mutable, JavaConverters} import scala.collection.JavaConverters._ +import scala.collection.mutable -class RecordLevelIndexSupport(spark: SparkSession, - metadataConfig: HoodieMetadataConfig, - metaClient: HoodieTableMetaClient) +/** + * Base class for data skipping based on the Record Level Index (RLI) in the metadata table. + * + * The RLI maps a record key to the file group that stores it. The actual metadata lookup differs between + * a global RLI ([[GlobalRecordLevelIndexSupport]]) and a partitioned RLI + * ([[PartitionedRecordLevelIndexSupport]]); subclasses implement [[lookupCandidateFilesForRecordKeys]]. + * Use [[RecordLevelIndexSupport.create]] to instantiate the right implementation for a table. + */ +abstract class RecordLevelIndexSupport(spark: SparkSession, + metadataConfig: HoodieMetadataConfig, + metaClient: HoodieTableMetaClient) extends SparkBaseIndexSupport(spark, metadataConfig, metaClient) { - override def getIndexName: String = RecordLevelIndexSupport.INDEX_NAME override def computeCandidateFileNames(fileIndex: HoodieFileIndex, @@ -55,48 +63,56 @@ class RecordLevelIndexSupport(spark: SparkSession, shouldPushDownFilesFilter: Boolean ): Option[Set[String]] = { lazy val (_, recordKeys) = filterQueriesWithRecordKey(queryFilters) - val prunedStoragePaths = getPrunedStoragePaths(prunedPartitionsAndFileSlices, fileIndex) if (recordKeys.nonEmpty) { - Option.apply(getCandidateFilesForRecordKeys(prunedStoragePaths, recordKeys)) + lookupCandidateFilesForRecordKeys(fileIndex, prunedPartitionsAndFileSlices, recordKeys) } else { Option.empty } } + /** + * Looks up the candidate files which may store the provided record keys from the record level index. + * Implemented differently for a global vs a partitioned RLI. + * + * @param fileIndex the file index of the query + * @param prunedPartitionsAndFileSlices already pruned partitions and file slices + * @param recordKeys the record key literals extracted from the query filters + * @return the set of candidate file names, or [[None]] if the index could not be used and pruning + * should be skipped (falling back to other indexes). + */ + protected def lookupCandidateFilesForRecordKeys(fileIndex: HoodieFileIndex, + prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])], + recordKeys: List[String]): Option[Set[String]] + override def invalidateCaches(): Unit = { // no caches for this index type, do nothing } /** - * Returns the list of candidate files which store the provided record keys based on Metadata Table Record Index. - * - * @param allFiles - List of all files which needs to be considered for the query - * @param recordKeys - List of record keys. - * @return Sequence of file names which need to be queried + * Builds a map from fileId to data-table partition path from record index lookup results. */ - private def getCandidateFilesForRecordKeys(allFiles: Seq[StoragePath], recordKeys: List[String]): Set[String] = { - val recordIndexData = metadataTable.readRecordIndexLocationsWithKeys( - HoodieListData.eager(JavaConverters.seqAsJavaListConverter(recordKeys).asJava)) - try { - val recordKeyLocationsList = HoodieDataUtils.dedupeAndCollectAsList(recordIndexData) - val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty - val candidateFiles: mutable.Set[String] = mutable.Set.empty - for (recordKeyLocation <- recordKeyLocationsList.asScala) { - val location = recordKeyLocation.getValue - fileIdToPartitionMap.put(location.getFileId, location.getPartitionPath) - } - for (file <- allFiles) { - val fileId = FSUtils.getFileIdFromFilePath(file) - val partitionOpt = fileIdToPartitionMap.get(fileId) - if (partitionOpt.isDefined) { - candidateFiles += file.getName - } + protected def collectFileIdToPartitionMap(recordIndexData: HoodiePairData[String, HoodieRecordGlobalLocation]): mutable.Map[String, String] = { + val recordKeyLocationsList = HoodieDataUtils.dedupeAndCollectAsList(recordIndexData) + val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty + for (recordKeyLocation <- recordKeyLocationsList.asScala) { + val location = recordKeyLocation.getValue + fileIdToPartitionMap.put(location.getFileId, location.getPartitionPath) + } + fileIdToPartitionMap + } + + /** + * Filters the input files, keeping only those whose fileId is present in the record index lookup results. + */ + protected def filterCandidateFiles(allFiles: Seq[StoragePath], fileIdToPartitionMap: mutable.Map[String, String]): Set[String] = { + val candidateFiles: mutable.Set[String] = mutable.Set.empty + for (file <- allFiles) { + val fileId = FSUtils.getFileIdFromFilePath(file) + if (fileIdToPartitionMap.contains(fileId)) { + candidateFiles += file.getName } - candidateFiles.toSet - } finally { - // Clean up the RDD to avoid memory leaks - recordIndexData.unpersistWithDependencies() } + candidateFiles.toSet } /** @@ -132,6 +148,34 @@ class RecordLevelIndexSupport(spark: SparkSession, object RecordLevelIndexSupport { val INDEX_NAME = "RECORD_LEVEL" + /** + * Upper bound on the number of candidate data-table partitions eligible for a partitioned RLI lookup. + * + * Unlike the global RLI (a single lookup over all keys), the partitioned variant performs one metadata-table read + * per candidate partition. When a query does not filter on the partition column the candidate set can span many + * partitions, and fanning out a lookup to each one can add latency that outweighs the skipping benefit. Once the + * candidate partition count exceeds this threshold, pruning is skipped. + */ + private[hudi] val MAX_PARTITIONS = 10 + + /** + * Creates the [[RecordLevelIndexSupport]] implementation matching the table's record level index: + * [[PartitionedRecordLevelIndexSupport]] when the RLI is partitioned, otherwise + * [[GlobalRecordLevelIndexSupport]]. + */ + def create(spark: SparkSession, + metadataConfig: HoodieMetadataConfig, + metaClient: HoodieTableMetaClient): RecordLevelIndexSupport = { + val isPartitioned = metaClient.getIndexForMetadataPartition(PARTITION_NAME_RECORD_INDEX) + .map[Boolean](indexDef => HoodieRecordIndex.isPartitioned(indexDef)) + .orElse(false) + if (isPartitioned) { + new PartitionedRecordLevelIndexSupport(spark, metadataConfig, metaClient) + } else { + new GlobalRecordLevelIndexSupport(spark, metadataConfig, metaClient) + } + } + private def getDefaultAttributeFetcher(): Function1[Expression, Expression] = { expr => expr } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala index 204029f022a64..6728bd3c12828 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala @@ -38,7 +38,7 @@ import scala.collection.JavaConverters._ class SecondaryIndexSupport(spark: SparkSession, metadataConfig: HoodieMetadataConfig, - metaClient: HoodieTableMetaClient) extends RecordLevelIndexSupport(spark, metadataConfig, metaClient) { + metaClient: HoodieTableMetaClient) extends GlobalRecordLevelIndexSupport(spark, metadataConfig, metaClient) { override def getIndexName: String = SecondaryIndexSupport.INDEX_NAME override def computeCandidateFileNames(fileIndex: HoodieFileIndex, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala index 5c6919a280693..9734aa7b5f7a5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala @@ -72,6 +72,7 @@ class RecordLevelIndexTestBase extends HoodieStatsIndexTestBase { saveMode: SaveMode, validate: Boolean = true, numUpdates: Int = 1, + numInserts: Int = 5, onlyUpdates: Boolean = false, schemaStr: String = HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, timestamp: Long = System.currentTimeMillis()): DataFrame = { @@ -87,7 +88,7 @@ class RecordLevelIndexTestBase extends HoodieStatsIndexTestBase { latestBatch = recordsToStrings(dataGen.generateInsertsForPartitionPerSchema( getInstantTime(), 5, dataGen.getPartitionPaths.last, schemaStr)).asScala } else { - latestBatch = recordsToStrings(dataGen.generateInsertsAsPerSchema(getInstantTime(), 5, schemaStr, timestamp)).asScala + latestBatch = recordsToStrings(dataGen.generateInsertsAsPerSchema(getInstantTime(), numInserts, schemaStr, timestamp)).asScala } val latestBatchDf = spark.read.json(spark.sparkContext.parallelize(latestBatch.toSeq, 2)) latestBatchDf.cache() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestGlobalRecordLevelIndexWithSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestGlobalRecordLevelIndexWithSQL.scala index a272fea36f7bf..bb464e277e2c0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestGlobalRecordLevelIndexWithSQL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestGlobalRecordLevelIndexWithSQL.scala @@ -46,6 +46,32 @@ import scala.util.Using class TestGlobalRecordLevelIndexWithSQL extends RecordLevelIndexTestBase { val sqlTempTable = "tbl" + /** + * Whether the record level index exercised by the tests is partitioned. Overridden by + * [[TestRecordLevelIndexWithSQL]] to run the same coverage against a partitioned RLI. + */ + protected def isPartitionedRli: Boolean = false + + /** Write options selecting a global vs partitioned record level index. */ + protected def rliEnableOpts: Map[String, String] = + if (isPartitionedRli) { + Map(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key -> "false", + HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key -> "true") + } else { + Map(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key -> "true", + HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key -> "false") + } + + /** The create-table DDL option that enables a global vs partitioned record level index. */ + protected def rliEnableDDLOpts: String = + if (isPartitionedRli) { + s"${HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key} = 'true'" + } else { + // "hoodie.metadata.record.index.enable" is the backward-compatible alternative key for + // GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP; used here intentionally to also exercise that alias. + "hoodie.metadata.record.index.enable = 'true'" + } + @ParameterizedTest @ValueSource(booleans = Array(true, false)) def testRLICreationUsingSQL(isPartitioned: Boolean): Unit = { @@ -73,7 +99,7 @@ class TestGlobalRecordLevelIndexWithSQL extends RecordLevelIndexTestBase { @ParameterizedTest @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) def testRLIWithSQL(tableType: String): Unit = { - val hudiOpts = commonOpts ++ Map( + val hudiOpts = commonOpts ++ rliEnableOpts ++ Map( DataSourceWriteOptions.TABLE_TYPE.key -> tableType, "hoodie.metadata.index.column.stats.enable" -> "false", DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true") @@ -166,24 +192,33 @@ class TestGlobalRecordLevelIndexWithSQL extends RecordLevelIndexTestBase { verifyPruningFileCount(hudiOpts, dataFilter, numFiles, shouldPrune) } - private def attribute(partition: String): AttributeReference = { + protected def attribute(partition: String): AttributeReference = { AttributeReference(partition, StringType, true)() } - private def literal(value: String): Literal = { + protected def literal(value: String): Literal = { Literal.create(value) } - private def verifyPruningFileCount(opts: Map[String, String], dataFilter: Expression, numFiles: Int, shouldPrune: Boolean): Unit = { - verifyPruningFileCount(opts, dataFilter, numFiles, HoodieTableMetaClient.reload(metaClient), shouldPrune) + protected def verifyPruningFileCount(opts: Map[String, String], dataFilter: Expression, numFiles: Int, shouldPrune: Boolean): Unit = { + verifyPruningFileCount(opts, Seq(dataFilter), Seq.empty, numFiles, HoodieTableMetaClient.reload(metaClient), shouldPrune) + } + + protected def verifyPruningFileCount(opts: Map[String, String], dataFilter: Expression, numFiles: Int, metaClient: HoodieTableMetaClient, shouldPrune: Boolean): Unit = { + verifyPruningFileCount(opts, Seq(dataFilter), Seq.empty, numFiles, metaClient, shouldPrune) } - private def verifyPruningFileCount(opts: Map[String, String], dataFilter: Expression, numFiles: Int, metaClient: HoodieTableMetaClient, shouldPrune: Boolean): Unit = { + protected def verifyPruningFileCount(opts: Map[String, String], + dataFilters: Seq[Expression], + partitionFilters: Seq[Expression], + numFiles: Int, + metaClient: HoodieTableMetaClient, + shouldPrune: Boolean): Unit = { // with data skipping val commonOpts = opts + ("path" -> metaClient.getBasePath.toString) this.metaClient = HoodieTableMetaClient.reload(metaClient) var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts, includeLogFiles = true) - val filteredPartitionDirectories = fileIndex.listFiles(Seq(), Seq(dataFilter)) + val filteredPartitionDirectories = fileIndex.listFiles(partitionFilters, dataFilters) val filteredFilesCount = filteredPartitionDirectories.flatMap(s => s.files).size if (shouldPrune) { assertEquals(numFiles, filteredFilesCount) @@ -193,7 +228,7 @@ class TestGlobalRecordLevelIndexWithSQL extends RecordLevelIndexTestBase { // with no data skipping fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + (DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "false"), includeLogFiles = true) - val filesCountWithNoSkipping = fileIndex.listFiles(Seq(), Seq(dataFilter)).flatMap(s => s.files).size + val filesCountWithNoSkipping = fileIndex.listFiles(partitionFilters, dataFilters).flatMap(s => s.files).size if (!shouldPrune) { assertEquals(filteredFilesCount, filesCountWithNoSkipping) } @@ -222,7 +257,7 @@ class TestGlobalRecordLevelIndexWithSQL extends RecordLevelIndexTestBase { FileSystemViewManager.createInMemoryFileSystemView(new HoodieSparkEngineContext(jsc), metaClient, HoodieMetadataConfig.newBuilder().fromProperties(props).build()) } - private def createTempTable(hudiOpts: Map[String, String]): Unit = { + protected def createTempTable(hudiOpts: Map[String, String]): Unit = { val readDf = spark.read.format("hudi").options(hudiOpts).load(basePath) readDf.registerTempTable(sqlTempTable) } @@ -246,7 +281,7 @@ class TestGlobalRecordLevelIndexWithSQL extends RecordLevelIndexTestBase { | options ( | primaryKey ='$recordKeyFields', | hoodie.metadata.enable = 'true', - | hoodie.metadata.record.index.enable = 'true', + | $rliEnableDDLOpts, | hoodie.datasource.write.recordkey.field = '$recordKeyFields', | hoodie.enable.data.skipping = 'true' | ) @@ -263,7 +298,7 @@ class TestGlobalRecordLevelIndexWithSQL extends RecordLevelIndexTestBase { @ParameterizedTest @ValueSource(booleans = Array(true, false)) def testPrunedStoragePaths(includeLogFiles: Boolean): Unit = { - val hudiOpts = commonOpts ++ metadataOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> "MERGE_ON_READ") + val hudiOpts = commonOpts ++ metadataOpts ++ rliEnableOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> "MERGE_ON_READ") val df = doWriteAndValidateDataAndRecordIndex(hudiOpts, operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Overwrite, @@ -292,7 +327,7 @@ class TestGlobalRecordLevelIndexWithSQL extends RecordLevelIndexTestBase { val recordKey: String = df.filter("partition = '" + selectedPartition + "'").limit(1).collect().apply(0).getAs("_row_key") val dataFilter = EqualTo(attribute("_row_key"), Literal(recordKey)) - val rliIndexSupport = new RecordLevelIndexSupport(spark, getConfig.getMetadataConfig, metaClient) + val rliIndexSupport = RecordLevelIndexSupport.create(spark, getConfig.getMetadataConfig, metaClient) val fileNames = rliIndexSupport.computeCandidateFileNames(fileIndex, Seq(dataFilter), null, prunedPaths, false) assertEquals(if (includeLogFiles) 2 else 1, fileNames.get.size) } @@ -323,7 +358,7 @@ class TestGlobalRecordLevelIndexWithSQL extends RecordLevelIndexTestBase { | options ( | primaryKey ='$recordKeyFields', | hoodie.metadata.enable = 'true', - | hoodie.metadata.record.index.enable = 'true', + | $rliEnableDDLOpts, | hoodie.datasource.write.recordkey.field = '$recordKeyFields', | hoodie.enable.data.skipping = 'true', | hoodie.metadata.index.column.stats.enable = 'false' @@ -371,7 +406,7 @@ class TestGlobalRecordLevelIndexWithSQL extends RecordLevelIndexTestBase { | options ( | primaryKey = '$recordKeyFields', | hoodie.metadata.enable = 'true', - | hoodie.metadata.record.index.enable = 'true', + | $rliEnableDDLOpts, | hoodie.datasource.write.recordkey.field = '$recordKeyFields', | hoodie.enable.data.skipping = 'true', | hoodie.metadata.index.column.stats.enable = 'false' diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala new file mode 100644 index 0000000000000..a901e1a5ed45a --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, RecordLevelIndexSupport} +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.index.record.HoodieRecordIndex +import org.apache.hudi.metadata.MetadataPartitionType + +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.catalyst.expressions.{EqualTo, Expression, Literal} +import org.junit.jupiter.api.{Tag, Test} +import org.junit.jupiter.api.Assertions.assertTrue + +/** + * Data skipping coverage for a partitioned Record Level Index (RLI). This reuses the full coverage of + * [[TestGlobalRecordLevelIndexWithSQL]] by flipping [[isPartitionedRli]] to true so that every inherited test + * exercises a partitioned RLI, and adds the partitioned-specific scenarios (e.g. the max-candidate-partitions + * threshold fallback). + */ +@Tag("functional") +class TestRecordLevelIndexWithSQL extends TestGlobalRecordLevelIndexWithSQL { + + override protected def isPartitionedRli: Boolean = true + + /** + * Verifies that the record level index is created as partitioned and that pruning is skipped (without error) + * once the number of candidate data table partitions exceeds the hard-coded threshold. + */ + @Test + def testPartitionedRliPartitionsThreshold(): Unit = { + val hudiOpts = commonOpts ++ rliEnableOpts ++ Map( + DataSourceWriteOptions.TABLE_TYPE.key -> HoodieTableType.COPY_ON_WRITE.name(), + "hoodie.metadata.index.column.stats.enable" -> "false", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true") + + val partitionPaths = (0 to RecordLevelIndexSupport.MAX_PARTITIONS) + .map(i => f"2026/06/$i%02d") + .toArray + initTestDataGenerator(partitionPaths) + doWriteAndValidateDataAndRecordIndex(hudiOpts, + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite, + validate = false, + numInserts = partitionPaths.length) + + // The record index should have been created as partitioned + metaClient = HoodieTableMetaClient.reload(metaClient) + assertTrue(HoodieRecordIndex.isPartitioned( + metaClient.getIndexMetadata.get().getIndex(MetadataPartitionType.RECORD_INDEX.getPartitionPath).get())) + + createTempTable(hudiOpts) + val latestSnapshotDf = spark.read.format("hudi").options(hudiOpts).load(basePath) + val recordKey = latestSnapshotDf.limit(1).collect().head.getAs[String]("_row_key") + val dataFilter: Expression = EqualTo(attribute("_row_key"), Literal(recordKey)) + + // The record-key-only query sees all candidate partitions and exceeds the hard-coded threshold. + verifyPruningFileCount(hudiOpts, dataFilter, numFiles = -1, shouldPrune = false) + + val partitionValue = latestSnapshotDf.limit(1).collect().head.getAs[String]("partition") + val partitionFilter: Expression = EqualTo(attribute("partition"), Literal(partitionValue)) + + // Spark passes partition predicates separately, and HoodieFileIndex only prunes partitions from that channel. + // With one candidate partition, the hard-coded threshold is not hit and RLI pruning still applies. + verifyPruningFileCount(hudiOpts, Seq(dataFilter), Seq(partitionFilter), numFiles = 1, + metaClient = HoodieTableMetaClient.reload(metaClient), shouldPrune = true) + } +}