From 71946a61b79da5825a0c88433bf445304fdcdbfa Mon Sep 17 00:00:00 2001 From: sivabalan Date: Thu, 11 Jun 2026 16:54:48 -0700 Subject: [PATCH] feat(hive-sync): batch and parallelize HMS partition operations Hive sync partition operations on HMS today serialize through a single IMetaStoreClient and ship entire partition lists in a single Thrift call for TOUCH/UPDATE. For large tables (~2k partitions) this is ~5-9x slower than parallel implementations (see #18331). The biggest contributors are (1) one giant alter_partitions call for UPDATE/TOUCH, and (2) per- partition Thrift round-trips for DROP, all sequential. This change introduces an opt-in IMetaStoreClientPool gated behind hoodie.datasource.hive_sync.batching.enabled (default false). When on, HMSDDLExecutor splits ADD / UPDATE / TOUCH / DROP into batches of hoodie.datasource.hive_sync.batch_num (existing config, default 1000) and fans them out across a pool of RetryingMetaStoreClient instances sized by hoodie.datasource.hive_sync.batching.threads (default 4). Design invariant: only partition-row operations go through the pool. Table-row operations (createTable, alter_table, last-commit-time-synced, writer-version, table-comments) stay on the existing session client, so there is no lost-update risk on table parameters. The sync flow remains serial-parallel-serial (phase 1: table setup, phase 2: parallel partition fan-out, phase 3: table finalization). Sequential fallback is preserved when the flag is off or when HIVE_SYNC_USE_SPARK_CATALOG is on (incompatible with the pool's direct RetryingMetaStoreClient.getProxy path). Tests: TestIMetaStoreClientPool covers borrow/return, concurrent borrows, close idempotency. TestHiveSyncTool.testHMSSyncWithBatchingEnabled exercises end-to-end sync against the embedded HMS with batching on. Related: #18331 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../hudi/hive/HiveSyncConfigHolder.java | 17 ++ .../hudi/hive/HoodieHiveSyncClient.java | 37 +++- .../apache/hudi/hive/ddl/HMSDDLExecutor.java | 127 ++++++++--- .../hudi/hive/util/IMetaStoreClientPool.java | 207 ++++++++++++++++++ .../apache/hudi/hive/TestHiveSyncTool.java | 37 ++++ .../hive/util/TestIMetaStoreClientPool.java | 181 +++++++++++++++ 6 files changed, 577 insertions(+), 29 deletions(-) create mode 100644 hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java create mode 100644 hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestIMetaStoreClientPool.java diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java index 418676d591627..01f6d0224070d 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java @@ -122,6 +122,23 @@ public class HiveSyncConfigHolder { .defaultValue(1000) .markAdvanced() .withDocumentation("The number of partitions one batch when synchronous partitions to hive."); + public static final ConfigProperty HIVE_SYNC_BATCHING_ENABLED = ConfigProperty + .key("hoodie.datasource.hive_sync.batching.enabled") + .defaultValue(false) + .markAdvanced() + .sinceVersion("1.1.0") + .withDocumentation("When true, HMS-mode partition operations (add/update/touch/drop) are split into " + + "batches of `hoodie.datasource.hive_sync.batch_num` and dispatched in parallel to a pool of " + + "IMetaStoreClient instances. Table-level operations (create/alter table, last commit time, " + + "writer version) continue to use the single session client. Default off; existing behavior " + + "is unchanged unless explicitly opted in."); + public static final ConfigProperty HIVE_SYNC_BATCHING_THREADS = ConfigProperty + .key("hoodie.datasource.hive_sync.batching.threads") + .defaultValue(4) + .markAdvanced() + .sinceVersion("1.1.0") + .withDocumentation("Pool size (number of IMetaStoreClient instances) and worker thread count used " + + "for parallel partition sync when `hoodie.datasource.hive_sync.batching.enabled` is true."); public static final ConfigProperty HIVE_SYNC_MODE = ConfigProperty .key("hoodie.datasource.hive_sync.mode") .noDefaultValue() diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index a3d04737b8c5c..08e0f8acdd02b 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -37,6 +37,7 @@ import org.apache.hudi.hive.ddl.HiveSyncMode; import org.apache.hudi.hive.ddl.JDBCBasedMetadataOperator; import org.apache.hudi.hive.ddl.JDBCExecutor; +import org.apache.hudi.hive.util.IMetaStoreClientPool; import org.apache.hudi.hive.util.IMetaStoreClientUtil; import org.apache.hudi.hive.util.PartitionFilterGenerator; import org.apache.hudi.sync.common.HoodieSyncClient; @@ -66,6 +67,8 @@ import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getInputFormatClassName; import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getOutputFormatClassName; import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getSerDeClassName; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_ENABLED; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_THREADS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_USE_SPARK_CATALOG; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC; @@ -86,6 +89,10 @@ public class HoodieHiveSyncClient extends HoodieSyncClient { private final Map initialTableByName = new HashMap<>(); DDLExecutor ddlExecutor; private IMetaStoreClient client; + // Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HMS. Owned by this + // class; closed in close() before Hive.closeCurrent(). Hands clients to HMSDDLExecutor + // for partition-row work only — see IMetaStoreClientPool javadoc. + private IMetaStoreClientPool partitionClientPool; /** * JDBC-based metadata operator, lazily initialized on first Thrift @@ -121,7 +128,8 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli HiveSyncMode syncMode = HiveSyncMode.of(config.getString(HIVE_SYNC_MODE)); switch (syncMode) { case HMS: - ddlExecutor = new HMSDDLExecutor(config, this.client); + this.partitionClientPool = maybeBuildPartitionClientPool(config); + ddlExecutor = new HMSDDLExecutor(config, this.client, this.partitionClientPool); break; case HIVEQL: ddlExecutor = new HiveQueryDDLExecutor(config, this.client); @@ -201,6 +209,22 @@ private IMetaStoreClient createMetaStoreClient(HiveSyncConfig config) { } } + private IMetaStoreClientPool maybeBuildPartitionClientPool(HiveSyncConfig config) { + if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) { + return null; + } + if (config.getBooleanOrDefault(HIVE_SYNC_USE_SPARK_CATALOG)) { + // The Spark catalog client is constructed via reflection against a Spark-side + // class and isn't compatible with the direct RetryingMetaStoreClient pool path. + // Fall back to single-client sequential behavior rather than failing the sync. + log.warn("hive_sync.batching.enabled=true is not supported with use_spark_catalog=true; " + + "falling back to sequential partition sync."); + return null; + } + int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS); + return new IMetaStoreClientPool(config, size); + } + private Table getInitialTable(String table) { return initialTableByName.computeIfAbsent(table, t -> { try { @@ -597,6 +621,17 @@ public void deleteLastReplicatedTimeStamp(String tableName) { public void close() { try { ddlExecutor.close(); + // Close the partition client pool before Hive.closeCurrent() so the + // RetryingMetaStoreClient instances held by the pool release their Thrift + // sockets without racing the ThreadLocal Hive cleanup. + if (partitionClientPool != null) { + try { + partitionClientPool.close(); + } catch (Exception e) { + log.warn("Error closing IMetaStoreClient pool", e); + } + partitionClientPool = null; + } if (client != null) { Hive.closeCurrent(); client = null; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java index 6b7bd3af00963..e80d63730b3c8 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java @@ -27,6 +27,7 @@ import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.util.HivePartitionUtil; import org.apache.hudi.hive.util.HiveSchemaUtil; +import org.apache.hudi.hive.util.IMetaStoreClientPool; import org.apache.hudi.storage.StorageSchemes; import org.apache.hudi.sync.common.model.PartitionValueExtractor; @@ -46,13 +47,13 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.thrift.TException; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import java.util.stream.Collectors; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; @@ -73,11 +74,21 @@ public class HMSDDLExecutor implements DDLExecutor { private final String databaseName; private final IMetaStoreClient client; private final PartitionValueExtractor partitionValueExtractor; + // Optional. When non-null, partition-row operations fan out across this pool; + // table-row operations always use the session `client` field above. See + // IMetaStoreClientPool javadoc for the usage contract. + private final IMetaStoreClientPool partitionClientPool; public HMSDDLExecutor(HiveSyncConfig syncConfig, IMetaStoreClient metaStoreClient) throws HiveException, MetaException { + this(syncConfig, metaStoreClient, null); + } + + public HMSDDLExecutor(HiveSyncConfig syncConfig, IMetaStoreClient metaStoreClient, + IMetaStoreClientPool partitionClientPool) throws HiveException, MetaException { this.syncConfig = syncConfig; this.databaseName = syncConfig.getStringOrDefault(META_SYNC_DATABASE_NAME); this.client = metaStoreClient; + this.partitionClientPool = partitionClientPool; try { this.partitionValueExtractor = (PartitionValueExtractor) Class.forName(syncConfig.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS)).newInstance(); @@ -193,11 +204,14 @@ public void addPartitionsToTable(String tableName, List partitionsToAdd) } log.info("Adding partitions {} to table {}", partitionsToAdd.size(), tableName); try { + // Fetch the table StorageDescriptor once on the session client. Each worker + // copies fields off it; the Thrift POJO is not mutated after this read. StorageDescriptor sd = client.getTable(databaseName, tableName).getSd(); int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); - for (List batch : CollectionUtils.batches(partitionsToAdd, batchSyncPartitionNum)) { - List partitionList = new ArrayList<>(); - batch.forEach(x -> { + List> batches = CollectionUtils.batches(partitionsToAdd, batchSyncPartitionNum); + runBatches("add", tableName, batches, (poolClient, batch) -> { + List partitionList = new ArrayList<>(batch.size()); + for (String x : batch) { StorageDescriptor partitionSd = new StorageDescriptor(); partitionSd.setCols(sd.getCols()); partitionSd.setInputFormat(sd.getInputFormat()); @@ -208,11 +222,11 @@ public void addPartitionsToTable(String tableName, List partitionsToAdd) List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(x); partitionSd.setLocation(fullPartitionPath); partitionList.add(new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null)); - }); - client.add_partitions(partitionList, true, false); + } + poolClient.add_partitions(partitionList, true, false); log.info("HMSDDLExecutor add a batch partitions done: {}", partitionList.size()); - } - } catch (TException e) { + }); + } catch (Exception e) { log.error("{}.{} add partition failed", databaseName, tableName, e); throw new HoodieHiveSyncException(databaseName + "." + tableName + " add partition failed", e); } @@ -247,15 +261,22 @@ public void dropPartitionsToTable(String tableName, List partitionsToDro log.info("Drop partitions {} on {}", partitionsToDrop.size(), tableName); try { - for (String dropPartition : partitionsToDrop) { - if (HivePartitionUtil.partitionExists(client, tableName, dropPartition, partitionValueExtractor, syncConfig)) { - String partitionClause = - HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig); - client.dropPartition(databaseName, tableName, partitionClause, false); + // HMS has no batch-drop primitive that matches dropPartition's semantics, so each + // worker still iterates its chunk one partition at a time. The win is fanning + // chunks across pooled clients. + int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); + List> batches = CollectionUtils.batches(partitionsToDrop, batchSyncPartitionNum); + runBatches("drop", tableName, batches, (poolClient, batch) -> { + for (String dropPartition : batch) { + if (HivePartitionUtil.partitionExists(poolClient, tableName, dropPartition, partitionValueExtractor, syncConfig)) { + String partitionClause = + HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig); + poolClient.dropPartition(databaseName, tableName, partitionClause, false); + } + log.info("Drop partition {} on {}", dropPartition, tableName); } - log.info("Drop partition {} on {}", dropPartition, tableName); - } - } catch (TException e) { + }); + } catch (Exception e) { log.error("{}.{} drop partition failed", databaseName, tableName, e); throw new HoodieHiveSyncException(databaseName + "." + tableName + " drop partition failed", e); } @@ -291,21 +312,71 @@ public void close() { private void registerAlterPartitionEvent(String tableName, List alteredPartitions) { try { + // Read the StorageDescriptor once on the session client; each worker deep-copies + // it per partition (alter_partitions semantics today). StorageDescriptor sd = client.getTable(databaseName, tableName).getSd(); - List partitionList = alteredPartitions.stream().map(partition -> { - Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH), partition); - String partitionScheme = partitionPath.toUri().getScheme(); - String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme) - ? HadoopFSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(), partitionPath) : partitionPath.toString(); - List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); - StorageDescriptor partitionSd = sd.deepCopy(); - partitionSd.setLocation(fullPartitionPath); - return new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null); - }).collect(Collectors.toList()); - client.alter_partitions(databaseName, tableName, partitionList, null); - } catch (TException e) { + int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); + List> batches = CollectionUtils.batches(alteredPartitions, batchSyncPartitionNum); + runBatches("alter", tableName, batches, (poolClient, batch) -> { + List partitionList = batch.stream().map(partition -> { + Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH), partition); + String partitionScheme = partitionPath.toUri().getScheme(); + String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme) + ? HadoopFSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(), partitionPath) : partitionPath.toString(); + List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); + StorageDescriptor partitionSd = sd.deepCopy(); + partitionSd.setLocation(fullPartitionPath); + return new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null); + }).collect(Collectors.toList()); + poolClient.alter_partitions(databaseName, tableName, partitionList, null); + }); + } catch (Exception e) { log.error("{}.{} update partition failed", databaseName, tableName, e); throw new HoodieHiveSyncException(databaseName + "." + tableName + " update partition failed", e); } } + + /** + * Dispatches partition batches either in parallel against the client pool (if + * configured) or sequentially against the session client. The sequential path + * preserves the exact behavior we had before the pool existed, including failure + * semantics where batch N+1 never runs if batch N throws. + */ + @FunctionalInterface + private interface BatchAction { + void apply(IMetaStoreClient client, List batch) throws Exception; + } + + private void runBatches(String opName, String tableName, List> batches, BatchAction action) throws Exception { + if (partitionClientPool == null) { + for (List batch : batches) { + action.apply(client, batch); + } + return; + } + List> futures = new ArrayList<>(batches.size()); + for (List batch : batches) { + futures.add(partitionClientPool.executor().submit(() -> + partitionClientPool.run(poolClient -> { + action.apply(poolClient, batch); + return null; + }) + )); + } + Exception firstError = null; + for (Future f : futures) { + try { + f.get(); + } catch (Exception e) { + if (firstError == null) { + firstError = e; + } else { + log.warn("Additional {} batch failed on {} (suppressed in favor of first error)", opName, tableName, e); + } + } + } + if (firstError != null) { + throw firstError; + } + } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java new file mode 100644 index 0000000000000..930fb705cbbda --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Pool of {@link IMetaStoreClient} instances for parallel partition sync. + * + *

Each pooled client wraps an independent Thrift connection to the Hive Metastore. + * Callers borrow a client via {@link #run(ClientAction)}, which blocks until a client + * is available, executes the action, and returns the client to the pool. A worker + * thread pool of the same size is exposed via {@link #executor()} so callers can fan + * out their batches to match the number of available clients. + * + *

Usage contract: pool clients must be used only for partition-row + * operations — {@code add_partitions}, {@code alter_partitions}, {@code dropPartition}, + * {@code getPartition}. Table-row operations ({@code createTable}, {@code alter_table}, + * {@code getTable} used as the read half of a read-modify-write of table parameters) + * must continue to go through the session client held by + * {@code HoodieHiveSyncClient.client} on the sync driver thread. Mixing the two would + * risk lost updates on table parameters such as the last-commit-time-synced marker. + * + *

The pool is gated behind {@code hoodie.datasource.hive_sync.batching.enabled} and + * is constructed only for sync mode HMS. + */ +public class IMetaStoreClientPool implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(IMetaStoreClientPool.class); + + private final ArrayBlockingQueue available; + private final List all; + private final ExecutorService executor; + private final int size; + private volatile boolean closed; + + public IMetaStoreClientPool(HiveSyncConfig config, int size) { + this(buildClients(config, size), size); + } + + // Package-private for tests: accepts a pre-built list of clients so we can + // exercise borrow/return/close semantics without a live metastore. + IMetaStoreClientPool(List clients, int size) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + if (clients.size() != size) { + throw new IllegalArgumentException("Expected " + size + " clients, got " + clients.size()); + } + this.size = size; + this.available = new ArrayBlockingQueue<>(size); + this.all = new ArrayList<>(clients); + this.available.addAll(clients); + this.executor = Executors.newFixedThreadPool(size, new PoolThreadFactory()); + LOG.info("Initialized IMetaStoreClient pool with {} clients", size); + } + + private static List buildClients(HiveSyncConfig config, int size) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + HiveConf hiveConf = config.getHiveConf(); + List clients = new ArrayList<>(size); + try { + for (int i = 0; i < size; i++) { + clients.add(newClient(hiveConf)); + } + return clients; + } catch (Exception e) { + // Construction failed mid-way; close any clients we already built before + // surfacing the error so we don't leak Thrift sockets. + for (IMetaStoreClient c : clients) { + try { + c.close(); + } catch (Exception ignore) { + // intentional: best-effort cleanup during failure + } + } + throw new HoodieException("Failed to construct IMetaStoreClient pool of size " + size, e); + } + } + + private static IMetaStoreClient newClient(HiveConf hiveConf) { + try { + // RetryingMetaStoreClient.getProxy returns an independent IMetaStoreClient + // (one Thrift socket per call), bypassing the Hive thread-local cache that + // Hive.get(conf) would use. This is what gives us N truly independent clients. + return RetryingMetaStoreClient.getProxy(hiveConf, true); + } catch (Exception e) { + throw new HoodieException("Failed to create IMetaStoreClient for pool", e); + } + } + + /** + * Borrows a client, runs the action, and returns the client to the pool. + * Blocks if all clients are in use until one becomes available. + */ + public T run(ClientAction action) throws Exception { + if (closed) { + throw new IllegalStateException("Cannot borrow from a closed IMetaStoreClient pool"); + } + IMetaStoreClient client = available.take(); + try { + return action.apply(client); + } finally { + // Always return the client to the pool, even on failure. Thrift clients + // recover transparently from transactional errors at the HMS layer; + // RetryingMetaStoreClient handles transient socket failures internally. + if (!closed) { + available.offer(client); + } + } + } + + /** + * Worker thread pool sized to match the client pool. Use this to fan out + * batches so the number of in-flight Thrift calls cannot exceed the + * number of pooled clients. + */ + public ExecutorService executor() { + return executor; + } + + public int size() { + return size; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + executor.shutdown(); + try { + if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ie) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + closeQuietly(); + } + + private void closeQuietly() { + for (IMetaStoreClient client : all) { + try { + client.close(); + } catch (Exception e) { + LOG.warn("Error closing pooled IMetaStoreClient", e); + } + } + available.clear(); + all.clear(); + } + + @FunctionalInterface + public interface ClientAction { + T apply(IMetaStoreClient client) throws Exception; + } + + private static final class PoolThreadFactory implements ThreadFactory { + private static final AtomicInteger POOL_ID = new AtomicInteger(0); + private final AtomicInteger threadId = new AtomicInteger(0); + private final String namePrefix = "hudi-hive-sync-pool-" + POOL_ID.incrementAndGet() + "-"; + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, namePrefix + threadId.incrementAndGet()); + t.setDaemon(true); + return t; + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 6d971bcc52263..5e8392c1850d0 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -96,9 +96,12 @@ import static org.apache.hudi.hive.HiveSyncConfig.HIVE_SYNC_FILTER_PUSHDOWN_ENABLED; import static org.apache.hudi.hive.HiveSyncConfig.RECREATE_HIVE_TABLE_ON_ERROR; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_IGNORE_EXCEPTIONS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_AS_DATA_SOURCE_TABLE; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_ENABLED; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_THREADS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_COMMENT; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_TABLE_STRATEGY; @@ -329,6 +332,40 @@ public void testDropUpperCasePartitionWithHMS() throws Exception { "Table partitions should match the number of partitions we wrote"); } + /** + * Exercises HMS sync with parallel partition batching enabled. Forces multiple + * batches with a small batch_num against a partition set large enough to fan out + * across multiple pool clients, and asserts the resulting partition set matches. + */ + @Test + public void testHMSSyncWithBatchingEnabled() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HMS.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "3"); + // Small batch_num so we get multiple batches dispatched in parallel. + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3"); + + int partitionCount = 10; + HiveTestUtil.createCOWTable("100", partitionCount, true); + + reInitHiveSyncClient(); + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), + "Table should not exist before initial sync"); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "All partitions should be added under parallel batching"); + + // Add more partitions, then sync again to exercise the parallel update path. + HiveTestUtil.addCOWPartition("2050/01/01", true, true, "101"); + HiveTestUtil.addCOWPartition("2050/01/02", true, true, "102"); + HiveTestUtil.addCOWPartition("2050/01/03", true, true, "103"); + HiveTestUtil.addCOWPartition("2050/01/04", true, true, "104"); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount + 4, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "Incremental add via parallel batching should sync the new partitions"); + } + @ParameterizedTest @MethodSource({"syncModeAndSchemaFromCommitMetadata"}) public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode, String enablePushDown) throws Exception { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestIMetaStoreClientPool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestIMetaStoreClientPool.java new file mode 100644 index 0000000000000..3c9d4fe899cfd --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestIMetaStoreClientPool.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * Unit tests for {@link IMetaStoreClientPool} borrow/return/close semantics. These tests + * never hit a real Hive Metastore — clients are mocked and injected via the package- + * private constructor. + */ +class TestIMetaStoreClientPool { + + private static List mockClients(int n) { + List clients = new ArrayList<>(n); + for (int i = 0; i < n; i++) { + clients.add(mock(IMetaStoreClient.class)); + } + return clients; + } + + @Test + void runReturnsClientToPoolOnSuccess() throws Exception { + List clients = mockClients(2); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, 2); + try { + IMetaStoreClient borrowed = pool.run(c -> c); + // Both clients should be available again + Set seen = new HashSet<>(); + seen.add(pool.run(c -> c)); + seen.add(pool.run(c -> c)); + assertEquals(2, seen.size(), "Both clients should be reachable after returns"); + assertTrue(seen.contains(borrowed)); + } finally { + pool.close(); + } + } + + @Test + void runReturnsClientToPoolOnFailure() { + List clients = mockClients(1); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, 1); + try { + assertThrows(IllegalStateException.class, () -> pool.run(c -> { + throw new IllegalStateException("boom"); + })); + // Pool should still be usable; the borrowed client was returned + try { + IMetaStoreClient again = pool.run(c -> c); + assertSame(clients.get(0), again); + } catch (Exception e) { + throw new AssertionError("Pool should still be usable after action failure", e); + } + } finally { + pool.close(); + } + } + + @Test + void concurrentBorrowsBlockUntilReturned() throws Exception { + int poolSize = 2; + int callers = 4; + List clients = mockClients(poolSize); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, poolSize); + AtomicInteger concurrent = new AtomicInteger(0); + AtomicInteger maxConcurrent = new AtomicInteger(0); + CountDownLatch start = new CountDownLatch(1); + ExecutorService threads = Executors.newFixedThreadPool(callers); + try { + List> futures = new ArrayList<>(); + for (int i = 0; i < callers; i++) { + futures.add(threads.submit(() -> { + start.await(); + return pool.run(c -> { + int now = concurrent.incrementAndGet(); + maxConcurrent.updateAndGet(prev -> Math.max(prev, now)); + Thread.sleep(50); + concurrent.decrementAndGet(); + return null; + }); + })); + } + start.countDown(); + for (java.util.concurrent.Future f : futures) { + f.get(10, TimeUnit.SECONDS); + } + assertTrue(maxConcurrent.get() <= poolSize, + "Concurrent borrows must not exceed pool size, observed " + maxConcurrent.get()); + assertTrue(maxConcurrent.get() >= 1, "At least one borrow must have occurred"); + } finally { + threads.shutdownNow(); + pool.close(); + } + } + + @Test + void closeReleasesAllClients() throws Exception { + int poolSize = 3; + List clients = mockClients(poolSize); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, poolSize); + pool.close(); + for (IMetaStoreClient c : clients) { + verify(c).close(); + } + // Subsequent borrow should fail + assertThrows(IllegalStateException.class, () -> pool.run(c -> c)); + } + + @Test + void closeIsIdempotent() throws Exception { + int poolSize = 2; + List clients = mockClients(poolSize); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, poolSize); + pool.close(); + pool.close(); + // verify still passes — close() called only once per client + for (IMetaStoreClient c : clients) { + verify(c).close(); + } + } + + @Test + void invalidSizeRejected() { + assertThrows(IllegalArgumentException.class, + () -> new IMetaStoreClientPool(mockClients(0), 0)); + } + + @Test + void sizeMismatchRejected() { + assertThrows(IllegalArgumentException.class, + () -> new IMetaStoreClientPool(mockClients(2), 3)); + } + + @Test + void executorSizedToPool() { + int poolSize = 4; + IMetaStoreClientPool pool = new IMetaStoreClientPool(mockClients(poolSize), poolSize); + try { + assertFalse(pool.executor().isShutdown()); + assertEquals(poolSize, pool.size()); + } finally { + pool.close(); + } + assertTrue(pool.executor().isShutdown()); + } +}