feat(hive-sync): batch and parallelize HiveQL partition operations#18984
feat(hive-sync): batch and parallelize HiveQL partition operations#18984nsivabalan wants to merge 4 commits into
Conversation
Hive sync partition operations on HMS today serialize through a single IMetaStoreClient and ship entire partition lists in a single Thrift call for TOUCH/UPDATE. For large tables (~2k partitions) this is ~5-9x slower than parallel implementations (see apache#18331). The biggest contributors are (1) one giant alter_partitions call for UPDATE/TOUCH, and (2) per- partition Thrift round-trips for DROP, all sequential. This change introduces an opt-in IMetaStoreClientPool gated behind hoodie.datasource.hive_sync.batching.enabled (default false). When on, HMSDDLExecutor splits ADD / UPDATE / TOUCH / DROP into batches of hoodie.datasource.hive_sync.batch_num (existing config, default 1000) and fans them out across a pool of RetryingMetaStoreClient instances sized by hoodie.datasource.hive_sync.batching.threads (default 4). Design invariant: only partition-row operations go through the pool. Table-row operations (createTable, alter_table, last-commit-time-synced, writer-version, table-comments) stay on the existing session client, so there is no lost-update risk on table parameters. The sync flow remains serial-parallel-serial (phase 1: table setup, phase 2: parallel partition fan-out, phase 3: table finalization). Sequential fallback is preserved when the flag is off or when HIVE_SYNC_USE_SPARK_CATALOG is on (incompatible with the pool's direct RetryingMetaStoreClient.getProxy path). Tests: TestIMetaStoreClientPool covers borrow/return, concurrent borrows, close idempotency. TestHiveSyncTool.testHMSSyncWithBatchingEnabled exercises end-to-end sync against the embedded HMS with batching on. Related: apache#18331 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Follow-up to apache#18983 (HMS parallelism). Applies the equivalent treatment to the HiveQL sync mode (hoodie.datasource.hive_sync.mode=hiveql). HiveQL had two issues that this change addresses: 1. Batching gaps in QueryBasedDDLExecutor.constructPartitionAlterStatements: TOUCH concatenated every partition into one giant ALTER TABLE ... TOUCH PARTITION (...) PARTITION (...) ... statement; SET_LOCATION (UPDATE) emitted one statement per partition. ADD was already batched. 2. Sequential SQL execution in HiveQueryDDLExecutor.updateHiveSQLs: even when batches existed, they ran in a single for-loop on one Hive Driver. This change introduces HiveDriverPool, an eager pool of single-thread executors each owning a Hive Driver bound to a shared SessionState. Gated behind the existing hoodie.datasource.hive_sync.batching.enabled flag (default off) and sized by hoodie.datasource.hive_sync.batching.threads (default 4) — no new configs. Design notes: - Hive's Driver and SessionState are thread-bound. SessionState.start() attaches to the calling thread's ThreadLocal. The pool gives each slot its own dedicated worker thread so the Driver stays valid for that thread's lifetime. Bootstrap, dispatch, and close all run on the bound thread. - SessionState is shared across workers (lazily constructed once), because each worker calls SessionState.start(sharedState) on its own thread to attach. Constructing one SessionState per worker triggered race conditions in Hive's resource-directory machinery on macOS. - TOUCH is now batched by HIVE_BATCH_SYNC_PARTITION_NUM. SET_LOCATION remains one statement per partition (Hive SQL doesn't support multi-partition SET LOCATION) but is now fanned out across workers. - Hive 2.x's ALTER PARTITION SET LOCATION ignores db.table qualifiers and silently uses the connection's current database, so the leading USE database statement is load-bearing. The pool peels it off and runs it on every worker via runOnEachWorker() before fanning the rest out. Tests: - TestHiveDriverPool: bootstrap, dispatch round-robin, error propagation, concurrent-borrow bounding, close idempotency. - TestHiveSyncTool.testHiveQLSyncWithBatchingEnabled: end-to-end with batching.enabled=true, threads=3, batch_num=3 against embedded HMS. - TestHiveSyncTool.testHiveQLTouchPartitionsWithBatching: exercises the batched TOUCH path specifically. - Full hudi-hive-sync suite: 305 passed, 0 failures, 0 errors. Related: apache#18331 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…omments - Change HiveDriverPool.awaitAll(...) to return void. The List<CommandProcessorResponse> it previously returned was always empty and no caller consumed it. Drops the unused CommandProcessorResponse import. - Lift the empty-input short-circuit to the top of HiveQueryDDLExecutor.runSQLs so the no-op case skips both the pool and the session Driver branches cleanly. - Document isUseStatement's strict 4-char prefix expectation so future callers don't feed it externally produced (potentially padded) SQL. No behavior change. Full hudi-hive-sync suite: 305 tests, 0 failures, 0 errors. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| this(config, metaStoreClient, null); | ||
| } | ||
|
|
||
| public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient, |
There was a problem hiding this comment.
can we make the last arg Option instead of HiveDriverPool.
also the instance var in the class.
| // Optional. When non-null, partition-row operations fan out across this pool; | ||
| // table-row operations always use the session `client` field above. See | ||
| // IMetaStoreClientPool javadoc for the usage contract. | ||
| private final IMetaStoreClientPool partitionClientPool; |
There was a problem hiding this comment.
can we make this Option ? also the last arg in the constructor.
There was a problem hiding this comment.
and can we name the variable iMetaStoreClientPool to align w/ the class being used
| sharedSessionState = new SessionState(hiveConf, | ||
| UserGroupInformation.getCurrentUser().getShortUserName()); | ||
| } | ||
| SessionState.start(sharedSessionState); |
There was a problem hiding this comment.
why do we need to reinstantiate here for every new driver.
the db cannot change across partitions right.
am I missing anything
PR review follow-ups for apache#18984: - HiveQueryDDLExecutor.driverPool -> Option<HiveDriverPool> (PR comment). Constructor arg, instance field, and HoodieHiveSyncClient call sites updated. Eliminates a stale 'Optional. When non-null' inline doc. - DefaultDriverFactory: stop redundantly calling setCurrentDatabase on every newDriver(). Database is a pool-wide property that never changes across workers, so set it once when the shared SessionState is first constructed (PR comment). - HiveDriverPool.awaitAll: on first failure, cancel remaining (not yet started) pending futures so workers don't keep running pointless work after a fatal error. Cancel uses mayInterruptIfRunning=false so any in-flight statement is allowed to run to completion (keeps Driver state consistent). Suppressed errors continue to be logged at WARN. Adds handling for CancellationException so the cancel-walk doesn't itself raise a spurious HoodieHiveSyncException. - HiveDriverPool bootstrap: bound each Future.get() at 60s (BOOTSTRAP_TIMEOUT_SECONDS). Prior code blocked forever if Hive init hung — now we surface a HoodieException with a timeout cause. - Logging: stop logging full SQL text per-statement in runAll/awaitAll (batched TOUCH/ADD can be many KB; N workers multiply log volume). Replaced with a single per-call summary line. Same treatment applied to HiveQueryDDLExecutor.updateHiveSQLs (sequential path). - New unit test: runOnEachWorkerRunsSetupOnEveryWorker — asserts every worker sees the leading USE before any fan-out partition statement. - New unit test: awaitAllCancelsPendingFuturesOnFirstError — uses a size-1 pool to guarantee the 2nd/3rd statements are still pending behind the failing 1st, then asserts they are cancelled. - New end-to-end test: testHiveQLSetLocationWithBatching — drives updatePartitionsToTable through the SET_LOCATION fan-out path with batching on; asserts partition count and per-partition relative paths survive parallel ALTER PARTITION SET LOCATION. Out of scope (documented as follow-up): DROP partition parallelization in HIVEQL mode. DROP goes through IMetaStoreClient.dropPartition (Thrift, not Hive Driver), so it would need IMetaStoreClientPool wired into the HiveQL path — a separable change from the HiveDriverPool work this PR delivers. Tests: full hudi-hive-sync suite passes — 308 tests, 0 failures, 0 errors (was 305 before this commit). New tests: - TestHiveDriverPool: 9 tests (was 7) - TestHiveSyncTool: testHiveQLSetLocationWithBatching added Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18984 +/- ##
============================================
- Coverage 68.26% 67.66% -0.61%
- Complexity 29513 29845 +332
============================================
Files 2542 2564 +22
Lines 142627 145456 +2829
Branches 17788 18370 +582
============================================
+ Hits 97369 98417 +1048
- Misses 37253 38809 +1556
- Partials 8005 8230 +225
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Summary
HiveDriverPoolfor HiveQL partition sync (issue #18331)hoodie.datasource.hive_sync.batch_num(was one giant statement)Driverworkers via dedicated single-thread executorshoodie.datasource.hive_sync.batching.enabled=trueStack:
IMetaStoreClientPool)HiveDriverPool)Until #18983 merges, the diff here includes the HMS commit. Once it lands, this PR rebases cleanly to a HiveQL-only delta. Reviewing the top three commits (
49a70050→4bb34bca→acefd03e) in isolation gives the HiveQL change plus the post-review cleanup.Design constraint
Hive's
DriverandSessionStateare thread-bound —SessionState.start()attaches to the calling thread'sThreadLocal, and a Driver constructed on one thread cannot be safely used from another. This is the opposite ofIMetaStoreClientfrom #18983, which is a Thrift socket we can pool freely.The
HiveDriverPoolgives each slot its own dedicated worker thread (anewSingleThreadExecutor). Bootstrap, dispatch, and close all run on that bound thread. TheSessionStateitself is shared across workers (lazily constructed once) — each worker callsSessionState.start(sharedState)on its own thread to attach to itsThreadLocal. Constructing one SessionState per worker triggered races in Hive's resource-dir machinery during early testing. Confirmed in review that the shared-SessionState model is appropriate for partition-only DDL — see usage contract inHiveDriverPooljavadoc.What this PR fixes
QueryBasedDDLExecutor.constructPartitionAlterStatementswas concatenating every partition into oneALTER TABLE ... TOUCH PARTITION (p1) PARTITION (p2) ...statement. Now split into batches ofHIVE_BATCH_SYNC_PARTITION_NUM.HiveQueryDDLExecutor.updateHiveSQLsran the SQL list in a singleforloop on one Driver. With the pool, each batch is dispatched to a worker (round-robin) and they execute in parallel.SET_LOCATION(UPDATE) remains one statement per partition because Hive SQL doesn't support multi-partitionSET LOCATION. But it now benefits from the parallel fan-out.ALTER TABLE statements (createTable, schema evolution, column-comment updates) continue to run on the session Driver — they're rare and don't benefit from parallelism.
Hive 2.x quirk worth flagging
ALTER PARTITION SET LOCATIONin Hive 2.x ignoresdb.tblqualifiers and silently uses the connection's current database. The leadingUSE databasestatement in the SQL list is therefore load-bearing — when the pool is in use, it peels off any leadingUSEstatements viarunOnEachWorker()and runs them on every worker before fanning the rest out. JDBC mode (which shares one Connection) preserves today's contract whereUSEpersists for subsequent statements on the same Connection.Configs
No new configs — reuses everything from #18983:
hoodie.datasource.hive_sync.batching.enabledfalsehoodie.datasource.hive_sync.batching.threads4hoodie.datasource.hive_sync.batch_num1000Post-review hardening (commits
4bb34bca,acefd03e)HiveDriverPool.awaitAll: cancels remaining pending futures on first error (mayInterruptIfRunning=falseso in-flight Driver statements run to completion).HiveDriverPoolbootstrap: bounded by a 60sBOOTSTRAP_TIMEOUT_SECONDS; prior code blocked forever if Hive init hung.HiveQueryDDLExecutor.driverPoolis nowOption<HiveDriverPool>(per review comment).SessionState.setCurrentDatabase(...)is called once when the shared SessionState is first constructed, not on everynewDriverinvocation.Test plan
mvn compileonhudi-sync/hudi-hive-sync— clean, 0 Checkstyle violations, 0 RAT issuesmvn testonhudi-sync/hudi-hive-sync— 308 tests, 0 failures, 0 errors (was 305 before post-review tests)TestHiveDriverPool— 9 unit tests (bootstrap, dispatch round-robin, error propagation, concurrent-borrow bounding, close idempotency, runOnEachWorker ordering, cancel-on-first-error)TestHiveSyncTool#testHiveQLSyncWithBatchingEnabled— end-to-end HiveQL sync with batching onTestHiveSyncTool#testHiveQLTouchPartitionsWithBatching— exercises the batched TOUCH pathTestHiveSyncTool#testHiveQLSetLocationWithBatching— drives the parallel SET_LOCATION fan-out pathFiles touched (top commits only)
QueryBasedDDLExecutor.java— batch TOUCH; newrunSQLs(List<String>)hook for parallel-friendly subclassesHiveQueryDDLExecutor.java— new constructor acceptingOption<HiveDriverPool>;runSQLspeels offUSEand dispatches via poolHoodieHiveSyncClient.java— build pool for HIVEQL mode (explicit + legacy default)util/HiveDriverPool.java— new, ~310 lines; eager pool of single-thread executors, each owning a Driver bound to a shared SessionStateutil/TestHiveDriverPool.java— new; 9 unit tests with mockedDriverFactoryTestHiveSyncTool.java— 3 new end-to-end test methodsFollow-ups (separate PRs)
Connectionpool, different concerns fromDriver/SessionState.Related: #18331
🤖 Generated with Claude Code