diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java index 418676d591627..01f6d0224070d 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java @@ -122,6 +122,23 @@ public class HiveSyncConfigHolder { .defaultValue(1000) .markAdvanced() .withDocumentation("The number of partitions one batch when synchronous partitions to hive."); + public static final ConfigProperty HIVE_SYNC_BATCHING_ENABLED = ConfigProperty + .key("hoodie.datasource.hive_sync.batching.enabled") + .defaultValue(false) + .markAdvanced() + .sinceVersion("1.1.0") + .withDocumentation("When true, HMS-mode partition operations (add/update/touch/drop) are split into " + + "batches of `hoodie.datasource.hive_sync.batch_num` and dispatched in parallel to a pool of " + + "IMetaStoreClient instances. Table-level operations (create/alter table, last commit time, " + + "writer version) continue to use the single session client. Default off; existing behavior " + + "is unchanged unless explicitly opted in."); + public static final ConfigProperty HIVE_SYNC_BATCHING_THREADS = ConfigProperty + .key("hoodie.datasource.hive_sync.batching.threads") + .defaultValue(4) + .markAdvanced() + .sinceVersion("1.1.0") + .withDocumentation("Pool size (number of IMetaStoreClient instances) and worker thread count used " + + "for parallel partition sync when `hoodie.datasource.hive_sync.batching.enabled` is true."); public static final ConfigProperty HIVE_SYNC_MODE = ConfigProperty .key("hoodie.datasource.hive_sync.mode") .noDefaultValue() diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index a3d04737b8c5c..14406c0beacd6 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -37,7 +37,10 @@ import org.apache.hudi.hive.ddl.HiveSyncMode; import org.apache.hudi.hive.ddl.JDBCBasedMetadataOperator; import org.apache.hudi.hive.ddl.JDBCExecutor; +import org.apache.hudi.hive.util.HiveDriverPool; +import org.apache.hudi.hive.util.IMetaStoreClientPool; import org.apache.hudi.hive.util.IMetaStoreClientUtil; +import org.apache.hudi.hive.util.JDBCConnectionPool; import org.apache.hudi.hive.util.PartitionFilterGenerator; import org.apache.hudi.sync.common.HoodieSyncClient; import org.apache.hudi.sync.common.model.FieldSchema; @@ -66,6 +69,8 @@ import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getInputFormatClassName; import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getOutputFormatClassName; import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getSerDeClassName; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_ENABLED; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_THREADS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_USE_SPARK_CATALOG; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC; @@ -86,6 +91,18 @@ public class HoodieHiveSyncClient extends HoodieSyncClient { private final Map initialTableByName = new HashMap<>(); DDLExecutor ddlExecutor; private IMetaStoreClient client; + // Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HMS. Owned by this + // class; closed in close() before Hive.closeCurrent(). Hands clients to HMSDDLExecutor + // for partition-row work only — see IMetaStoreClientPool javadoc. + private IMetaStoreClientPool partitionClientPool; + // Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL (explicit + // or legacy default). Owned by HiveQueryDDLExecutor; this field is kept for + // reference only — close() is delegated through ddlExecutor.close(). + private HiveDriverPool partitionDriverPool; + // Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is JDBC (explicit + // or legacy default with HIVE_USE_JDBC=true). Owned by JDBCExecutor; this field + // is kept for reference only — close() is delegated through ddlExecutor.close(). + private JDBCConnectionPool partitionJdbcPool; /** * JDBC-based metadata operator, lazily initialized on first Thrift @@ -121,13 +138,16 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli HiveSyncMode syncMode = HiveSyncMode.of(config.getString(HIVE_SYNC_MODE)); switch (syncMode) { case HMS: - ddlExecutor = new HMSDDLExecutor(config, this.client); + this.partitionClientPool = maybeBuildPartitionClientPool(config); + ddlExecutor = new HMSDDLExecutor(config, this.client, this.partitionClientPool); break; case HIVEQL: - ddlExecutor = new HiveQueryDDLExecutor(config, this.client); + this.partitionDriverPool = maybeBuildHiveDriverPool(config); + ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool); break; case JDBC: - JDBCExecutor jdbcExecutor = new JDBCExecutor(config); + this.partitionJdbcPool = maybeBuildJDBCConnectionPool(config); + JDBCExecutor jdbcExecutor = new JDBCExecutor(config, this.partitionJdbcPool); ddlExecutor = jdbcExecutor; jdbcMetadataOperator = new JDBCBasedMetadataOperator( jdbcExecutor.getConnection(), databaseName); @@ -137,12 +157,14 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli } } else { if (config.getBoolean(HIVE_USE_JDBC)) { - JDBCExecutor jdbcExecutor = new JDBCExecutor(config); + this.partitionJdbcPool = maybeBuildJDBCConnectionPool(config); + JDBCExecutor jdbcExecutor = new JDBCExecutor(config, this.partitionJdbcPool); ddlExecutor = jdbcExecutor; jdbcMetadataOperator = new JDBCBasedMetadataOperator( jdbcExecutor.getConnection(), databaseName); } else { - ddlExecutor = new HiveQueryDDLExecutor(config, this.client); + this.partitionDriverPool = maybeBuildHiveDriverPool(config); + ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool); } } } catch (Exception e) { @@ -201,6 +223,38 @@ private IMetaStoreClient createMetaStoreClient(HiveSyncConfig config) { } } + private IMetaStoreClientPool maybeBuildPartitionClientPool(HiveSyncConfig config) { + if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) { + return null; + } + if (config.getBooleanOrDefault(HIVE_SYNC_USE_SPARK_CATALOG)) { + // The Spark catalog client is constructed via reflection against a Spark-side + // class and isn't compatible with the direct RetryingMetaStoreClient pool path. + // Fall back to single-client sequential behavior rather than failing the sync. + log.warn("hive_sync.batching.enabled=true is not supported with use_spark_catalog=true; " + + "falling back to sequential partition sync."); + return null; + } + int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS); + return new IMetaStoreClientPool(config, size); + } + + private HiveDriverPool maybeBuildHiveDriverPool(HiveSyncConfig config) { + if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) { + return null; + } + int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS); + return new HiveDriverPool(config, size); + } + + private JDBCConnectionPool maybeBuildJDBCConnectionPool(HiveSyncConfig config) { + if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) { + return null; + } + int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS); + return new JDBCConnectionPool(config, size); + } + private Table getInitialTable(String table) { return initialTableByName.computeIfAbsent(table, t -> { try { @@ -597,6 +651,17 @@ public void deleteLastReplicatedTimeStamp(String tableName) { public void close() { try { ddlExecutor.close(); + // Close the partition client pool before Hive.closeCurrent() so the + // RetryingMetaStoreClient instances held by the pool release their Thrift + // sockets without racing the ThreadLocal Hive cleanup. + if (partitionClientPool != null) { + try { + partitionClientPool.close(); + } catch (Exception e) { + log.warn("Error closing IMetaStoreClient pool", e); + } + partitionClientPool = null; + } if (client != null) { Hive.closeCurrent(); client = null; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java index 6b7bd3af00963..e80d63730b3c8 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java @@ -27,6 +27,7 @@ import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.util.HivePartitionUtil; import org.apache.hudi.hive.util.HiveSchemaUtil; +import org.apache.hudi.hive.util.IMetaStoreClientPool; import org.apache.hudi.storage.StorageSchemes; import org.apache.hudi.sync.common.model.PartitionValueExtractor; @@ -46,13 +47,13 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.thrift.TException; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import java.util.stream.Collectors; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; @@ -73,11 +74,21 @@ public class HMSDDLExecutor implements DDLExecutor { private final String databaseName; private final IMetaStoreClient client; private final PartitionValueExtractor partitionValueExtractor; + // Optional. When non-null, partition-row operations fan out across this pool; + // table-row operations always use the session `client` field above. See + // IMetaStoreClientPool javadoc for the usage contract. + private final IMetaStoreClientPool partitionClientPool; public HMSDDLExecutor(HiveSyncConfig syncConfig, IMetaStoreClient metaStoreClient) throws HiveException, MetaException { + this(syncConfig, metaStoreClient, null); + } + + public HMSDDLExecutor(HiveSyncConfig syncConfig, IMetaStoreClient metaStoreClient, + IMetaStoreClientPool partitionClientPool) throws HiveException, MetaException { this.syncConfig = syncConfig; this.databaseName = syncConfig.getStringOrDefault(META_SYNC_DATABASE_NAME); this.client = metaStoreClient; + this.partitionClientPool = partitionClientPool; try { this.partitionValueExtractor = (PartitionValueExtractor) Class.forName(syncConfig.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS)).newInstance(); @@ -193,11 +204,14 @@ public void addPartitionsToTable(String tableName, List partitionsToAdd) } log.info("Adding partitions {} to table {}", partitionsToAdd.size(), tableName); try { + // Fetch the table StorageDescriptor once on the session client. Each worker + // copies fields off it; the Thrift POJO is not mutated after this read. StorageDescriptor sd = client.getTable(databaseName, tableName).getSd(); int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); - for (List batch : CollectionUtils.batches(partitionsToAdd, batchSyncPartitionNum)) { - List partitionList = new ArrayList<>(); - batch.forEach(x -> { + List> batches = CollectionUtils.batches(partitionsToAdd, batchSyncPartitionNum); + runBatches("add", tableName, batches, (poolClient, batch) -> { + List partitionList = new ArrayList<>(batch.size()); + for (String x : batch) { StorageDescriptor partitionSd = new StorageDescriptor(); partitionSd.setCols(sd.getCols()); partitionSd.setInputFormat(sd.getInputFormat()); @@ -208,11 +222,11 @@ public void addPartitionsToTable(String tableName, List partitionsToAdd) List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(x); partitionSd.setLocation(fullPartitionPath); partitionList.add(new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null)); - }); - client.add_partitions(partitionList, true, false); + } + poolClient.add_partitions(partitionList, true, false); log.info("HMSDDLExecutor add a batch partitions done: {}", partitionList.size()); - } - } catch (TException e) { + }); + } catch (Exception e) { log.error("{}.{} add partition failed", databaseName, tableName, e); throw new HoodieHiveSyncException(databaseName + "." + tableName + " add partition failed", e); } @@ -247,15 +261,22 @@ public void dropPartitionsToTable(String tableName, List partitionsToDro log.info("Drop partitions {} on {}", partitionsToDrop.size(), tableName); try { - for (String dropPartition : partitionsToDrop) { - if (HivePartitionUtil.partitionExists(client, tableName, dropPartition, partitionValueExtractor, syncConfig)) { - String partitionClause = - HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig); - client.dropPartition(databaseName, tableName, partitionClause, false); + // HMS has no batch-drop primitive that matches dropPartition's semantics, so each + // worker still iterates its chunk one partition at a time. The win is fanning + // chunks across pooled clients. + int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); + List> batches = CollectionUtils.batches(partitionsToDrop, batchSyncPartitionNum); + runBatches("drop", tableName, batches, (poolClient, batch) -> { + for (String dropPartition : batch) { + if (HivePartitionUtil.partitionExists(poolClient, tableName, dropPartition, partitionValueExtractor, syncConfig)) { + String partitionClause = + HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig); + poolClient.dropPartition(databaseName, tableName, partitionClause, false); + } + log.info("Drop partition {} on {}", dropPartition, tableName); } - log.info("Drop partition {} on {}", dropPartition, tableName); - } - } catch (TException e) { + }); + } catch (Exception e) { log.error("{}.{} drop partition failed", databaseName, tableName, e); throw new HoodieHiveSyncException(databaseName + "." + tableName + " drop partition failed", e); } @@ -291,21 +312,71 @@ public void close() { private void registerAlterPartitionEvent(String tableName, List alteredPartitions) { try { + // Read the StorageDescriptor once on the session client; each worker deep-copies + // it per partition (alter_partitions semantics today). StorageDescriptor sd = client.getTable(databaseName, tableName).getSd(); - List partitionList = alteredPartitions.stream().map(partition -> { - Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH), partition); - String partitionScheme = partitionPath.toUri().getScheme(); - String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme) - ? HadoopFSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(), partitionPath) : partitionPath.toString(); - List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); - StorageDescriptor partitionSd = sd.deepCopy(); - partitionSd.setLocation(fullPartitionPath); - return new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null); - }).collect(Collectors.toList()); - client.alter_partitions(databaseName, tableName, partitionList, null); - } catch (TException e) { + int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); + List> batches = CollectionUtils.batches(alteredPartitions, batchSyncPartitionNum); + runBatches("alter", tableName, batches, (poolClient, batch) -> { + List partitionList = batch.stream().map(partition -> { + Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH), partition); + String partitionScheme = partitionPath.toUri().getScheme(); + String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme) + ? HadoopFSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(), partitionPath) : partitionPath.toString(); + List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); + StorageDescriptor partitionSd = sd.deepCopy(); + partitionSd.setLocation(fullPartitionPath); + return new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null); + }).collect(Collectors.toList()); + poolClient.alter_partitions(databaseName, tableName, partitionList, null); + }); + } catch (Exception e) { log.error("{}.{} update partition failed", databaseName, tableName, e); throw new HoodieHiveSyncException(databaseName + "." + tableName + " update partition failed", e); } } + + /** + * Dispatches partition batches either in parallel against the client pool (if + * configured) or sequentially against the session client. The sequential path + * preserves the exact behavior we had before the pool existed, including failure + * semantics where batch N+1 never runs if batch N throws. + */ + @FunctionalInterface + private interface BatchAction { + void apply(IMetaStoreClient client, List batch) throws Exception; + } + + private void runBatches(String opName, String tableName, List> batches, BatchAction action) throws Exception { + if (partitionClientPool == null) { + for (List batch : batches) { + action.apply(client, batch); + } + return; + } + List> futures = new ArrayList<>(batches.size()); + for (List batch : batches) { + futures.add(partitionClientPool.executor().submit(() -> + partitionClientPool.run(poolClient -> { + action.apply(poolClient, batch); + return null; + }) + )); + } + Exception firstError = null; + for (Future f : futures) { + try { + f.get(); + } catch (Exception e) { + if (firstError == null) { + firstError = e; + } else { + log.warn("Additional {} batch failed on {} (suppressed in favor of first error)", opName, tableName, e); + } + } + } + if (firstError != null) { + throw firstError; + } + } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java index 25434d29eb3ff..b64a6a5d54c5b 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; +import org.apache.hudi.hive.util.HiveDriverPool; import org.apache.hudi.hive.util.HivePartitionUtil; import lombok.extern.slf4j.Slf4j; @@ -39,6 +40,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import java.util.stream.Collectors; import static org.apache.hudi.sync.common.util.TableUtils.tableId; @@ -52,10 +54,20 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor { private final IMetaStoreClient metaStoreClient; private SessionState sessionState; private Driver hiveDriver; + // Optional. When non-null, partition-phase SQL lists fan out across this pool; + // table-level SQL (createTable, schema evolution, single-statement runSQL callers) + // always uses the session `hiveDriver` above. See HiveDriverPool javadoc. + private final HiveDriverPool driverPool; public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient) { + this(config, metaStoreClient, null); + } + + public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient, + HiveDriverPool driverPool) { super(config); this.metaStoreClient = metaStoreClient; + this.driverPool = driverPool; try { this.sessionState = new SessionState(config.getHiveConf(), UserGroupInformation.getCurrentUser().getShortUserName()); @@ -82,6 +94,41 @@ public void runSQL(String sql) { updateHiveSQLs(Collections.singletonList(sql)); } + /** + * Partition-phase SQL fan-out. When the driver pool is present, any leading + * {@code USE database} statements are run on every worker (Hive 2.x's + * ALTER PARTITION SET LOCATION ignores db.table qualifiers and uses the + * connection's current database, so each worker needs to USE the right db + * before any partition ALTER). The remaining statements are then dispatched + * round-robin across the pool. Falls through to the sequential path on the + * session Driver when no pool is configured. + */ + @Override + protected void runSQLs(List sqls) { + if (driverPool == null || sqls.isEmpty()) { + updateHiveSQLs(sqls); + return; + } + int firstNonUse = 0; + while (firstNonUse < sqls.size() && isUseStatement(sqls.get(firstNonUse))) { + firstNonUse++; + } + if (firstNonUse > 0) { + List setupStatements = sqls.subList(0, firstNonUse); + driverPool.runOnEachWorker(setupStatements); + } + List partitionStatements = sqls.subList(firstNonUse, sqls.size()); + if (partitionStatements.isEmpty()) { + return; + } + List> futures = driverPool.runAll(partitionStatements); + driverPool.awaitAll(futures); + } + + private static boolean isUseStatement(String sql) { + return sql != null && sql.regionMatches(true, 0, "USE ", 0, 4); + } + private List updateHiveSQLs(List sqls) { List responses = new ArrayList<>(); try { @@ -149,6 +196,16 @@ public void dropPartitionsToTable(String tableName, List partitionsToDro @Override public void close() { + // Close the pool first so the worker threads stop dispatching against their + // Drivers before we tear down anything else. The pool's close() runs + // Driver/SessionState cleanup on each worker's own thread. + if (driverPool != null) { + try { + driverPool.close(); + } catch (Exception e) { + log.warn("Error closing HiveDriverPool", e); + } + } if (metaStoreClient != null) { Hive.closeCurrent(); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java index 5d3801c4d5a2f..0ade1c591719d 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java @@ -20,6 +20,7 @@ import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; +import org.apache.hudi.hive.util.JDBCConnectionPool; import lombok.extern.slf4j.Slf4j; @@ -34,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.Future; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS; @@ -48,6 +50,12 @@ public class JDBCExecutor extends QueryBasedDDLExecutor { private Connection connection; + // Optional. When non-null, partition-phase SQL lists fan out across this pool; + // table-level SQL (createTable, schema evolution, single-statement runSQL callers) + // always uses the session `connection` above. The connection passed to + // JDBCBasedMetadataOperator (via getConnection()) is also the session connection. + // See JDBCConnectionPool javadoc. + private final JDBCConnectionPool connectionPool; /** * Returns the underlying JDBC connection for use by @@ -59,18 +67,27 @@ public Connection getConnection() { } public JDBCExecutor(HiveSyncConfig config) { + this(config, null); + } + + public JDBCExecutor(HiveSyncConfig config, JDBCConnectionPool connectionPool) { super(config); Objects.requireNonNull(config.getStringOrDefault(HIVE_URL), "--jdbc-url option is required for jdbc sync mode"); Objects.requireNonNull(config.getStringOrDefault(HIVE_USER), "--user option is required for jdbc sync mode"); Objects.requireNonNull(config.getStringOrDefault(HIVE_PASS), "--pass option is required for jdbc sync mode"); createHiveConnection(config.getStringOrDefault(HIVE_URL), config.getStringOrDefault(HIVE_USER), config.getStringOrDefault(HIVE_PASS)); + this.connectionPool = connectionPool; } @Override public void runSQL(String s) { + runOnConnection(connection, s); + } + + private void runOnConnection(Connection conn, String s) { Statement stmt = null; try { - stmt = connection.createStatement(); + stmt = conn.createStatement(); log.info("Executing SQL {}", s); stmt.execute(s); } catch (SQLException e) { @@ -80,6 +97,75 @@ public void runSQL(String s) { } } + /** + * Partition-phase SQL fan-out. When the connection pool is present, any leading + * {@code USE database} statements are broadcast to every pooled connection (Hive + * 2.x's ALTER PARTITION SET LOCATION ignores db.table qualifiers and uses the + * connection's current database, so each pooled connection needs to USE the + * right db before any partition ALTER). The remaining statements are then + * dispatched round-robin across the pool. Falls through to the sequential path + * on the session connection when no pool is configured. + */ + @Override + protected void runSQLs(List sqls) { + if (connectionPool == null || sqls.isEmpty()) { + super.runSQLs(sqls); + return; + } + int firstNonUse = 0; + while (firstNonUse < sqls.size() && isUseStatement(sqls.get(firstNonUse))) { + firstNonUse++; + } + if (firstNonUse > 0) { + List setupStatements = sqls.subList(0, firstNonUse); + try { + connectionPool.runOnEachConnection(setupStatements); + } catch (Exception e) { + throw new HoodieHiveSyncException("Failed running per-connection setup SQL", e); + } + } + List partitionStatements = sqls.subList(firstNonUse, sqls.size()); + if (partitionStatements.isEmpty()) { + return; + } + List> futures = new ArrayList<>(partitionStatements.size()); + for (String sql : partitionStatements) { + futures.add(connectionPool.executor().submit(() -> + connectionPool.run(conn -> { + runOnConnection(conn, sql); + return null; + }) + )); + } + HoodieHiveSyncException firstError = null; + for (Future f : futures) { + try { + f.get(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + if (firstError == null) { + firstError = new HoodieHiveSyncException("Interrupted while running partition SQL", ie); + } + } catch (java.util.concurrent.ExecutionException ee) { + Throwable cause = ee.getCause(); + if (firstError == null) { + firstError = (cause instanceof HoodieHiveSyncException) + ? (HoodieHiveSyncException) cause + : new HoodieHiveSyncException("Failed in executing SQL", cause); + } else { + log.warn("Additional JDBC partition SQL failed (suppressed in favor of first error)", cause); + } + } + } + if (firstError != null) { + throw firstError; + } + } + + private static boolean isUseStatement(String sql) { + return sql != null && sql.regionMatches(true, 0, "USE ", 0, 4); + } + private void closeQuietly(ResultSet resultSet, Statement stmt) { try { if (stmt != null) { @@ -202,6 +288,15 @@ public StringBuilder getAlterTableDropPrefix(String tableName) { @Override public void close() { + // Close the pool first so worker threads stop dispatching against their + // connections before we tear down anything else. + if (connectionPool != null) { + try { + connectionPool.close(); + } catch (Exception e) { + log.warn("Error closing JDBCConnectionPool", e); + } + } try { if (connection != null) { connection.close(); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java index 7f776f2f7a04d..a68644ae86f05 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.PartitionPathEncodeUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; @@ -75,6 +76,20 @@ public QueryBasedDDLExecutor(HiveSyncConfig config) { */ public abstract void runSQL(String sql); + /** + * Runs a list of SQL statements. The default implementation executes them + * sequentially via {@link #runSQL(String)}. Subclasses that can parallelize + * (e.g. {@link HiveQueryDDLExecutor} with a driver pool) override this hook + * to fan the list out across workers. The contract requires that the list + * has no positional dependencies — callers must fully qualify table names + * with {@code `db`.`tbl`} so any statement can run on any worker. + */ + protected void runSQLs(List sqls) { + for (String sql : sqls) { + runSQL(sql); + } + } + @Override public void createDatabase(String databaseName) { runSQL("create database if not exists " + databaseName); @@ -120,7 +135,7 @@ public void addPartitionsToTable(String tableName, List partitionsToAdd) } log.info("Adding partitions {} to table {}", partitionsToAdd.size(), tableName); List sqls = constructAddPartitions(tableName, partitionsToAdd); - sqls.stream().forEach(sql -> runSQL(sql)); + runSQLs(sqls); } @Override @@ -131,9 +146,7 @@ public void updatePartitionsToTable(String tableName, List changedPartit } log.info("Changing partitions {} on {}", changedPartitions.size(), tableName); List sqls = constructPartitionAlterStatements(tableName, changedPartitions, PartitionAlterType.SET_LOCATION); - for (String sql : sqls) { - runSQL(sql); - } + runSQLs(sqls); } @Override @@ -210,29 +223,40 @@ public void touchPartitionsToTable(String tableName, List touchPartition } log.info("Touching partitions " + touchPartitions.size() + " on " + tableName); List sqls = constructPartitionAlterStatements(tableName, touchPartitions, PartitionAlterType.TOUCH); - for (String sql : sqls) { - runSQL(sql); - } + runSQLs(sqls); } /** * Builds SQL statements to either touch partitions or set their location. - * TOUCH: one ALTER TABLE ... TOUCH PARTITION (p1) PARTITION (p2) ... - * SET_LOCATION: one ALTER TABLE ... PARTITION (p) SET LOCATION '...' per partition. + * + *

The first element of the returned list is always a {@code USE database} + * statement. Hive 2.x's ALTER PARTITION ... SET LOCATION does not respect the + * {@code db.table} qualifier (silently routes to the connection's current + * database), so the {@code USE} is load-bearing. Parallel execution paths must + * run this statement on every worker before fanning out the rest. + * + *

TOUCH: one {@code ALTER TABLE ... TOUCH PARTITION (p1) ...} per batch of + * {@code HIVE_BATCH_SYNC_PARTITION_NUM} partitions. + * + *

SET_LOCATION: one {@code ALTER TABLE ... PARTITION (p) SET LOCATION '...'} + * per partition (Hive SQL does not support multi-partition SET LOCATION in one + * statement). */ private List constructPartitionAlterStatements(String tableName, List partitions, PartitionAlterType alterType) { List result = new ArrayList<>(); - // Hive 2.x doesn't like db.table name for operations, hence we need to change to using the database first String useDatabase = "USE " + HIVE_ESCAPE_CHARACTER + databaseName + HIVE_ESCAPE_CHARACTER; result.add(useDatabase); String alterTablePrefix = "ALTER TABLE " + HIVE_ESCAPE_CHARACTER + tableName + HIVE_ESCAPE_CHARACTER; + int batchSyncPartitionNum = config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); switch (alterType) { case TOUCH: - String alterTable = alterTablePrefix + " TOUCH"; - for (String partition : partitions) { - alterTable += " PARTITION (" + getPartitionClause(partition) + ")"; + for (List batch : CollectionUtils.batches(partitions, batchSyncPartitionNum)) { + StringBuilder alterTable = new StringBuilder(alterTablePrefix).append(" TOUCH"); + for (String partition : batch) { + alterTable.append(" PARTITION (").append(getPartitionClause(partition)).append(")"); + } + result.add(alterTable.toString()); } - result.add(alterTable); break; case SET_LOCATION: for (String partition : partitions) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java new file mode 100644 index 0000000000000..64bca898d1828 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HoodieHiveSyncException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; + +/** + * Pool of Hive {@link Driver} + {@link SessionState} pairs for parallel HiveQL DDL. + * + *

Hive's {@code SessionState.start(state)} binds state to the calling thread's + * thread-local, and {@code Driver} reads from that thread-local during {@code run()}. + * A Driver constructed on one thread cannot be safely used from another. This pool + * solves that by giving each slot its own dedicated worker thread (a single-thread + * executor) — the Driver and SessionState are built on that thread by a bootstrap + * task, and all subsequent SQL for that slot runs on the same thread. + * + *

Usage contract: use this pool only for partition-row DDL statements that + * are independent of each other and freely shuffleable across workers. Table-level + * statements (createTable, schema evolution, USE database) must continue to run on + * the session {@code Driver} held by {@code HiveQueryDDLExecutor} on the sync driver + * thread. The pool is gated behind {@code hoodie.datasource.hive_sync.batching.enabled} + * and is constructed only for HiveQL sync mode. + */ +public class HiveDriverPool implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(HiveDriverPool.class); + + private final List workers; + private final int size; + private volatile boolean closed; + + public HiveDriverPool(HiveSyncConfig config, int size) { + this(config, size, new DefaultDriverFactory(config)); + } + + // Package-private for tests: accepts a DriverFactory so unit tests can inject + // mock Driver instances without standing up a real Hive instance. + HiveDriverPool(HiveSyncConfig config, int size, DriverFactory factory) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + this.size = size; + this.workers = new ArrayList<>(size); + String databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME); + PoolThreadFactory threadFactory = new PoolThreadFactory(); + List> bootstrapFutures = new ArrayList<>(size); + try { + for (int i = 0; i < size; i++) { + Worker worker = new Worker(threadFactory); + workers.add(worker); + bootstrapFutures.add(worker.executor.submit(() -> { + worker.driver = factory.newDriver(databaseName); + return null; + })); + } + // Block until all bootstraps complete so we surface construction errors + // before any caller hands us SQL. + for (Future f : bootstrapFutures) { + f.get(); + } + } catch (Exception e) { + tearDown(); + throw new HoodieException("Failed to construct HiveDriverPool of size " + size, e); + } + LOG.info("Initialized HiveDriverPool with {} workers", size); + } + + /** + * Runs each given SQL on every worker, in order. Used for setup statements + * (e.g. {@code USE database}) that must establish per-thread session context + * before any partition statement runs. Blocks until all workers have completed + * the setup. Throws on first error. + */ + public void runOnEachWorker(List setupSqls) { + if (closed) { + throw new IllegalStateException("Cannot dispatch to a closed HiveDriverPool"); + } + if (setupSqls.isEmpty()) { + return; + } + List> futures = new ArrayList<>(workers.size()); + for (Worker worker : workers) { + futures.add(worker.executor.submit(() -> { + for (String sql : setupSqls) { + worker.driver.run(sql); + } + return null; + })); + } + awaitAll(futures); + } + + /** + * Dispatches each SQL string to a worker (round-robin) and returns the list of + * futures. The caller is responsible for awaiting and collecting errors. + */ + public List> runAll(List sqls) { + if (closed) { + throw new IllegalStateException("Cannot dispatch to a closed HiveDriverPool"); + } + List> futures = new ArrayList<>(sqls.size()); + for (int i = 0; i < sqls.size(); i++) { + String sql = sqls.get(i); + Worker worker = workers.get(i % workers.size()); + futures.add(worker.executor.submit(() -> { + long start = System.currentTimeMillis(); + worker.driver.run(sql); + LOG.info("Time taken to execute [{}]: {} ms", sql, System.currentTimeMillis() - start); + return null; + })); + } + return futures; + } + + /** + * Awaits all futures, throws the first exception encountered (logging the rest at + * WARN), and returns the list of CommandProcessorResponse objects (currently + * unused but matches the existing single-threaded contract that returned them). + */ + public List awaitAll(List> futures) { + Exception firstError = null; + for (Future f : futures) { + try { + f.get(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + if (firstError == null) { + firstError = ie; + } + } catch (ExecutionException ee) { + Exception cause = unwrap(ee); + if (firstError == null) { + firstError = cause; + } else { + LOG.warn("Additional SQL batch failed (suppressed in favor of first error)", cause); + } + } + } + if (firstError != null) { + throw new HoodieHiveSyncException("Failed in executing SQL", firstError); + } + return new ArrayList<>(); + } + + private static Exception unwrap(ExecutionException ee) { + Throwable cause = ee.getCause(); + return (cause instanceof Exception) ? (Exception) cause : ee; + } + + public int size() { + return size; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + tearDown(); + } + + private void tearDown() { + // Close each worker's Driver/SessionState on its own thread, then shut the + // executor down. Running close() on the bound thread keeps SessionState's + // thread-local cleanup correct. + for (Worker worker : workers) { + try { + worker.executor.submit(() -> { + if (worker.driver != null) { + try { + worker.driver.close(); + } catch (Exception e) { + LOG.warn("Error closing pooled Driver", e); + } + } + SessionState ss = SessionState.get(); + if (ss != null) { + try { + ss.close(); + } catch (Exception e) { + LOG.warn("Error closing pooled SessionState", e); + } + } + return null; + }).get(30, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.warn("Error during pool worker shutdown", e); + } + worker.executor.shutdown(); + try { + if (!worker.executor.awaitTermination(10, TimeUnit.SECONDS)) { + worker.executor.shutdownNow(); + } + } catch (InterruptedException ie) { + worker.executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + workers.clear(); + } + + /** + * Per-slot state: a single-thread executor and the Driver bound to its thread. + * Driver is volatile because it is written by the bootstrap task and read by + * subsequent dispatch tasks on the same executor. + */ + private static final class Worker { + final ExecutorService executor; + volatile Driver driver; + + Worker(ThreadFactory threadFactory) { + this.executor = Executors.newSingleThreadExecutor(threadFactory); + } + } + + @FunctionalInterface + interface DriverFactory { + Driver newDriver(String databaseName) throws Exception; + } + + /** + * Builds a real Hive {@link Driver} on the calling thread. The SessionState is + * constructed lazily (once, on the first worker thread that builds a Driver) and + * shared across all worker threads — Hive uses ThreadLocal attachment, not + * exclusive ownership, so multiple workers calling + * {@code SessionState.start(sharedState)} all see the same config and scratch dir + * without each spending the cost of building their own SessionState (and risking + * resource-dir creation races during the constructor). + */ + private static final class DefaultDriverFactory implements DriverFactory { + private final HiveConf hiveConf; + private volatile SessionState sharedSessionState; + + DefaultDriverFactory(HiveSyncConfig config) { + this.hiveConf = config.getHiveConf(); + } + + @Override + public synchronized Driver newDriver(String databaseName) throws Exception { + if (sharedSessionState == null) { + sharedSessionState = new SessionState(hiveConf, + UserGroupInformation.getCurrentUser().getShortUserName()); + } + SessionState.start(sharedSessionState); + sharedSessionState.setCurrentDatabase(databaseName); + return new Driver(hiveConf); + } + } + + private static final class PoolThreadFactory implements ThreadFactory { + private static final AtomicInteger POOL_ID = new AtomicInteger(0); + private final AtomicInteger threadId = new AtomicInteger(0); + private final String namePrefix = "hudi-hive-driver-pool-" + POOL_ID.incrementAndGet() + "-"; + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, namePrefix + threadId.incrementAndGet()); + t.setDaemon(true); + return t; + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java new file mode 100644 index 0000000000000..930fb705cbbda --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Pool of {@link IMetaStoreClient} instances for parallel partition sync. + * + *

Each pooled client wraps an independent Thrift connection to the Hive Metastore. + * Callers borrow a client via {@link #run(ClientAction)}, which blocks until a client + * is available, executes the action, and returns the client to the pool. A worker + * thread pool of the same size is exposed via {@link #executor()} so callers can fan + * out their batches to match the number of available clients. + * + *

Usage contract: pool clients must be used only for partition-row + * operations — {@code add_partitions}, {@code alter_partitions}, {@code dropPartition}, + * {@code getPartition}. Table-row operations ({@code createTable}, {@code alter_table}, + * {@code getTable} used as the read half of a read-modify-write of table parameters) + * must continue to go through the session client held by + * {@code HoodieHiveSyncClient.client} on the sync driver thread. Mixing the two would + * risk lost updates on table parameters such as the last-commit-time-synced marker. + * + *

The pool is gated behind {@code hoodie.datasource.hive_sync.batching.enabled} and + * is constructed only for sync mode HMS. + */ +public class IMetaStoreClientPool implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(IMetaStoreClientPool.class); + + private final ArrayBlockingQueue available; + private final List all; + private final ExecutorService executor; + private final int size; + private volatile boolean closed; + + public IMetaStoreClientPool(HiveSyncConfig config, int size) { + this(buildClients(config, size), size); + } + + // Package-private for tests: accepts a pre-built list of clients so we can + // exercise borrow/return/close semantics without a live metastore. + IMetaStoreClientPool(List clients, int size) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + if (clients.size() != size) { + throw new IllegalArgumentException("Expected " + size + " clients, got " + clients.size()); + } + this.size = size; + this.available = new ArrayBlockingQueue<>(size); + this.all = new ArrayList<>(clients); + this.available.addAll(clients); + this.executor = Executors.newFixedThreadPool(size, new PoolThreadFactory()); + LOG.info("Initialized IMetaStoreClient pool with {} clients", size); + } + + private static List buildClients(HiveSyncConfig config, int size) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + HiveConf hiveConf = config.getHiveConf(); + List clients = new ArrayList<>(size); + try { + for (int i = 0; i < size; i++) { + clients.add(newClient(hiveConf)); + } + return clients; + } catch (Exception e) { + // Construction failed mid-way; close any clients we already built before + // surfacing the error so we don't leak Thrift sockets. + for (IMetaStoreClient c : clients) { + try { + c.close(); + } catch (Exception ignore) { + // intentional: best-effort cleanup during failure + } + } + throw new HoodieException("Failed to construct IMetaStoreClient pool of size " + size, e); + } + } + + private static IMetaStoreClient newClient(HiveConf hiveConf) { + try { + // RetryingMetaStoreClient.getProxy returns an independent IMetaStoreClient + // (one Thrift socket per call), bypassing the Hive thread-local cache that + // Hive.get(conf) would use. This is what gives us N truly independent clients. + return RetryingMetaStoreClient.getProxy(hiveConf, true); + } catch (Exception e) { + throw new HoodieException("Failed to create IMetaStoreClient for pool", e); + } + } + + /** + * Borrows a client, runs the action, and returns the client to the pool. + * Blocks if all clients are in use until one becomes available. + */ + public T run(ClientAction action) throws Exception { + if (closed) { + throw new IllegalStateException("Cannot borrow from a closed IMetaStoreClient pool"); + } + IMetaStoreClient client = available.take(); + try { + return action.apply(client); + } finally { + // Always return the client to the pool, even on failure. Thrift clients + // recover transparently from transactional errors at the HMS layer; + // RetryingMetaStoreClient handles transient socket failures internally. + if (!closed) { + available.offer(client); + } + } + } + + /** + * Worker thread pool sized to match the client pool. Use this to fan out + * batches so the number of in-flight Thrift calls cannot exceed the + * number of pooled clients. + */ + public ExecutorService executor() { + return executor; + } + + public int size() { + return size; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + executor.shutdown(); + try { + if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ie) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + closeQuietly(); + } + + private void closeQuietly() { + for (IMetaStoreClient client : all) { + try { + client.close(); + } catch (Exception e) { + LOG.warn("Error closing pooled IMetaStoreClient", e); + } + } + available.clear(); + all.clear(); + } + + @FunctionalInterface + public interface ClientAction { + T apply(IMetaStoreClient client) throws Exception; + } + + private static final class PoolThreadFactory implements ThreadFactory { + private static final AtomicInteger POOL_ID = new AtomicInteger(0); + private final AtomicInteger threadId = new AtomicInteger(0); + private final String namePrefix = "hudi-hive-sync-pool-" + POOL_ID.incrementAndGet() + "-"; + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, namePrefix + threadId.incrementAndGet()); + t.setDaemon(true); + return t; + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/JDBCConnectionPool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/JDBCConnectionPool.java new file mode 100644 index 0000000000000..5874e00f00a56 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/JDBCConnectionPool.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER; + +/** + * Pool of JDBC {@link Connection} instances for parallel HiveQL DDL via JDBC mode. + * + *

JDBC {@code Connection} is not thread-safe but is cheap to construct (one TCP + * socket to HiveServer2). Connections are opened eagerly at pool construction. The + * pool exposes {@link #runOnEachConnection(java.util.List)} for setup statements + * that must run on every connection before any partition fan-out (e.g. + * {@code USE database}), which {@code JDBCExecutor} uses to handle the Hive 2.x + * quirk where {@code ALTER PARTITION ... SET LOCATION} ignores {@code db.tbl} + * qualifiers and silently uses the connection's current database. + * + *

Usage contract: use this pool only for partition-row DDL statements + * that are independent of each other and freely shuffleable across workers. + * Table-level statements (createTable, schema evolution, schema reads) must + * continue to run on the session connection held by {@link + * org.apache.hudi.hive.ddl.JDBCExecutor} on the sync driver thread, and the + * shared session connection is also what {@link + * org.apache.hudi.hive.ddl.JDBCBasedMetadataOperator} uses for its read/write of + * table parameters. + * + *

Gated behind {@code hoodie.datasource.hive_sync.batching.enabled} and + * constructed only for JDBC sync mode. + */ +public class JDBCConnectionPool implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(JDBCConnectionPool.class); + + private final ArrayBlockingQueue available; + private final List all; + private final ExecutorService executor; + private final int size; + private volatile boolean closed; + + public JDBCConnectionPool(HiveSyncConfig config, int size) { + this(buildConnections(config, size), size); + } + + // Package-private for tests: accepts a pre-built list of connections so we can + // exercise borrow/return/close semantics without a live HiveServer2. + JDBCConnectionPool(List connections, int size) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + if (connections.size() != size) { + throw new IllegalArgumentException("Expected " + size + " connections, got " + connections.size()); + } + this.size = size; + this.available = new ArrayBlockingQueue<>(size); + this.all = new ArrayList<>(connections); + this.available.addAll(connections); + this.executor = Executors.newFixedThreadPool(size, new PoolThreadFactory()); + LOG.info("Initialized JDBCConnectionPool with {} connections", size); + } + + private static List buildConnections(HiveSyncConfig config, int size) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + String jdbcUrl = config.getStringOrDefault(HIVE_URL); + String user = config.getStringOrDefault(HIVE_USER); + String pass = config.getStringOrDefault(HIVE_PASS); + try { + // Defensive: the Hive JDBC driver is normally already loaded by JDBCExecutor; + // load it here too so the pool can be constructed in isolation. + Class.forName("org.apache.hive.jdbc.HiveDriver"); + } catch (ClassNotFoundException e) { + throw new HoodieException("Hive JDBC driver class not found on classpath", e); + } + List connections = new ArrayList<>(size); + try { + for (int i = 0; i < size; i++) { + connections.add(DriverManager.getConnection(jdbcUrl, user, pass)); + } + return connections; + } catch (SQLException e) { + // Construction failed mid-way; close any connections we already built before + // surfacing the error so we don't leak sockets. + for (Connection c : connections) { + try { + c.close(); + } catch (SQLException ignore) { + // intentional: best-effort cleanup during failure + } + } + throw new HoodieException("Failed to construct JDBCConnectionPool of size " + size, e); + } + } + + /** + * Runs each given SQL on every pooled connection, in order. Used for + * setup statements (e.g. {@code USE database}) that must establish per- + * connection session context before any partition fan-out. Blocks until all + * connections complete. Throws on first error. + */ + public void runOnEachConnection(List setupSqls) throws Exception { + if (closed) { + throw new IllegalStateException("Cannot dispatch to a closed JDBCConnectionPool"); + } + if (setupSqls.isEmpty()) { + return; + } + List> futures = new ArrayList<>(all.size()); + for (Connection conn : all) { + futures.add(executor.submit(() -> { + for (String sql : setupSqls) { + try (Statement stmt = conn.createStatement()) { + stmt.execute(sql); + } + } + return null; + })); + } + Exception firstError = null; + for (Future f : futures) { + try { + f.get(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + if (firstError == null) { + firstError = ie; + } + } catch (java.util.concurrent.ExecutionException ee) { + Throwable cause = ee.getCause(); + if (firstError == null) { + firstError = (cause instanceof Exception) ? (Exception) cause : ee; + } else { + LOG.warn("Additional setup SQL failed (suppressed in favor of first error)", cause); + } + } + } + if (firstError != null) { + throw firstError; + } + } + + /** + * Borrows a connection, runs the action, and returns the connection to the pool. + * Blocks if all connections are in use until one becomes available. + */ + public T run(ConnectionAction action) throws Exception { + if (closed) { + throw new IllegalStateException("Cannot borrow from a closed JDBCConnectionPool"); + } + Connection conn = available.take(); + try { + return action.apply(conn); + } finally { + if (!closed) { + available.offer(conn); + } + } + } + + /** + * Worker thread pool sized to match the connection pool. Use this to fan out + * batches so the number of in-flight statements cannot exceed the number of + * pooled connections. + */ + public ExecutorService executor() { + return executor; + } + + public int size() { + return size; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + executor.shutdown(); + try { + if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ie) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + closeQuietly(); + } + + private void closeQuietly() { + for (Connection conn : all) { + try { + conn.close(); + } catch (SQLException e) { + LOG.warn("Error closing pooled JDBC Connection", e); + } + } + available.clear(); + all.clear(); + } + + @FunctionalInterface + public interface ConnectionAction { + T apply(Connection conn) throws Exception; + } + + private static final class PoolThreadFactory implements ThreadFactory { + private static final AtomicInteger POOL_ID = new AtomicInteger(0); + private final AtomicInteger threadId = new AtomicInteger(0); + private final String namePrefix = "hudi-hive-jdbc-pool-" + POOL_ID.incrementAndGet() + "-"; + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, namePrefix + threadId.incrementAndGet()); + t.setDaemon(true); + return t; + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 6d971bcc52263..51cb638c2ca48 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -96,9 +96,12 @@ import static org.apache.hudi.hive.HiveSyncConfig.HIVE_SYNC_FILTER_PUSHDOWN_ENABLED; import static org.apache.hudi.hive.HiveSyncConfig.RECREATE_HIVE_TABLE_ON_ERROR; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_IGNORE_EXCEPTIONS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_AS_DATA_SOURCE_TABLE; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_ENABLED; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_THREADS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_COMMENT; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_TABLE_STRATEGY; @@ -329,6 +332,134 @@ public void testDropUpperCasePartitionWithHMS() throws Exception { "Table partitions should match the number of partitions we wrote"); } + /** + * Exercises HMS sync with parallel partition batching enabled. Forces multiple + * batches with a small batch_num against a partition set large enough to fan out + * across multiple pool clients, and asserts the resulting partition set matches. + */ + @Test + public void testHMSSyncWithBatchingEnabled() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HMS.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "3"); + // Small batch_num so we get multiple batches dispatched in parallel. + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3"); + + int partitionCount = 10; + HiveTestUtil.createCOWTable("100", partitionCount, true); + + reInitHiveSyncClient(); + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), + "Table should not exist before initial sync"); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "All partitions should be added under parallel batching"); + + // Add more partitions, then sync again to exercise the parallel update path. + HiveTestUtil.addCOWPartition("2050/01/01", true, true, "101"); + HiveTestUtil.addCOWPartition("2050/01/02", true, true, "102"); + HiveTestUtil.addCOWPartition("2050/01/03", true, true, "103"); + HiveTestUtil.addCOWPartition("2050/01/04", true, true, "104"); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount + 4, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "Incremental add via parallel batching should sync the new partitions"); + } + + /** + * Exercises HiveQL sync with parallel partition batching enabled. Mirrors the + * HMS test but routes through the HiveDriverPool — each worker thread owns a + * Driver+SessionState pair, and the SQL list (qualified with `db`.`tbl`) is + * fanned out across them. + */ + @Test + public void testHiveQLSyncWithBatchingEnabled() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HIVEQL.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "3"); + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3"); + + int partitionCount = 10; + HiveTestUtil.createCOWTable("100", partitionCount, true); + + reInitHiveSyncClient(); + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), + "Table should not exist before initial sync"); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "All partitions should be added under parallel HiveQL batching"); + + // Add more partitions, then sync again to exercise the parallel update path. + HiveTestUtil.addCOWPartition("2050/01/01", true, true, "101"); + HiveTestUtil.addCOWPartition("2050/01/02", true, true, "102"); + HiveTestUtil.addCOWPartition("2050/01/03", true, true, "103"); + HiveTestUtil.addCOWPartition("2050/01/04", true, true, "104"); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount + 4, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "Incremental add via parallel HiveQL batching should sync the new partitions"); + } + + /** + * Exercises JDBC sync with parallel partition batching enabled. JDBC's pool + * holds N pre-opened HiveServer2 connections, each with `USE database` pinned + * at bootstrap. The SQL list's leading `USE` is stripped at dispatch time. + */ + @Test + public void testJDBCSyncWithBatchingEnabled() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.JDBC.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "3"); + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3"); + + int partitionCount = 10; + HiveTestUtil.createCOWTable("100", partitionCount, true); + + reInitHiveSyncClient(); + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), + "Table should not exist before initial sync"); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "All partitions should be added under parallel JDBC batching"); + + // Add more partitions, then sync again to exercise the parallel update path. + HiveTestUtil.addCOWPartition("2050/01/01", true, true, "101"); + HiveTestUtil.addCOWPartition("2050/01/02", true, true, "102"); + HiveTestUtil.addCOWPartition("2050/01/03", true, true, "103"); + HiveTestUtil.addCOWPartition("2050/01/04", true, true, "104"); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount + 4, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "Incremental add via parallel JDBC batching should sync the new partitions"); + } + + /** + * Exercises the TOUCH path in HiveQL mode with batching on. Verifies that + * splitting one giant ALTER TABLE TOUCH PARTITION(...)... into multiple smaller + * statements does not break partition visibility downstream. + */ + @Test + public void testHiveQLTouchPartitionsWithBatching() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HIVEQL.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "2"); + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "2"); + hiveSyncProps.setProperty(META_SYNC_TOUCH_PARTITIONS_ENABLED.key(), "true"); + + int partitionCount = 6; + HiveTestUtil.createCOWTable("100", partitionCount, true); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size()); + + // Touch existing partitions (no new data) — should hit the batched TOUCH path + // without changing the partition count. + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "TOUCH batching must not change the partition set"); + } + @ParameterizedTest @MethodSource({"syncModeAndSchemaFromCommitMetadata"}) public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode, String enablePushDown) throws Exception { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java new file mode 100644 index 0000000000000..daa98e5ebaec2 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HoodieHiveSyncException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.junit.jupiter.api.Test; +import org.mockito.invocation.InvocationOnMock; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Unit tests for {@link HiveDriverPool} that exercise bootstrap, dispatch, error + * propagation, and close semantics without standing up a real Hive instance. + */ +class TestHiveDriverPool { + + private static HiveSyncConfig configWithEmptyHiveConf() { + HiveSyncConfig config = mock(HiveSyncConfig.class); + doAnswer(inv -> new HiveConf()).when(config).getHiveConf(); + doAnswer(inv -> "default").when(config).getStringOrDefault( + org.mockito.ArgumentMatchers.any()); + return config; + } + + @Test + void bootstrapBuildsOneDriverPerSlot() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + AtomicInteger built = new AtomicInteger(); + HiveDriverPool.DriverFactory factory = (db) -> { + built.incrementAndGet(); + return mock(Driver.class); + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 3, factory)) { + assertEquals(3, pool.size()); + assertEquals(3, built.get(), "One Driver per slot should be constructed eagerly"); + } + } + + @Test + void bootstrapFailurePropagatesAndTearsDown() { + HiveSyncConfig config = configWithEmptyHiveConf(); + AtomicInteger calls = new AtomicInteger(); + HiveDriverPool.DriverFactory factory = (db) -> { + int n = calls.incrementAndGet(); + if (n == 2) { + throw new RuntimeException("simulated driver build failure"); + } + return mock(Driver.class); + }; + HoodieException ex = assertThrows(HoodieException.class, + () -> new HiveDriverPool(config, 3, factory)); + assertTrue(ex.getMessage().contains("Failed to construct HiveDriverPool")); + } + + @Test + void runAllDispatchesEachSqlAcrossWorkers() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + // Each worker counts how many SQLs it received and remembers the thread. + ConcurrentHashMap> seenThreadsByDriver = new ConcurrentHashMap<>(); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + seenThreadsByDriver.put(d, ConcurrentHashMap.newKeySet()); + doAnswer((InvocationOnMock inv) -> { + seenThreadsByDriver.get(d).add(Thread.currentThread().getName()); + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 2, factory)) { + List sqls = Arrays.asList("SELECT 1", "SELECT 2", "SELECT 3", "SELECT 4"); + List> futures = pool.runAll(sqls); + pool.awaitAll(futures); + assertEquals(2, seenThreadsByDriver.size(), "Expected exactly 2 worker Drivers"); + int totalCalls = seenThreadsByDriver.values().stream().mapToInt(Set::size).sum(); + assertTrue(totalCalls >= 1, "At least one worker should have logged a thread"); + // Each Driver should have been invoked exactly twice (round-robin with 4 sqls, 2 workers). + for (Driver d : seenThreadsByDriver.keySet()) { + verify(d, times(2)).run(anyString()); + } + } + } + + @Test + void awaitAllThrowsFirstError() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + doAnswer(inv -> { + String sql = inv.getArgument(0); + if (sql.equals("FAIL")) { + throw new RuntimeException("boom: " + sql); + } + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 2, factory)) { + List> futures = pool.runAll(Arrays.asList("OK", "FAIL", "OK")); + HoodieHiveSyncException ex = assertThrows(HoodieHiveSyncException.class, + () -> pool.awaitAll(futures)); + assertTrue(ex.getCause() != null && ex.getCause().getMessage().contains("boom")); + } + } + + @Test + void concurrentDispatchBoundedByPoolSize() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + AtomicInteger inFlight = new AtomicInteger(); + AtomicInteger maxInFlight = new AtomicInteger(); + CountDownLatch hold = new CountDownLatch(1); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + doAnswer(inv -> { + int now = inFlight.incrementAndGet(); + maxInFlight.updateAndGet(prev -> Math.max(prev, now)); + hold.await(2, TimeUnit.SECONDS); + inFlight.decrementAndGet(); + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 2, factory)) { + // 5 SQLs against pool of size 2 → max in-flight should be 2. + List> futures = pool.runAll(Arrays.asList("a", "b", "c", "d", "e")); + // Release after a short wait so all SQLs progress. + Thread.sleep(150); + hold.countDown(); + pool.awaitAll(futures); + assertTrue(maxInFlight.get() <= 2, + "Max concurrent dispatches must not exceed pool size, observed " + maxInFlight.get()); + assertTrue(maxInFlight.get() >= 1, "Sanity: at least one dispatch ran"); + } + } + + @Test + void closeIsIdempotentAndPreventsFurtherDispatch() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + HiveDriverPool.DriverFactory factory = (db) -> mock(Driver.class); + HiveDriverPool pool = new HiveDriverPool(config, 2, factory); + pool.close(); + pool.close(); + assertThrows(IllegalStateException.class, + () -> pool.runAll(Arrays.asList("anything"))); + } + + @Test + void invalidSizeRejected() { + HiveSyncConfig config = configWithEmptyHiveConf(); + HiveDriverPool.DriverFactory factory = (db) -> mock(Driver.class); + assertThrows(IllegalArgumentException.class, + () -> new HiveDriverPool(config, 0, factory)); + } +} diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestIMetaStoreClientPool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestIMetaStoreClientPool.java new file mode 100644 index 0000000000000..3c9d4fe899cfd --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestIMetaStoreClientPool.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * Unit tests for {@link IMetaStoreClientPool} borrow/return/close semantics. These tests + * never hit a real Hive Metastore — clients are mocked and injected via the package- + * private constructor. + */ +class TestIMetaStoreClientPool { + + private static List mockClients(int n) { + List clients = new ArrayList<>(n); + for (int i = 0; i < n; i++) { + clients.add(mock(IMetaStoreClient.class)); + } + return clients; + } + + @Test + void runReturnsClientToPoolOnSuccess() throws Exception { + List clients = mockClients(2); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, 2); + try { + IMetaStoreClient borrowed = pool.run(c -> c); + // Both clients should be available again + Set seen = new HashSet<>(); + seen.add(pool.run(c -> c)); + seen.add(pool.run(c -> c)); + assertEquals(2, seen.size(), "Both clients should be reachable after returns"); + assertTrue(seen.contains(borrowed)); + } finally { + pool.close(); + } + } + + @Test + void runReturnsClientToPoolOnFailure() { + List clients = mockClients(1); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, 1); + try { + assertThrows(IllegalStateException.class, () -> pool.run(c -> { + throw new IllegalStateException("boom"); + })); + // Pool should still be usable; the borrowed client was returned + try { + IMetaStoreClient again = pool.run(c -> c); + assertSame(clients.get(0), again); + } catch (Exception e) { + throw new AssertionError("Pool should still be usable after action failure", e); + } + } finally { + pool.close(); + } + } + + @Test + void concurrentBorrowsBlockUntilReturned() throws Exception { + int poolSize = 2; + int callers = 4; + List clients = mockClients(poolSize); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, poolSize); + AtomicInteger concurrent = new AtomicInteger(0); + AtomicInteger maxConcurrent = new AtomicInteger(0); + CountDownLatch start = new CountDownLatch(1); + ExecutorService threads = Executors.newFixedThreadPool(callers); + try { + List> futures = new ArrayList<>(); + for (int i = 0; i < callers; i++) { + futures.add(threads.submit(() -> { + start.await(); + return pool.run(c -> { + int now = concurrent.incrementAndGet(); + maxConcurrent.updateAndGet(prev -> Math.max(prev, now)); + Thread.sleep(50); + concurrent.decrementAndGet(); + return null; + }); + })); + } + start.countDown(); + for (java.util.concurrent.Future f : futures) { + f.get(10, TimeUnit.SECONDS); + } + assertTrue(maxConcurrent.get() <= poolSize, + "Concurrent borrows must not exceed pool size, observed " + maxConcurrent.get()); + assertTrue(maxConcurrent.get() >= 1, "At least one borrow must have occurred"); + } finally { + threads.shutdownNow(); + pool.close(); + } + } + + @Test + void closeReleasesAllClients() throws Exception { + int poolSize = 3; + List clients = mockClients(poolSize); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, poolSize); + pool.close(); + for (IMetaStoreClient c : clients) { + verify(c).close(); + } + // Subsequent borrow should fail + assertThrows(IllegalStateException.class, () -> pool.run(c -> c)); + } + + @Test + void closeIsIdempotent() throws Exception { + int poolSize = 2; + List clients = mockClients(poolSize); + IMetaStoreClientPool pool = new IMetaStoreClientPool(clients, poolSize); + pool.close(); + pool.close(); + // verify still passes — close() called only once per client + for (IMetaStoreClient c : clients) { + verify(c).close(); + } + } + + @Test + void invalidSizeRejected() { + assertThrows(IllegalArgumentException.class, + () -> new IMetaStoreClientPool(mockClients(0), 0)); + } + + @Test + void sizeMismatchRejected() { + assertThrows(IllegalArgumentException.class, + () -> new IMetaStoreClientPool(mockClients(2), 3)); + } + + @Test + void executorSizedToPool() { + int poolSize = 4; + IMetaStoreClientPool pool = new IMetaStoreClientPool(mockClients(poolSize), poolSize); + try { + assertFalse(pool.executor().isShutdown()); + assertEquals(poolSize, pool.size()); + } finally { + pool.close(); + } + assertTrue(pool.executor().isShutdown()); + } +} diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestJDBCConnectionPool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestJDBCConnectionPool.java new file mode 100644 index 0000000000000..9178923057b34 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestJDBCConnectionPool.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.junit.jupiter.api.Test; + +import java.sql.Connection; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * Unit tests for {@link JDBCConnectionPool} borrow/return/close semantics. These + * tests never hit a real HiveServer2 — connections are mocked and injected via + * the package-private constructor. + */ +class TestJDBCConnectionPool { + + private static List mockConnections(int n) { + List conns = new ArrayList<>(n); + for (int i = 0; i < n; i++) { + conns.add(mock(Connection.class)); + } + return conns; + } + + @Test + void runReturnsConnectionToPoolOnSuccess() throws Exception { + List conns = mockConnections(2); + JDBCConnectionPool pool = new JDBCConnectionPool(conns, 2); + try { + Connection borrowed = pool.run(c -> c); + Set seen = new HashSet<>(); + seen.add(pool.run(c -> c)); + seen.add(pool.run(c -> c)); + assertEquals(2, seen.size(), "Both connections should be reachable after returns"); + assertTrue(seen.contains(borrowed)); + } finally { + pool.close(); + } + } + + @Test + void runReturnsConnectionToPoolOnFailure() { + List conns = mockConnections(1); + JDBCConnectionPool pool = new JDBCConnectionPool(conns, 1); + try { + assertThrows(IllegalStateException.class, () -> pool.run(c -> { + throw new IllegalStateException("boom"); + })); + try { + Connection again = pool.run(c -> c); + assertSame(conns.get(0), again); + } catch (Exception e) { + throw new AssertionError("Pool should still be usable after action failure", e); + } + } finally { + pool.close(); + } + } + + @Test + void concurrentBorrowsBlockUntilReturned() throws Exception { + int poolSize = 2; + int callers = 4; + List conns = mockConnections(poolSize); + JDBCConnectionPool pool = new JDBCConnectionPool(conns, poolSize); + AtomicInteger concurrent = new AtomicInteger(0); + AtomicInteger maxConcurrent = new AtomicInteger(0); + CountDownLatch start = new CountDownLatch(1); + ExecutorService threads = Executors.newFixedThreadPool(callers); + try { + List> futures = new ArrayList<>(); + for (int i = 0; i < callers; i++) { + futures.add(threads.submit(() -> { + start.await(); + return pool.run(c -> { + int now = concurrent.incrementAndGet(); + maxConcurrent.updateAndGet(prev -> Math.max(prev, now)); + Thread.sleep(50); + concurrent.decrementAndGet(); + return null; + }); + })); + } + start.countDown(); + for (java.util.concurrent.Future f : futures) { + f.get(10, TimeUnit.SECONDS); + } + assertTrue(maxConcurrent.get() <= poolSize, + "Concurrent borrows must not exceed pool size, observed " + maxConcurrent.get()); + assertTrue(maxConcurrent.get() >= 1, "At least one borrow must have occurred"); + } finally { + threads.shutdownNow(); + pool.close(); + } + } + + @Test + void closeReleasesAllConnections() throws Exception { + int poolSize = 3; + List conns = mockConnections(poolSize); + JDBCConnectionPool pool = new JDBCConnectionPool(conns, poolSize); + pool.close(); + for (Connection c : conns) { + verify(c).close(); + } + assertThrows(IllegalStateException.class, () -> pool.run(c -> c)); + } + + @Test + void closeIsIdempotent() throws Exception { + int poolSize = 2; + List conns = mockConnections(poolSize); + JDBCConnectionPool pool = new JDBCConnectionPool(conns, poolSize); + pool.close(); + pool.close(); + for (Connection c : conns) { + verify(c).close(); + } + } + + @Test + void invalidSizeRejected() { + assertThrows(IllegalArgumentException.class, + () -> new JDBCConnectionPool(mockConnections(0), 0)); + } + + @Test + void sizeMismatchRejected() { + assertThrows(IllegalArgumentException.class, + () -> new JDBCConnectionPool(mockConnections(2), 3)); + } + + @Test + void executorSizedToPool() { + int poolSize = 4; + JDBCConnectionPool pool = new JDBCConnectionPool(mockConnections(poolSize), poolSize); + try { + assertFalse(pool.executor().isShutdown()); + assertEquals(poolSize, pool.size()); + } finally { + pool.close(); + } + assertTrue(pool.executor().isShutdown()); + } +}