Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,23 @@ public class HiveSyncConfigHolder {
.defaultValue(1000)
.markAdvanced()
.withDocumentation("The number of partitions one batch when synchronous partitions to hive.");
public static final ConfigProperty<Boolean> HIVE_SYNC_BATCHING_ENABLED = ConfigProperty
.key("hoodie.datasource.hive_sync.batching.enabled")
.defaultValue(false)
.markAdvanced()
.sinceVersion("1.1.0")
.withDocumentation("When true, HMS-mode partition operations (add/update/touch/drop) are split into "
+ "batches of `hoodie.datasource.hive_sync.batch_num` and dispatched in parallel to a pool of "
+ "IMetaStoreClient instances. Table-level operations (create/alter table, last commit time, "
+ "writer version) continue to use the single session client. Default off; existing behavior "
+ "is unchanged unless explicitly opted in.");
public static final ConfigProperty<Integer> HIVE_SYNC_BATCHING_THREADS = ConfigProperty
.key("hoodie.datasource.hive_sync.batching.threads")
.defaultValue(4)
.markAdvanced()
.sinceVersion("1.1.0")
.withDocumentation("Pool size (number of IMetaStoreClient instances) and worker thread count used "
+ "for parallel partition sync when `hoodie.datasource.hive_sync.batching.enabled` is true.");
public static final ConfigProperty<String> HIVE_SYNC_MODE = ConfigProperty
.key("hoodie.datasource.hive_sync.mode")
.noDefaultValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.hive.ddl.JDBCBasedMetadataOperator;
import org.apache.hudi.hive.ddl.JDBCExecutor;
import org.apache.hudi.hive.util.HiveDriverPool;
import org.apache.hudi.hive.util.IMetaStoreClientPool;
import org.apache.hudi.hive.util.IMetaStoreClientUtil;
import org.apache.hudi.hive.util.PartitionFilterGenerator;
import org.apache.hudi.sync.common.HoodieSyncClient;
Expand Down Expand Up @@ -66,6 +68,8 @@
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getInputFormatClassName;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getOutputFormatClassName;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getSerDeClassName;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_ENABLED;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_THREADS;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_USE_SPARK_CATALOG;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC;
Expand All @@ -86,6 +90,16 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
private final Map<String, Table> initialTableByName = new HashMap<>();
DDLExecutor ddlExecutor;
private IMetaStoreClient client;
// Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HMS or HIVEQL.
// Owned by this class; closed in close() before Hive.closeCurrent(). Hands clients
// to the active DDL executor for partition-row work only — see IMetaStoreClientPool
// javadoc. HMSDDLExecutor uses it for add/alter/drop; HiveQueryDDLExecutor uses it
// only for DROP (Hive Thrift, not Hive Driver).
private IMetaStoreClientPool partitionClientPool;
// Present only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL (explicit
// or legacy default). Owned by HiveQueryDDLExecutor; this field is kept for
// reference only — close() is delegated through ddlExecutor.close().
private Option<HiveDriverPool> partitionDriverPool = Option.empty();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the comment says this field is 'kept for reference only — close() is delegated through ddlExecutor.close()', which means it's never read after being passed into HiveQueryDDLExecutor. Could you make it a local variable in the constructor instead? A field that isn't read after construction will make future readers wonder what lifecycle role it plays.

- AI-generated; verify before applying. React 👍/👎 to flag quality.


/**
* JDBC-based metadata operator, lazily initialized on first Thrift
Expand Down Expand Up @@ -121,10 +135,14 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli
HiveSyncMode syncMode = HiveSyncMode.of(config.getString(HIVE_SYNC_MODE));
switch (syncMode) {
case HMS:
ddlExecutor = new HMSDDLExecutor(config, this.client);
this.partitionClientPool = maybeBuildPartitionClientPool(config);
ddlExecutor = new HMSDDLExecutor(config, this.client, this.partitionClientPool);
break;
case HIVEQL:
ddlExecutor = new HiveQueryDDLExecutor(config, this.client);
this.partitionDriverPool = maybeBuildHiveDriverPool(config);
this.partitionClientPool = maybeBuildPartitionClientPool(config);
ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool,
Option.ofNullable(this.partitionClientPool));
break;
case JDBC:
JDBCExecutor jdbcExecutor = new JDBCExecutor(config);
Expand All @@ -142,7 +160,10 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli
jdbcMetadataOperator = new JDBCBasedMetadataOperator(
jdbcExecutor.getConnection(), databaseName);
} else {
ddlExecutor = new HiveQueryDDLExecutor(config, this.client);
this.partitionDriverPool = maybeBuildHiveDriverPool(config);
this.partitionClientPool = maybeBuildPartitionClientPool(config);
ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool,
Option.ofNullable(this.partitionClientPool));
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -201,6 +222,30 @@ private IMetaStoreClient createMetaStoreClient(HiveSyncConfig config) {
}
}

private IMetaStoreClientPool maybeBuildPartitionClientPool(HiveSyncConfig config) {
if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) {
return null;
}
if (config.getBooleanOrDefault(HIVE_SYNC_USE_SPARK_CATALOG)) {
// The Spark catalog client is constructed via reflection against a Spark-side
// class and isn't compatible with the direct RetryingMetaStoreClient pool path.
// Fall back to single-client sequential behavior rather than failing the sync.
log.warn("hive_sync.batching.enabled=true is not supported with use_spark_catalog=true; "
+ "falling back to sequential partition sync.");
return null;
}
int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS);
return new IMetaStoreClientPool(config, size);
}

private Option<HiveDriverPool> maybeBuildHiveDriverPool(HiveSyncConfig config) {
if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) {
return Option.empty();
}
int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS);
return Option.of(new HiveDriverPool(config, size));
}

private Table getInitialTable(String table) {
return initialTableByName.computeIfAbsent(table, t -> {
try {
Expand Down Expand Up @@ -597,6 +642,17 @@ public void deleteLastReplicatedTimeStamp(String tableName) {
public void close() {
try {
ddlExecutor.close();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 If ddlExecutor.close() throws, control jumps to the outer catch (Exception e) { log.error(...) } (which doesn't rethrow), so partitionClientPool.close() never runs and the Thrift sockets + worker threads leak silently. Could you wrap the pool cleanup in a try-finally so it runs regardless of whether ddlExecutor.close() succeeds? E.g. try { ddlExecutor.close(); } finally { if (partitionClientPool != null) { ... } }.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

// Close the partition client pool before Hive.closeCurrent() so the
// RetryingMetaStoreClient instances held by the pool release their Thrift
// sockets without racing the ThreadLocal Hive cleanup.
if (partitionClientPool != null) {
try {
partitionClientPool.close();
} catch (Exception e) {
log.warn("Error closing IMetaStoreClient pool", e);
}
partitionClientPool = null;
}
if (client != null) {
Hive.closeCurrent();
client = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.hive.HoodieHiveSyncException;
import org.apache.hudi.hive.util.HivePartitionUtil;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.hive.util.IMetaStoreClientPool;
import org.apache.hudi.storage.StorageSchemes;
import org.apache.hudi.sync.common.model.PartitionValueExtractor;

Expand All @@ -46,13 +47,13 @@
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM;
Expand All @@ -73,11 +74,21 @@ public class HMSDDLExecutor implements DDLExecutor {
private final String databaseName;
private final IMetaStoreClient client;
private final PartitionValueExtractor partitionValueExtractor;
// Optional. When non-null, partition-row operations fan out across this pool;
// table-row operations always use the session `client` field above. See
// IMetaStoreClientPool javadoc for the usage contract.
private final IMetaStoreClientPool partitionClientPool;

public HMSDDLExecutor(HiveSyncConfig syncConfig, IMetaStoreClient metaStoreClient) throws HiveException, MetaException {
this(syncConfig, metaStoreClient, null);
}

public HMSDDLExecutor(HiveSyncConfig syncConfig, IMetaStoreClient metaStoreClient,
IMetaStoreClientPool partitionClientPool) throws HiveException, MetaException {
this.syncConfig = syncConfig;
this.databaseName = syncConfig.getStringOrDefault(META_SYNC_DATABASE_NAME);
this.client = metaStoreClient;
this.partitionClientPool = partitionClientPool;
try {
this.partitionValueExtractor =
(PartitionValueExtractor) Class.forName(syncConfig.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS)).newInstance();
Expand Down Expand Up @@ -193,11 +204,14 @@ public void addPartitionsToTable(String tableName, List<String> partitionsToAdd)
}
log.info("Adding partitions {} to table {}", partitionsToAdd.size(), tableName);
try {
// Fetch the table StorageDescriptor once on the session client. Each worker
// copies fields off it; the Thrift POJO is not mutated after this read.
StorageDescriptor sd = client.getTable(databaseName, tableName).getSd();
int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
for (List<String> batch : CollectionUtils.batches(partitionsToAdd, batchSyncPartitionNum)) {
List<Partition> partitionList = new ArrayList<>();
batch.forEach(x -> {
List<List<String>> batches = CollectionUtils.batches(partitionsToAdd, batchSyncPartitionNum);
runBatches("add", tableName, batches, (poolClient, batch) -> {
List<Partition> partitionList = new ArrayList<>(batch.size());
for (String x : batch) {
StorageDescriptor partitionSd = new StorageDescriptor();
partitionSd.setCols(sd.getCols());
partitionSd.setInputFormat(sd.getInputFormat());
Expand All @@ -208,11 +222,11 @@ public void addPartitionsToTable(String tableName, List<String> partitionsToAdd)
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(x);
partitionSd.setLocation(fullPartitionPath);
partitionList.add(new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null));
});
client.add_partitions(partitionList, true, false);
}
poolClient.add_partitions(partitionList, true, false);
log.info("HMSDDLExecutor add a batch partitions done: {}", partitionList.size());
}
} catch (TException e) {
});
} catch (Exception e) {
log.error("{}.{} add partition failed", databaseName, tableName, e);
throw new HoodieHiveSyncException(databaseName + "." + tableName + " add partition failed", e);
}
Expand Down Expand Up @@ -247,15 +261,22 @@ public void dropPartitionsToTable(String tableName, List<String> partitionsToDro

log.info("Drop partitions {} on {}", partitionsToDrop.size(), tableName);
try {
for (String dropPartition : partitionsToDrop) {
if (HivePartitionUtil.partitionExists(client, tableName, dropPartition, partitionValueExtractor, syncConfig)) {
String partitionClause =
HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig);
client.dropPartition(databaseName, tableName, partitionClause, false);
// HMS has no batch-drop primitive that matches dropPartition's semantics, so each
// worker still iterates its chunk one partition at a time. The win is fanning
// chunks across pooled clients.
int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
List<List<String>> batches = CollectionUtils.batches(partitionsToDrop, batchSyncPartitionNum);
runBatches("drop", tableName, batches, (poolClient, batch) -> {
for (String dropPartition : batch) {
if (HivePartitionUtil.partitionExists(poolClient, tableName, dropPartition, partitionValueExtractor, syncConfig)) {
String partitionClause =
HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig);
poolClient.dropPartition(databaseName, tableName, partitionClause, false);
}
log.info("Drop partition {} on {}", dropPartition, tableName);
}
log.info("Drop partition {} on {}", dropPartition, tableName);
}
} catch (TException e) {
});
} catch (Exception e) {
log.error("{}.{} drop partition failed", databaseName, tableName, e);
throw new HoodieHiveSyncException(databaseName + "." + tableName + " drop partition failed", e);
}
Expand Down Expand Up @@ -291,21 +312,71 @@ public void close() {

private void registerAlterPartitionEvent(String tableName, List<String> alteredPartitions) {
try {
// Read the StorageDescriptor once on the session client; each worker deep-copies
// it per partition (alter_partitions semantics today).
StorageDescriptor sd = client.getTable(databaseName, tableName).getSd();
List<Partition> partitionList = alteredPartitions.stream().map(partition -> {
Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH), partition);
String partitionScheme = partitionPath.toUri().getScheme();
String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme)
? HadoopFSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(), partitionPath) : partitionPath.toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
StorageDescriptor partitionSd = sd.deepCopy();
partitionSd.setLocation(fullPartitionPath);
return new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null);
}).collect(Collectors.toList());
client.alter_partitions(databaseName, tableName, partitionList, null);
} catch (TException e) {
int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
List<List<String>> batches = CollectionUtils.batches(alteredPartitions, batchSyncPartitionNum);
runBatches("alter", tableName, batches, (poolClient, batch) -> {
List<Partition> partitionList = batch.stream().map(partition -> {
Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH), partition);
String partitionScheme = partitionPath.toUri().getScheme();
String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme)
? HadoopFSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(), partitionPath) : partitionPath.toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
StorageDescriptor partitionSd = sd.deepCopy();
partitionSd.setLocation(fullPartitionPath);
return new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null);
}).collect(Collectors.toList());
poolClient.alter_partitions(databaseName, tableName, partitionList, null);
});
} catch (Exception e) {
log.error("{}.{} update partition failed", databaseName, tableName, e);
throw new HoodieHiveSyncException(databaseName + "." + tableName + " update partition failed", e);
}
}

/**
* Dispatches partition batches either in parallel against the client pool (if
* configured) or sequentially against the session client. The sequential path
* preserves the exact behavior we had before the pool existed, including failure
* semantics where batch N+1 never runs if batch N throws.
*/
@FunctionalInterface
private interface BatchAction {
void apply(IMetaStoreClient client, List<String> batch) throws Exception;
}

private void runBatches(String opName, String tableName, List<List<String>> batches, BatchAction action) throws Exception {
if (partitionClientPool == null) {
for (List<String> batch : batches) {
action.apply(client, batch);
}
return;
}
List<Future<Void>> futures = new ArrayList<>(batches.size());
for (List<String> batch : batches) {
futures.add(partitionClientPool.executor().submit(() ->
partitionClientPool.run(poolClient -> {
action.apply(poolClient, batch);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 When a Future fails, f.get() throws ExecutionException wrapping the real cause. firstError then stores the ExecutionException, which gets rethrown and caught by the outer catch (Exception e) in addPartitionsToTable/dropPartitionsToTable/registerAlterPartitionEvent, which wraps it again in HoodieHiveSyncException. The result is HoodieHiveSyncException -> ExecutionException -> RealCause — one more wrapping layer than the sequential path produced. HiveDriverPool.awaitAll already has an unwrap() helper; could we do the same here for consistency?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

return null;
})
));
}
Exception firstError = null;
for (Future<Void> f : futures) {
try {
f.get();
} catch (Exception e) {
if (firstError == null) {
firstError = e;
} else {
log.warn("Additional {} batch failed on {} (suppressed in favor of first error)", opName, tableName, e);
}
}
}
if (firstError != null) {
throw firstError;
}
}
}
Loading
Loading