From 71946a61b79da5825a0c88433bf445304fdcdbfa Mon Sep 17 00:00:00 2001 From: sivabalan Date: Thu, 11 Jun 2026 16:54:48 -0700 Subject: [PATCH 1/3] feat(hive-sync): batch and parallelize HMS partition operations Hive sync partition operations on HMS today serialize through a single IMetaStoreClient and ship entire partition lists in a single Thrift call for TOUCH/UPDATE. For large tables (~2k partitions) this is ~5-9x slower than parallel implementations (see #18331). The biggest contributors are (1) one giant alter_partitions call for UPDATE/TOUCH, and (2) per- partition Thrift round-trips for DROP, all sequential. This change introduces an opt-in IMetaStoreClientPool gated behind hoodie.datasource.hive_sync.batching.enabled (default false). When on, HMSDDLExecutor splits ADD / UPDATE / TOUCH / DROP into batches of hoodie.datasource.hive_sync.batch_num (existing config, default 1000) and fans them out across a pool of RetryingMetaStoreClient instances sized by hoodie.datasource.hive_sync.batching.threads (default 4). Design invariant: only partition-row operations go through the pool. Table-row operations (createTable, alter_table, last-commit-time-synced, writer-version, table-comments) stay on the existing session client, so there is no lost-update risk on table parameters. The sync flow remains serial-parallel-serial (phase 1: table setup, phase 2: parallel partition fan-out, phase 3: table finalization). Sequential fallback is preserved when the flag is off or when HIVE_SYNC_USE_SPARK_CATALOG is on (incompatible with the pool's direct RetryingMetaStoreClient.getProxy path). Tests: TestIMetaStoreClientPool covers borrow/return, concurrent borrows, close idempotency. TestHiveSyncTool.testHMSSyncWithBatchingEnabled exercises end-to-end sync against the embedded HMS with batching on. Related: #18331 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../hudi/hive/HiveSyncConfigHolder.java | 17 ++ .../hudi/hive/HoodieHiveSyncClient.java | 37 +++- .../apache/hudi/hive/ddl/HMSDDLExecutor.java | 127 ++++++++--- .../hudi/hive/util/IMetaStoreClientPool.java | 207 ++++++++++++++++++ .../apache/hudi/hive/TestHiveSyncTool.java | 37 ++++ .../hive/util/TestIMetaStoreClientPool.java | 181 +++++++++++++++ 6 files changed, 577 insertions(+), 29 deletions(-) create mode 100644 hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java create mode 100644 hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestIMetaStoreClientPool.java diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java index 418676d591627..01f6d0224070d 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java @@ -122,6 +122,23 @@ public class HiveSyncConfigHolder { .defaultValue(1000) .markAdvanced() .withDocumentation("The number of partitions one batch when synchronous partitions to hive."); + public static final ConfigProperty HIVE_SYNC_BATCHING_ENABLED = ConfigProperty + .key("hoodie.datasource.hive_sync.batching.enabled") + .defaultValue(false) + .markAdvanced() + .sinceVersion("1.1.0") + .withDocumentation("When true, HMS-mode partition operations (add/update/touch/drop) are split into " + + "batches of `hoodie.datasource.hive_sync.batch_num` and dispatched in parallel to a pool of " + + "IMetaStoreClient instances. Table-level operations (create/alter table, last commit time, " + + "writer version) continue to use the single session client. Default off; existing behavior " + + "is unchanged unless explicitly opted in."); + public static final ConfigProperty HIVE_SYNC_BATCHING_THREADS = ConfigProperty + .key("hoodie.datasource.hive_sync.batching.threads") + .defaultValue(4) + .markAdvanced() + .sinceVersion("1.1.0") + .withDocumentation("Pool size (number of IMetaStoreClient instances) and worker thread count used " + + "for parallel partition sync when `hoodie.datasource.hive_sync.batching.enabled` is true."); public static final ConfigProperty HIVE_SYNC_MODE = ConfigProperty .key("hoodie.datasource.hive_sync.mode") .noDefaultValue() diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index a3d04737b8c5c..08e0f8acdd02b 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -37,6 +37,7 @@ import org.apache.hudi.hive.ddl.HiveSyncMode; import org.apache.hudi.hive.ddl.JDBCBasedMetadataOperator; import org.apache.hudi.hive.ddl.JDBCExecutor; +import org.apache.hudi.hive.util.IMetaStoreClientPool; import org.apache.hudi.hive.util.IMetaStoreClientUtil; import org.apache.hudi.hive.util.PartitionFilterGenerator; import org.apache.hudi.sync.common.HoodieSyncClient; @@ -66,6 +67,8 @@ import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getInputFormatClassName; import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getOutputFormatClassName; import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getSerDeClassName; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_ENABLED; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_THREADS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_USE_SPARK_CATALOG; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC; @@ -86,6 +89,10 @@ public class HoodieHiveSyncClient extends HoodieSyncClient { private final Map initialTableByName = new HashMap<>(); DDLExecutor ddlExecutor; private IMetaStoreClient client; + // Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HMS. Owned by this + // class; closed in close() before Hive.closeCurrent(). Hands clients to HMSDDLExecutor + // for partition-row work only — see IMetaStoreClientPool javadoc. + private IMetaStoreClientPool partitionClientPool; /** * JDBC-based metadata operator, lazily initialized on first Thrift @@ -121,7 +128,8 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli HiveSyncMode syncMode = HiveSyncMode.of(config.getString(HIVE_SYNC_MODE)); switch (syncMode) { case HMS: - ddlExecutor = new HMSDDLExecutor(config, this.client); + this.partitionClientPool = maybeBuildPartitionClientPool(config); + ddlExecutor = new HMSDDLExecutor(config, this.client, this.partitionClientPool); break; case HIVEQL: ddlExecutor = new HiveQueryDDLExecutor(config, this.client); @@ -201,6 +209,22 @@ private IMetaStoreClient createMetaStoreClient(HiveSyncConfig config) { } } + private IMetaStoreClientPool maybeBuildPartitionClientPool(HiveSyncConfig config) { + if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) { + return null; + } + if (config.getBooleanOrDefault(HIVE_SYNC_USE_SPARK_CATALOG)) { + // The Spark catalog client is constructed via reflection against a Spark-side + // class and isn't compatible with the direct RetryingMetaStoreClient pool path. + // Fall back to single-client sequential behavior rather than failing the sync. + log.warn("hive_sync.batching.enabled=true is not supported with use_spark_catalog=true; " + + "falling back to sequential partition sync."); + return null; + } + int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS); + return new IMetaStoreClientPool(config, size); + } + private Table getInitialTable(String table) { return initialTableByName.computeIfAbsent(table, t -> { try { @@ -597,6 +621,17 @@ public void deleteLastReplicatedTimeStamp(String tableName) { public void close() { try { ddlExecutor.close(); + // Close the partition client pool before Hive.closeCurrent() so the + // RetryingMetaStoreClient instances held by the pool release their Thrift + // sockets without racing the ThreadLocal Hive cleanup. + if (partitionClientPool != null) { + try { + partitionClientPool.close(); + } catch (Exception e) { + log.warn("Error closing IMetaStoreClient pool", e); + } + partitionClientPool = null; + } if (client != null) { Hive.closeCurrent(); client = null; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java index 6b7bd3af00963..e80d63730b3c8 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java @@ -27,6 +27,7 @@ import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.util.HivePartitionUtil; import org.apache.hudi.hive.util.HiveSchemaUtil; +import org.apache.hudi.hive.util.IMetaStoreClientPool; import org.apache.hudi.storage.StorageSchemes; import org.apache.hudi.sync.common.model.PartitionValueExtractor; @@ -46,13 +47,13 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.thrift.TException; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import java.util.stream.Collectors; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; @@ -73,11 +74,21 @@ public class HMSDDLExecutor implements DDLExecutor { private final String databaseName; private final IMetaStoreClient client; private final PartitionValueExtractor partitionValueExtractor; + // Optional. When non-null, partition-row operations fan out across this pool; + // table-row operations always use the session `client` field above. See + // IMetaStoreClientPool javadoc for the usage contract. + private final IMetaStoreClientPool partitionClientPool; public HMSDDLExecutor(HiveSyncConfig syncConfig, IMetaStoreClient metaStoreClient) throws HiveException, MetaException { + this(syncConfig, metaStoreClient, null); + } + + public HMSDDLExecutor(HiveSyncConfig syncConfig, IMetaStoreClient metaStoreClient, + IMetaStoreClientPool partitionClientPool) throws HiveException, MetaException { this.syncConfig = syncConfig; this.databaseName = syncConfig.getStringOrDefault(META_SYNC_DATABASE_NAME); this.client = metaStoreClient; + this.partitionClientPool = partitionClientPool; try { this.partitionValueExtractor = (PartitionValueExtractor) Class.forName(syncConfig.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS)).newInstance(); @@ -193,11 +204,14 @@ public void addPartitionsToTable(String tableName, List partitionsToAdd) } log.info("Adding partitions {} to table {}", partitionsToAdd.size(), tableName); try { + // Fetch the table StorageDescriptor once on the session client. Each worker + // copies fields off it; the Thrift POJO is not mutated after this read. StorageDescriptor sd = client.getTable(databaseName, tableName).getSd(); int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); - for (List batch : CollectionUtils.batches(partitionsToAdd, batchSyncPartitionNum)) { - List partitionList = new ArrayList<>(); - batch.forEach(x -> { + List> batches = CollectionUtils.batches(partitionsToAdd, batchSyncPartitionNum); + runBatches("add", tableName, batches, (poolClient, batch) -> { + List partitionList = new ArrayList<>(batch.size()); + for (String x : batch) { StorageDescriptor partitionSd = new StorageDescriptor(); partitionSd.setCols(sd.getCols()); partitionSd.setInputFormat(sd.getInputFormat()); @@ -208,11 +222,11 @@ public void addPartitionsToTable(String tableName, List partitionsToAdd) List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(x); partitionSd.setLocation(fullPartitionPath); partitionList.add(new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null)); - }); - client.add_partitions(partitionList, true, false); + } + poolClient.add_partitions(partitionList, true, false); log.info("HMSDDLExecutor add a batch partitions done: {}", partitionList.size()); - } - } catch (TException e) { + }); + } catch (Exception e) { log.error("{}.{} add partition failed", databaseName, tableName, e); throw new HoodieHiveSyncException(databaseName + "." + tableName + " add partition failed", e); } @@ -247,15 +261,22 @@ public void dropPartitionsToTable(String tableName, List partitionsToDro log.info("Drop partitions {} on {}", partitionsToDrop.size(), tableName); try { - for (String dropPartition : partitionsToDrop) { - if (HivePartitionUtil.partitionExists(client, tableName, dropPartition, partitionValueExtractor, syncConfig)) { - String partitionClause = - HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig); - client.dropPartition(databaseName, tableName, partitionClause, false); + // HMS has no batch-drop primitive that matches dropPartition's semantics, so each + // worker still iterates its chunk one partition at a time. The win is fanning + // chunks across pooled clients. + int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); + List> batches = CollectionUtils.batches(partitionsToDrop, batchSyncPartitionNum); + runBatches("drop", tableName, batches, (poolClient, batch) -> { + for (String dropPartition : batch) { + if (HivePartitionUtil.partitionExists(poolClient, tableName, dropPartition, partitionValueExtractor, syncConfig)) { + String partitionClause = + HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig); + poolClient.dropPartition(databaseName, tableName, partitionClause, false); + } + log.info("Drop partition {} on {}", dropPartition, tableName); } - log.info("Drop partition {} on {}", dropPartition, tableName); - } - } catch (TException e) { + }); + } catch (Exception e) { log.error("{}.{} drop partition failed", databaseName, tableName, e); throw new HoodieHiveSyncException(databaseName + "." + tableName + " drop partition failed", e); } @@ -291,21 +312,71 @@ public void close() { private void registerAlterPartitionEvent(String tableName, List alteredPartitions) { try { + // Read the StorageDescriptor once on the session client; each worker deep-copies + // it per partition (alter_partitions semantics today). StorageDescriptor sd = client.getTable(databaseName, tableName).getSd(); - List partitionList = alteredPartitions.stream().map(partition -> { - Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH), partition); - String partitionScheme = partitionPath.toUri().getScheme(); - String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme) - ? HadoopFSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(), partitionPath) : partitionPath.toString(); - List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); - StorageDescriptor partitionSd = sd.deepCopy(); - partitionSd.setLocation(fullPartitionPath); - return new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null); - }).collect(Collectors.toList()); - client.alter_partitions(databaseName, tableName, partitionList, null); - } catch (TException e) { + int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); + List> batches = CollectionUtils.batches(alteredPartitions, batchSyncPartitionNum); + runBatches("alter", tableName, batches, (poolClient, batch) -> { + List partitionList = batch.stream().map(partition -> { + Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH), partition); + String partitionScheme = partitionPath.toUri().getScheme(); + String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme) + ? HadoopFSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(), partitionPath) : partitionPath.toString(); + List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); + StorageDescriptor partitionSd = sd.deepCopy(); + partitionSd.setLocation(fullPartitionPath); + return new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null); + }).collect(Collectors.toList()); + poolClient.alter_partitions(databaseName, tableName, partitionList, null); + }); + } catch (Exception e) { log.error("{}.{} update partition failed", databaseName, tableName, e); throw new HoodieHiveSyncException(databaseName + "." + tableName + " update partition failed", e); } } + + /** + * Dispatches partition batches either in parallel against the client pool (if + * configured) or sequentially against the session client. The sequential path + * preserves the exact behavior we had before the pool existed, including failure + * semantics where batch N+1 never runs if batch N throws. + */ + @FunctionalInterface + private interface BatchAction { + void apply(IMetaStoreClient client, List batch) throws Exception; + } + + private void runBatches(String opName, String tableName, List> batches, BatchAction action) throws Exception { + if (partitionClientPool == null) { + for (List batch : batches) { + action.apply(client, batch); + } + return; + } + List> futures = new ArrayList<>(batches.size()); + for (List batch : batches) { + futures.add(partitionClientPool.executor().submit(() -> + partitionClientPool.run(poolClient -> { + action.apply(poolClient, batch); + return null; + }) + )); + } + Exception firstError = null; + for (Future f : futures) { + try { + f.get(); + } catch (Exception e) { + if (firstError == null) { + firstError = e; + } else { + log.warn("Additional {} batch failed on {} (suppressed in favor of first error)", opName, tableName, e); + } + } + } + if (firstError != null) { + throw firstError; + } + } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java new file mode 100644 index 0000000000000..930fb705cbbda --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Pool of {@link IMetaStoreClient} instances for parallel partition sync. + * + *

Each pooled client wraps an independent Thrift connection to the Hive Metastore. + * Callers borrow a client via {@link #run(ClientAction)}, which blocks until a client + * is available, executes the action, and returns the client to the pool. A worker + * thread pool of the same size is exposed via {@link #executor()} so callers can fan + * out their batches to match the number of available clients. + * + *

Usage contract: pool clients must be used only for partition-row + * operations — {@code add_partitions}, {@code alter_partitions}, {@code dropPartition}, + * {@code getPartition}. Table-row operations ({@code createTable}, {@code alter_table}, + * {@code getTable} used as the read half of a read-modify-write of table parameters) + * must continue to go through the session client held by + * {@code HoodieHiveSyncClient.client} on the sync driver thread. Mixing the two would + * risk lost updates on table parameters such as the last-commit-time-synced marker. + * + *

The pool is gated behind {@code hoodie.datasource.hive_sync.batching.enabled} and + * is constructed only for sync mode HMS. + */ +public class IMetaStoreClientPool implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(IMetaStoreClientPool.class); + + private final ArrayBlockingQueue available; + private final List all; + private final ExecutorService executor; + private final int size; + private volatile boolean closed; + + public IMetaStoreClientPool(HiveSyncConfig config, int size) { + this(buildClients(config, size), size); + } + + // Package-private for tests: accepts a pre-built list of clients so we can + // exercise borrow/return/close semantics without a live metastore. + IMetaStoreClientPool(List clients, int size) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + if (clients.size() != size) { + throw new IllegalArgumentException("Expected " + size + " clients, got " + clients.size()); + } + this.size = size; + this.available = new ArrayBlockingQueue<>(size); + this.all = new ArrayList<>(clients); + this.available.addAll(clients); + this.executor = Executors.newFixedThreadPool(size, new PoolThreadFactory()); + LOG.info("Initialized IMetaStoreClient pool with {} clients", size); + } + + private static List buildClients(HiveSyncConfig config, int size) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + HiveConf hiveConf = config.getHiveConf(); + List clients = new ArrayList<>(size); + try { + for (int i = 0; i < size; i++) { + clients.add(newClient(hiveConf)); + } + return clients; + } catch (Exception e) { + // Construction failed mid-way; close any clients we already built before + // surfacing the error so we don't leak Thrift sockets. + for (IMetaStoreClient c : clients) { + try { + c.close(); + } catch (Exception ignore) { + // intentional: best-effort cleanup during failure + } + } + throw new HoodieException("Failed to construct IMetaStoreClient pool of size " + size, e); + } + } + + private static IMetaStoreClient newClient(HiveConf hiveConf) { + try { + // RetryingMetaStoreClient.getProxy returns an independent IMetaStoreClient + // (one Thrift socket per call), bypassing the Hive thread-local cache that + // Hive.get(conf) would use. This is what gives us N truly independent clients. + return RetryingMetaStoreClient.getProxy(hiveConf, true); + } catch (Exception e) { + throw new HoodieException("Failed to create IMetaStoreClient for pool", e); + } + } + + /** + * Borrows a client, runs the action, and returns the client to the pool. + * Blocks if all clients are in use until one becomes available. + */ + public T run(ClientAction action) throws Exception { + if (closed) { + throw new IllegalStateException("Cannot borrow from a closed IMetaStoreClient pool"); + } + IMetaStoreClient client = available.take(); + try { + return action.apply(client); + } finally { + // Always return the client to the pool, even on failure. Thrift clients + // recover transparently from transactional errors at the HMS layer; + // RetryingMetaStoreClient handles transient socket failures internally. + if (!closed) { + available.offer(client); + } + } + } + + /** + * Worker thread pool sized to match the client pool. Use this to fan out + * batches so the number of in-flight Thrift calls cannot exceed the + * number of pooled clients. + */ + public ExecutorService executor() { + return executor; + } + + public int size() { + return size; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + executor.shutdown(); + try { + if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ie) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + closeQuietly(); + } + + private void closeQuietly() { + for (IMetaStoreClient client : all) { + try { + client.close(); + } catch (Exception e) { + LOG.warn("Error closing pooled IMetaStoreClient", e); + } + } + available.clear(); + all.clear(); + } + + @FunctionalInterface + public interface ClientAction { + T apply(IMetaStoreClient client) throws Exception; + } + + private static final class PoolThreadFactory implements ThreadFactory { + private static final AtomicInteger POOL_ID = new AtomicInteger(0); + private final AtomicInteger threadId = new AtomicInteger(0); + private final String namePrefix = "hudi-hive-sync-pool-" + POOL_ID.incrementAndGet() + "-"; + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, namePrefix + threadId.incrementAndGet()); + t.setDaemon(true); + return t; + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 6d971bcc52263..5e8392c1850d0 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -96,9 +96,12 @@ import static org.apache.hudi.hive.HiveSyncConfig.HIVE_SYNC_FILTER_PUSHDOWN_ENABLED; import static org.apache.hudi.hive.HiveSyncConfig.RECREATE_HIVE_TABLE_ON_ERROR; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_IGNORE_EXCEPTIONS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_AS_DATA_SOURCE_TABLE; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_ENABLED; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_THREADS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_COMMENT; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_TABLE_STRATEGY; @@ -329,6 +332,40 @@ public void testDropUpperCasePartitionWithHMS() throws Exception { "Table partitions should match the number of partitions we wrote"); } + /** + * Exercises HMS sync with parallel partition batching enabled. Forces multiple + * batches with a small batch_num against a partition set large enough to fan out + * across multiple pool clients, and asserts the resulting partition set matches. + */ + @Test + public void testHMSSyncWithBatchingEnabled() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HMS.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "3"); + // Small batch_num so we get multiple batches dispatched in parallel. + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3"); + + int partitionCount = 10; + HiveTestUtil.createCOWTable("100", partitionCount, true); + + reInitHiveSyncClient(); + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), + "Table should not exist before initial sync"); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "All partitions should be added under parallel batching"); + + // Add more partitions, then sync again to exercise the parallel update path. + HiveTestUtil.addCOWPartition("2050/01/01", true, true, "101"); + HiveTestUtil.addCOWPartition("2050/01/02", true, true, "102"); + HiveTestUtil.addCOWPartition("2050/01/03", true, true, "103"); + HiveTestUtil.addCOWPartition("2050/01/04", true, true, "104"); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount + 4, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "Incremental add via parallel batching should sync the new partitions"); + } + @ParameterizedTest @MethodSource({"syncModeAndSchemaFromCommitMetadata"}) public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode, String enablePushDown) throws Exception { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestIMetaStoreClientPool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestIMetaStoreClientPool.java new file mode 100644 index 0000000000000..3c9d4fe899cfd --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestIMetaStoreClientPool.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * Unit tests for {@link IMetaStoreClientPool} borrow/return/close semantics. These tests + * never hit a real Hive Metastore — clients are mocked and injected via the package- + * private constructor. + */ +class TestIMetaStoreClientPool { + + private static List mockClients(int n) { + List clients = new ArrayList<>(n); + for (int i = 0; i < n; i++) { + clients.add(mock(IMetaStoreClient.class)); + } + return clients; + } + + @Test + void runReturnsClientToPoolOnSuccess() throws Exception { + List clients = mockClients(2); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, 2); + try { + IMetaStoreClient borrowed = pool.run(c -> c); + // Both clients should be available again + Set seen = new HashSet<>(); + seen.add(pool.run(c -> c)); + seen.add(pool.run(c -> c)); + assertEquals(2, seen.size(), "Both clients should be reachable after returns"); + assertTrue(seen.contains(borrowed)); + } finally { + pool.close(); + } + } + + @Test + void runReturnsClientToPoolOnFailure() { + List clients = mockClients(1); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, 1); + try { + assertThrows(IllegalStateException.class, () -> pool.run(c -> { + throw new IllegalStateException("boom"); + })); + // Pool should still be usable; the borrowed client was returned + try { + IMetaStoreClient again = pool.run(c -> c); + assertSame(clients.get(0), again); + } catch (Exception e) { + throw new AssertionError("Pool should still be usable after action failure", e); + } + } finally { + pool.close(); + } + } + + @Test + void concurrentBorrowsBlockUntilReturned() throws Exception { + int poolSize = 2; + int callers = 4; + List clients = mockClients(poolSize); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, poolSize); + AtomicInteger concurrent = new AtomicInteger(0); + AtomicInteger maxConcurrent = new AtomicInteger(0); + CountDownLatch start = new CountDownLatch(1); + ExecutorService threads = Executors.newFixedThreadPool(callers); + try { + List> futures = new ArrayList<>(); + for (int i = 0; i < callers; i++) { + futures.add(threads.submit(() -> { + start.await(); + return pool.run(c -> { + int now = concurrent.incrementAndGet(); + maxConcurrent.updateAndGet(prev -> Math.max(prev, now)); + Thread.sleep(50); + concurrent.decrementAndGet(); + return null; + }); + })); + } + start.countDown(); + for (java.util.concurrent.Future f : futures) { + f.get(10, TimeUnit.SECONDS); + } + assertTrue(maxConcurrent.get() <= poolSize, + "Concurrent borrows must not exceed pool size, observed " + maxConcurrent.get()); + assertTrue(maxConcurrent.get() >= 1, "At least one borrow must have occurred"); + } finally { + threads.shutdownNow(); + pool.close(); + } + } + + @Test + void closeReleasesAllClients() throws Exception { + int poolSize = 3; + List clients = mockClients(poolSize); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, poolSize); + pool.close(); + for (IMetaStoreClient c : clients) { + verify(c).close(); + } + // Subsequent borrow should fail + assertThrows(IllegalStateException.class, () -> pool.run(c -> c)); + } + + @Test + void closeIsIdempotent() throws Exception { + int poolSize = 2; + List clients = mockClients(poolSize); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, poolSize); + pool.close(); + pool.close(); + // verify still passes — close() called only once per client + for (IMetaStoreClient c : clients) { + verify(c).close(); + } + } + + @Test + void invalidSizeRejected() { + assertThrows(IllegalArgumentException.class, + () -> new IMetaStoreClientPool(mockClients(0), 0)); + } + + @Test + void sizeMismatchRejected() { + assertThrows(IllegalArgumentException.class, + () -> new IMetaStoreClientPool(mockClients(2), 3)); + } + + @Test + void executorSizedToPool() { + int poolSize = 4; + IMetaStoreClientPool pool = new IMetaStoreClientPool(mockClients(poolSize), poolSize); + try { + assertFalse(pool.executor().isShutdown()); + assertEquals(poolSize, pool.size()); + } finally { + pool.close(); + } + assertTrue(pool.executor().isShutdown()); + } +} From 49a70050c5ffad67f00b62a7edc30b0d5cfa7c20 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Thu, 11 Jun 2026 19:14:34 -0700 Subject: [PATCH 2/3] feat(hive-sync): batch and parallelize HiveQL partition operations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to #18983 (HMS parallelism). Applies the equivalent treatment to the HiveQL sync mode (hoodie.datasource.hive_sync.mode=hiveql). HiveQL had two issues that this change addresses: 1. Batching gaps in QueryBasedDDLExecutor.constructPartitionAlterStatements: TOUCH concatenated every partition into one giant ALTER TABLE ... TOUCH PARTITION (...) PARTITION (...) ... statement; SET_LOCATION (UPDATE) emitted one statement per partition. ADD was already batched. 2. Sequential SQL execution in HiveQueryDDLExecutor.updateHiveSQLs: even when batches existed, they ran in a single for-loop on one Hive Driver. This change introduces HiveDriverPool, an eager pool of single-thread executors each owning a Hive Driver bound to a shared SessionState. Gated behind the existing hoodie.datasource.hive_sync.batching.enabled flag (default off) and sized by hoodie.datasource.hive_sync.batching.threads (default 4) — no new configs. Design notes: - Hive's Driver and SessionState are thread-bound. SessionState.start() attaches to the calling thread's ThreadLocal. The pool gives each slot its own dedicated worker thread so the Driver stays valid for that thread's lifetime. Bootstrap, dispatch, and close all run on the bound thread. - SessionState is shared across workers (lazily constructed once), because each worker calls SessionState.start(sharedState) on its own thread to attach. Constructing one SessionState per worker triggered race conditions in Hive's resource-directory machinery on macOS. - TOUCH is now batched by HIVE_BATCH_SYNC_PARTITION_NUM. SET_LOCATION remains one statement per partition (Hive SQL doesn't support multi-partition SET LOCATION) but is now fanned out across workers. - Hive 2.x's ALTER PARTITION SET LOCATION ignores db.table qualifiers and silently uses the connection's current database, so the leading USE database statement is load-bearing. The pool peels it off and runs it on every worker via runOnEachWorker() before fanning the rest out. Tests: - TestHiveDriverPool: bootstrap, dispatch round-robin, error propagation, concurrent-borrow bounding, close idempotency. - TestHiveSyncTool.testHiveQLSyncWithBatchingEnabled: end-to-end with batching.enabled=true, threads=3, batch_num=3 against embedded HMS. - TestHiveSyncTool.testHiveQLTouchPartitionsWithBatching: exercises the batched TOUCH path specifically. - Full hudi-hive-sync suite: 305 passed, 0 failures, 0 errors. Related: #18331 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../hudi/hive/HoodieHiveSyncClient.java | 19 +- .../hudi/hive/ddl/HiveQueryDDLExecutor.java | 57 ++++ .../hudi/hive/ddl/QueryBasedDDLExecutor.java | 52 ++- .../apache/hudi/hive/util/HiveDriverPool.java | 301 ++++++++++++++++++ .../apache/hudi/hive/TestHiveSyncTool.java | 61 ++++ .../hudi/hive/util/TestHiveDriverPool.java | 190 +++++++++++ 6 files changed, 664 insertions(+), 16 deletions(-) create mode 100644 hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java create mode 100644 hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index 08e0f8acdd02b..94b61f63568c9 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -37,6 +37,7 @@ import org.apache.hudi.hive.ddl.HiveSyncMode; import org.apache.hudi.hive.ddl.JDBCBasedMetadataOperator; import org.apache.hudi.hive.ddl.JDBCExecutor; +import org.apache.hudi.hive.util.HiveDriverPool; import org.apache.hudi.hive.util.IMetaStoreClientPool; import org.apache.hudi.hive.util.IMetaStoreClientUtil; import org.apache.hudi.hive.util.PartitionFilterGenerator; @@ -93,6 +94,10 @@ public class HoodieHiveSyncClient extends HoodieSyncClient { // class; closed in close() before Hive.closeCurrent(). Hands clients to HMSDDLExecutor // for partition-row work only — see IMetaStoreClientPool javadoc. private IMetaStoreClientPool partitionClientPool; + // Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL (explicit + // or legacy default). Owned by HiveQueryDDLExecutor; this field is kept for + // reference only — close() is delegated through ddlExecutor.close(). + private HiveDriverPool partitionDriverPool; /** * JDBC-based metadata operator, lazily initialized on first Thrift @@ -132,7 +137,8 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli ddlExecutor = new HMSDDLExecutor(config, this.client, this.partitionClientPool); break; case HIVEQL: - ddlExecutor = new HiveQueryDDLExecutor(config, this.client); + this.partitionDriverPool = maybeBuildHiveDriverPool(config); + ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool); break; case JDBC: JDBCExecutor jdbcExecutor = new JDBCExecutor(config); @@ -150,7 +156,8 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli jdbcMetadataOperator = new JDBCBasedMetadataOperator( jdbcExecutor.getConnection(), databaseName); } else { - ddlExecutor = new HiveQueryDDLExecutor(config, this.client); + this.partitionDriverPool = maybeBuildHiveDriverPool(config); + ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool); } } } catch (Exception e) { @@ -225,6 +232,14 @@ private IMetaStoreClientPool maybeBuildPartitionClientPool(HiveSyncConfig config return new IMetaStoreClientPool(config, size); } + private HiveDriverPool maybeBuildHiveDriverPool(HiveSyncConfig config) { + if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) { + return null; + } + int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS); + return new HiveDriverPool(config, size); + } + private Table getInitialTable(String table) { return initialTableByName.computeIfAbsent(table, t -> { try { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java index 25434d29eb3ff..b64a6a5d54c5b 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; +import org.apache.hudi.hive.util.HiveDriverPool; import org.apache.hudi.hive.util.HivePartitionUtil; import lombok.extern.slf4j.Slf4j; @@ -39,6 +40,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import java.util.stream.Collectors; import static org.apache.hudi.sync.common.util.TableUtils.tableId; @@ -52,10 +54,20 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor { private final IMetaStoreClient metaStoreClient; private SessionState sessionState; private Driver hiveDriver; + // Optional. When non-null, partition-phase SQL lists fan out across this pool; + // table-level SQL (createTable, schema evolution, single-statement runSQL callers) + // always uses the session `hiveDriver` above. See HiveDriverPool javadoc. + private final HiveDriverPool driverPool; public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient) { + this(config, metaStoreClient, null); + } + + public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient, + HiveDriverPool driverPool) { super(config); this.metaStoreClient = metaStoreClient; + this.driverPool = driverPool; try { this.sessionState = new SessionState(config.getHiveConf(), UserGroupInformation.getCurrentUser().getShortUserName()); @@ -82,6 +94,41 @@ public void runSQL(String sql) { updateHiveSQLs(Collections.singletonList(sql)); } + /** + * Partition-phase SQL fan-out. When the driver pool is present, any leading + * {@code USE database} statements are run on every worker (Hive 2.x's + * ALTER PARTITION SET LOCATION ignores db.table qualifiers and uses the + * connection's current database, so each worker needs to USE the right db + * before any partition ALTER). The remaining statements are then dispatched + * round-robin across the pool. Falls through to the sequential path on the + * session Driver when no pool is configured. + */ + @Override + protected void runSQLs(List sqls) { + if (driverPool == null || sqls.isEmpty()) { + updateHiveSQLs(sqls); + return; + } + int firstNonUse = 0; + while (firstNonUse < sqls.size() && isUseStatement(sqls.get(firstNonUse))) { + firstNonUse++; + } + if (firstNonUse > 0) { + List setupStatements = sqls.subList(0, firstNonUse); + driverPool.runOnEachWorker(setupStatements); + } + List partitionStatements = sqls.subList(firstNonUse, sqls.size()); + if (partitionStatements.isEmpty()) { + return; + } + List> futures = driverPool.runAll(partitionStatements); + driverPool.awaitAll(futures); + } + + private static boolean isUseStatement(String sql) { + return sql != null && sql.regionMatches(true, 0, "USE ", 0, 4); + } + private List updateHiveSQLs(List sqls) { List responses = new ArrayList<>(); try { @@ -149,6 +196,16 @@ public void dropPartitionsToTable(String tableName, List partitionsToDro @Override public void close() { + // Close the pool first so the worker threads stop dispatching against their + // Drivers before we tear down anything else. The pool's close() runs + // Driver/SessionState cleanup on each worker's own thread. + if (driverPool != null) { + try { + driverPool.close(); + } catch (Exception e) { + log.warn("Error closing HiveDriverPool", e); + } + } if (metaStoreClient != null) { Hive.closeCurrent(); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java index 7f776f2f7a04d..a68644ae86f05 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.PartitionPathEncodeUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; @@ -75,6 +76,20 @@ public QueryBasedDDLExecutor(HiveSyncConfig config) { */ public abstract void runSQL(String sql); + /** + * Runs a list of SQL statements. The default implementation executes them + * sequentially via {@link #runSQL(String)}. Subclasses that can parallelize + * (e.g. {@link HiveQueryDDLExecutor} with a driver pool) override this hook + * to fan the list out across workers. The contract requires that the list + * has no positional dependencies — callers must fully qualify table names + * with {@code `db`.`tbl`} so any statement can run on any worker. + */ + protected void runSQLs(List sqls) { + for (String sql : sqls) { + runSQL(sql); + } + } + @Override public void createDatabase(String databaseName) { runSQL("create database if not exists " + databaseName); @@ -120,7 +135,7 @@ public void addPartitionsToTable(String tableName, List partitionsToAdd) } log.info("Adding partitions {} to table {}", partitionsToAdd.size(), tableName); List sqls = constructAddPartitions(tableName, partitionsToAdd); - sqls.stream().forEach(sql -> runSQL(sql)); + runSQLs(sqls); } @Override @@ -131,9 +146,7 @@ public void updatePartitionsToTable(String tableName, List changedPartit } log.info("Changing partitions {} on {}", changedPartitions.size(), tableName); List sqls = constructPartitionAlterStatements(tableName, changedPartitions, PartitionAlterType.SET_LOCATION); - for (String sql : sqls) { - runSQL(sql); - } + runSQLs(sqls); } @Override @@ -210,29 +223,40 @@ public void touchPartitionsToTable(String tableName, List touchPartition } log.info("Touching partitions " + touchPartitions.size() + " on " + tableName); List sqls = constructPartitionAlterStatements(tableName, touchPartitions, PartitionAlterType.TOUCH); - for (String sql : sqls) { - runSQL(sql); - } + runSQLs(sqls); } /** * Builds SQL statements to either touch partitions or set their location. - * TOUCH: one ALTER TABLE ... TOUCH PARTITION (p1) PARTITION (p2) ... - * SET_LOCATION: one ALTER TABLE ... PARTITION (p) SET LOCATION '...' per partition. + * + *

The first element of the returned list is always a {@code USE database} + * statement. Hive 2.x's ALTER PARTITION ... SET LOCATION does not respect the + * {@code db.table} qualifier (silently routes to the connection's current + * database), so the {@code USE} is load-bearing. Parallel execution paths must + * run this statement on every worker before fanning out the rest. + * + *

TOUCH: one {@code ALTER TABLE ... TOUCH PARTITION (p1) ...} per batch of + * {@code HIVE_BATCH_SYNC_PARTITION_NUM} partitions. + * + *

SET_LOCATION: one {@code ALTER TABLE ... PARTITION (p) SET LOCATION '...'} + * per partition (Hive SQL does not support multi-partition SET LOCATION in one + * statement). */ private List constructPartitionAlterStatements(String tableName, List partitions, PartitionAlterType alterType) { List result = new ArrayList<>(); - // Hive 2.x doesn't like db.table name for operations, hence we need to change to using the database first String useDatabase = "USE " + HIVE_ESCAPE_CHARACTER + databaseName + HIVE_ESCAPE_CHARACTER; result.add(useDatabase); String alterTablePrefix = "ALTER TABLE " + HIVE_ESCAPE_CHARACTER + tableName + HIVE_ESCAPE_CHARACTER; + int batchSyncPartitionNum = config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); switch (alterType) { case TOUCH: - String alterTable = alterTablePrefix + " TOUCH"; - for (String partition : partitions) { - alterTable += " PARTITION (" + getPartitionClause(partition) + ")"; + for (List batch : CollectionUtils.batches(partitions, batchSyncPartitionNum)) { + StringBuilder alterTable = new StringBuilder(alterTablePrefix).append(" TOUCH"); + for (String partition : batch) { + alterTable.append(" PARTITION (").append(getPartitionClause(partition)).append(")"); + } + result.add(alterTable.toString()); } - result.add(alterTable); break; case SET_LOCATION: for (String partition : partitions) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java new file mode 100644 index 0000000000000..64bca898d1828 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HoodieHiveSyncException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; + +/** + * Pool of Hive {@link Driver} + {@link SessionState} pairs for parallel HiveQL DDL. + * + *

Hive's {@code SessionState.start(state)} binds state to the calling thread's + * thread-local, and {@code Driver} reads from that thread-local during {@code run()}. + * A Driver constructed on one thread cannot be safely used from another. This pool + * solves that by giving each slot its own dedicated worker thread (a single-thread + * executor) — the Driver and SessionState are built on that thread by a bootstrap + * task, and all subsequent SQL for that slot runs on the same thread. + * + *

Usage contract: use this pool only for partition-row DDL statements that + * are independent of each other and freely shuffleable across workers. Table-level + * statements (createTable, schema evolution, USE database) must continue to run on + * the session {@code Driver} held by {@code HiveQueryDDLExecutor} on the sync driver + * thread. The pool is gated behind {@code hoodie.datasource.hive_sync.batching.enabled} + * and is constructed only for HiveQL sync mode. + */ +public class HiveDriverPool implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(HiveDriverPool.class); + + private final List workers; + private final int size; + private volatile boolean closed; + + public HiveDriverPool(HiveSyncConfig config, int size) { + this(config, size, new DefaultDriverFactory(config)); + } + + // Package-private for tests: accepts a DriverFactory so unit tests can inject + // mock Driver instances without standing up a real Hive instance. + HiveDriverPool(HiveSyncConfig config, int size, DriverFactory factory) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + this.size = size; + this.workers = new ArrayList<>(size); + String databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME); + PoolThreadFactory threadFactory = new PoolThreadFactory(); + List> bootstrapFutures = new ArrayList<>(size); + try { + for (int i = 0; i < size; i++) { + Worker worker = new Worker(threadFactory); + workers.add(worker); + bootstrapFutures.add(worker.executor.submit(() -> { + worker.driver = factory.newDriver(databaseName); + return null; + })); + } + // Block until all bootstraps complete so we surface construction errors + // before any caller hands us SQL. + for (Future f : bootstrapFutures) { + f.get(); + } + } catch (Exception e) { + tearDown(); + throw new HoodieException("Failed to construct HiveDriverPool of size " + size, e); + } + LOG.info("Initialized HiveDriverPool with {} workers", size); + } + + /** + * Runs each given SQL on every worker, in order. Used for setup statements + * (e.g. {@code USE database}) that must establish per-thread session context + * before any partition statement runs. Blocks until all workers have completed + * the setup. Throws on first error. + */ + public void runOnEachWorker(List setupSqls) { + if (closed) { + throw new IllegalStateException("Cannot dispatch to a closed HiveDriverPool"); + } + if (setupSqls.isEmpty()) { + return; + } + List> futures = new ArrayList<>(workers.size()); + for (Worker worker : workers) { + futures.add(worker.executor.submit(() -> { + for (String sql : setupSqls) { + worker.driver.run(sql); + } + return null; + })); + } + awaitAll(futures); + } + + /** + * Dispatches each SQL string to a worker (round-robin) and returns the list of + * futures. The caller is responsible for awaiting and collecting errors. + */ + public List> runAll(List sqls) { + if (closed) { + throw new IllegalStateException("Cannot dispatch to a closed HiveDriverPool"); + } + List> futures = new ArrayList<>(sqls.size()); + for (int i = 0; i < sqls.size(); i++) { + String sql = sqls.get(i); + Worker worker = workers.get(i % workers.size()); + futures.add(worker.executor.submit(() -> { + long start = System.currentTimeMillis(); + worker.driver.run(sql); + LOG.info("Time taken to execute [{}]: {} ms", sql, System.currentTimeMillis() - start); + return null; + })); + } + return futures; + } + + /** + * Awaits all futures, throws the first exception encountered (logging the rest at + * WARN), and returns the list of CommandProcessorResponse objects (currently + * unused but matches the existing single-threaded contract that returned them). + */ + public List awaitAll(List> futures) { + Exception firstError = null; + for (Future f : futures) { + try { + f.get(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + if (firstError == null) { + firstError = ie; + } + } catch (ExecutionException ee) { + Exception cause = unwrap(ee); + if (firstError == null) { + firstError = cause; + } else { + LOG.warn("Additional SQL batch failed (suppressed in favor of first error)", cause); + } + } + } + if (firstError != null) { + throw new HoodieHiveSyncException("Failed in executing SQL", firstError); + } + return new ArrayList<>(); + } + + private static Exception unwrap(ExecutionException ee) { + Throwable cause = ee.getCause(); + return (cause instanceof Exception) ? (Exception) cause : ee; + } + + public int size() { + return size; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + tearDown(); + } + + private void tearDown() { + // Close each worker's Driver/SessionState on its own thread, then shut the + // executor down. Running close() on the bound thread keeps SessionState's + // thread-local cleanup correct. + for (Worker worker : workers) { + try { + worker.executor.submit(() -> { + if (worker.driver != null) { + try { + worker.driver.close(); + } catch (Exception e) { + LOG.warn("Error closing pooled Driver", e); + } + } + SessionState ss = SessionState.get(); + if (ss != null) { + try { + ss.close(); + } catch (Exception e) { + LOG.warn("Error closing pooled SessionState", e); + } + } + return null; + }).get(30, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.warn("Error during pool worker shutdown", e); + } + worker.executor.shutdown(); + try { + if (!worker.executor.awaitTermination(10, TimeUnit.SECONDS)) { + worker.executor.shutdownNow(); + } + } catch (InterruptedException ie) { + worker.executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + workers.clear(); + } + + /** + * Per-slot state: a single-thread executor and the Driver bound to its thread. + * Driver is volatile because it is written by the bootstrap task and read by + * subsequent dispatch tasks on the same executor. + */ + private static final class Worker { + final ExecutorService executor; + volatile Driver driver; + + Worker(ThreadFactory threadFactory) { + this.executor = Executors.newSingleThreadExecutor(threadFactory); + } + } + + @FunctionalInterface + interface DriverFactory { + Driver newDriver(String databaseName) throws Exception; + } + + /** + * Builds a real Hive {@link Driver} on the calling thread. The SessionState is + * constructed lazily (once, on the first worker thread that builds a Driver) and + * shared across all worker threads — Hive uses ThreadLocal attachment, not + * exclusive ownership, so multiple workers calling + * {@code SessionState.start(sharedState)} all see the same config and scratch dir + * without each spending the cost of building their own SessionState (and risking + * resource-dir creation races during the constructor). + */ + private static final class DefaultDriverFactory implements DriverFactory { + private final HiveConf hiveConf; + private volatile SessionState sharedSessionState; + + DefaultDriverFactory(HiveSyncConfig config) { + this.hiveConf = config.getHiveConf(); + } + + @Override + public synchronized Driver newDriver(String databaseName) throws Exception { + if (sharedSessionState == null) { + sharedSessionState = new SessionState(hiveConf, + UserGroupInformation.getCurrentUser().getShortUserName()); + } + SessionState.start(sharedSessionState); + sharedSessionState.setCurrentDatabase(databaseName); + return new Driver(hiveConf); + } + } + + private static final class PoolThreadFactory implements ThreadFactory { + private static final AtomicInteger POOL_ID = new AtomicInteger(0); + private final AtomicInteger threadId = new AtomicInteger(0); + private final String namePrefix = "hudi-hive-driver-pool-" + POOL_ID.incrementAndGet() + "-"; + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, namePrefix + threadId.incrementAndGet()); + t.setDaemon(true); + return t; + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 5e8392c1850d0..5afc9af484ac2 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -366,6 +366,67 @@ public void testHMSSyncWithBatchingEnabled() throws Exception { "Incremental add via parallel batching should sync the new partitions"); } + /** + * Exercises HiveQL sync with parallel partition batching enabled. Mirrors the + * HMS test but routes through the HiveDriverPool — each worker thread owns a + * Driver+SessionState pair, and the SQL list (qualified with `db`.`tbl`) is + * fanned out across them. + */ + @Test + public void testHiveQLSyncWithBatchingEnabled() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HIVEQL.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "3"); + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3"); + + int partitionCount = 10; + HiveTestUtil.createCOWTable("100", partitionCount, true); + + reInitHiveSyncClient(); + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), + "Table should not exist before initial sync"); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "All partitions should be added under parallel HiveQL batching"); + + // Add more partitions, then sync again to exercise the parallel update path. + HiveTestUtil.addCOWPartition("2050/01/01", true, true, "101"); + HiveTestUtil.addCOWPartition("2050/01/02", true, true, "102"); + HiveTestUtil.addCOWPartition("2050/01/03", true, true, "103"); + HiveTestUtil.addCOWPartition("2050/01/04", true, true, "104"); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount + 4, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "Incremental add via parallel HiveQL batching should sync the new partitions"); + } + + /** + * Exercises the TOUCH path in HiveQL mode with batching on. Verifies that + * splitting one giant ALTER TABLE TOUCH PARTITION(...)... into multiple smaller + * statements does not break partition visibility downstream. + */ + @Test + public void testHiveQLTouchPartitionsWithBatching() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HIVEQL.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "2"); + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "2"); + hiveSyncProps.setProperty(META_SYNC_TOUCH_PARTITIONS_ENABLED.key(), "true"); + + int partitionCount = 6; + HiveTestUtil.createCOWTable("100", partitionCount, true); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size()); + + // Touch existing partitions (no new data) — should hit the batched TOUCH path + // without changing the partition count. + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "TOUCH batching must not change the partition set"); + } + @ParameterizedTest @MethodSource({"syncModeAndSchemaFromCommitMetadata"}) public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode, String enablePushDown) throws Exception { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java new file mode 100644 index 0000000000000..daa98e5ebaec2 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HoodieHiveSyncException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.junit.jupiter.api.Test; +import org.mockito.invocation.InvocationOnMock; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Unit tests for {@link HiveDriverPool} that exercise bootstrap, dispatch, error + * propagation, and close semantics without standing up a real Hive instance. + */ +class TestHiveDriverPool { + + private static HiveSyncConfig configWithEmptyHiveConf() { + HiveSyncConfig config = mock(HiveSyncConfig.class); + doAnswer(inv -> new HiveConf()).when(config).getHiveConf(); + doAnswer(inv -> "default").when(config).getStringOrDefault( + org.mockito.ArgumentMatchers.any()); + return config; + } + + @Test + void bootstrapBuildsOneDriverPerSlot() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + AtomicInteger built = new AtomicInteger(); + HiveDriverPool.DriverFactory factory = (db) -> { + built.incrementAndGet(); + return mock(Driver.class); + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 3, factory)) { + assertEquals(3, pool.size()); + assertEquals(3, built.get(), "One Driver per slot should be constructed eagerly"); + } + } + + @Test + void bootstrapFailurePropagatesAndTearsDown() { + HiveSyncConfig config = configWithEmptyHiveConf(); + AtomicInteger calls = new AtomicInteger(); + HiveDriverPool.DriverFactory factory = (db) -> { + int n = calls.incrementAndGet(); + if (n == 2) { + throw new RuntimeException("simulated driver build failure"); + } + return mock(Driver.class); + }; + HoodieException ex = assertThrows(HoodieException.class, + () -> new HiveDriverPool(config, 3, factory)); + assertTrue(ex.getMessage().contains("Failed to construct HiveDriverPool")); + } + + @Test + void runAllDispatchesEachSqlAcrossWorkers() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + // Each worker counts how many SQLs it received and remembers the thread. + ConcurrentHashMap> seenThreadsByDriver = new ConcurrentHashMap<>(); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + seenThreadsByDriver.put(d, ConcurrentHashMap.newKeySet()); + doAnswer((InvocationOnMock inv) -> { + seenThreadsByDriver.get(d).add(Thread.currentThread().getName()); + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 2, factory)) { + List sqls = Arrays.asList("SELECT 1", "SELECT 2", "SELECT 3", "SELECT 4"); + List> futures = pool.runAll(sqls); + pool.awaitAll(futures); + assertEquals(2, seenThreadsByDriver.size(), "Expected exactly 2 worker Drivers"); + int totalCalls = seenThreadsByDriver.values().stream().mapToInt(Set::size).sum(); + assertTrue(totalCalls >= 1, "At least one worker should have logged a thread"); + // Each Driver should have been invoked exactly twice (round-robin with 4 sqls, 2 workers). + for (Driver d : seenThreadsByDriver.keySet()) { + verify(d, times(2)).run(anyString()); + } + } + } + + @Test + void awaitAllThrowsFirstError() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + doAnswer(inv -> { + String sql = inv.getArgument(0); + if (sql.equals("FAIL")) { + throw new RuntimeException("boom: " + sql); + } + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 2, factory)) { + List> futures = pool.runAll(Arrays.asList("OK", "FAIL", "OK")); + HoodieHiveSyncException ex = assertThrows(HoodieHiveSyncException.class, + () -> pool.awaitAll(futures)); + assertTrue(ex.getCause() != null && ex.getCause().getMessage().contains("boom")); + } + } + + @Test + void concurrentDispatchBoundedByPoolSize() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + AtomicInteger inFlight = new AtomicInteger(); + AtomicInteger maxInFlight = new AtomicInteger(); + CountDownLatch hold = new CountDownLatch(1); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + doAnswer(inv -> { + int now = inFlight.incrementAndGet(); + maxInFlight.updateAndGet(prev -> Math.max(prev, now)); + hold.await(2, TimeUnit.SECONDS); + inFlight.decrementAndGet(); + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 2, factory)) { + // 5 SQLs against pool of size 2 → max in-flight should be 2. + List> futures = pool.runAll(Arrays.asList("a", "b", "c", "d", "e")); + // Release after a short wait so all SQLs progress. + Thread.sleep(150); + hold.countDown(); + pool.awaitAll(futures); + assertTrue(maxInFlight.get() <= 2, + "Max concurrent dispatches must not exceed pool size, observed " + maxInFlight.get()); + assertTrue(maxInFlight.get() >= 1, "Sanity: at least one dispatch ran"); + } + } + + @Test + void closeIsIdempotentAndPreventsFurtherDispatch() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + HiveDriverPool.DriverFactory factory = (db) -> mock(Driver.class); + HiveDriverPool pool = new HiveDriverPool(config, 2, factory); + pool.close(); + pool.close(); + assertThrows(IllegalStateException.class, + () -> pool.runAll(Arrays.asList("anything"))); + } + + @Test + void invalidSizeRejected() { + HiveSyncConfig config = configWithEmptyHiveConf(); + HiveDriverPool.DriverFactory factory = (db) -> mock(Driver.class); + assertThrows(IllegalArgumentException.class, + () -> new HiveDriverPool(config, 0, factory)); + } +} From f2b079109192e6b181858c964fdc8ab4909f4aae Mon Sep 17 00:00:00 2001 From: sivabalan Date: Thu, 11 Jun 2026 21:01:24 -0700 Subject: [PATCH 3/3] feat(hive-sync): batch and parallelize JDBC partition operations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to #18983 (HMS pool) and #18984 (HiveQL Driver pool). Applies the equivalent treatment to JDBC sync mode. The JDBC executor's partition phase ran every SQL statement sequentially on a single shared java.sql.Connection. ADD and DROP were already batched in the parent QueryBasedDDLExecutor (and in JDBCExecutor.constructDropPartitions); TOUCH became batched in #18984. The only batching gap that remains is SET_LOCATION (UPDATE), which Hive SQL fundamentally cannot batch (no multi-partition SET LOCATION syntax). So the win for JDBC is pure parallel execution. This change introduces JDBCConnectionPool, a simple borrow/return pool of N java.sql.Connection instances. JDBC Connection is not thread-safe but is cheap to construct (one TCP socket to HiveServer2), so the design mirrors IMetaStoreClientPool more than HiveDriverPool — no thread-binding needed. Gated behind the existing hoodie.datasource.hive_sync.batching.enabled flag (default false) and sized by hoodie.datasource.hive_sync.batching.threads (default 4) — no new configs. Design notes: - JDBCExecutor keeps its session Connection for table-row work (createDatabase, createTable, schema evolution, updateTableComments). JDBCBasedMetadataOperator continues to use that session Connection, unchanged. - The pool is purely additive for partition fan-out. Pool clients only run partition-row statements. - Hive 2.x's ALTER PARTITION SET LOCATION ignores db.table qualifiers and silently uses the connection's current database. The pool's runOnEachConnection() broadcasts the leading USE database statement to every connection before partition fan-out, same pattern as HiveDriverPool.runOnEachWorker() from #18984. This is run lazily on first dispatch, not at pool construction (the database may not exist yet at construction time). Tests: - TestJDBCConnectionPool: borrow/return, concurrent-borrow bounding, close idempotency, exhaustion blocking, size validation. - TestHiveSyncTool.testJDBCSyncWithBatchingEnabled: end-to-end JDBC sync with batching.enabled=true, threads=3, batch_num=3 against the embedded HiveServer2. - Full hudi-hive-sync suite: 314 passed, 0 failures. Related: #18331 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../hudi/hive/HoodieHiveSyncClient.java | 19 +- .../apache/hudi/hive/ddl/JDBCExecutor.java | 97 ++++++- .../hudi/hive/util/JDBCConnectionPool.java | 258 ++++++++++++++++++ .../apache/hudi/hive/TestHiveSyncTool.java | 33 +++ .../hive/util/TestJDBCConnectionPool.java | 177 ++++++++++++ 5 files changed, 581 insertions(+), 3 deletions(-) create mode 100644 hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/JDBCConnectionPool.java create mode 100644 hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestJDBCConnectionPool.java diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index 94b61f63568c9..14406c0beacd6 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -40,6 +40,7 @@ import org.apache.hudi.hive.util.HiveDriverPool; import org.apache.hudi.hive.util.IMetaStoreClientPool; import org.apache.hudi.hive.util.IMetaStoreClientUtil; +import org.apache.hudi.hive.util.JDBCConnectionPool; import org.apache.hudi.hive.util.PartitionFilterGenerator; import org.apache.hudi.sync.common.HoodieSyncClient; import org.apache.hudi.sync.common.model.FieldSchema; @@ -98,6 +99,10 @@ public class HoodieHiveSyncClient extends HoodieSyncClient { // or legacy default). Owned by HiveQueryDDLExecutor; this field is kept for // reference only — close() is delegated through ddlExecutor.close(). private HiveDriverPool partitionDriverPool; + // Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is JDBC (explicit + // or legacy default with HIVE_USE_JDBC=true). Owned by JDBCExecutor; this field + // is kept for reference only — close() is delegated through ddlExecutor.close(). + private JDBCConnectionPool partitionJdbcPool; /** * JDBC-based metadata operator, lazily initialized on first Thrift @@ -141,7 +146,8 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool); break; case JDBC: - JDBCExecutor jdbcExecutor = new JDBCExecutor(config); + this.partitionJdbcPool = maybeBuildJDBCConnectionPool(config); + JDBCExecutor jdbcExecutor = new JDBCExecutor(config, this.partitionJdbcPool); ddlExecutor = jdbcExecutor; jdbcMetadataOperator = new JDBCBasedMetadataOperator( jdbcExecutor.getConnection(), databaseName); @@ -151,7 +157,8 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli } } else { if (config.getBoolean(HIVE_USE_JDBC)) { - JDBCExecutor jdbcExecutor = new JDBCExecutor(config); + this.partitionJdbcPool = maybeBuildJDBCConnectionPool(config); + JDBCExecutor jdbcExecutor = new JDBCExecutor(config, this.partitionJdbcPool); ddlExecutor = jdbcExecutor; jdbcMetadataOperator = new JDBCBasedMetadataOperator( jdbcExecutor.getConnection(), databaseName); @@ -240,6 +247,14 @@ private HiveDriverPool maybeBuildHiveDriverPool(HiveSyncConfig config) { return new HiveDriverPool(config, size); } + private JDBCConnectionPool maybeBuildJDBCConnectionPool(HiveSyncConfig config) { + if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) { + return null; + } + int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS); + return new JDBCConnectionPool(config, size); + } + private Table getInitialTable(String table) { return initialTableByName.computeIfAbsent(table, t -> { try { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java index 5d3801c4d5a2f..0ade1c591719d 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java @@ -20,6 +20,7 @@ import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; +import org.apache.hudi.hive.util.JDBCConnectionPool; import lombok.extern.slf4j.Slf4j; @@ -34,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.Future; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS; @@ -48,6 +50,12 @@ public class JDBCExecutor extends QueryBasedDDLExecutor { private Connection connection; + // Optional. When non-null, partition-phase SQL lists fan out across this pool; + // table-level SQL (createTable, schema evolution, single-statement runSQL callers) + // always uses the session `connection` above. The connection passed to + // JDBCBasedMetadataOperator (via getConnection()) is also the session connection. + // See JDBCConnectionPool javadoc. + private final JDBCConnectionPool connectionPool; /** * Returns the underlying JDBC connection for use by @@ -59,18 +67,27 @@ public Connection getConnection() { } public JDBCExecutor(HiveSyncConfig config) { + this(config, null); + } + + public JDBCExecutor(HiveSyncConfig config, JDBCConnectionPool connectionPool) { super(config); Objects.requireNonNull(config.getStringOrDefault(HIVE_URL), "--jdbc-url option is required for jdbc sync mode"); Objects.requireNonNull(config.getStringOrDefault(HIVE_USER), "--user option is required for jdbc sync mode"); Objects.requireNonNull(config.getStringOrDefault(HIVE_PASS), "--pass option is required for jdbc sync mode"); createHiveConnection(config.getStringOrDefault(HIVE_URL), config.getStringOrDefault(HIVE_USER), config.getStringOrDefault(HIVE_PASS)); + this.connectionPool = connectionPool; } @Override public void runSQL(String s) { + runOnConnection(connection, s); + } + + private void runOnConnection(Connection conn, String s) { Statement stmt = null; try { - stmt = connection.createStatement(); + stmt = conn.createStatement(); log.info("Executing SQL {}", s); stmt.execute(s); } catch (SQLException e) { @@ -80,6 +97,75 @@ public void runSQL(String s) { } } + /** + * Partition-phase SQL fan-out. When the connection pool is present, any leading + * {@code USE database} statements are broadcast to every pooled connection (Hive + * 2.x's ALTER PARTITION SET LOCATION ignores db.table qualifiers and uses the + * connection's current database, so each pooled connection needs to USE the + * right db before any partition ALTER). The remaining statements are then + * dispatched round-robin across the pool. Falls through to the sequential path + * on the session connection when no pool is configured. + */ + @Override + protected void runSQLs(List sqls) { + if (connectionPool == null || sqls.isEmpty()) { + super.runSQLs(sqls); + return; + } + int firstNonUse = 0; + while (firstNonUse < sqls.size() && isUseStatement(sqls.get(firstNonUse))) { + firstNonUse++; + } + if (firstNonUse > 0) { + List setupStatements = sqls.subList(0, firstNonUse); + try { + connectionPool.runOnEachConnection(setupStatements); + } catch (Exception e) { + throw new HoodieHiveSyncException("Failed running per-connection setup SQL", e); + } + } + List partitionStatements = sqls.subList(firstNonUse, sqls.size()); + if (partitionStatements.isEmpty()) { + return; + } + List> futures = new ArrayList<>(partitionStatements.size()); + for (String sql : partitionStatements) { + futures.add(connectionPool.executor().submit(() -> + connectionPool.run(conn -> { + runOnConnection(conn, sql); + return null; + }) + )); + } + HoodieHiveSyncException firstError = null; + for (Future f : futures) { + try { + f.get(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + if (firstError == null) { + firstError = new HoodieHiveSyncException("Interrupted while running partition SQL", ie); + } + } catch (java.util.concurrent.ExecutionException ee) { + Throwable cause = ee.getCause(); + if (firstError == null) { + firstError = (cause instanceof HoodieHiveSyncException) + ? (HoodieHiveSyncException) cause + : new HoodieHiveSyncException("Failed in executing SQL", cause); + } else { + log.warn("Additional JDBC partition SQL failed (suppressed in favor of first error)", cause); + } + } + } + if (firstError != null) { + throw firstError; + } + } + + private static boolean isUseStatement(String sql) { + return sql != null && sql.regionMatches(true, 0, "USE ", 0, 4); + } + private void closeQuietly(ResultSet resultSet, Statement stmt) { try { if (stmt != null) { @@ -202,6 +288,15 @@ public StringBuilder getAlterTableDropPrefix(String tableName) { @Override public void close() { + // Close the pool first so worker threads stop dispatching against their + // connections before we tear down anything else. + if (connectionPool != null) { + try { + connectionPool.close(); + } catch (Exception e) { + log.warn("Error closing JDBCConnectionPool", e); + } + } try { if (connection != null) { connection.close(); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/JDBCConnectionPool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/JDBCConnectionPool.java new file mode 100644 index 0000000000000..5874e00f00a56 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/JDBCConnectionPool.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER; + +/** + * Pool of JDBC {@link Connection} instances for parallel HiveQL DDL via JDBC mode. + * + *

JDBC {@code Connection} is not thread-safe but is cheap to construct (one TCP + * socket to HiveServer2). Connections are opened eagerly at pool construction. The + * pool exposes {@link #runOnEachConnection(java.util.List)} for setup statements + * that must run on every connection before any partition fan-out (e.g. + * {@code USE database}), which {@code JDBCExecutor} uses to handle the Hive 2.x + * quirk where {@code ALTER PARTITION ... SET LOCATION} ignores {@code db.tbl} + * qualifiers and silently uses the connection's current database. + * + *

Usage contract: use this pool only for partition-row DDL statements + * that are independent of each other and freely shuffleable across workers. + * Table-level statements (createTable, schema evolution, schema reads) must + * continue to run on the session connection held by {@link + * org.apache.hudi.hive.ddl.JDBCExecutor} on the sync driver thread, and the + * shared session connection is also what {@link + * org.apache.hudi.hive.ddl.JDBCBasedMetadataOperator} uses for its read/write of + * table parameters. + * + *

Gated behind {@code hoodie.datasource.hive_sync.batching.enabled} and + * constructed only for JDBC sync mode. + */ +public class JDBCConnectionPool implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(JDBCConnectionPool.class); + + private final ArrayBlockingQueue available; + private final List all; + private final ExecutorService executor; + private final int size; + private volatile boolean closed; + + public JDBCConnectionPool(HiveSyncConfig config, int size) { + this(buildConnections(config, size), size); + } + + // Package-private for tests: accepts a pre-built list of connections so we can + // exercise borrow/return/close semantics without a live HiveServer2. + JDBCConnectionPool(List connections, int size) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + if (connections.size() != size) { + throw new IllegalArgumentException("Expected " + size + " connections, got " + connections.size()); + } + this.size = size; + this.available = new ArrayBlockingQueue<>(size); + this.all = new ArrayList<>(connections); + this.available.addAll(connections); + this.executor = Executors.newFixedThreadPool(size, new PoolThreadFactory()); + LOG.info("Initialized JDBCConnectionPool with {} connections", size); + } + + private static List buildConnections(HiveSyncConfig config, int size) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + String jdbcUrl = config.getStringOrDefault(HIVE_URL); + String user = config.getStringOrDefault(HIVE_USER); + String pass = config.getStringOrDefault(HIVE_PASS); + try { + // Defensive: the Hive JDBC driver is normally already loaded by JDBCExecutor; + // load it here too so the pool can be constructed in isolation. + Class.forName("org.apache.hive.jdbc.HiveDriver"); + } catch (ClassNotFoundException e) { + throw new HoodieException("Hive JDBC driver class not found on classpath", e); + } + List connections = new ArrayList<>(size); + try { + for (int i = 0; i < size; i++) { + connections.add(DriverManager.getConnection(jdbcUrl, user, pass)); + } + return connections; + } catch (SQLException e) { + // Construction failed mid-way; close any connections we already built before + // surfacing the error so we don't leak sockets. + for (Connection c : connections) { + try { + c.close(); + } catch (SQLException ignore) { + // intentional: best-effort cleanup during failure + } + } + throw new HoodieException("Failed to construct JDBCConnectionPool of size " + size, e); + } + } + + /** + * Runs each given SQL on every pooled connection, in order. Used for + * setup statements (e.g. {@code USE database}) that must establish per- + * connection session context before any partition fan-out. Blocks until all + * connections complete. Throws on first error. + */ + public void runOnEachConnection(List setupSqls) throws Exception { + if (closed) { + throw new IllegalStateException("Cannot dispatch to a closed JDBCConnectionPool"); + } + if (setupSqls.isEmpty()) { + return; + } + List> futures = new ArrayList<>(all.size()); + for (Connection conn : all) { + futures.add(executor.submit(() -> { + for (String sql : setupSqls) { + try (Statement stmt = conn.createStatement()) { + stmt.execute(sql); + } + } + return null; + })); + } + Exception firstError = null; + for (Future f : futures) { + try { + f.get(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + if (firstError == null) { + firstError = ie; + } + } catch (java.util.concurrent.ExecutionException ee) { + Throwable cause = ee.getCause(); + if (firstError == null) { + firstError = (cause instanceof Exception) ? (Exception) cause : ee; + } else { + LOG.warn("Additional setup SQL failed (suppressed in favor of first error)", cause); + } + } + } + if (firstError != null) { + throw firstError; + } + } + + /** + * Borrows a connection, runs the action, and returns the connection to the pool. + * Blocks if all connections are in use until one becomes available. + */ + public T run(ConnectionAction action) throws Exception { + if (closed) { + throw new IllegalStateException("Cannot borrow from a closed JDBCConnectionPool"); + } + Connection conn = available.take(); + try { + return action.apply(conn); + } finally { + if (!closed) { + available.offer(conn); + } + } + } + + /** + * Worker thread pool sized to match the connection pool. Use this to fan out + * batches so the number of in-flight statements cannot exceed the number of + * pooled connections. + */ + public ExecutorService executor() { + return executor; + } + + public int size() { + return size; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + executor.shutdown(); + try { + if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ie) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + closeQuietly(); + } + + private void closeQuietly() { + for (Connection conn : all) { + try { + conn.close(); + } catch (SQLException e) { + LOG.warn("Error closing pooled JDBC Connection", e); + } + } + available.clear(); + all.clear(); + } + + @FunctionalInterface + public interface ConnectionAction { + T apply(Connection conn) throws Exception; + } + + private static final class PoolThreadFactory implements ThreadFactory { + private static final AtomicInteger POOL_ID = new AtomicInteger(0); + private final AtomicInteger threadId = new AtomicInteger(0); + private final String namePrefix = "hudi-hive-jdbc-pool-" + POOL_ID.incrementAndGet() + "-"; + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, namePrefix + threadId.incrementAndGet()); + t.setDaemon(true); + return t; + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 5afc9af484ac2..51cb638c2ca48 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -400,6 +400,39 @@ public void testHiveQLSyncWithBatchingEnabled() throws Exception { "Incremental add via parallel HiveQL batching should sync the new partitions"); } + /** + * Exercises JDBC sync with parallel partition batching enabled. JDBC's pool + * holds N pre-opened HiveServer2 connections, each with `USE database` pinned + * at bootstrap. The SQL list's leading `USE` is stripped at dispatch time. + */ + @Test + public void testJDBCSyncWithBatchingEnabled() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.JDBC.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "3"); + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3"); + + int partitionCount = 10; + HiveTestUtil.createCOWTable("100", partitionCount, true); + + reInitHiveSyncClient(); + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), + "Table should not exist before initial sync"); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "All partitions should be added under parallel JDBC batching"); + + // Add more partitions, then sync again to exercise the parallel update path. + HiveTestUtil.addCOWPartition("2050/01/01", true, true, "101"); + HiveTestUtil.addCOWPartition("2050/01/02", true, true, "102"); + HiveTestUtil.addCOWPartition("2050/01/03", true, true, "103"); + HiveTestUtil.addCOWPartition("2050/01/04", true, true, "104"); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount + 4, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "Incremental add via parallel JDBC batching should sync the new partitions"); + } + /** * Exercises the TOUCH path in HiveQL mode with batching on. Verifies that * splitting one giant ALTER TABLE TOUCH PARTITION(...)... into multiple smaller diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestJDBCConnectionPool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestJDBCConnectionPool.java new file mode 100644 index 0000000000000..9178923057b34 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestJDBCConnectionPool.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.junit.jupiter.api.Test; + +import java.sql.Connection; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * Unit tests for {@link JDBCConnectionPool} borrow/return/close semantics. These + * tests never hit a real HiveServer2 — connections are mocked and injected via + * the package-private constructor. + */ +class TestJDBCConnectionPool { + + private static List mockConnections(int n) { + List conns = new ArrayList<>(n); + for (int i = 0; i < n; i++) { + conns.add(mock(Connection.class)); + } + return conns; + } + + @Test + void runReturnsConnectionToPoolOnSuccess() throws Exception { + List conns = mockConnections(2); + JDBCConnectionPool pool = new JDBCConnectionPool(conns, 2); + try { + Connection borrowed = pool.run(c -> c); + Set seen = new HashSet<>(); + seen.add(pool.run(c -> c)); + seen.add(pool.run(c -> c)); + assertEquals(2, seen.size(), "Both connections should be reachable after returns"); + assertTrue(seen.contains(borrowed)); + } finally { + pool.close(); + } + } + + @Test + void runReturnsConnectionToPoolOnFailure() { + List conns = mockConnections(1); + JDBCConnectionPool pool = new JDBCConnectionPool(conns, 1); + try { + assertThrows(IllegalStateException.class, () -> pool.run(c -> { + throw new IllegalStateException("boom"); + })); + try { + Connection again = pool.run(c -> c); + assertSame(conns.get(0), again); + } catch (Exception e) { + throw new AssertionError("Pool should still be usable after action failure", e); + } + } finally { + pool.close(); + } + } + + @Test + void concurrentBorrowsBlockUntilReturned() throws Exception { + int poolSize = 2; + int callers = 4; + List conns = mockConnections(poolSize); + JDBCConnectionPool pool = new JDBCConnectionPool(conns, poolSize); + AtomicInteger concurrent = new AtomicInteger(0); + AtomicInteger maxConcurrent = new AtomicInteger(0); + CountDownLatch start = new CountDownLatch(1); + ExecutorService threads = Executors.newFixedThreadPool(callers); + try { + List> futures = new ArrayList<>(); + for (int i = 0; i < callers; i++) { + futures.add(threads.submit(() -> { + start.await(); + return pool.run(c -> { + int now = concurrent.incrementAndGet(); + maxConcurrent.updateAndGet(prev -> Math.max(prev, now)); + Thread.sleep(50); + concurrent.decrementAndGet(); + return null; + }); + })); + } + start.countDown(); + for (java.util.concurrent.Future f : futures) { + f.get(10, TimeUnit.SECONDS); + } + assertTrue(maxConcurrent.get() <= poolSize, + "Concurrent borrows must not exceed pool size, observed " + maxConcurrent.get()); + assertTrue(maxConcurrent.get() >= 1, "At least one borrow must have occurred"); + } finally { + threads.shutdownNow(); + pool.close(); + } + } + + @Test + void closeReleasesAllConnections() throws Exception { + int poolSize = 3; + List conns = mockConnections(poolSize); + JDBCConnectionPool pool = new JDBCConnectionPool(conns, poolSize); + pool.close(); + for (Connection c : conns) { + verify(c).close(); + } + assertThrows(IllegalStateException.class, () -> pool.run(c -> c)); + } + + @Test + void closeIsIdempotent() throws Exception { + int poolSize = 2; + List conns = mockConnections(poolSize); + JDBCConnectionPool pool = new JDBCConnectionPool(conns, poolSize); + pool.close(); + pool.close(); + for (Connection c : conns) { + verify(c).close(); + } + } + + @Test + void invalidSizeRejected() { + assertThrows(IllegalArgumentException.class, + () -> new JDBCConnectionPool(mockConnections(0), 0)); + } + + @Test + void sizeMismatchRejected() { + assertThrows(IllegalArgumentException.class, + () -> new JDBCConnectionPool(mockConnections(2), 3)); + } + + @Test + void executorSizedToPool() { + int poolSize = 4; + JDBCConnectionPool pool = new JDBCConnectionPool(mockConnections(poolSize), poolSize); + try { + assertFalse(pool.executor().isShutdown()); + assertEquals(poolSize, pool.size()); + } finally { + pool.close(); + } + assertTrue(pool.executor().isShutdown()); + } +}