From 71946a61b79da5825a0c88433bf445304fdcdbfa Mon Sep 17 00:00:00 2001 From: sivabalan Date: Thu, 11 Jun 2026 16:54:48 -0700 Subject: [PATCH 1/5] feat(hive-sync): batch and parallelize HMS partition operations Hive sync partition operations on HMS today serialize through a single IMetaStoreClient and ship entire partition lists in a single Thrift call for TOUCH/UPDATE. For large tables (~2k partitions) this is ~5-9x slower than parallel implementations (see #18331). The biggest contributors are (1) one giant alter_partitions call for UPDATE/TOUCH, and (2) per- partition Thrift round-trips for DROP, all sequential. This change introduces an opt-in IMetaStoreClientPool gated behind hoodie.datasource.hive_sync.batching.enabled (default false). When on, HMSDDLExecutor splits ADD / UPDATE / TOUCH / DROP into batches of hoodie.datasource.hive_sync.batch_num (existing config, default 1000) and fans them out across a pool of RetryingMetaStoreClient instances sized by hoodie.datasource.hive_sync.batching.threads (default 4). Design invariant: only partition-row operations go through the pool. Table-row operations (createTable, alter_table, last-commit-time-synced, writer-version, table-comments) stay on the existing session client, so there is no lost-update risk on table parameters. The sync flow remains serial-parallel-serial (phase 1: table setup, phase 2: parallel partition fan-out, phase 3: table finalization). Sequential fallback is preserved when the flag is off or when HIVE_SYNC_USE_SPARK_CATALOG is on (incompatible with the pool's direct RetryingMetaStoreClient.getProxy path). Tests: TestIMetaStoreClientPool covers borrow/return, concurrent borrows, close idempotency. TestHiveSyncTool.testHMSSyncWithBatchingEnabled exercises end-to-end sync against the embedded HMS with batching on. Related: #18331 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../hudi/hive/HiveSyncConfigHolder.java | 17 ++ .../hudi/hive/HoodieHiveSyncClient.java | 37 +++- .../apache/hudi/hive/ddl/HMSDDLExecutor.java | 127 ++++++++--- .../hudi/hive/util/IMetaStoreClientPool.java | 207 ++++++++++++++++++ .../apache/hudi/hive/TestHiveSyncTool.java | 37 ++++ .../hive/util/TestIMetaStoreClientPool.java | 181 +++++++++++++++ 6 files changed, 577 insertions(+), 29 deletions(-) create mode 100644 hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java create mode 100644 hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestIMetaStoreClientPool.java diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java index 418676d591627..01f6d0224070d 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java @@ -122,6 +122,23 @@ public class HiveSyncConfigHolder { .defaultValue(1000) .markAdvanced() .withDocumentation("The number of partitions one batch when synchronous partitions to hive."); + public static final ConfigProperty HIVE_SYNC_BATCHING_ENABLED = ConfigProperty + .key("hoodie.datasource.hive_sync.batching.enabled") + .defaultValue(false) + .markAdvanced() + .sinceVersion("1.1.0") + .withDocumentation("When true, HMS-mode partition operations (add/update/touch/drop) are split into " + + "batches of `hoodie.datasource.hive_sync.batch_num` and dispatched in parallel to a pool of " + + "IMetaStoreClient instances. Table-level operations (create/alter table, last commit time, " + + "writer version) continue to use the single session client. Default off; existing behavior " + + "is unchanged unless explicitly opted in."); + public static final ConfigProperty HIVE_SYNC_BATCHING_THREADS = ConfigProperty + .key("hoodie.datasource.hive_sync.batching.threads") + .defaultValue(4) + .markAdvanced() + .sinceVersion("1.1.0") + .withDocumentation("Pool size (number of IMetaStoreClient instances) and worker thread count used " + + "for parallel partition sync when `hoodie.datasource.hive_sync.batching.enabled` is true."); public static final ConfigProperty HIVE_SYNC_MODE = ConfigProperty .key("hoodie.datasource.hive_sync.mode") .noDefaultValue() diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index a3d04737b8c5c..08e0f8acdd02b 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -37,6 +37,7 @@ import org.apache.hudi.hive.ddl.HiveSyncMode; import org.apache.hudi.hive.ddl.JDBCBasedMetadataOperator; import org.apache.hudi.hive.ddl.JDBCExecutor; +import org.apache.hudi.hive.util.IMetaStoreClientPool; import org.apache.hudi.hive.util.IMetaStoreClientUtil; import org.apache.hudi.hive.util.PartitionFilterGenerator; import org.apache.hudi.sync.common.HoodieSyncClient; @@ -66,6 +67,8 @@ import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getInputFormatClassName; import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getOutputFormatClassName; import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getSerDeClassName; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_ENABLED; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_THREADS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_USE_SPARK_CATALOG; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC; @@ -86,6 +89,10 @@ public class HoodieHiveSyncClient extends HoodieSyncClient { private final Map initialTableByName = new HashMap<>(); DDLExecutor ddlExecutor; private IMetaStoreClient client; + // Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HMS. Owned by this + // class; closed in close() before Hive.closeCurrent(). Hands clients to HMSDDLExecutor + // for partition-row work only — see IMetaStoreClientPool javadoc. + private IMetaStoreClientPool partitionClientPool; /** * JDBC-based metadata operator, lazily initialized on first Thrift @@ -121,7 +128,8 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli HiveSyncMode syncMode = HiveSyncMode.of(config.getString(HIVE_SYNC_MODE)); switch (syncMode) { case HMS: - ddlExecutor = new HMSDDLExecutor(config, this.client); + this.partitionClientPool = maybeBuildPartitionClientPool(config); + ddlExecutor = new HMSDDLExecutor(config, this.client, this.partitionClientPool); break; case HIVEQL: ddlExecutor = new HiveQueryDDLExecutor(config, this.client); @@ -201,6 +209,22 @@ private IMetaStoreClient createMetaStoreClient(HiveSyncConfig config) { } } + private IMetaStoreClientPool maybeBuildPartitionClientPool(HiveSyncConfig config) { + if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) { + return null; + } + if (config.getBooleanOrDefault(HIVE_SYNC_USE_SPARK_CATALOG)) { + // The Spark catalog client is constructed via reflection against a Spark-side + // class and isn't compatible with the direct RetryingMetaStoreClient pool path. + // Fall back to single-client sequential behavior rather than failing the sync. + log.warn("hive_sync.batching.enabled=true is not supported with use_spark_catalog=true; " + + "falling back to sequential partition sync."); + return null; + } + int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS); + return new IMetaStoreClientPool(config, size); + } + private Table getInitialTable(String table) { return initialTableByName.computeIfAbsent(table, t -> { try { @@ -597,6 +621,17 @@ public void deleteLastReplicatedTimeStamp(String tableName) { public void close() { try { ddlExecutor.close(); + // Close the partition client pool before Hive.closeCurrent() so the + // RetryingMetaStoreClient instances held by the pool release their Thrift + // sockets without racing the ThreadLocal Hive cleanup. + if (partitionClientPool != null) { + try { + partitionClientPool.close(); + } catch (Exception e) { + log.warn("Error closing IMetaStoreClient pool", e); + } + partitionClientPool = null; + } if (client != null) { Hive.closeCurrent(); client = null; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java index 6b7bd3af00963..e80d63730b3c8 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java @@ -27,6 +27,7 @@ import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.util.HivePartitionUtil; import org.apache.hudi.hive.util.HiveSchemaUtil; +import org.apache.hudi.hive.util.IMetaStoreClientPool; import org.apache.hudi.storage.StorageSchemes; import org.apache.hudi.sync.common.model.PartitionValueExtractor; @@ -46,13 +47,13 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.thrift.TException; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import java.util.stream.Collectors; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; @@ -73,11 +74,21 @@ public class HMSDDLExecutor implements DDLExecutor { private final String databaseName; private final IMetaStoreClient client; private final PartitionValueExtractor partitionValueExtractor; + // Optional. When non-null, partition-row operations fan out across this pool; + // table-row operations always use the session `client` field above. See + // IMetaStoreClientPool javadoc for the usage contract. + private final IMetaStoreClientPool partitionClientPool; public HMSDDLExecutor(HiveSyncConfig syncConfig, IMetaStoreClient metaStoreClient) throws HiveException, MetaException { + this(syncConfig, metaStoreClient, null); + } + + public HMSDDLExecutor(HiveSyncConfig syncConfig, IMetaStoreClient metaStoreClient, + IMetaStoreClientPool partitionClientPool) throws HiveException, MetaException { this.syncConfig = syncConfig; this.databaseName = syncConfig.getStringOrDefault(META_SYNC_DATABASE_NAME); this.client = metaStoreClient; + this.partitionClientPool = partitionClientPool; try { this.partitionValueExtractor = (PartitionValueExtractor) Class.forName(syncConfig.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS)).newInstance(); @@ -193,11 +204,14 @@ public void addPartitionsToTable(String tableName, List partitionsToAdd) } log.info("Adding partitions {} to table {}", partitionsToAdd.size(), tableName); try { + // Fetch the table StorageDescriptor once on the session client. Each worker + // copies fields off it; the Thrift POJO is not mutated after this read. StorageDescriptor sd = client.getTable(databaseName, tableName).getSd(); int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); - for (List batch : CollectionUtils.batches(partitionsToAdd, batchSyncPartitionNum)) { - List partitionList = new ArrayList<>(); - batch.forEach(x -> { + List> batches = CollectionUtils.batches(partitionsToAdd, batchSyncPartitionNum); + runBatches("add", tableName, batches, (poolClient, batch) -> { + List partitionList = new ArrayList<>(batch.size()); + for (String x : batch) { StorageDescriptor partitionSd = new StorageDescriptor(); partitionSd.setCols(sd.getCols()); partitionSd.setInputFormat(sd.getInputFormat()); @@ -208,11 +222,11 @@ public void addPartitionsToTable(String tableName, List partitionsToAdd) List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(x); partitionSd.setLocation(fullPartitionPath); partitionList.add(new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null)); - }); - client.add_partitions(partitionList, true, false); + } + poolClient.add_partitions(partitionList, true, false); log.info("HMSDDLExecutor add a batch partitions done: {}", partitionList.size()); - } - } catch (TException e) { + }); + } catch (Exception e) { log.error("{}.{} add partition failed", databaseName, tableName, e); throw new HoodieHiveSyncException(databaseName + "." + tableName + " add partition failed", e); } @@ -247,15 +261,22 @@ public void dropPartitionsToTable(String tableName, List partitionsToDro log.info("Drop partitions {} on {}", partitionsToDrop.size(), tableName); try { - for (String dropPartition : partitionsToDrop) { - if (HivePartitionUtil.partitionExists(client, tableName, dropPartition, partitionValueExtractor, syncConfig)) { - String partitionClause = - HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig); - client.dropPartition(databaseName, tableName, partitionClause, false); + // HMS has no batch-drop primitive that matches dropPartition's semantics, so each + // worker still iterates its chunk one partition at a time. The win is fanning + // chunks across pooled clients. + int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); + List> batches = CollectionUtils.batches(partitionsToDrop, batchSyncPartitionNum); + runBatches("drop", tableName, batches, (poolClient, batch) -> { + for (String dropPartition : batch) { + if (HivePartitionUtil.partitionExists(poolClient, tableName, dropPartition, partitionValueExtractor, syncConfig)) { + String partitionClause = + HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig); + poolClient.dropPartition(databaseName, tableName, partitionClause, false); + } + log.info("Drop partition {} on {}", dropPartition, tableName); } - log.info("Drop partition {} on {}", dropPartition, tableName); - } - } catch (TException e) { + }); + } catch (Exception e) { log.error("{}.{} drop partition failed", databaseName, tableName, e); throw new HoodieHiveSyncException(databaseName + "." + tableName + " drop partition failed", e); } @@ -291,21 +312,71 @@ public void close() { private void registerAlterPartitionEvent(String tableName, List alteredPartitions) { try { + // Read the StorageDescriptor once on the session client; each worker deep-copies + // it per partition (alter_partitions semantics today). StorageDescriptor sd = client.getTable(databaseName, tableName).getSd(); - List partitionList = alteredPartitions.stream().map(partition -> { - Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH), partition); - String partitionScheme = partitionPath.toUri().getScheme(); - String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme) - ? HadoopFSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(), partitionPath) : partitionPath.toString(); - List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); - StorageDescriptor partitionSd = sd.deepCopy(); - partitionSd.setLocation(fullPartitionPath); - return new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null); - }).collect(Collectors.toList()); - client.alter_partitions(databaseName, tableName, partitionList, null); - } catch (TException e) { + int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); + List> batches = CollectionUtils.batches(alteredPartitions, batchSyncPartitionNum); + runBatches("alter", tableName, batches, (poolClient, batch) -> { + List partitionList = batch.stream().map(partition -> { + Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH), partition); + String partitionScheme = partitionPath.toUri().getScheme(); + String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme) + ? HadoopFSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(), partitionPath) : partitionPath.toString(); + List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); + StorageDescriptor partitionSd = sd.deepCopy(); + partitionSd.setLocation(fullPartitionPath); + return new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null); + }).collect(Collectors.toList()); + poolClient.alter_partitions(databaseName, tableName, partitionList, null); + }); + } catch (Exception e) { log.error("{}.{} update partition failed", databaseName, tableName, e); throw new HoodieHiveSyncException(databaseName + "." + tableName + " update partition failed", e); } } + + /** + * Dispatches partition batches either in parallel against the client pool (if + * configured) or sequentially against the session client. The sequential path + * preserves the exact behavior we had before the pool existed, including failure + * semantics where batch N+1 never runs if batch N throws. + */ + @FunctionalInterface + private interface BatchAction { + void apply(IMetaStoreClient client, List batch) throws Exception; + } + + private void runBatches(String opName, String tableName, List> batches, BatchAction action) throws Exception { + if (partitionClientPool == null) { + for (List batch : batches) { + action.apply(client, batch); + } + return; + } + List> futures = new ArrayList<>(batches.size()); + for (List batch : batches) { + futures.add(partitionClientPool.executor().submit(() -> + partitionClientPool.run(poolClient -> { + action.apply(poolClient, batch); + return null; + }) + )); + } + Exception firstError = null; + for (Future f : futures) { + try { + f.get(); + } catch (Exception e) { + if (firstError == null) { + firstError = e; + } else { + log.warn("Additional {} batch failed on {} (suppressed in favor of first error)", opName, tableName, e); + } + } + } + if (firstError != null) { + throw firstError; + } + } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java new file mode 100644 index 0000000000000..930fb705cbbda --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Pool of {@link IMetaStoreClient} instances for parallel partition sync. + * + *

Each pooled client wraps an independent Thrift connection to the Hive Metastore. + * Callers borrow a client via {@link #run(ClientAction)}, which blocks until a client + * is available, executes the action, and returns the client to the pool. A worker + * thread pool of the same size is exposed via {@link #executor()} so callers can fan + * out their batches to match the number of available clients. + * + *

Usage contract: pool clients must be used only for partition-row + * operations — {@code add_partitions}, {@code alter_partitions}, {@code dropPartition}, + * {@code getPartition}. Table-row operations ({@code createTable}, {@code alter_table}, + * {@code getTable} used as the read half of a read-modify-write of table parameters) + * must continue to go through the session client held by + * {@code HoodieHiveSyncClient.client} on the sync driver thread. Mixing the two would + * risk lost updates on table parameters such as the last-commit-time-synced marker. + * + *

The pool is gated behind {@code hoodie.datasource.hive_sync.batching.enabled} and + * is constructed only for sync mode HMS. + */ +public class IMetaStoreClientPool implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(IMetaStoreClientPool.class); + + private final ArrayBlockingQueue available; + private final List all; + private final ExecutorService executor; + private final int size; + private volatile boolean closed; + + public IMetaStoreClientPool(HiveSyncConfig config, int size) { + this(buildClients(config, size), size); + } + + // Package-private for tests: accepts a pre-built list of clients so we can + // exercise borrow/return/close semantics without a live metastore. + IMetaStoreClientPool(List clients, int size) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + if (clients.size() != size) { + throw new IllegalArgumentException("Expected " + size + " clients, got " + clients.size()); + } + this.size = size; + this.available = new ArrayBlockingQueue<>(size); + this.all = new ArrayList<>(clients); + this.available.addAll(clients); + this.executor = Executors.newFixedThreadPool(size, new PoolThreadFactory()); + LOG.info("Initialized IMetaStoreClient pool with {} clients", size); + } + + private static List buildClients(HiveSyncConfig config, int size) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + HiveConf hiveConf = config.getHiveConf(); + List clients = new ArrayList<>(size); + try { + for (int i = 0; i < size; i++) { + clients.add(newClient(hiveConf)); + } + return clients; + } catch (Exception e) { + // Construction failed mid-way; close any clients we already built before + // surfacing the error so we don't leak Thrift sockets. + for (IMetaStoreClient c : clients) { + try { + c.close(); + } catch (Exception ignore) { + // intentional: best-effort cleanup during failure + } + } + throw new HoodieException("Failed to construct IMetaStoreClient pool of size " + size, e); + } + } + + private static IMetaStoreClient newClient(HiveConf hiveConf) { + try { + // RetryingMetaStoreClient.getProxy returns an independent IMetaStoreClient + // (one Thrift socket per call), bypassing the Hive thread-local cache that + // Hive.get(conf) would use. This is what gives us N truly independent clients. + return RetryingMetaStoreClient.getProxy(hiveConf, true); + } catch (Exception e) { + throw new HoodieException("Failed to create IMetaStoreClient for pool", e); + } + } + + /** + * Borrows a client, runs the action, and returns the client to the pool. + * Blocks if all clients are in use until one becomes available. + */ + public T run(ClientAction action) throws Exception { + if (closed) { + throw new IllegalStateException("Cannot borrow from a closed IMetaStoreClient pool"); + } + IMetaStoreClient client = available.take(); + try { + return action.apply(client); + } finally { + // Always return the client to the pool, even on failure. Thrift clients + // recover transparently from transactional errors at the HMS layer; + // RetryingMetaStoreClient handles transient socket failures internally. + if (!closed) { + available.offer(client); + } + } + } + + /** + * Worker thread pool sized to match the client pool. Use this to fan out + * batches so the number of in-flight Thrift calls cannot exceed the + * number of pooled clients. + */ + public ExecutorService executor() { + return executor; + } + + public int size() { + return size; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + executor.shutdown(); + try { + if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ie) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + closeQuietly(); + } + + private void closeQuietly() { + for (IMetaStoreClient client : all) { + try { + client.close(); + } catch (Exception e) { + LOG.warn("Error closing pooled IMetaStoreClient", e); + } + } + available.clear(); + all.clear(); + } + + @FunctionalInterface + public interface ClientAction { + T apply(IMetaStoreClient client) throws Exception; + } + + private static final class PoolThreadFactory implements ThreadFactory { + private static final AtomicInteger POOL_ID = new AtomicInteger(0); + private final AtomicInteger threadId = new AtomicInteger(0); + private final String namePrefix = "hudi-hive-sync-pool-" + POOL_ID.incrementAndGet() + "-"; + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, namePrefix + threadId.incrementAndGet()); + t.setDaemon(true); + return t; + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 6d971bcc52263..5e8392c1850d0 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -96,9 +96,12 @@ import static org.apache.hudi.hive.HiveSyncConfig.HIVE_SYNC_FILTER_PUSHDOWN_ENABLED; import static org.apache.hudi.hive.HiveSyncConfig.RECREATE_HIVE_TABLE_ON_ERROR; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_IGNORE_EXCEPTIONS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_AS_DATA_SOURCE_TABLE; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_ENABLED; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_THREADS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_COMMENT; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_TABLE_STRATEGY; @@ -329,6 +332,40 @@ public void testDropUpperCasePartitionWithHMS() throws Exception { "Table partitions should match the number of partitions we wrote"); } + /** + * Exercises HMS sync with parallel partition batching enabled. Forces multiple + * batches with a small batch_num against a partition set large enough to fan out + * across multiple pool clients, and asserts the resulting partition set matches. + */ + @Test + public void testHMSSyncWithBatchingEnabled() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HMS.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "3"); + // Small batch_num so we get multiple batches dispatched in parallel. + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3"); + + int partitionCount = 10; + HiveTestUtil.createCOWTable("100", partitionCount, true); + + reInitHiveSyncClient(); + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), + "Table should not exist before initial sync"); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "All partitions should be added under parallel batching"); + + // Add more partitions, then sync again to exercise the parallel update path. + HiveTestUtil.addCOWPartition("2050/01/01", true, true, "101"); + HiveTestUtil.addCOWPartition("2050/01/02", true, true, "102"); + HiveTestUtil.addCOWPartition("2050/01/03", true, true, "103"); + HiveTestUtil.addCOWPartition("2050/01/04", true, true, "104"); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount + 4, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "Incremental add via parallel batching should sync the new partitions"); + } + @ParameterizedTest @MethodSource({"syncModeAndSchemaFromCommitMetadata"}) public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode, String enablePushDown) throws Exception { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestIMetaStoreClientPool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestIMetaStoreClientPool.java new file mode 100644 index 0000000000000..3c9d4fe899cfd --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestIMetaStoreClientPool.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * Unit tests for {@link IMetaStoreClientPool} borrow/return/close semantics. These tests + * never hit a real Hive Metastore — clients are mocked and injected via the package- + * private constructor. + */ +class TestIMetaStoreClientPool { + + private static List mockClients(int n) { + List clients = new ArrayList<>(n); + for (int i = 0; i < n; i++) { + clients.add(mock(IMetaStoreClient.class)); + } + return clients; + } + + @Test + void runReturnsClientToPoolOnSuccess() throws Exception { + List clients = mockClients(2); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, 2); + try { + IMetaStoreClient borrowed = pool.run(c -> c); + // Both clients should be available again + Set seen = new HashSet<>(); + seen.add(pool.run(c -> c)); + seen.add(pool.run(c -> c)); + assertEquals(2, seen.size(), "Both clients should be reachable after returns"); + assertTrue(seen.contains(borrowed)); + } finally { + pool.close(); + } + } + + @Test + void runReturnsClientToPoolOnFailure() { + List clients = mockClients(1); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, 1); + try { + assertThrows(IllegalStateException.class, () -> pool.run(c -> { + throw new IllegalStateException("boom"); + })); + // Pool should still be usable; the borrowed client was returned + try { + IMetaStoreClient again = pool.run(c -> c); + assertSame(clients.get(0), again); + } catch (Exception e) { + throw new AssertionError("Pool should still be usable after action failure", e); + } + } finally { + pool.close(); + } + } + + @Test + void concurrentBorrowsBlockUntilReturned() throws Exception { + int poolSize = 2; + int callers = 4; + List clients = mockClients(poolSize); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, poolSize); + AtomicInteger concurrent = new AtomicInteger(0); + AtomicInteger maxConcurrent = new AtomicInteger(0); + CountDownLatch start = new CountDownLatch(1); + ExecutorService threads = Executors.newFixedThreadPool(callers); + try { + List> futures = new ArrayList<>(); + for (int i = 0; i < callers; i++) { + futures.add(threads.submit(() -> { + start.await(); + return pool.run(c -> { + int now = concurrent.incrementAndGet(); + maxConcurrent.updateAndGet(prev -> Math.max(prev, now)); + Thread.sleep(50); + concurrent.decrementAndGet(); + return null; + }); + })); + } + start.countDown(); + for (java.util.concurrent.Future f : futures) { + f.get(10, TimeUnit.SECONDS); + } + assertTrue(maxConcurrent.get() <= poolSize, + "Concurrent borrows must not exceed pool size, observed " + maxConcurrent.get()); + assertTrue(maxConcurrent.get() >= 1, "At least one borrow must have occurred"); + } finally { + threads.shutdownNow(); + pool.close(); + } + } + + @Test + void closeReleasesAllClients() throws Exception { + int poolSize = 3; + List clients = mockClients(poolSize); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, poolSize); + pool.close(); + for (IMetaStoreClient c : clients) { + verify(c).close(); + } + // Subsequent borrow should fail + assertThrows(IllegalStateException.class, () -> pool.run(c -> c)); + } + + @Test + void closeIsIdempotent() throws Exception { + int poolSize = 2; + List clients = mockClients(poolSize); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, poolSize); + pool.close(); + pool.close(); + // verify still passes — close() called only once per client + for (IMetaStoreClient c : clients) { + verify(c).close(); + } + } + + @Test + void invalidSizeRejected() { + assertThrows(IllegalArgumentException.class, + () -> new IMetaStoreClientPool(mockClients(0), 0)); + } + + @Test + void sizeMismatchRejected() { + assertThrows(IllegalArgumentException.class, + () -> new IMetaStoreClientPool(mockClients(2), 3)); + } + + @Test + void executorSizedToPool() { + int poolSize = 4; + IMetaStoreClientPool pool = new IMetaStoreClientPool(mockClients(poolSize), poolSize); + try { + assertFalse(pool.executor().isShutdown()); + assertEquals(poolSize, pool.size()); + } finally { + pool.close(); + } + assertTrue(pool.executor().isShutdown()); + } +} From 49a70050c5ffad67f00b62a7edc30b0d5cfa7c20 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Thu, 11 Jun 2026 19:14:34 -0700 Subject: [PATCH 2/5] feat(hive-sync): batch and parallelize HiveQL partition operations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to #18983 (HMS parallelism). Applies the equivalent treatment to the HiveQL sync mode (hoodie.datasource.hive_sync.mode=hiveql). HiveQL had two issues that this change addresses: 1. Batching gaps in QueryBasedDDLExecutor.constructPartitionAlterStatements: TOUCH concatenated every partition into one giant ALTER TABLE ... TOUCH PARTITION (...) PARTITION (...) ... statement; SET_LOCATION (UPDATE) emitted one statement per partition. ADD was already batched. 2. Sequential SQL execution in HiveQueryDDLExecutor.updateHiveSQLs: even when batches existed, they ran in a single for-loop on one Hive Driver. This change introduces HiveDriverPool, an eager pool of single-thread executors each owning a Hive Driver bound to a shared SessionState. Gated behind the existing hoodie.datasource.hive_sync.batching.enabled flag (default off) and sized by hoodie.datasource.hive_sync.batching.threads (default 4) — no new configs. Design notes: - Hive's Driver and SessionState are thread-bound. SessionState.start() attaches to the calling thread's ThreadLocal. The pool gives each slot its own dedicated worker thread so the Driver stays valid for that thread's lifetime. Bootstrap, dispatch, and close all run on the bound thread. - SessionState is shared across workers (lazily constructed once), because each worker calls SessionState.start(sharedState) on its own thread to attach. Constructing one SessionState per worker triggered race conditions in Hive's resource-directory machinery on macOS. - TOUCH is now batched by HIVE_BATCH_SYNC_PARTITION_NUM. SET_LOCATION remains one statement per partition (Hive SQL doesn't support multi-partition SET LOCATION) but is now fanned out across workers. - Hive 2.x's ALTER PARTITION SET LOCATION ignores db.table qualifiers and silently uses the connection's current database, so the leading USE database statement is load-bearing. The pool peels it off and runs it on every worker via runOnEachWorker() before fanning the rest out. Tests: - TestHiveDriverPool: bootstrap, dispatch round-robin, error propagation, concurrent-borrow bounding, close idempotency. - TestHiveSyncTool.testHiveQLSyncWithBatchingEnabled: end-to-end with batching.enabled=true, threads=3, batch_num=3 against embedded HMS. - TestHiveSyncTool.testHiveQLTouchPartitionsWithBatching: exercises the batched TOUCH path specifically. - Full hudi-hive-sync suite: 305 passed, 0 failures, 0 errors. Related: #18331 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../hudi/hive/HoodieHiveSyncClient.java | 19 +- .../hudi/hive/ddl/HiveQueryDDLExecutor.java | 57 ++++ .../hudi/hive/ddl/QueryBasedDDLExecutor.java | 52 ++- .../apache/hudi/hive/util/HiveDriverPool.java | 301 ++++++++++++++++++ .../apache/hudi/hive/TestHiveSyncTool.java | 61 ++++ .../hudi/hive/util/TestHiveDriverPool.java | 190 +++++++++++ 6 files changed, 664 insertions(+), 16 deletions(-) create mode 100644 hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java create mode 100644 hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index 08e0f8acdd02b..94b61f63568c9 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -37,6 +37,7 @@ import org.apache.hudi.hive.ddl.HiveSyncMode; import org.apache.hudi.hive.ddl.JDBCBasedMetadataOperator; import org.apache.hudi.hive.ddl.JDBCExecutor; +import org.apache.hudi.hive.util.HiveDriverPool; import org.apache.hudi.hive.util.IMetaStoreClientPool; import org.apache.hudi.hive.util.IMetaStoreClientUtil; import org.apache.hudi.hive.util.PartitionFilterGenerator; @@ -93,6 +94,10 @@ public class HoodieHiveSyncClient extends HoodieSyncClient { // class; closed in close() before Hive.closeCurrent(). Hands clients to HMSDDLExecutor // for partition-row work only — see IMetaStoreClientPool javadoc. private IMetaStoreClientPool partitionClientPool; + // Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL (explicit + // or legacy default). Owned by HiveQueryDDLExecutor; this field is kept for + // reference only — close() is delegated through ddlExecutor.close(). + private HiveDriverPool partitionDriverPool; /** * JDBC-based metadata operator, lazily initialized on first Thrift @@ -132,7 +137,8 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli ddlExecutor = new HMSDDLExecutor(config, this.client, this.partitionClientPool); break; case HIVEQL: - ddlExecutor = new HiveQueryDDLExecutor(config, this.client); + this.partitionDriverPool = maybeBuildHiveDriverPool(config); + ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool); break; case JDBC: JDBCExecutor jdbcExecutor = new JDBCExecutor(config); @@ -150,7 +156,8 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli jdbcMetadataOperator = new JDBCBasedMetadataOperator( jdbcExecutor.getConnection(), databaseName); } else { - ddlExecutor = new HiveQueryDDLExecutor(config, this.client); + this.partitionDriverPool = maybeBuildHiveDriverPool(config); + ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool); } } } catch (Exception e) { @@ -225,6 +232,14 @@ private IMetaStoreClientPool maybeBuildPartitionClientPool(HiveSyncConfig config return new IMetaStoreClientPool(config, size); } + private HiveDriverPool maybeBuildHiveDriverPool(HiveSyncConfig config) { + if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) { + return null; + } + int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS); + return new HiveDriverPool(config, size); + } + private Table getInitialTable(String table) { return initialTableByName.computeIfAbsent(table, t -> { try { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java index 25434d29eb3ff..b64a6a5d54c5b 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; +import org.apache.hudi.hive.util.HiveDriverPool; import org.apache.hudi.hive.util.HivePartitionUtil; import lombok.extern.slf4j.Slf4j; @@ -39,6 +40,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import java.util.stream.Collectors; import static org.apache.hudi.sync.common.util.TableUtils.tableId; @@ -52,10 +54,20 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor { private final IMetaStoreClient metaStoreClient; private SessionState sessionState; private Driver hiveDriver; + // Optional. When non-null, partition-phase SQL lists fan out across this pool; + // table-level SQL (createTable, schema evolution, single-statement runSQL callers) + // always uses the session `hiveDriver` above. See HiveDriverPool javadoc. + private final HiveDriverPool driverPool; public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient) { + this(config, metaStoreClient, null); + } + + public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient, + HiveDriverPool driverPool) { super(config); this.metaStoreClient = metaStoreClient; + this.driverPool = driverPool; try { this.sessionState = new SessionState(config.getHiveConf(), UserGroupInformation.getCurrentUser().getShortUserName()); @@ -82,6 +94,41 @@ public void runSQL(String sql) { updateHiveSQLs(Collections.singletonList(sql)); } + /** + * Partition-phase SQL fan-out. When the driver pool is present, any leading + * {@code USE database} statements are run on every worker (Hive 2.x's + * ALTER PARTITION SET LOCATION ignores db.table qualifiers and uses the + * connection's current database, so each worker needs to USE the right db + * before any partition ALTER). The remaining statements are then dispatched + * round-robin across the pool. Falls through to the sequential path on the + * session Driver when no pool is configured. + */ + @Override + protected void runSQLs(List sqls) { + if (driverPool == null || sqls.isEmpty()) { + updateHiveSQLs(sqls); + return; + } + int firstNonUse = 0; + while (firstNonUse < sqls.size() && isUseStatement(sqls.get(firstNonUse))) { + firstNonUse++; + } + if (firstNonUse > 0) { + List setupStatements = sqls.subList(0, firstNonUse); + driverPool.runOnEachWorker(setupStatements); + } + List partitionStatements = sqls.subList(firstNonUse, sqls.size()); + if (partitionStatements.isEmpty()) { + return; + } + List> futures = driverPool.runAll(partitionStatements); + driverPool.awaitAll(futures); + } + + private static boolean isUseStatement(String sql) { + return sql != null && sql.regionMatches(true, 0, "USE ", 0, 4); + } + private List updateHiveSQLs(List sqls) { List responses = new ArrayList<>(); try { @@ -149,6 +196,16 @@ public void dropPartitionsToTable(String tableName, List partitionsToDro @Override public void close() { + // Close the pool first so the worker threads stop dispatching against their + // Drivers before we tear down anything else. The pool's close() runs + // Driver/SessionState cleanup on each worker's own thread. + if (driverPool != null) { + try { + driverPool.close(); + } catch (Exception e) { + log.warn("Error closing HiveDriverPool", e); + } + } if (metaStoreClient != null) { Hive.closeCurrent(); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java index 7f776f2f7a04d..a68644ae86f05 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.PartitionPathEncodeUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; @@ -75,6 +76,20 @@ public QueryBasedDDLExecutor(HiveSyncConfig config) { */ public abstract void runSQL(String sql); + /** + * Runs a list of SQL statements. The default implementation executes them + * sequentially via {@link #runSQL(String)}. Subclasses that can parallelize + * (e.g. {@link HiveQueryDDLExecutor} with a driver pool) override this hook + * to fan the list out across workers. The contract requires that the list + * has no positional dependencies — callers must fully qualify table names + * with {@code `db`.`tbl`} so any statement can run on any worker. + */ + protected void runSQLs(List sqls) { + for (String sql : sqls) { + runSQL(sql); + } + } + @Override public void createDatabase(String databaseName) { runSQL("create database if not exists " + databaseName); @@ -120,7 +135,7 @@ public void addPartitionsToTable(String tableName, List partitionsToAdd) } log.info("Adding partitions {} to table {}", partitionsToAdd.size(), tableName); List sqls = constructAddPartitions(tableName, partitionsToAdd); - sqls.stream().forEach(sql -> runSQL(sql)); + runSQLs(sqls); } @Override @@ -131,9 +146,7 @@ public void updatePartitionsToTable(String tableName, List changedPartit } log.info("Changing partitions {} on {}", changedPartitions.size(), tableName); List sqls = constructPartitionAlterStatements(tableName, changedPartitions, PartitionAlterType.SET_LOCATION); - for (String sql : sqls) { - runSQL(sql); - } + runSQLs(sqls); } @Override @@ -210,29 +223,40 @@ public void touchPartitionsToTable(String tableName, List touchPartition } log.info("Touching partitions " + touchPartitions.size() + " on " + tableName); List sqls = constructPartitionAlterStatements(tableName, touchPartitions, PartitionAlterType.TOUCH); - for (String sql : sqls) { - runSQL(sql); - } + runSQLs(sqls); } /** * Builds SQL statements to either touch partitions or set their location. - * TOUCH: one ALTER TABLE ... TOUCH PARTITION (p1) PARTITION (p2) ... - * SET_LOCATION: one ALTER TABLE ... PARTITION (p) SET LOCATION '...' per partition. + * + *

The first element of the returned list is always a {@code USE database} + * statement. Hive 2.x's ALTER PARTITION ... SET LOCATION does not respect the + * {@code db.table} qualifier (silently routes to the connection's current + * database), so the {@code USE} is load-bearing. Parallel execution paths must + * run this statement on every worker before fanning out the rest. + * + *

TOUCH: one {@code ALTER TABLE ... TOUCH PARTITION (p1) ...} per batch of + * {@code HIVE_BATCH_SYNC_PARTITION_NUM} partitions. + * + *

SET_LOCATION: one {@code ALTER TABLE ... PARTITION (p) SET LOCATION '...'} + * per partition (Hive SQL does not support multi-partition SET LOCATION in one + * statement). */ private List constructPartitionAlterStatements(String tableName, List partitions, PartitionAlterType alterType) { List result = new ArrayList<>(); - // Hive 2.x doesn't like db.table name for operations, hence we need to change to using the database first String useDatabase = "USE " + HIVE_ESCAPE_CHARACTER + databaseName + HIVE_ESCAPE_CHARACTER; result.add(useDatabase); String alterTablePrefix = "ALTER TABLE " + HIVE_ESCAPE_CHARACTER + tableName + HIVE_ESCAPE_CHARACTER; + int batchSyncPartitionNum = config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); switch (alterType) { case TOUCH: - String alterTable = alterTablePrefix + " TOUCH"; - for (String partition : partitions) { - alterTable += " PARTITION (" + getPartitionClause(partition) + ")"; + for (List batch : CollectionUtils.batches(partitions, batchSyncPartitionNum)) { + StringBuilder alterTable = new StringBuilder(alterTablePrefix).append(" TOUCH"); + for (String partition : batch) { + alterTable.append(" PARTITION (").append(getPartitionClause(partition)).append(")"); + } + result.add(alterTable.toString()); } - result.add(alterTable); break; case SET_LOCATION: for (String partition : partitions) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java new file mode 100644 index 0000000000000..64bca898d1828 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HoodieHiveSyncException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; + +/** + * Pool of Hive {@link Driver} + {@link SessionState} pairs for parallel HiveQL DDL. + * + *

Hive's {@code SessionState.start(state)} binds state to the calling thread's + * thread-local, and {@code Driver} reads from that thread-local during {@code run()}. + * A Driver constructed on one thread cannot be safely used from another. This pool + * solves that by giving each slot its own dedicated worker thread (a single-thread + * executor) — the Driver and SessionState are built on that thread by a bootstrap + * task, and all subsequent SQL for that slot runs on the same thread. + * + *

Usage contract: use this pool only for partition-row DDL statements that + * are independent of each other and freely shuffleable across workers. Table-level + * statements (createTable, schema evolution, USE database) must continue to run on + * the session {@code Driver} held by {@code HiveQueryDDLExecutor} on the sync driver + * thread. The pool is gated behind {@code hoodie.datasource.hive_sync.batching.enabled} + * and is constructed only for HiveQL sync mode. + */ +public class HiveDriverPool implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(HiveDriverPool.class); + + private final List workers; + private final int size; + private volatile boolean closed; + + public HiveDriverPool(HiveSyncConfig config, int size) { + this(config, size, new DefaultDriverFactory(config)); + } + + // Package-private for tests: accepts a DriverFactory so unit tests can inject + // mock Driver instances without standing up a real Hive instance. + HiveDriverPool(HiveSyncConfig config, int size, DriverFactory factory) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + this.size = size; + this.workers = new ArrayList<>(size); + String databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME); + PoolThreadFactory threadFactory = new PoolThreadFactory(); + List> bootstrapFutures = new ArrayList<>(size); + try { + for (int i = 0; i < size; i++) { + Worker worker = new Worker(threadFactory); + workers.add(worker); + bootstrapFutures.add(worker.executor.submit(() -> { + worker.driver = factory.newDriver(databaseName); + return null; + })); + } + // Block until all bootstraps complete so we surface construction errors + // before any caller hands us SQL. + for (Future f : bootstrapFutures) { + f.get(); + } + } catch (Exception e) { + tearDown(); + throw new HoodieException("Failed to construct HiveDriverPool of size " + size, e); + } + LOG.info("Initialized HiveDriverPool with {} workers", size); + } + + /** + * Runs each given SQL on every worker, in order. Used for setup statements + * (e.g. {@code USE database}) that must establish per-thread session context + * before any partition statement runs. Blocks until all workers have completed + * the setup. Throws on first error. + */ + public void runOnEachWorker(List setupSqls) { + if (closed) { + throw new IllegalStateException("Cannot dispatch to a closed HiveDriverPool"); + } + if (setupSqls.isEmpty()) { + return; + } + List> futures = new ArrayList<>(workers.size()); + for (Worker worker : workers) { + futures.add(worker.executor.submit(() -> { + for (String sql : setupSqls) { + worker.driver.run(sql); + } + return null; + })); + } + awaitAll(futures); + } + + /** + * Dispatches each SQL string to a worker (round-robin) and returns the list of + * futures. The caller is responsible for awaiting and collecting errors. + */ + public List> runAll(List sqls) { + if (closed) { + throw new IllegalStateException("Cannot dispatch to a closed HiveDriverPool"); + } + List> futures = new ArrayList<>(sqls.size()); + for (int i = 0; i < sqls.size(); i++) { + String sql = sqls.get(i); + Worker worker = workers.get(i % workers.size()); + futures.add(worker.executor.submit(() -> { + long start = System.currentTimeMillis(); + worker.driver.run(sql); + LOG.info("Time taken to execute [{}]: {} ms", sql, System.currentTimeMillis() - start); + return null; + })); + } + return futures; + } + + /** + * Awaits all futures, throws the first exception encountered (logging the rest at + * WARN), and returns the list of CommandProcessorResponse objects (currently + * unused but matches the existing single-threaded contract that returned them). + */ + public List awaitAll(List> futures) { + Exception firstError = null; + for (Future f : futures) { + try { + f.get(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + if (firstError == null) { + firstError = ie; + } + } catch (ExecutionException ee) { + Exception cause = unwrap(ee); + if (firstError == null) { + firstError = cause; + } else { + LOG.warn("Additional SQL batch failed (suppressed in favor of first error)", cause); + } + } + } + if (firstError != null) { + throw new HoodieHiveSyncException("Failed in executing SQL", firstError); + } + return new ArrayList<>(); + } + + private static Exception unwrap(ExecutionException ee) { + Throwable cause = ee.getCause(); + return (cause instanceof Exception) ? (Exception) cause : ee; + } + + public int size() { + return size; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + tearDown(); + } + + private void tearDown() { + // Close each worker's Driver/SessionState on its own thread, then shut the + // executor down. Running close() on the bound thread keeps SessionState's + // thread-local cleanup correct. + for (Worker worker : workers) { + try { + worker.executor.submit(() -> { + if (worker.driver != null) { + try { + worker.driver.close(); + } catch (Exception e) { + LOG.warn("Error closing pooled Driver", e); + } + } + SessionState ss = SessionState.get(); + if (ss != null) { + try { + ss.close(); + } catch (Exception e) { + LOG.warn("Error closing pooled SessionState", e); + } + } + return null; + }).get(30, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.warn("Error during pool worker shutdown", e); + } + worker.executor.shutdown(); + try { + if (!worker.executor.awaitTermination(10, TimeUnit.SECONDS)) { + worker.executor.shutdownNow(); + } + } catch (InterruptedException ie) { + worker.executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + workers.clear(); + } + + /** + * Per-slot state: a single-thread executor and the Driver bound to its thread. + * Driver is volatile because it is written by the bootstrap task and read by + * subsequent dispatch tasks on the same executor. + */ + private static final class Worker { + final ExecutorService executor; + volatile Driver driver; + + Worker(ThreadFactory threadFactory) { + this.executor = Executors.newSingleThreadExecutor(threadFactory); + } + } + + @FunctionalInterface + interface DriverFactory { + Driver newDriver(String databaseName) throws Exception; + } + + /** + * Builds a real Hive {@link Driver} on the calling thread. The SessionState is + * constructed lazily (once, on the first worker thread that builds a Driver) and + * shared across all worker threads — Hive uses ThreadLocal attachment, not + * exclusive ownership, so multiple workers calling + * {@code SessionState.start(sharedState)} all see the same config and scratch dir + * without each spending the cost of building their own SessionState (and risking + * resource-dir creation races during the constructor). + */ + private static final class DefaultDriverFactory implements DriverFactory { + private final HiveConf hiveConf; + private volatile SessionState sharedSessionState; + + DefaultDriverFactory(HiveSyncConfig config) { + this.hiveConf = config.getHiveConf(); + } + + @Override + public synchronized Driver newDriver(String databaseName) throws Exception { + if (sharedSessionState == null) { + sharedSessionState = new SessionState(hiveConf, + UserGroupInformation.getCurrentUser().getShortUserName()); + } + SessionState.start(sharedSessionState); + sharedSessionState.setCurrentDatabase(databaseName); + return new Driver(hiveConf); + } + } + + private static final class PoolThreadFactory implements ThreadFactory { + private static final AtomicInteger POOL_ID = new AtomicInteger(0); + private final AtomicInteger threadId = new AtomicInteger(0); + private final String namePrefix = "hudi-hive-driver-pool-" + POOL_ID.incrementAndGet() + "-"; + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, namePrefix + threadId.incrementAndGet()); + t.setDaemon(true); + return t; + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 5e8392c1850d0..5afc9af484ac2 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -366,6 +366,67 @@ public void testHMSSyncWithBatchingEnabled() throws Exception { "Incremental add via parallel batching should sync the new partitions"); } + /** + * Exercises HiveQL sync with parallel partition batching enabled. Mirrors the + * HMS test but routes through the HiveDriverPool — each worker thread owns a + * Driver+SessionState pair, and the SQL list (qualified with `db`.`tbl`) is + * fanned out across them. + */ + @Test + public void testHiveQLSyncWithBatchingEnabled() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HIVEQL.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "3"); + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3"); + + int partitionCount = 10; + HiveTestUtil.createCOWTable("100", partitionCount, true); + + reInitHiveSyncClient(); + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), + "Table should not exist before initial sync"); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "All partitions should be added under parallel HiveQL batching"); + + // Add more partitions, then sync again to exercise the parallel update path. + HiveTestUtil.addCOWPartition("2050/01/01", true, true, "101"); + HiveTestUtil.addCOWPartition("2050/01/02", true, true, "102"); + HiveTestUtil.addCOWPartition("2050/01/03", true, true, "103"); + HiveTestUtil.addCOWPartition("2050/01/04", true, true, "104"); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount + 4, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "Incremental add via parallel HiveQL batching should sync the new partitions"); + } + + /** + * Exercises the TOUCH path in HiveQL mode with batching on. Verifies that + * splitting one giant ALTER TABLE TOUCH PARTITION(...)... into multiple smaller + * statements does not break partition visibility downstream. + */ + @Test + public void testHiveQLTouchPartitionsWithBatching() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HIVEQL.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "2"); + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "2"); + hiveSyncProps.setProperty(META_SYNC_TOUCH_PARTITIONS_ENABLED.key(), "true"); + + int partitionCount = 6; + HiveTestUtil.createCOWTable("100", partitionCount, true); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size()); + + // Touch existing partitions (no new data) — should hit the batched TOUCH path + // without changing the partition count. + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "TOUCH batching must not change the partition set"); + } + @ParameterizedTest @MethodSource({"syncModeAndSchemaFromCommitMetadata"}) public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode, String enablePushDown) throws Exception { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java new file mode 100644 index 0000000000000..daa98e5ebaec2 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HoodieHiveSyncException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.junit.jupiter.api.Test; +import org.mockito.invocation.InvocationOnMock; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Unit tests for {@link HiveDriverPool} that exercise bootstrap, dispatch, error + * propagation, and close semantics without standing up a real Hive instance. + */ +class TestHiveDriverPool { + + private static HiveSyncConfig configWithEmptyHiveConf() { + HiveSyncConfig config = mock(HiveSyncConfig.class); + doAnswer(inv -> new HiveConf()).when(config).getHiveConf(); + doAnswer(inv -> "default").when(config).getStringOrDefault( + org.mockito.ArgumentMatchers.any()); + return config; + } + + @Test + void bootstrapBuildsOneDriverPerSlot() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + AtomicInteger built = new AtomicInteger(); + HiveDriverPool.DriverFactory factory = (db) -> { + built.incrementAndGet(); + return mock(Driver.class); + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 3, factory)) { + assertEquals(3, pool.size()); + assertEquals(3, built.get(), "One Driver per slot should be constructed eagerly"); + } + } + + @Test + void bootstrapFailurePropagatesAndTearsDown() { + HiveSyncConfig config = configWithEmptyHiveConf(); + AtomicInteger calls = new AtomicInteger(); + HiveDriverPool.DriverFactory factory = (db) -> { + int n = calls.incrementAndGet(); + if (n == 2) { + throw new RuntimeException("simulated driver build failure"); + } + return mock(Driver.class); + }; + HoodieException ex = assertThrows(HoodieException.class, + () -> new HiveDriverPool(config, 3, factory)); + assertTrue(ex.getMessage().contains("Failed to construct HiveDriverPool")); + } + + @Test + void runAllDispatchesEachSqlAcrossWorkers() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + // Each worker counts how many SQLs it received and remembers the thread. + ConcurrentHashMap> seenThreadsByDriver = new ConcurrentHashMap<>(); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + seenThreadsByDriver.put(d, ConcurrentHashMap.newKeySet()); + doAnswer((InvocationOnMock inv) -> { + seenThreadsByDriver.get(d).add(Thread.currentThread().getName()); + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 2, factory)) { + List sqls = Arrays.asList("SELECT 1", "SELECT 2", "SELECT 3", "SELECT 4"); + List> futures = pool.runAll(sqls); + pool.awaitAll(futures); + assertEquals(2, seenThreadsByDriver.size(), "Expected exactly 2 worker Drivers"); + int totalCalls = seenThreadsByDriver.values().stream().mapToInt(Set::size).sum(); + assertTrue(totalCalls >= 1, "At least one worker should have logged a thread"); + // Each Driver should have been invoked exactly twice (round-robin with 4 sqls, 2 workers). + for (Driver d : seenThreadsByDriver.keySet()) { + verify(d, times(2)).run(anyString()); + } + } + } + + @Test + void awaitAllThrowsFirstError() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + doAnswer(inv -> { + String sql = inv.getArgument(0); + if (sql.equals("FAIL")) { + throw new RuntimeException("boom: " + sql); + } + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 2, factory)) { + List> futures = pool.runAll(Arrays.asList("OK", "FAIL", "OK")); + HoodieHiveSyncException ex = assertThrows(HoodieHiveSyncException.class, + () -> pool.awaitAll(futures)); + assertTrue(ex.getCause() != null && ex.getCause().getMessage().contains("boom")); + } + } + + @Test + void concurrentDispatchBoundedByPoolSize() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + AtomicInteger inFlight = new AtomicInteger(); + AtomicInteger maxInFlight = new AtomicInteger(); + CountDownLatch hold = new CountDownLatch(1); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + doAnswer(inv -> { + int now = inFlight.incrementAndGet(); + maxInFlight.updateAndGet(prev -> Math.max(prev, now)); + hold.await(2, TimeUnit.SECONDS); + inFlight.decrementAndGet(); + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 2, factory)) { + // 5 SQLs against pool of size 2 → max in-flight should be 2. + List> futures = pool.runAll(Arrays.asList("a", "b", "c", "d", "e")); + // Release after a short wait so all SQLs progress. + Thread.sleep(150); + hold.countDown(); + pool.awaitAll(futures); + assertTrue(maxInFlight.get() <= 2, + "Max concurrent dispatches must not exceed pool size, observed " + maxInFlight.get()); + assertTrue(maxInFlight.get() >= 1, "Sanity: at least one dispatch ran"); + } + } + + @Test + void closeIsIdempotentAndPreventsFurtherDispatch() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + HiveDriverPool.DriverFactory factory = (db) -> mock(Driver.class); + HiveDriverPool pool = new HiveDriverPool(config, 2, factory); + pool.close(); + pool.close(); + assertThrows(IllegalStateException.class, + () -> pool.runAll(Arrays.asList("anything"))); + } + + @Test + void invalidSizeRejected() { + HiveSyncConfig config = configWithEmptyHiveConf(); + HiveDriverPool.DriverFactory factory = (db) -> mock(Driver.class); + assertThrows(IllegalArgumentException.class, + () -> new HiveDriverPool(config, 0, factory)); + } +} From 4bb34bca169143ea361c5750dc03be7738207e4f Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 16 Jun 2026 20:52:06 -0700 Subject: [PATCH 3/5] refactor(hive-sync): clean up HiveDriverPool awaitAll return; small comments - Change HiveDriverPool.awaitAll(...) to return void. The List it previously returned was always empty and no caller consumed it. Drops the unused CommandProcessorResponse import. - Lift the empty-input short-circuit to the top of HiveQueryDDLExecutor.runSQLs so the no-op case skips both the pool and the session Driver branches cleanly. - Document isUseStatement's strict 4-char prefix expectation so future callers don't feed it externally produced (potentially padded) SQL. No behavior change. Full hudi-hive-sync suite: 305 tests, 0 failures, 0 errors. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java | 8 +++++++- .../java/org/apache/hudi/hive/util/HiveDriverPool.java | 10 ++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java index b64a6a5d54c5b..22835ff3df377 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java @@ -105,7 +105,10 @@ public void runSQL(String sql) { */ @Override protected void runSQLs(List sqls) { - if (driverPool == null || sqls.isEmpty()) { + if (sqls.isEmpty()) { + return; + } + if (driverPool == null) { updateHiveSQLs(sqls); return; } @@ -125,6 +128,9 @@ protected void runSQLs(List sqls) { driverPool.awaitAll(futures); } + // Strict 4-char prefix match on "USE ". Internal callers (constructPartitionAlterStatements) + // always emit the USE statement without leading whitespace; do not call with externally + // supplied SQL that might be padded. private static boolean isUseStatement(String sql) { return sql != null && sql.regionMatches(true, 0, "USE ", 0, 4); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java index 64bca898d1828..f4abace11e611 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; @@ -151,11 +150,11 @@ public List> runAll(List sqls) { } /** - * Awaits all futures, throws the first exception encountered (logging the rest at - * WARN), and returns the list of CommandProcessorResponse objects (currently - * unused but matches the existing single-threaded contract that returned them). + * Awaits all futures and throws the first exception encountered, logging any + * subsequent failures at WARN. Callers do not need the per-statement results + * (Hive's Driver.run side-effects the metastore), so this method is void. */ - public List awaitAll(List> futures) { + public void awaitAll(List> futures) { Exception firstError = null; for (Future f : futures) { try { @@ -177,7 +176,6 @@ public List awaitAll(List> futures) { if (firstError != null) { throw new HoodieHiveSyncException("Failed in executing SQL", firstError); } - return new ArrayList<>(); } private static Exception unwrap(ExecutionException ee) { From acefd03ef857f220b814d8c0545c6d38168dbed1 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 16 Jun 2026 23:50:03 -0700 Subject: [PATCH 4/5] refactor(hive-sync): address review feedback on HiveDriverPool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR review follow-ups for #18984: - HiveQueryDDLExecutor.driverPool -> Option (PR comment). Constructor arg, instance field, and HoodieHiveSyncClient call sites updated. Eliminates a stale 'Optional. When non-null' inline doc. - DefaultDriverFactory: stop redundantly calling setCurrentDatabase on every newDriver(). Database is a pool-wide property that never changes across workers, so set it once when the shared SessionState is first constructed (PR comment). - HiveDriverPool.awaitAll: on first failure, cancel remaining (not yet started) pending futures so workers don't keep running pointless work after a fatal error. Cancel uses mayInterruptIfRunning=false so any in-flight statement is allowed to run to completion (keeps Driver state consistent). Suppressed errors continue to be logged at WARN. Adds handling for CancellationException so the cancel-walk doesn't itself raise a spurious HoodieHiveSyncException. - HiveDriverPool bootstrap: bound each Future.get() at 60s (BOOTSTRAP_TIMEOUT_SECONDS). Prior code blocked forever if Hive init hung — now we surface a HoodieException with a timeout cause. - Logging: stop logging full SQL text per-statement in runAll/awaitAll (batched TOUCH/ADD can be many KB; N workers multiply log volume). Replaced with a single per-call summary line. Same treatment applied to HiveQueryDDLExecutor.updateHiveSQLs (sequential path). - New unit test: runOnEachWorkerRunsSetupOnEveryWorker — asserts every worker sees the leading USE before any fan-out partition statement. - New unit test: awaitAllCancelsPendingFuturesOnFirstError — uses a size-1 pool to guarantee the 2nd/3rd statements are still pending behind the failing 1st, then asserts they are cancelled. - New end-to-end test: testHiveQLSetLocationWithBatching — drives updatePartitionsToTable through the SET_LOCATION fan-out path with batching on; asserts partition count and per-partition relative paths survive parallel ALTER PARTITION SET LOCATION. Out of scope (documented as follow-up): DROP partition parallelization in HIVEQL mode. DROP goes through IMetaStoreClient.dropPartition (Thrift, not Hive Driver), so it would need IMetaStoreClientPool wired into the HiveQL path — a separable change from the HiveDriverPool work this PR delivers. Tests: full hudi-hive-sync suite passes — 308 tests, 0 failures, 0 errors (was 305 before this commit). New tests: - TestHiveDriverPool: 9 tests (was 7) - TestHiveSyncTool: testHiveQLSetLocationWithBatching added Co-Authored-By: Claude Opus 4.7 (1M context) --- .../hudi/hive/HoodieHiveSyncClient.java | 10 +-- .../hudi/hive/ddl/HiveQueryDDLExecutor.java | 32 +++++---- .../apache/hudi/hive/util/HiveDriverPool.java | 61 +++++++++++++--- .../apache/hudi/hive/TestHiveSyncTool.java | 39 +++++++++++ .../hudi/hive/util/TestHiveDriverPool.java | 70 +++++++++++++++++++ 5 files changed, 182 insertions(+), 30 deletions(-) diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index 94b61f63568c9..53beb0bccd8d2 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -94,10 +94,10 @@ public class HoodieHiveSyncClient extends HoodieSyncClient { // class; closed in close() before Hive.closeCurrent(). Hands clients to HMSDDLExecutor // for partition-row work only — see IMetaStoreClientPool javadoc. private IMetaStoreClientPool partitionClientPool; - // Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL (explicit + // Present only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL (explicit // or legacy default). Owned by HiveQueryDDLExecutor; this field is kept for // reference only — close() is delegated through ddlExecutor.close(). - private HiveDriverPool partitionDriverPool; + private Option partitionDriverPool = Option.empty(); /** * JDBC-based metadata operator, lazily initialized on first Thrift @@ -232,12 +232,12 @@ private IMetaStoreClientPool maybeBuildPartitionClientPool(HiveSyncConfig config return new IMetaStoreClientPool(config, size); } - private HiveDriverPool maybeBuildHiveDriverPool(HiveSyncConfig config) { + private Option maybeBuildHiveDriverPool(HiveSyncConfig config) { if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) { - return null; + return Option.empty(); } int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS); - return new HiveDriverPool(config, size); + return Option.of(new HiveDriverPool(config, size)); } private Table getInitialTable(String table) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java index 22835ff3df377..8f52c099f1c02 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java @@ -19,6 +19,7 @@ package org.apache.hudi.hive.ddl; import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.Option; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.util.HiveDriverPool; @@ -54,17 +55,17 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor { private final IMetaStoreClient metaStoreClient; private SessionState sessionState; private Driver hiveDriver; - // Optional. When non-null, partition-phase SQL lists fan out across this pool; - // table-level SQL (createTable, schema evolution, single-statement runSQL callers) - // always uses the session `hiveDriver` above. See HiveDriverPool javadoc. - private final HiveDriverPool driverPool; + // When present, partition-phase SQL lists fan out across this pool; table-level SQL + // (createTable, schema evolution, single-statement runSQL callers) always uses the + // session `hiveDriver` above. See HiveDriverPool javadoc. + private final Option driverPool; public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient) { - this(config, metaStoreClient, null); + this(config, metaStoreClient, Option.empty()); } public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient, - HiveDriverPool driverPool) { + Option driverPool) { super(config); this.metaStoreClient = metaStoreClient; this.driverPool = driverPool; @@ -108,24 +109,25 @@ protected void runSQLs(List sqls) { if (sqls.isEmpty()) { return; } - if (driverPool == null) { + if (!driverPool.isPresent()) { updateHiveSQLs(sqls); return; } + HiveDriverPool pool = driverPool.get(); int firstNonUse = 0; while (firstNonUse < sqls.size() && isUseStatement(sqls.get(firstNonUse))) { firstNonUse++; } if (firstNonUse > 0) { List setupStatements = sqls.subList(0, firstNonUse); - driverPool.runOnEachWorker(setupStatements); + pool.runOnEachWorker(setupStatements); } List partitionStatements = sqls.subList(firstNonUse, sqls.size()); if (partitionStatements.isEmpty()) { return; } - List> futures = driverPool.runAll(partitionStatements); - driverPool.awaitAll(futures); + List> futures = pool.runAll(partitionStatements); + pool.awaitAll(futures); } // Strict 4-char prefix match on "USE ". Internal callers (constructPartitionAlterStatements) @@ -137,17 +139,17 @@ private static boolean isUseStatement(String sql) { private List updateHiveSQLs(List sqls) { List responses = new ArrayList<>(); + HoodieTimer timer = HoodieTimer.start(); try { for (String sql : sqls) { if (hiveDriver != null) { - HoodieTimer timer = HoodieTimer.start(); responses.add(hiveDriver.run(sql)); - log.info("Time taken to execute [{}]: {} ms", sql, timer.endTimer()); } } } catch (Exception e) { throw new HoodieHiveSyncException("Failed in executing SQL", e); } + log.info("Executed {} SQL statements sequentially in {} ms", sqls.size(), timer.endTimer()); return responses; } @@ -205,13 +207,13 @@ public void close() { // Close the pool first so the worker threads stop dispatching against their // Drivers before we tear down anything else. The pool's close() runs // Driver/SessionState cleanup on each worker's own thread. - if (driverPool != null) { + driverPool.ifPresent(pool -> { try { - driverPool.close(); + pool.close(); } catch (Exception e) { log.warn("Error closing HiveDriverPool", e); } - } + }); if (metaStoreClient != null) { Hive.closeCurrent(); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java index f4abace11e611..2ea2ebc5e6e22 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -62,6 +63,12 @@ public class HiveDriverPool implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(HiveDriverPool.class); + // Per-worker Driver construction has to be fast in practice (a few hundred ms + // for the SessionState + Driver init). A 60s ceiling per worker leaves plenty of + // headroom for a slow JVM warm-up but bounds the failure mode if the metastore + // is unreachable or Hive hangs during init. + private static final long BOOTSTRAP_TIMEOUT_SECONDS = 60; + private final List workers; private final int size; private volatile boolean closed; @@ -91,9 +98,10 @@ public HiveDriverPool(HiveSyncConfig config, int size) { })); } // Block until all bootstraps complete so we surface construction errors - // before any caller hands us SQL. + // before any caller hands us SQL. Bounded by BOOTSTRAP_TIMEOUT_SECONDS so a + // hung Hive init doesn't deadlock the sync driver thread. for (Future f : bootstrapFutures) { - f.get(); + f.get(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS); } } catch (Exception e) { tearDown(); @@ -129,7 +137,10 @@ public void runOnEachWorker(List setupSqls) { /** * Dispatches each SQL string to a worker (round-robin) and returns the list of - * futures. The caller is responsible for awaiting and collecting errors. + * futures. The caller is responsible for awaiting and collecting errors. SQL text + * is intentionally not logged per-statement here: batched TOUCH/ADD statements can + * be many kilobytes, and N parallel workers would multiply the log volume. See + * {@link #awaitAll(List)} for the per-call summary log. */ public List> runAll(List sqls) { if (closed) { @@ -140,9 +151,7 @@ public List> runAll(List sqls) { String sql = sqls.get(i); Worker worker = workers.get(i % workers.size()); futures.add(worker.executor.submit(() -> { - long start = System.currentTimeMillis(); worker.driver.run(sql); - LOG.info("Time taken to execute [{}]: {} ms", sql, System.currentTimeMillis() - start); return null; })); } @@ -150,24 +159,37 @@ public List> runAll(List sqls) { } /** - * Awaits all futures and throws the first exception encountered, logging any - * subsequent failures at WARN. Callers do not need the per-statement results - * (Hive's Driver.run side-effects the metastore), so this method is void. + * Awaits all futures and throws the first exception encountered. On first failure, + * cancels the remaining (not yet started) futures so workers don't keep running + * pointless work after a fatal error. Any errors that finished before cancellation + * are logged at WARN. Callers do not need per-statement results (Hive's Driver.run + * side-effects the metastore), so this method is void. */ public void awaitAll(List> futures) { + long start = System.currentTimeMillis(); Exception firstError = null; - for (Future f : futures) { + int completed = 0; + int cancelled = 0; + for (int i = 0; i < futures.size(); i++) { + Future f = futures.get(i); try { f.get(); + completed++; + } catch (CancellationException ce) { + // We cancelled this future ourselves after a prior error. Don't treat it + // as a new failure; just note it for the summary log. + cancelled++; } catch (InterruptedException ie) { Thread.currentThread().interrupt(); if (firstError == null) { firstError = ie; + cancelled += cancelRemaining(futures, i + 1); } } catch (ExecutionException ee) { Exception cause = unwrap(ee); if (firstError == null) { firstError = cause; + cancelled += cancelRemaining(futures, i + 1); } else { LOG.warn("Additional SQL batch failed (suppressed in favor of first error)", cause); } @@ -176,6 +198,21 @@ public void awaitAll(List> futures) { if (firstError != null) { throw new HoodieHiveSyncException("Failed in executing SQL", firstError); } + LOG.info("Completed {} SQL statements ({} cancelled) in {} ms across {} workers", + completed, cancelled, System.currentTimeMillis() - start, size); + } + + private static int cancelRemaining(List> futures, int fromIndex) { + int cancelled = 0; + for (int j = fromIndex; j < futures.size(); j++) { + // mayInterruptIfRunning=false: the worker thread is bound to a Hive Driver + // whose state we don't want to corrupt mid-statement. Cancel only those that + // haven't started yet; in-flight statements run to completion. + if (futures.get(j).cancel(false)) { + cancelled++; + } + } + return cancelled; } private static Exception unwrap(ExecutionException ee) { @@ -274,12 +311,16 @@ private static final class DefaultDriverFactory implements DriverFactory { @Override public synchronized Driver newDriver(String databaseName) throws Exception { + // SessionState is shared across workers; build it once on the first call (with + // currentDatabase already set) and attach it to each worker's thread-local on + // subsequent calls. The database is a pool-wide property and never changes + // across workers, so setting it once at construction time is sufficient. if (sharedSessionState == null) { sharedSessionState = new SessionState(hiveConf, UserGroupInformation.getCurrentUser().getShortUserName()); + sharedSessionState.setCurrentDatabase(databaseName); } SessionState.start(sharedSessionState); - sharedSessionState.setCurrentDatabase(databaseName); return new Driver(hiveConf); } } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 5afc9af484ac2..2c574b802ece1 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -400,6 +400,45 @@ public void testHiveQLSyncWithBatchingEnabled() throws Exception { "Incremental add via parallel HiveQL batching should sync the new partitions"); } + /** + * Exercises the SET_LOCATION path in HiveQL mode with batching on. SET_LOCATION + * emits one ALTER PARTITION ... SET LOCATION statement per partition (Hive SQL + * has no multi-partition SET LOCATION), so this is the fan-out path most likely + * to exercise concurrent ALTER PARTITION calls against the same table. + */ + @Test + public void testHiveQLSetLocationWithBatching() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HIVEQL.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "3"); + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "2"); + + int partitionCount = 6; + HiveTestUtil.createCOWTable("100", partitionCount, true); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size()); + + // Drive the SET_LOCATION path by directly calling updatePartitionsToTable with + // existing partition paths. Each partition produces its own ALTER ... SET LOCATION + // statement, fanned out across the 3 workers in the pool. + List existingPartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).stream() + .map(p -> getRelativePartitionPath(new Path(basePath), new Path(p.getStorageLocation()))) + .collect(Collectors.toList()); + hiveClient.updatePartitionsToTable(HiveTestUtil.TABLE_NAME, existingPartitions); + + List after = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); + assertEquals(partitionCount, after.size(), + "Parallel SET_LOCATION must not change the partition set"); + Set relativePaths = after.stream() + .map(p -> getRelativePartitionPath(new Path(basePath), new Path(p.getStorageLocation()))) + .collect(Collectors.toSet()); + assertEquals(partitionCount, relativePaths.size(), + "Each partition should resolve to a unique relative path after parallel SET_LOCATION"); + assertTrue(relativePaths.containsAll(existingPartitions), + "All original partition paths should still be present after parallel SET_LOCATION"); + } + /** * Exercises the TOUCH path in HiveQL mode with batching on. Verifies that * splitting one giant ALTER TABLE TOUCH PARTITION(...)... into multiple smaller diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java index daa98e5ebaec2..cac89dcf54bf9 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -187,4 +188,73 @@ void invalidSizeRejected() { assertThrows(IllegalArgumentException.class, () -> new HiveDriverPool(config, 0, factory)); } + + /** + * runOnEachWorker must execute the setup SQL on every worker (each on its bound + * thread) before {@code runAll()} fans the partition statements out. Without this, + * Hive 2.x's SET LOCATION would silently route to the wrong database on the workers + * that never saw the leading USE statement. + */ + @Test + void runOnEachWorkerRunsSetupOnEveryWorker() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + ConcurrentHashMap> sqlsByDriver = new ConcurrentHashMap<>(); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + sqlsByDriver.put(d, java.util.Collections.synchronizedList(new java.util.ArrayList<>())); + doAnswer((InvocationOnMock inv) -> { + sqlsByDriver.get(d).add(inv.getArgument(0)); + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 3, factory)) { + pool.runOnEachWorker(Arrays.asList("USE `db1`")); + List> futures = pool.runAll(Arrays.asList("ALTER 1", "ALTER 2", "ALTER 3")); + pool.awaitAll(futures); + + assertEquals(3, sqlsByDriver.size(), "Expected one Driver per worker"); + for (Map.Entry> e : sqlsByDriver.entrySet()) { + List seen = e.getValue(); + assertTrue(!seen.isEmpty() && seen.get(0).equals("USE `db1`"), + "Each worker must see USE first; saw " + seen); + } + } + } + + /** + * On the first failure, awaitAll must throw the original cause and cancel any + * futures that have not started yet. Futures already in-flight are not interrupted + * (per the cancel-with-mayInterruptIfRunning=false contract). + */ + @Test + void awaitAllCancelsPendingFuturesOnFirstError() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + // Single-worker pool so SQLs run strictly sequentially → the 2nd SQL is + // pending when the 1st errors, and must be cancelled. + CountDownLatch fired = new CountDownLatch(1); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + doAnswer(inv -> { + String sql = inv.getArgument(0); + if (sql.equals("FAIL")) { + fired.countDown(); + throw new RuntimeException("boom"); + } + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 1, factory)) { + List> futures = pool.runAll(Arrays.asList("FAIL", "PENDING_A", "PENDING_B")); + HoodieHiveSyncException ex = assertThrows(HoodieHiveSyncException.class, + () -> pool.awaitAll(futures)); + assertTrue(ex.getCause() != null && ex.getCause().getMessage().contains("boom")); + // The first future failed, so it's done (not cancelled). The remaining two + // were pending behind it on the single worker and should now be cancelled. + assertTrue(fired.await(1, TimeUnit.SECONDS), "Failing SQL must have run"); + assertTrue(futures.get(1).isCancelled(), "Pending future after error must be cancelled"); + assertTrue(futures.get(2).isCancelled(), "Pending future after error must be cancelled"); + } + } } From 7022e7d493416d600664235c3056d32ea4a1c179 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Wed, 17 Jun 2026 00:31:24 -0700 Subject: [PATCH 5/5] feat(hive-sync): parallelize DROP partitions in HiveQL sync mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to #18984 (HiveQL partition batching). The HiveQL executor's dropPartitionsToTable goes through IMetaStoreClient.dropPartition (Thrift), not Hive Driver — so it can't reuse HiveDriverPool. This change wires the existing IMetaStoreClientPool (from #18983) into HiveQueryDDLExecutor and uses it to fan drop batches across the pool's worker threads. Behavior: - batching.enabled=false (default): unchanged. dropPartitionsToTable iterates the partition list sequentially on the session metastore client, exactly as before. - batching.enabled=true: partitions are split into batches of HIVE_BATCH_SYNC_PARTITION_NUM, batches fan out across the pool's workers (one IMetaStoreClient per worker), and first-error semantics match the existing HMS-mode implementation (first failure thrown, subsequent failures logged at WARN). HoodieHiveSyncClient now builds partitionClientPool in both HMS and HIVEQL branches (and the legacy default-mode branch). The pool is closed in HoodieHiveSyncClient.close() before Hive.closeCurrent(), unchanged from #18983. Tests: - New end-to-end test: testHiveQLDropPartitionsWithBatching — creates 8 partitions, drops 4 through the parallel pool path with threads=3 and batch_num=2 (so we get multiple drop batches racing), asserts the remaining partition set matches. - Full hudi-hive-sync suite: 309 tests, 0 failures, 0 errors (was 308). Configs: no new configs. Reuses hoodie.datasource.hive_sync.batching.* from #18983 / #18984. Related: #18331 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../hudi/hive/HoodieHiveSyncClient.java | 16 ++-- .../hudi/hive/ddl/HiveQueryDDLExecutor.java | 79 ++++++++++++++++--- .../apache/hudi/hive/TestHiveSyncTool.java | 40 ++++++++++ 3 files changed, 119 insertions(+), 16 deletions(-) diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index 53beb0bccd8d2..7a821b0f17bc3 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -90,9 +90,11 @@ public class HoodieHiveSyncClient extends HoodieSyncClient { private final Map initialTableByName = new HashMap<>(); DDLExecutor ddlExecutor; private IMetaStoreClient client; - // Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HMS. Owned by this - // class; closed in close() before Hive.closeCurrent(). Hands clients to HMSDDLExecutor - // for partition-row work only — see IMetaStoreClientPool javadoc. + // Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HMS or HIVEQL. + // Owned by this class; closed in close() before Hive.closeCurrent(). Hands clients + // to the active DDL executor for partition-row work only — see IMetaStoreClientPool + // javadoc. HMSDDLExecutor uses it for add/alter/drop; HiveQueryDDLExecutor uses it + // only for DROP (Hive Thrift, not Hive Driver). private IMetaStoreClientPool partitionClientPool; // Present only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL (explicit // or legacy default). Owned by HiveQueryDDLExecutor; this field is kept for @@ -138,7 +140,9 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli break; case HIVEQL: this.partitionDriverPool = maybeBuildHiveDriverPool(config); - ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool); + this.partitionClientPool = maybeBuildPartitionClientPool(config); + ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool, + Option.ofNullable(this.partitionClientPool)); break; case JDBC: JDBCExecutor jdbcExecutor = new JDBCExecutor(config); @@ -157,7 +161,9 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli jdbcExecutor.getConnection(), databaseName); } else { this.partitionDriverPool = maybeBuildHiveDriverPool(config); - ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool); + this.partitionClientPool = maybeBuildPartitionClientPool(config); + ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool, + Option.ofNullable(this.partitionClientPool)); } } } catch (Exception e) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java index 8f52c099f1c02..13f6550b1c72f 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java @@ -18,12 +18,14 @@ package org.apache.hudi.hive.ddl; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.util.HiveDriverPool; import org.apache.hudi.hive.util.HivePartitionUtil; +import org.apache.hudi.hive.util.IMetaStoreClientPool; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -44,6 +46,7 @@ import java.util.concurrent.Future; import java.util.stream.Collectors; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; import static org.apache.hudi.sync.common.util.TableUtils.tableId; /** @@ -59,16 +62,22 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor { // (createTable, schema evolution, single-statement runSQL callers) always uses the // session `hiveDriver` above. See HiveDriverPool javadoc. private final Option driverPool; + // When present, dropPartitionsToTable fans batches across this Thrift client pool. + // Owned by HoodieHiveSyncClient; close() is delegated through there. See + // IMetaStoreClientPool javadoc for the usage contract (partition-row ops only). + private final Option metaStoreClientPool; public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient) { - this(config, metaStoreClient, Option.empty()); + this(config, metaStoreClient, Option.empty(), Option.empty()); } public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient, - Option driverPool) { + Option driverPool, + Option metaStoreClientPool) { super(config); this.metaStoreClient = metaStoreClient; this.driverPool = driverPool; + this.metaStoreClientPool = metaStoreClientPool; try { this.sessionState = new SessionState(config.getHiveConf(), UserGroupInformation.getCurrentUser().getShortUserName()); @@ -187,21 +196,69 @@ public void dropPartitionsToTable(String tableName, List partitionsToDro log.info("Drop partitions {} on {}", partitionsToDrop.size(), tableName); try { - for (String dropPartition : partitionsToDrop) { - if (HivePartitionUtil.partitionExists(metaStoreClient, tableName, dropPartition, partitionValueExtractor, - config)) { - String partitionClause = - HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, config); - metaStoreClient.dropPartition(databaseName, tableName, partitionClause, false); - } - log.info("Drop partition {} on {}", dropPartition, tableName); - } + int batchSyncPartitionNum = config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); + List> batches = CollectionUtils.batches(partitionsToDrop, batchSyncPartitionNum); + runDropBatches(tableName, batches); } catch (Exception e) { log.error("{} drop partition failed", tableId(databaseName, tableName), e); throw new HoodieHiveSyncException(tableId(databaseName, tableName) + " drop partition failed", e); } } + /** + * Drops partitions one batch at a time. When {@link #metaStoreClientPool} is present, + * batches fan out across the pool's worker threads (each borrowing an independent + * IMetaStoreClient); otherwise batches are dispatched sequentially against the + * session client. Hive has no batch-drop primitive that matches dropPartition's + * semantics, so each worker still iterates its chunk one partition at a time — the + * win is fanning chunks across independent Thrift clients. + */ + private void runDropBatches(String tableName, List> batches) throws Exception { + if (!metaStoreClientPool.isPresent()) { + for (List batch : batches) { + applyDropBatch(metaStoreClient, tableName, batch); + } + return; + } + IMetaStoreClientPool pool = metaStoreClientPool.get(); + List> futures = new ArrayList<>(batches.size()); + for (List batch : batches) { + futures.add(pool.executor().submit(() -> + pool.run(poolClient -> { + applyDropBatch(poolClient, tableName, batch); + return null; + }) + )); + } + Exception firstError = null; + for (Future f : futures) { + try { + f.get(); + } catch (Exception e) { + if (firstError == null) { + firstError = e; + } else { + log.warn("Additional drop batch failed on {} (suppressed in favor of first error)", tableName, e); + } + } + } + if (firstError != null) { + throw firstError; + } + } + + private void applyDropBatch(IMetaStoreClient poolClient, String tableName, List batch) throws Exception { + for (String dropPartition : batch) { + if (HivePartitionUtil.partitionExists(poolClient, tableName, dropPartition, + partitionValueExtractor, config)) { + String partitionClause = + HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, config); + poolClient.dropPartition(databaseName, tableName, partitionClause, false); + } + log.info("Drop partition {} on {}", dropPartition, tableName); + } + } + @Override public void close() { // Close the pool first so the worker threads stop dispatching against their diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 2c574b802ece1..1b0e52c83b761 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -400,6 +400,46 @@ public void testHiveQLSyncWithBatchingEnabled() throws Exception { "Incremental add via parallel HiveQL batching should sync the new partitions"); } + /** + * Exercises the DROP path in HiveQL mode with batching on. DROP routes through + * IMetaStoreClient.dropPartition (Thrift, not Hive Driver), so when batching is + * enabled it fans out across IMetaStoreClientPool. Verifies the partition set + * shrinks as expected when batches drop in parallel. + */ + @Test + public void testHiveQLDropPartitionsWithBatching() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HIVEQL.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "3"); + // Small batch_num so we get multiple drop batches dispatched in parallel. + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "2"); + + int partitionCount = 8; + HiveTestUtil.createCOWTable("100", partitionCount, true); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "All partitions should be added before drop test"); + + // Drop half the partitions through the parallel pool path. + List existing = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).stream() + .map(p -> getRelativePartitionPath(new Path(basePath), new Path(p.getStorageLocation()))) + .collect(Collectors.toList()); + List toDrop = existing.subList(0, partitionCount / 2); + hiveClient.dropPartitions(HiveTestUtil.TABLE_NAME, toDrop); + + List remaining = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); + assertEquals(partitionCount - toDrop.size(), remaining.size(), + "Parallel DROP should remove exactly the requested partitions"); + Set remainingPaths = remaining.stream() + .map(p -> getRelativePartitionPath(new Path(basePath), new Path(p.getStorageLocation()))) + .collect(Collectors.toSet()); + for (String dropped : toDrop) { + assertFalse(remainingPaths.contains(dropped), + "Dropped partition " + dropped + " must not appear in remaining set"); + } + } + /** * Exercises the SET_LOCATION path in HiveQL mode with batching on. SET_LOCATION * emits one ALTER PARTITION ... SET LOCATION statement per partition (Hive SQL