Skip to content

[IMPROVEMENT] Hive Sync partition operations lack batching and parallelism, causing 4x-9x slowdown for large tables #18331

@prashantwason

Description

@prashantwason

Problem

Hive Sync partition operations (TOUCH, UPDATE, DROP) are significantly slower than necessary for tables with large partition counts (~1,500+ partitions per sync). Benchmarking against an equivalent system (HDrone) shows a 4.3x–9.2x slowdown on incremental syncs of the same tables under identical conditions.

Benchmark Results

Scenario Partitions Sequential (Hudi) Parallel (HDrone) Slowdown
Large table incremental ~1,970 1,186s (19.8 min) 208s (3.5 min) 5.7x
Large table incremental ~1,516 1,637s (27.3 min) 178s (3.0 min) 9.2x
Large table incremental ~2,166 892s (14.9 min) 206s (3.4 min) 4.3x
Small table (~5 partitions) ~5 ~4s ~3s ~1x

For small partition counts the difference is negligible, but it becomes severe at scale.

Root Causes

1. No batching for TOUCH/UPDATE/DROP partition operations

The hoodie.datasource.hive_sync.batch_num config only applies to ADD partition operations. All other operations ignore it:

  • TOUCH: constructTouchPartitions() in QueryBasedDDLExecutor concatenates ALL partitions into a single ALTER TABLE ... TOUCH PARTITION(...) PARTITION(...) ... statement. For 2,000 partitions, this produces one massive SQL statement.
  • UPDATE (SET LOCATION): constructChangePartitions() produces one SQL statement per partition but doesn't batch them.
  • HMS mode: registerAlterPartitionEvent() in HMSDDLExecutor sends ALL partitions in a single client.alter_partitions() Thrift call.

Relevant code (QueryBasedDDLExecutor.java):

// TOUCH — no batching, single statement with ALL partitions
private List<String> constructTouchPartitions(String tableName, List<String> partitions) {
    String alterTable = "ALTER TABLE " + tableName + " TOUCH";
    for (String partition : partitions) {
        alterTable += " PARTITION (" + getPartitionClause(partition) + ")";
    }
    // Returns ONE statement containing all partitions
}

// Compare with ADD — properly batched
private List<String> constructAddPartitions(String tableName, List<String> partitions) {
    int batchSyncPartitionNum = config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
    for (int i = 0; i < partitions.size(); i++) {
        // ... append partition ...
        if ((i + 1) % batchSyncPartitionNum == 0) {
            result.add(alterSQL.toString());  // flush batch
            alterSQL = getAlterTablePrefix(tableName);  // start new
        }
    }
}

2. No parallel execution of DDL statements

Even when ADD partitions are batched into multiple SQL statements, they are executed sequentially on a single thread:

// HiveQueryDDLExecutor — sequential loop
private List<CommandProcessorResponse> updateHiveSQLs(List<String> sqls) {
    for (String sql : sqls) {
        hiveDriver.run(sql);  // blocks until complete, then next
    }
}

// JDBCExecutor — sequential loop
public void runSQL(List<String> sqls) {
    sqls.forEach(this::runSQL);  // one at a time
}

The bottleneck is I/O-bound (network round-trips to the metastore RDBMS), not CPU-bound, so parallelism would significantly reduce wall-clock time.

Proposed Fix

1. Add batching to TOUCH/UPDATE/DROP operations

Extend batch_num to apply to all partition operations, not just ADD. For TOUCH, this means splitting one giant ALTER TABLE TOUCH into multiple smaller statements (e.g., 1,000 partitions each).

2. Add parallel DDL execution

Execute batched DDL statements concurrently using a thread pool (default 8 threads), similar to how HDrone processes partition operations.

For thread safety:

  • HiveQueryDDLExecutor: Each worker thread creates its own SessionState + Driver (they are not thread-safe).
  • JDBCExecutor: Each worker thread creates its own JDBC Connection.
  • HMSDDLExecutor: IMetaStoreClient is a single Thrift connection and not thread-safe, so batching is applied sequentially (still helps avoid oversized Thrift requests and timeouts).

3. Feature flag for safe rollout

Gate behind a new config hoodie.datasource.hive_sync.batching.enabled (default false) so existing behavior is completely unchanged unless explicitly opted in.

New configs

Config Default Purpose
hoodie.datasource.hive_sync.batching.enabled false Feature flag to enable batched + parallel partition sync
hoodie.datasource.hive_sync.batching.threads 8 Thread pool size for parallel DDL execution

Files to modify

  • HiveSyncConfigHolder.java — new configs
  • QueryBasedDDLExecutor.java — batch TOUCH/UPDATE, add parallel execution routing
  • HiveQueryDDLExecutor.java — parallel execution with per-thread Hive sessions
  • JDBCExecutor.java — parallel execution with per-thread JDBC connections
  • HMSDDLExecutor.java — sequential batching for alter_partitions

Expected improvement

Based on benchmarks, enabling batching + 8-thread parallelism should bring Hudi's partition sync performance in line with HDrone (~0.10 s/partition vs current ~0.60–1.08 s/partition), reducing sync time for 2,000-partition tables from ~20 minutes to ~3 minutes.

Metadata

Metadata

Assignees

No one assigned

    Labels

    type:devtaskDevelopment tasks and maintenance work

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions