diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java index 418676d591627..01f6d0224070d 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java @@ -122,6 +122,23 @@ public class HiveSyncConfigHolder { .defaultValue(1000) .markAdvanced() .withDocumentation("The number of partitions one batch when synchronous partitions to hive."); + public static final ConfigProperty HIVE_SYNC_BATCHING_ENABLED = ConfigProperty + .key("hoodie.datasource.hive_sync.batching.enabled") + .defaultValue(false) + .markAdvanced() + .sinceVersion("1.1.0") + .withDocumentation("When true, HMS-mode partition operations (add/update/touch/drop) are split into " + + "batches of `hoodie.datasource.hive_sync.batch_num` and dispatched in parallel to a pool of " + + "IMetaStoreClient instances. Table-level operations (create/alter table, last commit time, " + + "writer version) continue to use the single session client. Default off; existing behavior " + + "is unchanged unless explicitly opted in."); + public static final ConfigProperty HIVE_SYNC_BATCHING_THREADS = ConfigProperty + .key("hoodie.datasource.hive_sync.batching.threads") + .defaultValue(4) + .markAdvanced() + .sinceVersion("1.1.0") + .withDocumentation("Pool size (number of IMetaStoreClient instances) and worker thread count used " + + "for parallel partition sync when `hoodie.datasource.hive_sync.batching.enabled` is true."); public static final ConfigProperty HIVE_SYNC_MODE = ConfigProperty .key("hoodie.datasource.hive_sync.mode") .noDefaultValue() diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index a3d04737b8c5c..7a821b0f17bc3 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -37,6 +37,8 @@ import org.apache.hudi.hive.ddl.HiveSyncMode; import org.apache.hudi.hive.ddl.JDBCBasedMetadataOperator; import org.apache.hudi.hive.ddl.JDBCExecutor; +import org.apache.hudi.hive.util.HiveDriverPool; +import org.apache.hudi.hive.util.IMetaStoreClientPool; import org.apache.hudi.hive.util.IMetaStoreClientUtil; import org.apache.hudi.hive.util.PartitionFilterGenerator; import org.apache.hudi.sync.common.HoodieSyncClient; @@ -66,6 +68,8 @@ import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getInputFormatClassName; import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getOutputFormatClassName; import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getSerDeClassName; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_ENABLED; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_THREADS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_USE_SPARK_CATALOG; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC; @@ -86,6 +90,16 @@ public class HoodieHiveSyncClient extends HoodieSyncClient { private final Map initialTableByName = new HashMap<>(); DDLExecutor ddlExecutor; private IMetaStoreClient client; + // Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HMS or HIVEQL. + // Owned by this class; closed in close() before Hive.closeCurrent(). Hands clients + // to the active DDL executor for partition-row work only — see IMetaStoreClientPool + // javadoc. HMSDDLExecutor uses it for add/alter/drop; HiveQueryDDLExecutor uses it + // only for DROP (Hive Thrift, not Hive Driver). + private IMetaStoreClientPool partitionClientPool; + // Present only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL (explicit + // or legacy default). Owned by HiveQueryDDLExecutor; this field is kept for + // reference only — close() is delegated through ddlExecutor.close(). + private Option partitionDriverPool = Option.empty(); /** * JDBC-based metadata operator, lazily initialized on first Thrift @@ -121,10 +135,14 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli HiveSyncMode syncMode = HiveSyncMode.of(config.getString(HIVE_SYNC_MODE)); switch (syncMode) { case HMS: - ddlExecutor = new HMSDDLExecutor(config, this.client); + this.partitionClientPool = maybeBuildPartitionClientPool(config); + ddlExecutor = new HMSDDLExecutor(config, this.client, this.partitionClientPool); break; case HIVEQL: - ddlExecutor = new HiveQueryDDLExecutor(config, this.client); + this.partitionDriverPool = maybeBuildHiveDriverPool(config); + this.partitionClientPool = maybeBuildPartitionClientPool(config); + ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool, + Option.ofNullable(this.partitionClientPool)); break; case JDBC: JDBCExecutor jdbcExecutor = new JDBCExecutor(config); @@ -142,7 +160,10 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli jdbcMetadataOperator = new JDBCBasedMetadataOperator( jdbcExecutor.getConnection(), databaseName); } else { - ddlExecutor = new HiveQueryDDLExecutor(config, this.client); + this.partitionDriverPool = maybeBuildHiveDriverPool(config); + this.partitionClientPool = maybeBuildPartitionClientPool(config); + ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool, + Option.ofNullable(this.partitionClientPool)); } } } catch (Exception e) { @@ -201,6 +222,30 @@ private IMetaStoreClient createMetaStoreClient(HiveSyncConfig config) { } } + private IMetaStoreClientPool maybeBuildPartitionClientPool(HiveSyncConfig config) { + if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) { + return null; + } + if (config.getBooleanOrDefault(HIVE_SYNC_USE_SPARK_CATALOG)) { + // The Spark catalog client is constructed via reflection against a Spark-side + // class and isn't compatible with the direct RetryingMetaStoreClient pool path. + // Fall back to single-client sequential behavior rather than failing the sync. + log.warn("hive_sync.batching.enabled=true is not supported with use_spark_catalog=true; " + + "falling back to sequential partition sync."); + return null; + } + int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS); + return new IMetaStoreClientPool(config, size); + } + + private Option maybeBuildHiveDriverPool(HiveSyncConfig config) { + if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) { + return Option.empty(); + } + int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS); + return Option.of(new HiveDriverPool(config, size)); + } + private Table getInitialTable(String table) { return initialTableByName.computeIfAbsent(table, t -> { try { @@ -597,6 +642,17 @@ public void deleteLastReplicatedTimeStamp(String tableName) { public void close() { try { ddlExecutor.close(); + // Close the partition client pool before Hive.closeCurrent() so the + // RetryingMetaStoreClient instances held by the pool release their Thrift + // sockets without racing the ThreadLocal Hive cleanup. + if (partitionClientPool != null) { + try { + partitionClientPool.close(); + } catch (Exception e) { + log.warn("Error closing IMetaStoreClient pool", e); + } + partitionClientPool = null; + } if (client != null) { Hive.closeCurrent(); client = null; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java index 6b7bd3af00963..e80d63730b3c8 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java @@ -27,6 +27,7 @@ import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.util.HivePartitionUtil; import org.apache.hudi.hive.util.HiveSchemaUtil; +import org.apache.hudi.hive.util.IMetaStoreClientPool; import org.apache.hudi.storage.StorageSchemes; import org.apache.hudi.sync.common.model.PartitionValueExtractor; @@ -46,13 +47,13 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.thrift.TException; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import java.util.stream.Collectors; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; @@ -73,11 +74,21 @@ public class HMSDDLExecutor implements DDLExecutor { private final String databaseName; private final IMetaStoreClient client; private final PartitionValueExtractor partitionValueExtractor; + // Optional. When non-null, partition-row operations fan out across this pool; + // table-row operations always use the session `client` field above. See + // IMetaStoreClientPool javadoc for the usage contract. + private final IMetaStoreClientPool partitionClientPool; public HMSDDLExecutor(HiveSyncConfig syncConfig, IMetaStoreClient metaStoreClient) throws HiveException, MetaException { + this(syncConfig, metaStoreClient, null); + } + + public HMSDDLExecutor(HiveSyncConfig syncConfig, IMetaStoreClient metaStoreClient, + IMetaStoreClientPool partitionClientPool) throws HiveException, MetaException { this.syncConfig = syncConfig; this.databaseName = syncConfig.getStringOrDefault(META_SYNC_DATABASE_NAME); this.client = metaStoreClient; + this.partitionClientPool = partitionClientPool; try { this.partitionValueExtractor = (PartitionValueExtractor) Class.forName(syncConfig.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS)).newInstance(); @@ -193,11 +204,14 @@ public void addPartitionsToTable(String tableName, List partitionsToAdd) } log.info("Adding partitions {} to table {}", partitionsToAdd.size(), tableName); try { + // Fetch the table StorageDescriptor once on the session client. Each worker + // copies fields off it; the Thrift POJO is not mutated after this read. StorageDescriptor sd = client.getTable(databaseName, tableName).getSd(); int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); - for (List batch : CollectionUtils.batches(partitionsToAdd, batchSyncPartitionNum)) { - List partitionList = new ArrayList<>(); - batch.forEach(x -> { + List> batches = CollectionUtils.batches(partitionsToAdd, batchSyncPartitionNum); + runBatches("add", tableName, batches, (poolClient, batch) -> { + List partitionList = new ArrayList<>(batch.size()); + for (String x : batch) { StorageDescriptor partitionSd = new StorageDescriptor(); partitionSd.setCols(sd.getCols()); partitionSd.setInputFormat(sd.getInputFormat()); @@ -208,11 +222,11 @@ public void addPartitionsToTable(String tableName, List partitionsToAdd) List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(x); partitionSd.setLocation(fullPartitionPath); partitionList.add(new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null)); - }); - client.add_partitions(partitionList, true, false); + } + poolClient.add_partitions(partitionList, true, false); log.info("HMSDDLExecutor add a batch partitions done: {}", partitionList.size()); - } - } catch (TException e) { + }); + } catch (Exception e) { log.error("{}.{} add partition failed", databaseName, tableName, e); throw new HoodieHiveSyncException(databaseName + "." + tableName + " add partition failed", e); } @@ -247,15 +261,22 @@ public void dropPartitionsToTable(String tableName, List partitionsToDro log.info("Drop partitions {} on {}", partitionsToDrop.size(), tableName); try { - for (String dropPartition : partitionsToDrop) { - if (HivePartitionUtil.partitionExists(client, tableName, dropPartition, partitionValueExtractor, syncConfig)) { - String partitionClause = - HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig); - client.dropPartition(databaseName, tableName, partitionClause, false); + // HMS has no batch-drop primitive that matches dropPartition's semantics, so each + // worker still iterates its chunk one partition at a time. The win is fanning + // chunks across pooled clients. + int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); + List> batches = CollectionUtils.batches(partitionsToDrop, batchSyncPartitionNum); + runBatches("drop", tableName, batches, (poolClient, batch) -> { + for (String dropPartition : batch) { + if (HivePartitionUtil.partitionExists(poolClient, tableName, dropPartition, partitionValueExtractor, syncConfig)) { + String partitionClause = + HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig); + poolClient.dropPartition(databaseName, tableName, partitionClause, false); + } + log.info("Drop partition {} on {}", dropPartition, tableName); } - log.info("Drop partition {} on {}", dropPartition, tableName); - } - } catch (TException e) { + }); + } catch (Exception e) { log.error("{}.{} drop partition failed", databaseName, tableName, e); throw new HoodieHiveSyncException(databaseName + "." + tableName + " drop partition failed", e); } @@ -291,21 +312,71 @@ public void close() { private void registerAlterPartitionEvent(String tableName, List alteredPartitions) { try { + // Read the StorageDescriptor once on the session client; each worker deep-copies + // it per partition (alter_partitions semantics today). StorageDescriptor sd = client.getTable(databaseName, tableName).getSd(); - List partitionList = alteredPartitions.stream().map(partition -> { - Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH), partition); - String partitionScheme = partitionPath.toUri().getScheme(); - String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme) - ? HadoopFSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(), partitionPath) : partitionPath.toString(); - List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); - StorageDescriptor partitionSd = sd.deepCopy(); - partitionSd.setLocation(fullPartitionPath); - return new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null); - }).collect(Collectors.toList()); - client.alter_partitions(databaseName, tableName, partitionList, null); - } catch (TException e) { + int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); + List> batches = CollectionUtils.batches(alteredPartitions, batchSyncPartitionNum); + runBatches("alter", tableName, batches, (poolClient, batch) -> { + List partitionList = batch.stream().map(partition -> { + Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH), partition); + String partitionScheme = partitionPath.toUri().getScheme(); + String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme) + ? HadoopFSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(), partitionPath) : partitionPath.toString(); + List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); + StorageDescriptor partitionSd = sd.deepCopy(); + partitionSd.setLocation(fullPartitionPath); + return new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null); + }).collect(Collectors.toList()); + poolClient.alter_partitions(databaseName, tableName, partitionList, null); + }); + } catch (Exception e) { log.error("{}.{} update partition failed", databaseName, tableName, e); throw new HoodieHiveSyncException(databaseName + "." + tableName + " update partition failed", e); } } + + /** + * Dispatches partition batches either in parallel against the client pool (if + * configured) or sequentially against the session client. The sequential path + * preserves the exact behavior we had before the pool existed, including failure + * semantics where batch N+1 never runs if batch N throws. + */ + @FunctionalInterface + private interface BatchAction { + void apply(IMetaStoreClient client, List batch) throws Exception; + } + + private void runBatches(String opName, String tableName, List> batches, BatchAction action) throws Exception { + if (partitionClientPool == null) { + for (List batch : batches) { + action.apply(client, batch); + } + return; + } + List> futures = new ArrayList<>(batches.size()); + for (List batch : batches) { + futures.add(partitionClientPool.executor().submit(() -> + partitionClientPool.run(poolClient -> { + action.apply(poolClient, batch); + return null; + }) + )); + } + Exception firstError = null; + for (Future f : futures) { + try { + f.get(); + } catch (Exception e) { + if (firstError == null) { + firstError = e; + } else { + log.warn("Additional {} batch failed on {} (suppressed in favor of first error)", opName, tableName, e); + } + } + } + if (firstError != null) { + throw firstError; + } + } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java index 25434d29eb3ff..13f6550b1c72f 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java @@ -18,10 +18,14 @@ package org.apache.hudi.hive.ddl; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.Option; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; +import org.apache.hudi.hive.util.HiveDriverPool; import org.apache.hudi.hive.util.HivePartitionUtil; +import org.apache.hudi.hive.util.IMetaStoreClientPool; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -39,8 +43,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import java.util.stream.Collectors; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; import static org.apache.hudi.sync.common.util.TableUtils.tableId; /** @@ -52,10 +58,26 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor { private final IMetaStoreClient metaStoreClient; private SessionState sessionState; private Driver hiveDriver; + // When present, partition-phase SQL lists fan out across this pool; table-level SQL + // (createTable, schema evolution, single-statement runSQL callers) always uses the + // session `hiveDriver` above. See HiveDriverPool javadoc. + private final Option driverPool; + // When present, dropPartitionsToTable fans batches across this Thrift client pool. + // Owned by HoodieHiveSyncClient; close() is delegated through there. See + // IMetaStoreClientPool javadoc for the usage contract (partition-row ops only). + private final Option metaStoreClientPool; public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient) { + this(config, metaStoreClient, Option.empty(), Option.empty()); + } + + public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient, + Option driverPool, + Option metaStoreClientPool) { super(config); this.metaStoreClient = metaStoreClient; + this.driverPool = driverPool; + this.metaStoreClientPool = metaStoreClientPool; try { this.sessionState = new SessionState(config.getHiveConf(), UserGroupInformation.getCurrentUser().getShortUserName()); @@ -82,19 +104,61 @@ public void runSQL(String sql) { updateHiveSQLs(Collections.singletonList(sql)); } + /** + * Partition-phase SQL fan-out. When the driver pool is present, any leading + * {@code USE database} statements are run on every worker (Hive 2.x's + * ALTER PARTITION SET LOCATION ignores db.table qualifiers and uses the + * connection's current database, so each worker needs to USE the right db + * before any partition ALTER). The remaining statements are then dispatched + * round-robin across the pool. Falls through to the sequential path on the + * session Driver when no pool is configured. + */ + @Override + protected void runSQLs(List sqls) { + if (sqls.isEmpty()) { + return; + } + if (!driverPool.isPresent()) { + updateHiveSQLs(sqls); + return; + } + HiveDriverPool pool = driverPool.get(); + int firstNonUse = 0; + while (firstNonUse < sqls.size() && isUseStatement(sqls.get(firstNonUse))) { + firstNonUse++; + } + if (firstNonUse > 0) { + List setupStatements = sqls.subList(0, firstNonUse); + pool.runOnEachWorker(setupStatements); + } + List partitionStatements = sqls.subList(firstNonUse, sqls.size()); + if (partitionStatements.isEmpty()) { + return; + } + List> futures = pool.runAll(partitionStatements); + pool.awaitAll(futures); + } + + // Strict 4-char prefix match on "USE ". Internal callers (constructPartitionAlterStatements) + // always emit the USE statement without leading whitespace; do not call with externally + // supplied SQL that might be padded. + private static boolean isUseStatement(String sql) { + return sql != null && sql.regionMatches(true, 0, "USE ", 0, 4); + } + private List updateHiveSQLs(List sqls) { List responses = new ArrayList<>(); + HoodieTimer timer = HoodieTimer.start(); try { for (String sql : sqls) { if (hiveDriver != null) { - HoodieTimer timer = HoodieTimer.start(); responses.add(hiveDriver.run(sql)); - log.info("Time taken to execute [{}]: {} ms", sql, timer.endTimer()); } } } catch (Exception e) { throw new HoodieHiveSyncException("Failed in executing SQL", e); } + log.info("Executed {} SQL statements sequentially in {} ms", sqls.size(), timer.endTimer()); return responses; } @@ -132,23 +196,81 @@ public void dropPartitionsToTable(String tableName, List partitionsToDro log.info("Drop partitions {} on {}", partitionsToDrop.size(), tableName); try { - for (String dropPartition : partitionsToDrop) { - if (HivePartitionUtil.partitionExists(metaStoreClient, tableName, dropPartition, partitionValueExtractor, - config)) { - String partitionClause = - HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, config); - metaStoreClient.dropPartition(databaseName, tableName, partitionClause, false); - } - log.info("Drop partition {} on {}", dropPartition, tableName); - } + int batchSyncPartitionNum = config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); + List> batches = CollectionUtils.batches(partitionsToDrop, batchSyncPartitionNum); + runDropBatches(tableName, batches); } catch (Exception e) { log.error("{} drop partition failed", tableId(databaseName, tableName), e); throw new HoodieHiveSyncException(tableId(databaseName, tableName) + " drop partition failed", e); } } + /** + * Drops partitions one batch at a time. When {@link #metaStoreClientPool} is present, + * batches fan out across the pool's worker threads (each borrowing an independent + * IMetaStoreClient); otherwise batches are dispatched sequentially against the + * session client. Hive has no batch-drop primitive that matches dropPartition's + * semantics, so each worker still iterates its chunk one partition at a time — the + * win is fanning chunks across independent Thrift clients. + */ + private void runDropBatches(String tableName, List> batches) throws Exception { + if (!metaStoreClientPool.isPresent()) { + for (List batch : batches) { + applyDropBatch(metaStoreClient, tableName, batch); + } + return; + } + IMetaStoreClientPool pool = metaStoreClientPool.get(); + List> futures = new ArrayList<>(batches.size()); + for (List batch : batches) { + futures.add(pool.executor().submit(() -> + pool.run(poolClient -> { + applyDropBatch(poolClient, tableName, batch); + return null; + }) + )); + } + Exception firstError = null; + for (Future f : futures) { + try { + f.get(); + } catch (Exception e) { + if (firstError == null) { + firstError = e; + } else { + log.warn("Additional drop batch failed on {} (suppressed in favor of first error)", tableName, e); + } + } + } + if (firstError != null) { + throw firstError; + } + } + + private void applyDropBatch(IMetaStoreClient poolClient, String tableName, List batch) throws Exception { + for (String dropPartition : batch) { + if (HivePartitionUtil.partitionExists(poolClient, tableName, dropPartition, + partitionValueExtractor, config)) { + String partitionClause = + HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, config); + poolClient.dropPartition(databaseName, tableName, partitionClause, false); + } + log.info("Drop partition {} on {}", dropPartition, tableName); + } + } + @Override public void close() { + // Close the pool first so the worker threads stop dispatching against their + // Drivers before we tear down anything else. The pool's close() runs + // Driver/SessionState cleanup on each worker's own thread. + driverPool.ifPresent(pool -> { + try { + pool.close(); + } catch (Exception e) { + log.warn("Error closing HiveDriverPool", e); + } + }); if (metaStoreClient != null) { Hive.closeCurrent(); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java index 7f776f2f7a04d..a68644ae86f05 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.PartitionPathEncodeUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; @@ -75,6 +76,20 @@ public QueryBasedDDLExecutor(HiveSyncConfig config) { */ public abstract void runSQL(String sql); + /** + * Runs a list of SQL statements. The default implementation executes them + * sequentially via {@link #runSQL(String)}. Subclasses that can parallelize + * (e.g. {@link HiveQueryDDLExecutor} with a driver pool) override this hook + * to fan the list out across workers. The contract requires that the list + * has no positional dependencies — callers must fully qualify table names + * with {@code `db`.`tbl`} so any statement can run on any worker. + */ + protected void runSQLs(List sqls) { + for (String sql : sqls) { + runSQL(sql); + } + } + @Override public void createDatabase(String databaseName) { runSQL("create database if not exists " + databaseName); @@ -120,7 +135,7 @@ public void addPartitionsToTable(String tableName, List partitionsToAdd) } log.info("Adding partitions {} to table {}", partitionsToAdd.size(), tableName); List sqls = constructAddPartitions(tableName, partitionsToAdd); - sqls.stream().forEach(sql -> runSQL(sql)); + runSQLs(sqls); } @Override @@ -131,9 +146,7 @@ public void updatePartitionsToTable(String tableName, List changedPartit } log.info("Changing partitions {} on {}", changedPartitions.size(), tableName); List sqls = constructPartitionAlterStatements(tableName, changedPartitions, PartitionAlterType.SET_LOCATION); - for (String sql : sqls) { - runSQL(sql); - } + runSQLs(sqls); } @Override @@ -210,29 +223,40 @@ public void touchPartitionsToTable(String tableName, List touchPartition } log.info("Touching partitions " + touchPartitions.size() + " on " + tableName); List sqls = constructPartitionAlterStatements(tableName, touchPartitions, PartitionAlterType.TOUCH); - for (String sql : sqls) { - runSQL(sql); - } + runSQLs(sqls); } /** * Builds SQL statements to either touch partitions or set their location. - * TOUCH: one ALTER TABLE ... TOUCH PARTITION (p1) PARTITION (p2) ... - * SET_LOCATION: one ALTER TABLE ... PARTITION (p) SET LOCATION '...' per partition. + * + *

The first element of the returned list is always a {@code USE database} + * statement. Hive 2.x's ALTER PARTITION ... SET LOCATION does not respect the + * {@code db.table} qualifier (silently routes to the connection's current + * database), so the {@code USE} is load-bearing. Parallel execution paths must + * run this statement on every worker before fanning out the rest. + * + *

TOUCH: one {@code ALTER TABLE ... TOUCH PARTITION (p1) ...} per batch of + * {@code HIVE_BATCH_SYNC_PARTITION_NUM} partitions. + * + *

SET_LOCATION: one {@code ALTER TABLE ... PARTITION (p) SET LOCATION '...'} + * per partition (Hive SQL does not support multi-partition SET LOCATION in one + * statement). */ private List constructPartitionAlterStatements(String tableName, List partitions, PartitionAlterType alterType) { List result = new ArrayList<>(); - // Hive 2.x doesn't like db.table name for operations, hence we need to change to using the database first String useDatabase = "USE " + HIVE_ESCAPE_CHARACTER + databaseName + HIVE_ESCAPE_CHARACTER; result.add(useDatabase); String alterTablePrefix = "ALTER TABLE " + HIVE_ESCAPE_CHARACTER + tableName + HIVE_ESCAPE_CHARACTER; + int batchSyncPartitionNum = config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); switch (alterType) { case TOUCH: - String alterTable = alterTablePrefix + " TOUCH"; - for (String partition : partitions) { - alterTable += " PARTITION (" + getPartitionClause(partition) + ")"; + for (List batch : CollectionUtils.batches(partitions, batchSyncPartitionNum)) { + StringBuilder alterTable = new StringBuilder(alterTablePrefix).append(" TOUCH"); + for (String partition : batch) { + alterTable.append(" PARTITION (").append(getPartitionClause(partition)).append(")"); + } + result.add(alterTable.toString()); } - result.add(alterTable); break; case SET_LOCATION: for (String partition : partitions) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java new file mode 100644 index 0000000000000..2ea2ebc5e6e22 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HoodieHiveSyncException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; + +/** + * Pool of Hive {@link Driver} + {@link SessionState} pairs for parallel HiveQL DDL. + * + *

Hive's {@code SessionState.start(state)} binds state to the calling thread's + * thread-local, and {@code Driver} reads from that thread-local during {@code run()}. + * A Driver constructed on one thread cannot be safely used from another. This pool + * solves that by giving each slot its own dedicated worker thread (a single-thread + * executor) — the Driver and SessionState are built on that thread by a bootstrap + * task, and all subsequent SQL for that slot runs on the same thread. + * + *

Usage contract: use this pool only for partition-row DDL statements that + * are independent of each other and freely shuffleable across workers. Table-level + * statements (createTable, schema evolution, USE database) must continue to run on + * the session {@code Driver} held by {@code HiveQueryDDLExecutor} on the sync driver + * thread. The pool is gated behind {@code hoodie.datasource.hive_sync.batching.enabled} + * and is constructed only for HiveQL sync mode. + */ +public class HiveDriverPool implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(HiveDriverPool.class); + + // Per-worker Driver construction has to be fast in practice (a few hundred ms + // for the SessionState + Driver init). A 60s ceiling per worker leaves plenty of + // headroom for a slow JVM warm-up but bounds the failure mode if the metastore + // is unreachable or Hive hangs during init. + private static final long BOOTSTRAP_TIMEOUT_SECONDS = 60; + + private final List workers; + private final int size; + private volatile boolean closed; + + public HiveDriverPool(HiveSyncConfig config, int size) { + this(config, size, new DefaultDriverFactory(config)); + } + + // Package-private for tests: accepts a DriverFactory so unit tests can inject + // mock Driver instances without standing up a real Hive instance. + HiveDriverPool(HiveSyncConfig config, int size, DriverFactory factory) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + this.size = size; + this.workers = new ArrayList<>(size); + String databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME); + PoolThreadFactory threadFactory = new PoolThreadFactory(); + List> bootstrapFutures = new ArrayList<>(size); + try { + for (int i = 0; i < size; i++) { + Worker worker = new Worker(threadFactory); + workers.add(worker); + bootstrapFutures.add(worker.executor.submit(() -> { + worker.driver = factory.newDriver(databaseName); + return null; + })); + } + // Block until all bootstraps complete so we surface construction errors + // before any caller hands us SQL. Bounded by BOOTSTRAP_TIMEOUT_SECONDS so a + // hung Hive init doesn't deadlock the sync driver thread. + for (Future f : bootstrapFutures) { + f.get(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + } catch (Exception e) { + tearDown(); + throw new HoodieException("Failed to construct HiveDriverPool of size " + size, e); + } + LOG.info("Initialized HiveDriverPool with {} workers", size); + } + + /** + * Runs each given SQL on every worker, in order. Used for setup statements + * (e.g. {@code USE database}) that must establish per-thread session context + * before any partition statement runs. Blocks until all workers have completed + * the setup. Throws on first error. + */ + public void runOnEachWorker(List setupSqls) { + if (closed) { + throw new IllegalStateException("Cannot dispatch to a closed HiveDriverPool"); + } + if (setupSqls.isEmpty()) { + return; + } + List> futures = new ArrayList<>(workers.size()); + for (Worker worker : workers) { + futures.add(worker.executor.submit(() -> { + for (String sql : setupSqls) { + worker.driver.run(sql); + } + return null; + })); + } + awaitAll(futures); + } + + /** + * Dispatches each SQL string to a worker (round-robin) and returns the list of + * futures. The caller is responsible for awaiting and collecting errors. SQL text + * is intentionally not logged per-statement here: batched TOUCH/ADD statements can + * be many kilobytes, and N parallel workers would multiply the log volume. See + * {@link #awaitAll(List)} for the per-call summary log. + */ + public List> runAll(List sqls) { + if (closed) { + throw new IllegalStateException("Cannot dispatch to a closed HiveDriverPool"); + } + List> futures = new ArrayList<>(sqls.size()); + for (int i = 0; i < sqls.size(); i++) { + String sql = sqls.get(i); + Worker worker = workers.get(i % workers.size()); + futures.add(worker.executor.submit(() -> { + worker.driver.run(sql); + return null; + })); + } + return futures; + } + + /** + * Awaits all futures and throws the first exception encountered. On first failure, + * cancels the remaining (not yet started) futures so workers don't keep running + * pointless work after a fatal error. Any errors that finished before cancellation + * are logged at WARN. Callers do not need per-statement results (Hive's Driver.run + * side-effects the metastore), so this method is void. + */ + public void awaitAll(List> futures) { + long start = System.currentTimeMillis(); + Exception firstError = null; + int completed = 0; + int cancelled = 0; + for (int i = 0; i < futures.size(); i++) { + Future f = futures.get(i); + try { + f.get(); + completed++; + } catch (CancellationException ce) { + // We cancelled this future ourselves after a prior error. Don't treat it + // as a new failure; just note it for the summary log. + cancelled++; + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + if (firstError == null) { + firstError = ie; + cancelled += cancelRemaining(futures, i + 1); + } + } catch (ExecutionException ee) { + Exception cause = unwrap(ee); + if (firstError == null) { + firstError = cause; + cancelled += cancelRemaining(futures, i + 1); + } else { + LOG.warn("Additional SQL batch failed (suppressed in favor of first error)", cause); + } + } + } + if (firstError != null) { + throw new HoodieHiveSyncException("Failed in executing SQL", firstError); + } + LOG.info("Completed {} SQL statements ({} cancelled) in {} ms across {} workers", + completed, cancelled, System.currentTimeMillis() - start, size); + } + + private static int cancelRemaining(List> futures, int fromIndex) { + int cancelled = 0; + for (int j = fromIndex; j < futures.size(); j++) { + // mayInterruptIfRunning=false: the worker thread is bound to a Hive Driver + // whose state we don't want to corrupt mid-statement. Cancel only those that + // haven't started yet; in-flight statements run to completion. + if (futures.get(j).cancel(false)) { + cancelled++; + } + } + return cancelled; + } + + private static Exception unwrap(ExecutionException ee) { + Throwable cause = ee.getCause(); + return (cause instanceof Exception) ? (Exception) cause : ee; + } + + public int size() { + return size; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + tearDown(); + } + + private void tearDown() { + // Close each worker's Driver/SessionState on its own thread, then shut the + // executor down. Running close() on the bound thread keeps SessionState's + // thread-local cleanup correct. + for (Worker worker : workers) { + try { + worker.executor.submit(() -> { + if (worker.driver != null) { + try { + worker.driver.close(); + } catch (Exception e) { + LOG.warn("Error closing pooled Driver", e); + } + } + SessionState ss = SessionState.get(); + if (ss != null) { + try { + ss.close(); + } catch (Exception e) { + LOG.warn("Error closing pooled SessionState", e); + } + } + return null; + }).get(30, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.warn("Error during pool worker shutdown", e); + } + worker.executor.shutdown(); + try { + if (!worker.executor.awaitTermination(10, TimeUnit.SECONDS)) { + worker.executor.shutdownNow(); + } + } catch (InterruptedException ie) { + worker.executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + workers.clear(); + } + + /** + * Per-slot state: a single-thread executor and the Driver bound to its thread. + * Driver is volatile because it is written by the bootstrap task and read by + * subsequent dispatch tasks on the same executor. + */ + private static final class Worker { + final ExecutorService executor; + volatile Driver driver; + + Worker(ThreadFactory threadFactory) { + this.executor = Executors.newSingleThreadExecutor(threadFactory); + } + } + + @FunctionalInterface + interface DriverFactory { + Driver newDriver(String databaseName) throws Exception; + } + + /** + * Builds a real Hive {@link Driver} on the calling thread. The SessionState is + * constructed lazily (once, on the first worker thread that builds a Driver) and + * shared across all worker threads — Hive uses ThreadLocal attachment, not + * exclusive ownership, so multiple workers calling + * {@code SessionState.start(sharedState)} all see the same config and scratch dir + * without each spending the cost of building their own SessionState (and risking + * resource-dir creation races during the constructor). + */ + private static final class DefaultDriverFactory implements DriverFactory { + private final HiveConf hiveConf; + private volatile SessionState sharedSessionState; + + DefaultDriverFactory(HiveSyncConfig config) { + this.hiveConf = config.getHiveConf(); + } + + @Override + public synchronized Driver newDriver(String databaseName) throws Exception { + // SessionState is shared across workers; build it once on the first call (with + // currentDatabase already set) and attach it to each worker's thread-local on + // subsequent calls. The database is a pool-wide property and never changes + // across workers, so setting it once at construction time is sufficient. + if (sharedSessionState == null) { + sharedSessionState = new SessionState(hiveConf, + UserGroupInformation.getCurrentUser().getShortUserName()); + sharedSessionState.setCurrentDatabase(databaseName); + } + SessionState.start(sharedSessionState); + return new Driver(hiveConf); + } + } + + private static final class PoolThreadFactory implements ThreadFactory { + private static final AtomicInteger POOL_ID = new AtomicInteger(0); + private final AtomicInteger threadId = new AtomicInteger(0); + private final String namePrefix = "hudi-hive-driver-pool-" + POOL_ID.incrementAndGet() + "-"; + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, namePrefix + threadId.incrementAndGet()); + t.setDaemon(true); + return t; + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java new file mode 100644 index 0000000000000..930fb705cbbda --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Pool of {@link IMetaStoreClient} instances for parallel partition sync. + * + *

Each pooled client wraps an independent Thrift connection to the Hive Metastore. + * Callers borrow a client via {@link #run(ClientAction)}, which blocks until a client + * is available, executes the action, and returns the client to the pool. A worker + * thread pool of the same size is exposed via {@link #executor()} so callers can fan + * out their batches to match the number of available clients. + * + *

Usage contract: pool clients must be used only for partition-row + * operations — {@code add_partitions}, {@code alter_partitions}, {@code dropPartition}, + * {@code getPartition}. Table-row operations ({@code createTable}, {@code alter_table}, + * {@code getTable} used as the read half of a read-modify-write of table parameters) + * must continue to go through the session client held by + * {@code HoodieHiveSyncClient.client} on the sync driver thread. Mixing the two would + * risk lost updates on table parameters such as the last-commit-time-synced marker. + * + *

The pool is gated behind {@code hoodie.datasource.hive_sync.batching.enabled} and + * is constructed only for sync mode HMS. + */ +public class IMetaStoreClientPool implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(IMetaStoreClientPool.class); + + private final ArrayBlockingQueue available; + private final List all; + private final ExecutorService executor; + private final int size; + private volatile boolean closed; + + public IMetaStoreClientPool(HiveSyncConfig config, int size) { + this(buildClients(config, size), size); + } + + // Package-private for tests: accepts a pre-built list of clients so we can + // exercise borrow/return/close semantics without a live metastore. + IMetaStoreClientPool(List clients, int size) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + if (clients.size() != size) { + throw new IllegalArgumentException("Expected " + size + " clients, got " + clients.size()); + } + this.size = size; + this.available = new ArrayBlockingQueue<>(size); + this.all = new ArrayList<>(clients); + this.available.addAll(clients); + this.executor = Executors.newFixedThreadPool(size, new PoolThreadFactory()); + LOG.info("Initialized IMetaStoreClient pool with {} clients", size); + } + + private static List buildClients(HiveSyncConfig config, int size) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + HiveConf hiveConf = config.getHiveConf(); + List clients = new ArrayList<>(size); + try { + for (int i = 0; i < size; i++) { + clients.add(newClient(hiveConf)); + } + return clients; + } catch (Exception e) { + // Construction failed mid-way; close any clients we already built before + // surfacing the error so we don't leak Thrift sockets. + for (IMetaStoreClient c : clients) { + try { + c.close(); + } catch (Exception ignore) { + // intentional: best-effort cleanup during failure + } + } + throw new HoodieException("Failed to construct IMetaStoreClient pool of size " + size, e); + } + } + + private static IMetaStoreClient newClient(HiveConf hiveConf) { + try { + // RetryingMetaStoreClient.getProxy returns an independent IMetaStoreClient + // (one Thrift socket per call), bypassing the Hive thread-local cache that + // Hive.get(conf) would use. This is what gives us N truly independent clients. + return RetryingMetaStoreClient.getProxy(hiveConf, true); + } catch (Exception e) { + throw new HoodieException("Failed to create IMetaStoreClient for pool", e); + } + } + + /** + * Borrows a client, runs the action, and returns the client to the pool. + * Blocks if all clients are in use until one becomes available. + */ + public T run(ClientAction action) throws Exception { + if (closed) { + throw new IllegalStateException("Cannot borrow from a closed IMetaStoreClient pool"); + } + IMetaStoreClient client = available.take(); + try { + return action.apply(client); + } finally { + // Always return the client to the pool, even on failure. Thrift clients + // recover transparently from transactional errors at the HMS layer; + // RetryingMetaStoreClient handles transient socket failures internally. + if (!closed) { + available.offer(client); + } + } + } + + /** + * Worker thread pool sized to match the client pool. Use this to fan out + * batches so the number of in-flight Thrift calls cannot exceed the + * number of pooled clients. + */ + public ExecutorService executor() { + return executor; + } + + public int size() { + return size; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + executor.shutdown(); + try { + if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ie) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + closeQuietly(); + } + + private void closeQuietly() { + for (IMetaStoreClient client : all) { + try { + client.close(); + } catch (Exception e) { + LOG.warn("Error closing pooled IMetaStoreClient", e); + } + } + available.clear(); + all.clear(); + } + + @FunctionalInterface + public interface ClientAction { + T apply(IMetaStoreClient client) throws Exception; + } + + private static final class PoolThreadFactory implements ThreadFactory { + private static final AtomicInteger POOL_ID = new AtomicInteger(0); + private final AtomicInteger threadId = new AtomicInteger(0); + private final String namePrefix = "hudi-hive-sync-pool-" + POOL_ID.incrementAndGet() + "-"; + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, namePrefix + threadId.incrementAndGet()); + t.setDaemon(true); + return t; + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 6d971bcc52263..1b0e52c83b761 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -96,9 +96,12 @@ import static org.apache.hudi.hive.HiveSyncConfig.HIVE_SYNC_FILTER_PUSHDOWN_ENABLED; import static org.apache.hudi.hive.HiveSyncConfig.RECREATE_HIVE_TABLE_ON_ERROR; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_IGNORE_EXCEPTIONS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_AS_DATA_SOURCE_TABLE; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_ENABLED; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_THREADS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_COMMENT; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_TABLE_STRATEGY; @@ -329,6 +332,180 @@ public void testDropUpperCasePartitionWithHMS() throws Exception { "Table partitions should match the number of partitions we wrote"); } + /** + * Exercises HMS sync with parallel partition batching enabled. Forces multiple + * batches with a small batch_num against a partition set large enough to fan out + * across multiple pool clients, and asserts the resulting partition set matches. + */ + @Test + public void testHMSSyncWithBatchingEnabled() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HMS.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "3"); + // Small batch_num so we get multiple batches dispatched in parallel. + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3"); + + int partitionCount = 10; + HiveTestUtil.createCOWTable("100", partitionCount, true); + + reInitHiveSyncClient(); + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), + "Table should not exist before initial sync"); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "All partitions should be added under parallel batching"); + + // Add more partitions, then sync again to exercise the parallel update path. + HiveTestUtil.addCOWPartition("2050/01/01", true, true, "101"); + HiveTestUtil.addCOWPartition("2050/01/02", true, true, "102"); + HiveTestUtil.addCOWPartition("2050/01/03", true, true, "103"); + HiveTestUtil.addCOWPartition("2050/01/04", true, true, "104"); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount + 4, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "Incremental add via parallel batching should sync the new partitions"); + } + + /** + * Exercises HiveQL sync with parallel partition batching enabled. Mirrors the + * HMS test but routes through the HiveDriverPool — each worker thread owns a + * Driver+SessionState pair, and the SQL list (qualified with `db`.`tbl`) is + * fanned out across them. + */ + @Test + public void testHiveQLSyncWithBatchingEnabled() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HIVEQL.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "3"); + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3"); + + int partitionCount = 10; + HiveTestUtil.createCOWTable("100", partitionCount, true); + + reInitHiveSyncClient(); + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), + "Table should not exist before initial sync"); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "All partitions should be added under parallel HiveQL batching"); + + // Add more partitions, then sync again to exercise the parallel update path. + HiveTestUtil.addCOWPartition("2050/01/01", true, true, "101"); + HiveTestUtil.addCOWPartition("2050/01/02", true, true, "102"); + HiveTestUtil.addCOWPartition("2050/01/03", true, true, "103"); + HiveTestUtil.addCOWPartition("2050/01/04", true, true, "104"); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount + 4, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "Incremental add via parallel HiveQL batching should sync the new partitions"); + } + + /** + * Exercises the DROP path in HiveQL mode with batching on. DROP routes through + * IMetaStoreClient.dropPartition (Thrift, not Hive Driver), so when batching is + * enabled it fans out across IMetaStoreClientPool. Verifies the partition set + * shrinks as expected when batches drop in parallel. + */ + @Test + public void testHiveQLDropPartitionsWithBatching() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HIVEQL.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "3"); + // Small batch_num so we get multiple drop batches dispatched in parallel. + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "2"); + + int partitionCount = 8; + HiveTestUtil.createCOWTable("100", partitionCount, true); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "All partitions should be added before drop test"); + + // Drop half the partitions through the parallel pool path. + List existing = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).stream() + .map(p -> getRelativePartitionPath(new Path(basePath), new Path(p.getStorageLocation()))) + .collect(Collectors.toList()); + List toDrop = existing.subList(0, partitionCount / 2); + hiveClient.dropPartitions(HiveTestUtil.TABLE_NAME, toDrop); + + List remaining = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); + assertEquals(partitionCount - toDrop.size(), remaining.size(), + "Parallel DROP should remove exactly the requested partitions"); + Set remainingPaths = remaining.stream() + .map(p -> getRelativePartitionPath(new Path(basePath), new Path(p.getStorageLocation()))) + .collect(Collectors.toSet()); + for (String dropped : toDrop) { + assertFalse(remainingPaths.contains(dropped), + "Dropped partition " + dropped + " must not appear in remaining set"); + } + } + + /** + * Exercises the SET_LOCATION path in HiveQL mode with batching on. SET_LOCATION + * emits one ALTER PARTITION ... SET LOCATION statement per partition (Hive SQL + * has no multi-partition SET LOCATION), so this is the fan-out path most likely + * to exercise concurrent ALTER PARTITION calls against the same table. + */ + @Test + public void testHiveQLSetLocationWithBatching() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HIVEQL.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "3"); + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "2"); + + int partitionCount = 6; + HiveTestUtil.createCOWTable("100", partitionCount, true); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size()); + + // Drive the SET_LOCATION path by directly calling updatePartitionsToTable with + // existing partition paths. Each partition produces its own ALTER ... SET LOCATION + // statement, fanned out across the 3 workers in the pool. + List existingPartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).stream() + .map(p -> getRelativePartitionPath(new Path(basePath), new Path(p.getStorageLocation()))) + .collect(Collectors.toList()); + hiveClient.updatePartitionsToTable(HiveTestUtil.TABLE_NAME, existingPartitions); + + List after = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); + assertEquals(partitionCount, after.size(), + "Parallel SET_LOCATION must not change the partition set"); + Set relativePaths = after.stream() + .map(p -> getRelativePartitionPath(new Path(basePath), new Path(p.getStorageLocation()))) + .collect(Collectors.toSet()); + assertEquals(partitionCount, relativePaths.size(), + "Each partition should resolve to a unique relative path after parallel SET_LOCATION"); + assertTrue(relativePaths.containsAll(existingPartitions), + "All original partition paths should still be present after parallel SET_LOCATION"); + } + + /** + * Exercises the TOUCH path in HiveQL mode with batching on. Verifies that + * splitting one giant ALTER TABLE TOUCH PARTITION(...)... into multiple smaller + * statements does not break partition visibility downstream. + */ + @Test + public void testHiveQLTouchPartitionsWithBatching() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HIVEQL.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "2"); + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "2"); + hiveSyncProps.setProperty(META_SYNC_TOUCH_PARTITIONS_ENABLED.key(), "true"); + + int partitionCount = 6; + HiveTestUtil.createCOWTable("100", partitionCount, true); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size()); + + // Touch existing partitions (no new data) — should hit the batched TOUCH path + // without changing the partition count. + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "TOUCH batching must not change the partition set"); + } + @ParameterizedTest @MethodSource({"syncModeAndSchemaFromCommitMetadata"}) public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode, String enablePushDown) throws Exception { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java new file mode 100644 index 0000000000000..cac89dcf54bf9 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HoodieHiveSyncException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.junit.jupiter.api.Test; +import org.mockito.invocation.InvocationOnMock; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Unit tests for {@link HiveDriverPool} that exercise bootstrap, dispatch, error + * propagation, and close semantics without standing up a real Hive instance. + */ +class TestHiveDriverPool { + + private static HiveSyncConfig configWithEmptyHiveConf() { + HiveSyncConfig config = mock(HiveSyncConfig.class); + doAnswer(inv -> new HiveConf()).when(config).getHiveConf(); + doAnswer(inv -> "default").when(config).getStringOrDefault( + org.mockito.ArgumentMatchers.any()); + return config; + } + + @Test + void bootstrapBuildsOneDriverPerSlot() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + AtomicInteger built = new AtomicInteger(); + HiveDriverPool.DriverFactory factory = (db) -> { + built.incrementAndGet(); + return mock(Driver.class); + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 3, factory)) { + assertEquals(3, pool.size()); + assertEquals(3, built.get(), "One Driver per slot should be constructed eagerly"); + } + } + + @Test + void bootstrapFailurePropagatesAndTearsDown() { + HiveSyncConfig config = configWithEmptyHiveConf(); + AtomicInteger calls = new AtomicInteger(); + HiveDriverPool.DriverFactory factory = (db) -> { + int n = calls.incrementAndGet(); + if (n == 2) { + throw new RuntimeException("simulated driver build failure"); + } + return mock(Driver.class); + }; + HoodieException ex = assertThrows(HoodieException.class, + () -> new HiveDriverPool(config, 3, factory)); + assertTrue(ex.getMessage().contains("Failed to construct HiveDriverPool")); + } + + @Test + void runAllDispatchesEachSqlAcrossWorkers() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + // Each worker counts how many SQLs it received and remembers the thread. + ConcurrentHashMap> seenThreadsByDriver = new ConcurrentHashMap<>(); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + seenThreadsByDriver.put(d, ConcurrentHashMap.newKeySet()); + doAnswer((InvocationOnMock inv) -> { + seenThreadsByDriver.get(d).add(Thread.currentThread().getName()); + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 2, factory)) { + List sqls = Arrays.asList("SELECT 1", "SELECT 2", "SELECT 3", "SELECT 4"); + List> futures = pool.runAll(sqls); + pool.awaitAll(futures); + assertEquals(2, seenThreadsByDriver.size(), "Expected exactly 2 worker Drivers"); + int totalCalls = seenThreadsByDriver.values().stream().mapToInt(Set::size).sum(); + assertTrue(totalCalls >= 1, "At least one worker should have logged a thread"); + // Each Driver should have been invoked exactly twice (round-robin with 4 sqls, 2 workers). + for (Driver d : seenThreadsByDriver.keySet()) { + verify(d, times(2)).run(anyString()); + } + } + } + + @Test + void awaitAllThrowsFirstError() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + doAnswer(inv -> { + String sql = inv.getArgument(0); + if (sql.equals("FAIL")) { + throw new RuntimeException("boom: " + sql); + } + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 2, factory)) { + List> futures = pool.runAll(Arrays.asList("OK", "FAIL", "OK")); + HoodieHiveSyncException ex = assertThrows(HoodieHiveSyncException.class, + () -> pool.awaitAll(futures)); + assertTrue(ex.getCause() != null && ex.getCause().getMessage().contains("boom")); + } + } + + @Test + void concurrentDispatchBoundedByPoolSize() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + AtomicInteger inFlight = new AtomicInteger(); + AtomicInteger maxInFlight = new AtomicInteger(); + CountDownLatch hold = new CountDownLatch(1); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + doAnswer(inv -> { + int now = inFlight.incrementAndGet(); + maxInFlight.updateAndGet(prev -> Math.max(prev, now)); + hold.await(2, TimeUnit.SECONDS); + inFlight.decrementAndGet(); + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 2, factory)) { + // 5 SQLs against pool of size 2 → max in-flight should be 2. + List> futures = pool.runAll(Arrays.asList("a", "b", "c", "d", "e")); + // Release after a short wait so all SQLs progress. + Thread.sleep(150); + hold.countDown(); + pool.awaitAll(futures); + assertTrue(maxInFlight.get() <= 2, + "Max concurrent dispatches must not exceed pool size, observed " + maxInFlight.get()); + assertTrue(maxInFlight.get() >= 1, "Sanity: at least one dispatch ran"); + } + } + + @Test + void closeIsIdempotentAndPreventsFurtherDispatch() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + HiveDriverPool.DriverFactory factory = (db) -> mock(Driver.class); + HiveDriverPool pool = new HiveDriverPool(config, 2, factory); + pool.close(); + pool.close(); + assertThrows(IllegalStateException.class, + () -> pool.runAll(Arrays.asList("anything"))); + } + + @Test + void invalidSizeRejected() { + HiveSyncConfig config = configWithEmptyHiveConf(); + HiveDriverPool.DriverFactory factory = (db) -> mock(Driver.class); + assertThrows(IllegalArgumentException.class, + () -> new HiveDriverPool(config, 0, factory)); + } + + /** + * runOnEachWorker must execute the setup SQL on every worker (each on its bound + * thread) before {@code runAll()} fans the partition statements out. Without this, + * Hive 2.x's SET LOCATION would silently route to the wrong database on the workers + * that never saw the leading USE statement. + */ + @Test + void runOnEachWorkerRunsSetupOnEveryWorker() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + ConcurrentHashMap> sqlsByDriver = new ConcurrentHashMap<>(); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + sqlsByDriver.put(d, java.util.Collections.synchronizedList(new java.util.ArrayList<>())); + doAnswer((InvocationOnMock inv) -> { + sqlsByDriver.get(d).add(inv.getArgument(0)); + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 3, factory)) { + pool.runOnEachWorker(Arrays.asList("USE `db1`")); + List> futures = pool.runAll(Arrays.asList("ALTER 1", "ALTER 2", "ALTER 3")); + pool.awaitAll(futures); + + assertEquals(3, sqlsByDriver.size(), "Expected one Driver per worker"); + for (Map.Entry> e : sqlsByDriver.entrySet()) { + List seen = e.getValue(); + assertTrue(!seen.isEmpty() && seen.get(0).equals("USE `db1`"), + "Each worker must see USE first; saw " + seen); + } + } + } + + /** + * On the first failure, awaitAll must throw the original cause and cancel any + * futures that have not started yet. Futures already in-flight are not interrupted + * (per the cancel-with-mayInterruptIfRunning=false contract). + */ + @Test + void awaitAllCancelsPendingFuturesOnFirstError() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + // Single-worker pool so SQLs run strictly sequentially → the 2nd SQL is + // pending when the 1st errors, and must be cancelled. + CountDownLatch fired = new CountDownLatch(1); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + doAnswer(inv -> { + String sql = inv.getArgument(0); + if (sql.equals("FAIL")) { + fired.countDown(); + throw new RuntimeException("boom"); + } + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 1, factory)) { + List> futures = pool.runAll(Arrays.asList("FAIL", "PENDING_A", "PENDING_B")); + HoodieHiveSyncException ex = assertThrows(HoodieHiveSyncException.class, + () -> pool.awaitAll(futures)); + assertTrue(ex.getCause() != null && ex.getCause().getMessage().contains("boom")); + // The first future failed, so it's done (not cancelled). The remaining two + // were pending behind it on the single worker and should now be cancelled. + assertTrue(fired.await(1, TimeUnit.SECONDS), "Failing SQL must have run"); + assertTrue(futures.get(1).isCancelled(), "Pending future after error must be cancelled"); + assertTrue(futures.get(2).isCancelled(), "Pending future after error must be cancelled"); + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestIMetaStoreClientPool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestIMetaStoreClientPool.java new file mode 100644 index 0000000000000..3c9d4fe899cfd --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestIMetaStoreClientPool.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * Unit tests for {@link IMetaStoreClientPool} borrow/return/close semantics. These tests + * never hit a real Hive Metastore — clients are mocked and injected via the package- + * private constructor. + */ +class TestIMetaStoreClientPool { + + private static List mockClients(int n) { + List clients = new ArrayList<>(n); + for (int i = 0; i < n; i++) { + clients.add(mock(IMetaStoreClient.class)); + } + return clients; + } + + @Test + void runReturnsClientToPoolOnSuccess() throws Exception { + List clients = mockClients(2); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, 2); + try { + IMetaStoreClient borrowed = pool.run(c -> c); + // Both clients should be available again + Set seen = new HashSet<>(); + seen.add(pool.run(c -> c)); + seen.add(pool.run(c -> c)); + assertEquals(2, seen.size(), "Both clients should be reachable after returns"); + assertTrue(seen.contains(borrowed)); + } finally { + pool.close(); + } + } + + @Test + void runReturnsClientToPoolOnFailure() { + List clients = mockClients(1); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, 1); + try { + assertThrows(IllegalStateException.class, () -> pool.run(c -> { + throw new IllegalStateException("boom"); + })); + // Pool should still be usable; the borrowed client was returned + try { + IMetaStoreClient again = pool.run(c -> c); + assertSame(clients.get(0), again); + } catch (Exception e) { + throw new AssertionError("Pool should still be usable after action failure", e); + } + } finally { + pool.close(); + } + } + + @Test + void concurrentBorrowsBlockUntilReturned() throws Exception { + int poolSize = 2; + int callers = 4; + List clients = mockClients(poolSize); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, poolSize); + AtomicInteger concurrent = new AtomicInteger(0); + AtomicInteger maxConcurrent = new AtomicInteger(0); + CountDownLatch start = new CountDownLatch(1); + ExecutorService threads = Executors.newFixedThreadPool(callers); + try { + List> futures = new ArrayList<>(); + for (int i = 0; i < callers; i++) { + futures.add(threads.submit(() -> { + start.await(); + return pool.run(c -> { + int now = concurrent.incrementAndGet(); + maxConcurrent.updateAndGet(prev -> Math.max(prev, now)); + Thread.sleep(50); + concurrent.decrementAndGet(); + return null; + }); + })); + } + start.countDown(); + for (java.util.concurrent.Future f : futures) { + f.get(10, TimeUnit.SECONDS); + } + assertTrue(maxConcurrent.get() <= poolSize, + "Concurrent borrows must not exceed pool size, observed " + maxConcurrent.get()); + assertTrue(maxConcurrent.get() >= 1, "At least one borrow must have occurred"); + } finally { + threads.shutdownNow(); + pool.close(); + } + } + + @Test + void closeReleasesAllClients() throws Exception { + int poolSize = 3; + List clients = mockClients(poolSize); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, poolSize); + pool.close(); + for (IMetaStoreClient c : clients) { + verify(c).close(); + } + // Subsequent borrow should fail + assertThrows(IllegalStateException.class, () -> pool.run(c -> c)); + } + + @Test + void closeIsIdempotent() throws Exception { + int poolSize = 2; + List clients = mockClients(poolSize); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, poolSize); + pool.close(); + pool.close(); + // verify still passes — close() called only once per client + for (IMetaStoreClient c : clients) { + verify(c).close(); + } + } + + @Test + void invalidSizeRejected() { + assertThrows(IllegalArgumentException.class, + () -> new IMetaStoreClientPool(mockClients(0), 0)); + } + + @Test + void sizeMismatchRejected() { + assertThrows(IllegalArgumentException.class, + () -> new IMetaStoreClientPool(mockClients(2), 3)); + } + + @Test + void executorSizedToPool() { + int poolSize = 4; + IMetaStoreClientPool pool = new IMetaStoreClientPool(mockClients(poolSize), poolSize); + try { + assertFalse(pool.executor().isShutdown()); + assertEquals(poolSize, pool.size()); + } finally { + pool.close(); + } + assertTrue(pool.executor().isShutdown()); + } +}