Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.index;

import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.metadata.HoodieTableMetadata;

import org.apache.spark.api.java.function.PairFlatMapFunction;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import scala.Tuple2;

/**
* Looks up record-index keys that have already been grouped into one shard of a partitioned record index.
*/
public class PartitionedRecordIndexFileGroupLookupFunction
implements PairFlatMapFunction<Iterator<Pair<String, String>>, String, HoodieRecordGlobalLocation> {

private final HoodieTableMetadata metadataTable;

public PartitionedRecordIndexFileGroupLookupFunction(HoodieTableMetadata metadataTable) {
this.metadataTable = metadataTable;
}

@Override
public Iterator<Tuple2<String, HoodieRecordGlobalLocation>> call(Iterator<Pair<String, String>> partitionPathRecordKeyIterator) {
String partitionName = null;
List<String> keysToLookup = new ArrayList<>();
while (partitionPathRecordKeyIterator.hasNext()) {
Pair<String, String> partitionPathRecordKey = partitionPathRecordKeyIterator.next();
keysToLookup.add(partitionPathRecordKey.getRight());
if (partitionName == null) {
partitionName = partitionPathRecordKey.getLeft();
}
}

if (keysToLookup.isEmpty()) {
return Collections.emptyIterator();
}

HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData =
metadataTable.readRecordIndexLocationsWithKeys(HoodieListData.eager(keysToLookup), Option.of(partitionName));
try {
Map<String, HoodieRecordGlobalLocation> recordIndexInfo = recordIndexData.collectAsList().stream()
.collect(HashMap::new, (map, pair) -> map.put(pair.getKey(), pair.getValue()), HashMap::putAll);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the three-arg Stream.collect() mutable-reduction form is a bit surprising here — could you use collect(Collectors.toMap(Pair::getKey, Pair::getValue)) instead? It makes the intent immediately obvious.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

return recordIndexInfo.entrySet().stream()
.map(e -> new Tuple2<>(e.getKey(), e.getValue())).iterator();
} finally {
recordIndexData.unpersistWithDependencies();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@
package org.apache.hudi.index;

import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -39,16 +37,11 @@
import org.apache.hudi.table.HoodieTable;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import scala.Tuple2;

/**
* Index to be used with RLI. Queries the record index for tables with non-global record keys
*/
Expand Down Expand Up @@ -85,7 +78,7 @@ protected <R> HoodiePairData<String, HoodieRecordGlobalLocation> lookupRecords(H
.map(t -> t._2);
ValidationUtils.checkState(partitionedKeyRDD.getNumPartitions() <= numFileGroups);
// Lookup the keys in the record index
return HoodieJavaPairRDD.of(partitionedKeyRDD.mapPartitionsToPair(new PartitionedRecordIndexFileGroupLookupFunction(hoodieTable)));
return HoodieJavaPairRDD.of(partitionedKeyRDD.mapPartitionsToPair(new PartitionedRecordIndexFileGroupLookupFunction(hoodieTable.getTableMetadata())));
}

@Override
Expand All @@ -105,36 +98,4 @@ protected int getTotalFileGroupCount(Either<Integer, Map<String, Integer>> fileG
protected boolean shouldUpdatePartitionPath(HoodieTable hoodieTable) {
return false;
}

/**
* Function that lookups a list of keys in a single shard of the record index
*/
private static class PartitionedRecordIndexFileGroupLookupFunction implements PairFlatMapFunction<Iterator<Pair<String,String>>, String, HoodieRecordGlobalLocation> {
private final HoodieTable hoodieTable;

public PartitionedRecordIndexFileGroupLookupFunction(HoodieTable hoodieTable) {
this.hoodieTable = hoodieTable;
}

@Override
public Iterator<Tuple2<String, HoodieRecordGlobalLocation>> call(Iterator<Pair<String, String>> partitionPathRecordKeyIterator) throws Exception {
//Needs to be final, so we must use 1 element array to store the value
final String[] partitionName = {null};
List<String> keysToLookup = new ArrayList<>();
partitionPathRecordKeyIterator.forEachRemaining(p -> {
keysToLookup.add(p.getRight());
if (partitionName[0] == null) {
partitionName[0] = p.getLeft();
}
});

// recordIndexInfo object only contains records that are present in record_index.
assert partitionName[0] != null || keysToLookup.isEmpty();
Map<String, HoodieRecordGlobalLocation> recordIndexInfo =
hoodieTable.getTableMetadata().readRecordIndexLocationsWithKeys(HoodieListData.eager(keysToLookup), Option.ofNullable(partitionName[0]))
.collectAsList().stream().collect(HashMap::new, (map, pair) -> map.put(pair.getKey(), pair.getValue()), HashMap::putAll);
return recordIndexInfo.entrySet().stream()
.map(e -> new Tuple2<>(e.getKey(), e.getValue())).iterator();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi

import org.apache.hudi.RecordLevelIndexSupport.getPrunedStoragePaths
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.data.HoodieListData
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.table.HoodieTableMetaClient

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Expression

import scala.collection.JavaConverters

/**
* Data skipping based on a global Record Level Index (RLI), where a single set of file groups indexes
* the record keys across the whole table. All record keys are resolved with one metadata table lookup.
*/
class GlobalRecordLevelIndexSupport(spark: SparkSession,
metadataConfig: HoodieMetadataConfig,
metaClient: HoodieTableMetaClient)
extends RecordLevelIndexSupport(spark, metadataConfig, metaClient) {

override protected def lookupCandidateFilesForRecordKeys(fileIndex: HoodieFileIndex,
prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
recordKeys: List[String]): Option[Set[String]] = {
val prunedStoragePaths = getPrunedStoragePaths(prunedPartitionsAndFileSlices, fileIndex)
val recordIndexData = metadataTable.readRecordIndexLocationsWithKeys(
HoodieListData.eager(JavaConverters.seqAsJavaListConverter(recordKeys).asJava))
try {
val fileIdToPartitionMap = collectFileIdToPartitionMap(recordIndexData)
Option.apply(filterCandidateFiles(prunedStoragePaths, fileIdToPartitionMap))
} finally {
// Clean up the RDD to avoid memory leaks
recordIndexData.unpersistWithDependencies()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ case class HoodieFileIndex(spark: SparkSession,
* during `lookupCandidateFilesInMetadataTable`
*/
@transient private lazy val indicesSupport: List[SparkBaseIndexSupport] = List(
new RecordLevelIndexSupport(spark, metadataConfig, metaClient),
RecordLevelIndexSupport.create(spark, metadataConfig, metaClient),
if (PartitionBucketIndexUtils.isPartitionSimpleBucketIndex(metaClient.getStorageConf, metaClient.getBasePath.toString)) {
new PartitionBucketIndexSupport(spark, metadataConfig, metaClient,
options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi

import org.apache.hudi.RecordLevelIndexSupport.{getPrunedStoragePaths, MAX_PARTITIONS}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ValidationUtils
import org.apache.hudi.common.util.collection.Pair
import org.apache.hudi.index.PartitionedRecordIndexFileGroupLookupFunction
import org.apache.hudi.metadata.{BucketizedMetadataTableFileGroupIndexParser, HoodieTableMetadataUtil, MetadataPartitionType}

import org.apache.spark.Partitioner
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession

import scala.collection.JavaConverters._
import scala.collection.mutable

/**
* Data skipping based on a partitioned Record Level Index (RLI), where the file groups indexing the
* record keys are sharded per data-table partition. The metadata lookup must therefore be scoped to each
* candidate partition.
*
* The candidate partitions are derived from the already pruned partitions. Because each partition requires a
* separate metadata table lookup, if the number of candidate partitions exceeds {@code MAX_PARTITIONS} the
* record index filtering is skipped (returns [[None]]) and the reader falls back to other indexes.
*/
class PartitionedRecordLevelIndexSupport(spark: SparkSession,
metadataConfig: HoodieMetadataConfig,
metaClient: HoodieTableMetaClient)
extends RecordLevelIndexSupport(spark, metadataConfig, metaClient) with Logging {

override protected def lookupCandidateFilesForRecordKeys(fileIndex: HoodieFileIndex,
prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
recordKeys: List[String]): Option[Set[String]] = {
val partitions = prunedPartitionsAndFileSlices.flatMap { case (partitionPathOpt, _) =>
partitionPathOpt.map(_.getPath)
}.toSet
if (partitions.isEmpty) {
// Cannot resolve candidate partitions, fall back to other indexes rather than over-pruning
Option.empty
} else if (partitions.size > MAX_PARTITIONS) {
logInfo(s"The number of candidate partitions ${partitions.size} exceeds the partitioned record level index " +
s"lookup threshold $MAX_PARTITIONS. Skipping record level index pruning.")
Option.empty
} else {
lookupRecordKeys(partitions, recordKeys) match {
case Some(fileIdToPartitionMap) =>
val prunedStoragePaths = getPrunedStoragePaths(prunedPartitionsAndFileSlices, fileIndex)
Option.apply(filterCandidateFiles(prunedStoragePaths, fileIdToPartitionMap))
case None =>
// None of the candidate partitions are indexed by the partitioned RLI (e.g. partitions
// not yet indexed), so we cannot determine the matching files. Fall back to other indexes
// rather than over-pruning to an empty candidate set.
Option.empty
}
}
}

private def lookupRecordKeys(partitions: Set[String],
recordKeys: List[String]): Option[mutable.Map[String, String]] = {
val fileGroups = metadataTable.getBucketizedFileGroupsForPartitionedRLI(MetadataPartitionType.RECORD_INDEX)
val fileGroupCountPerDataPartition = fileGroups.asScala
.filter { case (partition, _) => partitions.contains(partition) }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Could you double-check what happens if a candidate partition exists in the data table but isn't yet present in getBucketizedFileGroupsForPartitionedRLI (e.g. during an RLI backfill, or any transient skew)? The .filter { partitions.contains(partition) } silently drops such partitions; the lookup then never queries them, and filterCandidateFiles ends up filtering all their files out of the candidate set — so a matching record there would be skipped at read time. The Flink path (hudi-flink/.../RecordLevelIndex.java#groupKeysByPartitionAndShard) explicitly checks-state here and refuses to prune; should the Spark path behave the same (fall back to Option.empty if any candidate partition is missing from the indexed map) instead of silently over-pruning? @yihua

- AI-generated; verify before applying. React 👍/👎 to flag quality.

.map { case (partition, slices) => partition -> Integer.valueOf(slices.size()) }
.toMap
if (fileGroupCountPerDataPartition.isEmpty) {
None
} else {
val numFileGroups = BucketizedMetadataTableFileGroupIndexParser.calculateNumberOfFileGroups(fileGroupCountPerDataPartition.asJava)
val partitionOffsetIndexes = BucketizedMetadataTableFileGroupIndexParser.generatePartitionToBaseIndexOffsets(fileGroupCountPerDataPartition.asJava).asScala
// Like SparkMetadataTableRecordLevelIndex#lookupRecords: build (partition, recordKey)
// pairs, key by the global partitioned-RLI shard id, and let each Spark partition look up
// one record-index shard through PartitionedRecordIndexFileGroupLookupFunction.
val partitionRecordKeys = fileGroupCountPerDataPartition.keys.toSeq.flatMap { partition =>
recordKeys.map { recordKey => Pair.of(partition, recordKey) }
}
val partitionedKeyRDD = spark.sparkContext.parallelize(partitionRecordKeys, numFileGroups)
.keyBy { partitionRecordKey =>
val partition = partitionRecordKey.getLeft
partitionOffsetIndexes(partition).intValue() +
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(partitionRecordKey.getRight, fileGroupCountPerDataPartition(partition).intValue())
}
.partitionBy(new PartitionIdPassthrough(numFileGroups))
.map(_._2)
.toJavaRDD()
ValidationUtils.checkState(partitionedKeyRDD.getNumPartitions <= numFileGroups)
val fileIdToPartitionMap = partitionedKeyRDD.mapPartitionsToPair(new PartitionedRecordIndexFileGroupLookupFunction(metadataTable))
.collect()
.asScala
.foldLeft(mutable.Map.empty[String, String]) { (fileIdToPartitionMap, location) =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: using foldLeft to build a mutable Map mixes functional and imperative styles in a way that can trip up readers — have you considered simplifying to .map(t => t._2.getFileId -> t._2.getPartitionPath).toMap? That also lets lookupRecordKeys return Option[Map[String, String]] (immutable) since the map is never mutated after construction.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

fileIdToPartitionMap.put(location._2.getFileId, location._2.getPartitionPath)
fileIdToPartitionMap
}
Some(fileIdToPartitionMap)
}
}
}

private class PartitionIdPassthrough(override val numPartitions: Int) extends Partitioner {
override def getPartition(key: Any): Int = key.asInstanceOf[Int]
}
Loading
Loading