Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,23 @@ public class HiveSyncConfigHolder {
.defaultValue(1000)
.markAdvanced()
.withDocumentation("The number of partitions one batch when synchronous partitions to hive.");
public static final ConfigProperty<Boolean> HIVE_SYNC_BATCHING_ENABLED = ConfigProperty
.key("hoodie.datasource.hive_sync.batching.enabled")
.defaultValue(false)
.markAdvanced()
.sinceVersion("1.1.0")
.withDocumentation("When true, HMS-mode partition operations (add/update/touch/drop) are split into "
+ "batches of `hoodie.datasource.hive_sync.batch_num` and dispatched in parallel to a pool of "
+ "IMetaStoreClient instances. Table-level operations (create/alter table, last commit time, "
+ "writer version) continue to use the single session client. Default off; existing behavior "
+ "is unchanged unless explicitly opted in.");
public static final ConfigProperty<Integer> HIVE_SYNC_BATCHING_THREADS = ConfigProperty
.key("hoodie.datasource.hive_sync.batching.threads")
.defaultValue(4)
.markAdvanced()
.sinceVersion("1.1.0")
.withDocumentation("Pool size (number of IMetaStoreClient instances) and worker thread count used "
+ "for parallel partition sync when `hoodie.datasource.hive_sync.batching.enabled` is true.");
public static final ConfigProperty<String> HIVE_SYNC_MODE = ConfigProperty
.key("hoodie.datasource.hive_sync.mode")
.noDefaultValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.hive.ddl.JDBCBasedMetadataOperator;
import org.apache.hudi.hive.ddl.JDBCExecutor;
import org.apache.hudi.hive.util.HiveDriverPool;
import org.apache.hudi.hive.util.IMetaStoreClientPool;
import org.apache.hudi.hive.util.IMetaStoreClientUtil;
import org.apache.hudi.hive.util.JDBCConnectionPool;
import org.apache.hudi.hive.util.PartitionFilterGenerator;
import org.apache.hudi.sync.common.HoodieSyncClient;
import org.apache.hudi.sync.common.model.FieldSchema;
Expand Down Expand Up @@ -66,6 +69,8 @@
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getInputFormatClassName;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getOutputFormatClassName;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getSerDeClassName;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_ENABLED;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_THREADS;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_USE_SPARK_CATALOG;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC;
Expand All @@ -86,6 +91,18 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
private final Map<String, Table> initialTableByName = new HashMap<>();
DDLExecutor ddlExecutor;
private IMetaStoreClient client;
// Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HMS. Owned by this
// class; closed in close() before Hive.closeCurrent(). Hands clients to HMSDDLExecutor
// for partition-row work only — see IMetaStoreClientPool javadoc.
private IMetaStoreClientPool partitionClientPool;
// Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL (explicit
// or legacy default). Owned by HiveQueryDDLExecutor; this field is kept for
// reference only — close() is delegated through ddlExecutor.close().
private HiveDriverPool partitionDriverPool;
// Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is JDBC (explicit
// or legacy default with HIVE_USE_JDBC=true). Owned by JDBCExecutor; this field
// is kept for reference only — close() is delegated through ddlExecutor.close().
private JDBCConnectionPool partitionJdbcPool;

/**
* JDBC-based metadata operator, lazily initialized on first Thrift
Expand Down Expand Up @@ -121,13 +138,16 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli
HiveSyncMode syncMode = HiveSyncMode.of(config.getString(HIVE_SYNC_MODE));
switch (syncMode) {
case HMS:
ddlExecutor = new HMSDDLExecutor(config, this.client);
this.partitionClientPool = maybeBuildPartitionClientPool(config);
ddlExecutor = new HMSDDLExecutor(config, this.client, this.partitionClientPool);
break;
case HIVEQL:
ddlExecutor = new HiveQueryDDLExecutor(config, this.client);
this.partitionDriverPool = maybeBuildHiveDriverPool(config);
ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool);
break;
case JDBC:
JDBCExecutor jdbcExecutor = new JDBCExecutor(config);
this.partitionJdbcPool = maybeBuildJDBCConnectionPool(config);
JDBCExecutor jdbcExecutor = new JDBCExecutor(config, this.partitionJdbcPool);
ddlExecutor = jdbcExecutor;
jdbcMetadataOperator = new JDBCBasedMetadataOperator(
jdbcExecutor.getConnection(), databaseName);
Expand All @@ -137,12 +157,14 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli
}
} else {
if (config.getBoolean(HIVE_USE_JDBC)) {
JDBCExecutor jdbcExecutor = new JDBCExecutor(config);
this.partitionJdbcPool = maybeBuildJDBCConnectionPool(config);
JDBCExecutor jdbcExecutor = new JDBCExecutor(config, this.partitionJdbcPool);
ddlExecutor = jdbcExecutor;
jdbcMetadataOperator = new JDBCBasedMetadataOperator(
jdbcExecutor.getConnection(), databaseName);
} else {
ddlExecutor = new HiveQueryDDLExecutor(config, this.client);
this.partitionDriverPool = maybeBuildHiveDriverPool(config);
ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool);
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -201,6 +223,38 @@ private IMetaStoreClient createMetaStoreClient(HiveSyncConfig config) {
}
}

private IMetaStoreClientPool maybeBuildPartitionClientPool(HiveSyncConfig config) {
if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) {
return null;
}
if (config.getBooleanOrDefault(HIVE_SYNC_USE_SPARK_CATALOG)) {
// The Spark catalog client is constructed via reflection against a Spark-side
// class and isn't compatible with the direct RetryingMetaStoreClient pool path.
// Fall back to single-client sequential behavior rather than failing the sync.
log.warn("hive_sync.batching.enabled=true is not supported with use_spark_catalog=true; "
+ "falling back to sequential partition sync.");
return null;
}
int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS);
return new IMetaStoreClientPool(config, size);
}

private HiveDriverPool maybeBuildHiveDriverPool(HiveSyncConfig config) {
if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) {
return null;
}
int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS);
return new HiveDriverPool(config, size);
}

private JDBCConnectionPool maybeBuildJDBCConnectionPool(HiveSyncConfig config) {
if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) {
return null;
}
int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS);
return new JDBCConnectionPool(config, size);
}

private Table getInitialTable(String table) {
return initialTableByName.computeIfAbsent(table, t -> {
try {
Expand Down Expand Up @@ -597,6 +651,17 @@ public void deleteLastReplicatedTimeStamp(String tableName) {
public void close() {
try {
ddlExecutor.close();
// Close the partition client pool before Hive.closeCurrent() so the
// RetryingMetaStoreClient instances held by the pool release their Thrift
// sockets without racing the ThreadLocal Hive cleanup.
if (partitionClientPool != null) {
try {
partitionClientPool.close();
} catch (Exception e) {
log.warn("Error closing IMetaStoreClient pool", e);
}
partitionClientPool = null;
}
if (client != null) {
Hive.closeCurrent();
client = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.hive.HoodieHiveSyncException;
import org.apache.hudi.hive.util.HivePartitionUtil;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.hive.util.IMetaStoreClientPool;
import org.apache.hudi.storage.StorageSchemes;
import org.apache.hudi.sync.common.model.PartitionValueExtractor;

Expand All @@ -46,13 +47,13 @@
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM;
Expand All @@ -73,11 +74,21 @@ public class HMSDDLExecutor implements DDLExecutor {
private final String databaseName;
private final IMetaStoreClient client;
private final PartitionValueExtractor partitionValueExtractor;
// Optional. When non-null, partition-row operations fan out across this pool;
// table-row operations always use the session `client` field above. See
// IMetaStoreClientPool javadoc for the usage contract.
private final IMetaStoreClientPool partitionClientPool;

public HMSDDLExecutor(HiveSyncConfig syncConfig, IMetaStoreClient metaStoreClient) throws HiveException, MetaException {
this(syncConfig, metaStoreClient, null);
}

public HMSDDLExecutor(HiveSyncConfig syncConfig, IMetaStoreClient metaStoreClient,
IMetaStoreClientPool partitionClientPool) throws HiveException, MetaException {
this.syncConfig = syncConfig;
this.databaseName = syncConfig.getStringOrDefault(META_SYNC_DATABASE_NAME);
this.client = metaStoreClient;
this.partitionClientPool = partitionClientPool;
try {
this.partitionValueExtractor =
(PartitionValueExtractor) Class.forName(syncConfig.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS)).newInstance();
Expand Down Expand Up @@ -193,11 +204,14 @@ public void addPartitionsToTable(String tableName, List<String> partitionsToAdd)
}
log.info("Adding partitions {} to table {}", partitionsToAdd.size(), tableName);
try {
// Fetch the table StorageDescriptor once on the session client. Each worker
// copies fields off it; the Thrift POJO is not mutated after this read.
StorageDescriptor sd = client.getTable(databaseName, tableName).getSd();
int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
for (List<String> batch : CollectionUtils.batches(partitionsToAdd, batchSyncPartitionNum)) {
List<Partition> partitionList = new ArrayList<>();
batch.forEach(x -> {
List<List<String>> batches = CollectionUtils.batches(partitionsToAdd, batchSyncPartitionNum);
runBatches("add", tableName, batches, (poolClient, batch) -> {
List<Partition> partitionList = new ArrayList<>(batch.size());
for (String x : batch) {
StorageDescriptor partitionSd = new StorageDescriptor();
partitionSd.setCols(sd.getCols());
partitionSd.setInputFormat(sd.getInputFormat());
Expand All @@ -208,11 +222,11 @@ public void addPartitionsToTable(String tableName, List<String> partitionsToAdd)
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(x);
partitionSd.setLocation(fullPartitionPath);
partitionList.add(new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null));
});
client.add_partitions(partitionList, true, false);
}
poolClient.add_partitions(partitionList, true, false);
log.info("HMSDDLExecutor add a batch partitions done: {}", partitionList.size());
}
} catch (TException e) {
});
} catch (Exception e) {
log.error("{}.{} add partition failed", databaseName, tableName, e);
throw new HoodieHiveSyncException(databaseName + "." + tableName + " add partition failed", e);
}
Expand Down Expand Up @@ -247,15 +261,22 @@ public void dropPartitionsToTable(String tableName, List<String> partitionsToDro

log.info("Drop partitions {} on {}", partitionsToDrop.size(), tableName);
try {
for (String dropPartition : partitionsToDrop) {
if (HivePartitionUtil.partitionExists(client, tableName, dropPartition, partitionValueExtractor, syncConfig)) {
String partitionClause =
HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig);
client.dropPartition(databaseName, tableName, partitionClause, false);
// HMS has no batch-drop primitive that matches dropPartition's semantics, so each
// worker still iterates its chunk one partition at a time. The win is fanning
// chunks across pooled clients.
int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
List<List<String>> batches = CollectionUtils.batches(partitionsToDrop, batchSyncPartitionNum);
runBatches("drop", tableName, batches, (poolClient, batch) -> {
for (String dropPartition : batch) {
if (HivePartitionUtil.partitionExists(poolClient, tableName, dropPartition, partitionValueExtractor, syncConfig)) {
String partitionClause =
HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig);
poolClient.dropPartition(databaseName, tableName, partitionClause, false);
}
log.info("Drop partition {} on {}", dropPartition, tableName);
}
log.info("Drop partition {} on {}", dropPartition, tableName);
}
} catch (TException e) {
});
} catch (Exception e) {
log.error("{}.{} drop partition failed", databaseName, tableName, e);
throw new HoodieHiveSyncException(databaseName + "." + tableName + " drop partition failed", e);
}
Expand Down Expand Up @@ -291,21 +312,71 @@ public void close() {

private void registerAlterPartitionEvent(String tableName, List<String> alteredPartitions) {
try {
// Read the StorageDescriptor once on the session client; each worker deep-copies
// it per partition (alter_partitions semantics today).
StorageDescriptor sd = client.getTable(databaseName, tableName).getSd();
List<Partition> partitionList = alteredPartitions.stream().map(partition -> {
Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH), partition);
String partitionScheme = partitionPath.toUri().getScheme();
String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme)
? HadoopFSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(), partitionPath) : partitionPath.toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
StorageDescriptor partitionSd = sd.deepCopy();
partitionSd.setLocation(fullPartitionPath);
return new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null);
}).collect(Collectors.toList());
client.alter_partitions(databaseName, tableName, partitionList, null);
} catch (TException e) {
int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
List<List<String>> batches = CollectionUtils.batches(alteredPartitions, batchSyncPartitionNum);
runBatches("alter", tableName, batches, (poolClient, batch) -> {
List<Partition> partitionList = batch.stream().map(partition -> {
Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH), partition);
String partitionScheme = partitionPath.toUri().getScheme();
String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme)
? HadoopFSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(), partitionPath) : partitionPath.toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
StorageDescriptor partitionSd = sd.deepCopy();
partitionSd.setLocation(fullPartitionPath);
return new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null);
}).collect(Collectors.toList());
poolClient.alter_partitions(databaseName, tableName, partitionList, null);
});
} catch (Exception e) {
log.error("{}.{} update partition failed", databaseName, tableName, e);
throw new HoodieHiveSyncException(databaseName + "." + tableName + " update partition failed", e);
}
}

/**
* Dispatches partition batches either in parallel against the client pool (if
* configured) or sequentially against the session client. The sequential path
* preserves the exact behavior we had before the pool existed, including failure
* semantics where batch N+1 never runs if batch N throws.
*/
@FunctionalInterface
private interface BatchAction {
void apply(IMetaStoreClient client, List<String> batch) throws Exception;
}

private void runBatches(String opName, String tableName, List<List<String>> batches, BatchAction action) throws Exception {
if (partitionClientPool == null) {
for (List<String> batch : batches) {
action.apply(client, batch);
}
return;
}
List<Future<Void>> futures = new ArrayList<>(batches.size());
for (List<String> batch : batches) {
futures.add(partitionClientPool.executor().submit(() ->
partitionClientPool.run(poolClient -> {
action.apply(poolClient, batch);
return null;
})
));
}
Exception firstError = null;
for (Future<Void> f : futures) {
try {
f.get();
} catch (Exception e) {
if (firstError == null) {
firstError = e;
} else {
log.warn("Additional {} batch failed on {} (suppressed in favor of first error)", opName, tableName, e);
}
}
}
if (firstError != null) {
throw firstError;
}
}
}
Loading
Loading