From 46dfe10c107f3b57de941651e616bfeccc1a1d94 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Tue, 16 Jun 2026 14:30:06 +0700 Subject: [PATCH] perf(kafka-connect): avoid per-record TopicPartition allocation in HoodieSinkTask.put Route records through a secondary topic -> partition -> participant index instead of allocating a TopicPartition key per record. The index mirrors transactionParticipants and is maintained in bootstrap, close, and cleanup; the primary TopicPartition-keyed map is unchanged. --- .../apache/hudi/connect/HoodieSinkTask.java | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java index bcf126ca7ddbf..64c099635a4c8 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java @@ -53,6 +53,8 @@ public class HoodieSinkTask extends SinkTask { private final Map transactionCoordinators; private final Map transactionParticipants; + // Secondary index for per-record routing in put(); avoids allocating a TopicPartition key per record. + private final Map> participantsByTopicPartition; private KafkaConnectControlAgent controlKafkaClient; private KafkaConnectConfigs connectConfigs; @@ -62,6 +64,7 @@ public class HoodieSinkTask extends SinkTask { public HoodieSinkTask() { transactionCoordinators = new HashMap<>(); transactionParticipants = new HashMap<>(); + participantsByTopicPartition = new HashMap<>(); } @Override @@ -93,11 +96,9 @@ public void start(Map props) { @Override public void put(Collection records) { for (SinkRecord record : records) { - String topic = record.topic(); - int partition = record.kafkaPartition(); - TopicPartition tp = new TopicPartition(topic, partition); - - TransactionParticipant transactionParticipant = transactionParticipants.get(tp); + Map participantsForTopic = participantsByTopicPartition.get(record.topic()); + TransactionParticipant transactionParticipant = + participantsForTopic == null ? null : participantsForTopic.get(record.kafkaPartition()); if (transactionParticipant != null) { transactionParticipant.buffer(record); } @@ -168,6 +169,13 @@ public void close(Collection partitions) { } } TransactionParticipant worker = transactionParticipants.remove(partition); + Map participantsForTopic = participantsByTopicPartition.get(partition.topic()); + if (participantsForTopic != null) { + participantsForTopic.remove(partition.partition()); + if (participantsForTopic.isEmpty()) { + participantsByTopicPartition.remove(partition.topic()); + } + } if (worker != null) { try { log.debug("Closing data writer due to task start failure."); @@ -194,6 +202,7 @@ private void bootstrap(Collection partitions) { } ConnectTransactionParticipant worker = new ConnectTransactionParticipant(connectConfigs, partition, controlKafkaClient, context); transactionParticipants.put(partition, worker); + participantsByTopicPartition.computeIfAbsent(partition.topic(), k -> new HashMap<>()).put(partition.partition(), worker); worker.start(); } catch (HoodieException exception) { log.error("Fatal error initializing task {} for partition {}", taskId, partition.partition(), exception); @@ -214,6 +223,7 @@ private void cleanup() { } } transactionParticipants.clear(); + participantsByTopicPartition.clear(); transactionCoordinators.forEach((topic, transactionCoordinator) -> transactionCoordinator.stop()); transactionCoordinators.clear(); }