Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class HoodieSinkTask extends SinkTask {

private final Map<TopicPartition, TransactionCoordinator> transactionCoordinators;
private final Map<TopicPartition, TransactionParticipant> transactionParticipants;
// Secondary index for per-record routing in put(); avoids allocating a TopicPartition key per record.
private final Map<String, Map<Integer, TransactionParticipant>> participantsByTopicPartition;
private KafkaConnectControlAgent controlKafkaClient;
private KafkaConnectConfigs connectConfigs;

Expand All @@ -62,6 +64,7 @@ public class HoodieSinkTask extends SinkTask {
public HoodieSinkTask() {
transactionCoordinators = new HashMap<>();
transactionParticipants = new HashMap<>();
participantsByTopicPartition = new HashMap<>();
}

@Override
Expand Down Expand Up @@ -93,11 +96,9 @@ public void start(Map<String, String> props) {
@Override
public void put(Collection<SinkRecord> records) {
for (SinkRecord record : records) {
String topic = record.topic();
int partition = record.kafkaPartition();
TopicPartition tp = new TopicPartition(topic, partition);

TransactionParticipant transactionParticipant = transactionParticipants.get(tp);
Map<Integer, TransactionParticipant> participantsForTopic = participantsByTopicPartition.get(record.topic());
TransactionParticipant transactionParticipant =
participantsForTopic == null ? null : participantsForTopic.get(record.kafkaPartition());
if (transactionParticipant != null) {
transactionParticipant.buffer(record);
}
Expand Down Expand Up @@ -168,6 +169,13 @@ public void close(Collection<TopicPartition> partitions) {
}
}
TransactionParticipant worker = transactionParticipants.remove(partition);
Map<Integer, TransactionParticipant> participantsForTopic = participantsByTopicPartition.get(partition.topic());
if (participantsForTopic != null) {
participantsForTopic.remove(partition.partition());
if (participantsForTopic.isEmpty()) {
participantsByTopicPartition.remove(partition.topic());
}
}
if (worker != null) {
try {
log.debug("Closing data writer due to task start failure.");
Expand All @@ -194,6 +202,7 @@ private void bootstrap(Collection<TopicPartition> partitions) {
}
ConnectTransactionParticipant worker = new ConnectTransactionParticipant(connectConfigs, partition, controlKafkaClient, context);
transactionParticipants.put(partition, worker);
participantsByTopicPartition.computeIfAbsent(partition.topic(), k -> new HashMap<>()).put(partition.partition(), worker);
worker.start();
} catch (HoodieException exception) {
log.error("Fatal error initializing task {} for partition {}", taskId, partition.partition(), exception);
Expand All @@ -214,6 +223,7 @@ private void cleanup() {
}
}
transactionParticipants.clear();
participantsByTopicPartition.clear();
transactionCoordinators.forEach((topic, transactionCoordinator) -> transactionCoordinator.stop());
transactionCoordinators.clear();
}
Expand Down
Loading