From 6026152d9d06f24cf4cc5d921dc55566ac780ee1 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Jul 2025 16:33:49 +0800 Subject: [PATCH 01/14] 1. Call checkTopicNsOwnership instead of isServiceUnitActiveAsync --- .../broker/namespace/NamespaceService.java | 31 ----------- .../pulsar/broker/service/BrokerService.java | 55 ++++++++----------- ...sistentDispatcherFailoverConsumerTest.java | 2 - .../PersistentTopicConcurrentTest.java | 2 - .../broker/service/PersistentTopicTest.java | 3 +- .../pulsar/broker/service/ServerCnxTest.java | 7 +-- .../client/api/OrphanPersistentTopicTest.java | 6 +- 7 files changed, 30 insertions(+), 76 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 32fe4ca14493f..a92acf2177e9c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -43,9 +43,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -1225,35 +1223,6 @@ public CompletableFuture isServiceUnitOwnedAsync(ServiceUnitId suName) new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName())); } - /** - * @deprecated This method is only used in test now. - */ - @Deprecated - public boolean isServiceUnitActive(TopicName topicName) { - try { - return isServiceUnitActiveAsync(topicName).get(pulsar.getConfig() - .getMetadataStoreOperationTimeoutSeconds(), SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - LOG.warn("Unable to find OwnedBundle for topic in time - [{}]", topicName, e); - throw new RuntimeException(e); - } - } - - public CompletableFuture isServiceUnitActiveAsync(TopicName topicName) { - // TODO: Add unit tests cover it. - if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { - return getBundleAsync(topicName) - .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle)); - } - return getBundleAsync(topicName).thenCompose(bundle -> { - Optional> optionalFuture = ownershipCache.getOwnedBundleAsync(bundle); - if (optionalFuture.isEmpty()) { - return CompletableFuture.completedFuture(false); - } - return optionalFuture.get().thenApply(ob -> ob != null && ob.isActive()); - }); - } - private CompletableFuture isNamespaceOwnedAsync(NamespaceName fqnn) { // TODO: Add unit tests cover it. if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index f67a16ed8da7e..98a439212ee8d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1736,38 +1736,29 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean CompletableFuture> topicFuture, Map properties) { TopicName topicName = TopicName.get(topic); - pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName) - .thenAccept(isActive -> { - if (isActive) { - CompletableFuture> propertiesFuture; - if (properties == null) { - //Read properties from storage when loading topic. - propertiesFuture = fetchTopicPropertiesAsync(topicName); - } else { - propertiesFuture = CompletableFuture.completedFuture(properties); - } - propertiesFuture.thenAccept(finalProperties -> - //TODO add topicName in properties? - createPersistentTopic0(topic, createIfMissing, topicFuture, - finalProperties) - ).exceptionally(throwable -> { - log.warn("[{}] Read topic property failed", topic, throwable); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(throwable); - return null; - }); - } else { - // namespace is being unloaded - String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); - log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); - } - }).exceptionally(ex -> { - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(ex); - return null; - }); + checkTopicNsOwnership(topic).thenRun(() -> { + CompletableFuture> propertiesFuture; + if (properties == null) { + //Read properties from storage when loading topic. + propertiesFuture = fetchTopicPropertiesAsync(topicName); + } else { + propertiesFuture = CompletableFuture.completedFuture(properties); + } + propertiesFuture.thenAccept(finalProperties -> + //TODO add topicName in properties? + createPersistentTopic0(topic, createIfMissing, topicFuture, + finalProperties) + ).exceptionally(throwable -> { + log.warn("[{}] Read topic property failed", topic, throwable); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + topicFuture.completeExceptionally(throwable); + return null; + }); + }).exceptionally(ex -> { + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + topicFuture.completeExceptionally(ex); + return null; + }); } @VisibleForTesting diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 96ca2d90f0613..37cf75d84ca6d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -79,7 +79,6 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.naming.NamespaceBundle; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.awaitility.Awaitility; import org.slf4j.Logger; @@ -162,7 +161,6 @@ public void setup() throws Exception { NamespaceService nsSvc = pulsarTestContext.getPulsarService().getNamespaceService(); doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class)); - doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class)); doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java index 20f58f277a39c..2f8a924635116 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java @@ -51,7 +51,6 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.naming.NamespaceBundle; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,7 +102,6 @@ public void setup(Method m) throws Exception { NamespaceService nsSvc = mock(NamespaceService.class); doReturn(nsSvc).when(pulsar).getNamespaceService(); doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class)); - doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class)); doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index ca8f762adc445..f021b4a4519ca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -224,8 +224,7 @@ public void setup() throws Exception { NamespaceBundle bundle = mock(NamespaceBundle.class); doReturn(CompletableFuture.completedFuture(bundle)).when(nsSvc).getBundleAsync(any()); doReturn(true).when(nsSvc).isServiceUnitOwned(any()); - doReturn(true).when(nsSvc).isServiceUnitActive(any()); - doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any()); + doReturn(CompletableFuture.completedFuture(null)).when(brokerService).checkTopicNsOwnership(any()); doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index bba8f28675533..f442bdac06716 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -231,8 +231,7 @@ public void setup() throws Exception { .getBundleAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkBundleOwnership(any(), any()); doReturn(true).when(namespaceService).isServiceUnitOwned(any()); - doReturn(true).when(namespaceService).isServiceUnitActive(any()); - doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any()); + doReturn(CompletableFuture.completedFuture(null)).when(brokerService).checkTopicNsOwnership(any()); doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics( NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL); doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfUserTopics( @@ -1583,8 +1582,8 @@ public void testProducerOnNotOwnedTopic() throws Exception { setChannelConnected(); // Force the case where the broker doesn't own any topic - doReturn(CompletableFuture.completedFuture(false)).when(namespaceService) - .isServiceUnitActiveAsync(any(TopicName.class)); + doReturn(CompletableFuture.failedFuture(new ServiceUnitNotReadyException("topic not owned"))) + .when(brokerService).checkTopicNsOwnership(any()); // test PRODUCER failure case ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /* producer id */, 1 /* request id */, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index b7c323af5bcd4..3613ba516254c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -245,7 +245,7 @@ public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception { admin.topics().createNonPartitionedTopic(tpName); admin.namespaces().unload(ns); - // Inject an error when calling "NamespaceService.isServiceUnitActiveAsync". + // Inject an error when loading the topic AtomicInteger failedTimes = new AtomicInteger(); NamespaceService namespaceService = pulsar.getNamespaceService(); doAnswer(invocation -> { @@ -258,7 +258,7 @@ public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception { return CompletableFuture.failedFuture(new RuntimeException("mocked error")); } return invocation.callRealMethod(); - }).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class)); + }).when(namespaceService).checkBundleOwnership(any(TopicName.class), any()); // Verify: the consumer can create successfully eventually. Consumer consumer = pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe(); @@ -295,7 +295,7 @@ public void testTopicLoadAndDeleteAtTheSameTime(boolean injectTimeout) throws Ex pulsar.getDefaultManagedLedgerFactory().delete(TopicName.get(tpName).getPersistenceNamingEncoding()); } return invocation.callRealMethod(); - }).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class)); + }).when(namespaceService).checkBundleOwnership(any(TopicName.class), any()); // Verify: the consumer create failed due to pulsar does not allow to create topic automatically. try { From d81529a8ee8a791dc066341d91f3ab71eaa89f66 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Jul 2025 16:52:49 +0800 Subject: [PATCH 02/14] 2. Reduce the TopicName conversion --- .../TransactionMetadataStoreService.java | 2 +- .../pulsar/broker/service/AbstractTopic.java | 2 +- .../pulsar/broker/service/BrokerService.java | 49 ++++++++----------- .../nonpersistent/NonPersistentTopic.java | 2 +- .../service/persistent/PersistentTopic.java | 2 +- .../broker/service/BrokerServiceTest.java | 4 +- .../pulsar/broker/service/ServerCnxTest.java | 3 +- .../buffer/TopicTransactionBufferTest.java | 3 +- 8 files changed, 30 insertions(+), 37 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index bd19a8e860255..2512028b381df 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -117,7 +117,7 @@ public CompletableFuture handleTcClientConnect(TransactionCoordinatorID tc completableFuture.complete(null); } else { pulsarService.getBrokerService().checkTopicNsOwnership(SystemTopicNames - .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString()) + .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId())) .thenRun(() -> internalPinnedExecutor.execute(() -> { final Semaphore tcLoadSemaphore = this.tcLoadSemaphores .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index e8253771eded4..8f32963759431 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -762,7 +762,7 @@ public CompletableFuture> addProducer(Producer producer, CompletableFuture producerQueuedFuture) { checkArgument(producer.getTopic() == this); - return brokerService.checkTopicNsOwnership(getName()) + return brokerService.checkTopicNsOwnership(TopicName.get(topic)) .thenCompose(__ -> incrementTopicEpochIfNeeded(producer, producerQueuedFuture)) .thenCompose(producerEpoch -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 98a439212ee8d..6363e551486a9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1176,10 +1176,8 @@ public CompletableFuture> getTopic(final TopicName topicName, bo rc.getMessage()); log.error(errorInfo, rc); throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo)); - }).thenCompose(optionalTopicPolicies -> { - return topics.computeIfAbsent(topicName.toString(), - (tpName) -> loadOrCreatePersistentTopic(tpName, createIfMissing, properties)); - }); + }).thenCompose(optionalTopicPolicies -> topics.computeIfAbsent(topicName.toString(), + __ -> loadOrCreatePersistentTopic(topicName, createIfMissing, properties))); }); } else { if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { @@ -1391,7 +1389,7 @@ private CompletableFuture> createNonPersistentTopic(String topic topicFuture.completeExceptionally(e); return topicFuture; } - checkTopicNsOwnership(topic) + checkTopicNsOwnership(TopicName.get(topic)) .thenCompose((__) -> validateTopicConsistency(TopicName.get(topic))) .thenRun(() -> { nonPersistentTopic.initialize() @@ -1651,12 +1649,8 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional c /** * It creates a topic async and returns CompletableFuture. It also throttles down configured max-concurrent topic * loading and puts them into queue once in-process topics are created. - * - * @param topic persistent-topic name - * @return CompletableFuture - * @throws RuntimeException */ - protected CompletableFuture> loadOrCreatePersistentTopic(final String topic, + protected CompletableFuture> loadOrCreatePersistentTopic(TopicName topicName, boolean createIfMissing, Map properties) { final CompletableFuture> topicFuture = FutureUtil.createFutureWithTimeout( Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(), @@ -1667,12 +1661,13 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S return null; }); - checkTopicNsOwnership(topic) + final var topic = topicName.toString(); + checkTopicNsOwnership(topicName) .thenRun(() -> { final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); if (topicLoadSemaphore.tryAcquire()) { - checkOwnershipAndCreatePersistentTopic(topic, createIfMissing, topicFuture, + checkOwnershipAndCreatePersistentTopic(topicName, createIfMissing, topicFuture, properties); topicFuture.handle((persistentTopic, ex) -> { // release permit and process pending topic @@ -1688,7 +1683,7 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S return null; }); } else { - pendingTopicLoadingQueue.add(new TopicLoadingContext(topic, + pendingTopicLoadingQueue.add(new TopicLoadingContext(topicName, createIfMissing, topicFuture, properties)); if (log.isDebugEnabled()) { log.debug("topic-loading for {} added into pending queue", topic); @@ -1732,11 +1727,11 @@ protected CompletableFuture> fetchTopicPropertiesAsync(Topic } } - private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean createIfMissing, + private void checkOwnershipAndCreatePersistentTopic(TopicName topicName, boolean createIfMissing, CompletableFuture> topicFuture, Map properties) { - TopicName topicName = TopicName.get(topic); - checkTopicNsOwnership(topic).thenRun(() -> { + final var topic = topicName.toString(); + checkTopicNsOwnership(topicName).thenRun(() -> { CompletableFuture> propertiesFuture; if (properties == null) { //Read properties from storage when loading topic. @@ -1746,7 +1741,7 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean } propertiesFuture.thenAccept(finalProperties -> //TODO add topicName in properties? - createPersistentTopic0(topic, createIfMissing, topicFuture, + createPersistentTopic0(topicName, createIfMissing, topicFuture, finalProperties) ).exceptionally(throwable -> { log.warn("[{}] Read topic property failed", topic, throwable); @@ -1762,12 +1757,11 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean } @VisibleForTesting - public void createPersistentTopic0(final String topic, boolean createIfMissing, + public void createPersistentTopic0(TopicName topicName, boolean createIfMissing, CompletableFuture> topicFuture, Map properties) { - TopicName topicName = TopicName.get(topic); final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - + final var topic = topicName.toString(); if (isTransactionInternalName(topicName)) { String msg = String.format("Can not create transaction system topic %s", topic); log.warn(msg); @@ -2354,8 +2348,7 @@ public CompletableFuture isTopicNsOwnedByBrokerAsync(TopicName topicNam }); } - public CompletableFuture checkTopicNsOwnership(final String topic) { - TopicName topicName = TopicName.get(topic); + public CompletableFuture checkTopicNsOwnership(TopicName topicName) { final var namespaceService = pulsar.getNamespaceService(); return namespaceService.getBundleAsync(topicName).thenCompose(bundle -> @@ -2364,7 +2357,7 @@ public CompletableFuture checkTopicNsOwnership(final String topic) { return CompletableFuture.completedFuture(null); } else { String msg = String.format("Namespace bundle (%s) for topic (%s) not served by this instance:" - + "%s. Please redo the lookup.", bundle, topic, pulsar.getBrokerId()); + + "%s. Please redo the lookup.", bundle, topicName, pulsar.getBrokerId()); log.warn(msg); return FutureUtil.failedFuture(new ServiceUnitNotReadyException(msg)); } @@ -3246,12 +3239,12 @@ private void createPendingLoadTopic() { return; } - final String topic = pendingTopic.getTopic(); - checkTopicNsOwnership(topic).thenRun(() -> { + final var topicName = pendingTopic.topicName; + checkTopicNsOwnership(topicName).thenRun(() -> { CompletableFuture> pendingFuture = pendingTopic.getTopicFuture(); final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); final boolean acquiredPermit = topicLoadSemaphore.tryAcquire(); - checkOwnershipAndCreatePersistentTopic(topic, + checkOwnershipAndCreatePersistentTopic(topicName, pendingTopic.isCreateIfMissing(), pendingFuture, pendingTopic.getProperties()); @@ -3264,7 +3257,7 @@ private void createPendingLoadTopic() { return null; }); }).exceptionally(e -> { - log.error("Failed to create pending topic {}", topic, e); + log.error("Failed to create pending topic {}", topicName.toString(), e); pendingTopic.getTopicFuture() .completeExceptionally((e instanceof RuntimeException && e.getCause() != null) ? e.getCause() : e); // schedule to process next pending topic @@ -3824,7 +3817,7 @@ public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory @AllArgsConstructor @Getter private static class TopicLoadingContext { - private final String topic; + private final TopicName topicName; private final boolean createIfMissing; private final CompletableFuture> topicFuture; private final Map properties; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 6021c41142a5e..f219663bc2abe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -284,7 +284,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St Map subscriptionProperties, SchemaType schemaType) { - return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> { + return brokerService.checkTopicNsOwnership(TopicName.get(topic)).thenCompose(__ -> { final CompletableFuture future = new CompletableFuture<>(); if (hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 4d29252eafda1..1510b0d95b305 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -928,7 +928,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St "readCompacted only allowed on failover or exclusive subscriptions")); } - return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> { + return brokerService.checkTopicNsOwnership(TopicName.get(topic)).thenCompose(__ -> { Boolean replicatedSubscriptionState = replicatedSubscriptionStateArg; if (replicatedSubscriptionState != null && replicatedSubscriptionState && !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index c73acfe9ee81f..695c4b99f1747 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -1183,7 +1183,7 @@ public void testTopicLoadingOnDisableNamespaceBundle() throws Exception { // try to create topic which should fail as bundle is disable CompletableFuture> futureResult = pulsar.getBrokerService() - .loadOrCreatePersistentTopic(topicName, true, null); + .loadOrCreatePersistentTopic(topic, true, null); try { futureResult.get(); @@ -1227,7 +1227,7 @@ public void testConcurrentLoadTopicExceedLimitShouldNotBeAutoCreated() throws Ex for (int i = 0; i < 10; i++) { // try to create topic which should fail as bundle is disable CompletableFuture> futureResult = pulsar.getBrokerService() - .loadOrCreatePersistentTopic(topicName + "_" + i, false, null); + .loadOrCreatePersistentTopic(TopicName.get(topicName + "_" + i), false, null); loadFutures.add(futureResult); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index f442bdac06716..2e1eeecd6444b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -26,7 +26,6 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.CALLS_REAL_METHODS; @@ -3108,7 +3107,7 @@ public void testTopicIsNotReady() throws Exception { // Force the checkTopicNsOwnership method to throw ServiceUnitNotReadyException doReturn(FutureUtil.failedFuture(new ServiceUnitNotReadyException("Service unit is not ready"))) - .when(brokerService).checkTopicNsOwnership(anyString()); + .when(brokerService).checkTopicNsOwnership(any()); // 2nd subscribe command when the service unit is not ready ByteBuf clientCommand2 = Commands.newSubscribe(successTopicName, successSubName, 2 /* consumer id */, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 93654db2c9992..7af76e136c359 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -178,7 +178,8 @@ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Excep .newTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService), Mockito.eq(PersistentTopic.class)); - brokerService.createPersistentTopic0(topic, true, new CompletableFuture<>(), Collections.emptyMap()); + brokerService.createPersistentTopic0(TopicName.get(topic), true, new CompletableFuture<>(), + Collections.emptyMap()); Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() != null); PersistentTopic persistentTopic = reference.get(); From ddecdeab3c3add4e63a03b4378e68591c52bdbef Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Jul 2025 16:54:16 +0800 Subject: [PATCH 03/14] 3. Reduce a checkTopicNsOwnership call --- .../pulsar/broker/service/BrokerService.java | 51 ++++++++----------- 1 file changed, 22 insertions(+), 29 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6363e551486a9..1e9ee08b38480 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1662,38 +1662,31 @@ protected CompletableFuture> loadOrCreatePersistentTopic(TopicNa }); final var topic = topicName.toString(); - checkTopicNsOwnership(topicName) - .thenRun(() -> { - final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); - - if (topicLoadSemaphore.tryAcquire()) { - checkOwnershipAndCreatePersistentTopic(topicName, createIfMissing, topicFuture, - properties); - topicFuture.handle((persistentTopic, ex) -> { - // release permit and process pending topic - topicLoadSemaphore.release(); - // do not recreate topic if topic is already migrated and deleted by broker - // so, avoid creating a new topic if migration is already started - if (ex != null && (ex.getCause() instanceof TopicMigratedException)) { - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(ex.getCause()); - return null; - } - createPendingLoadTopic(); - return null; - }); - } else { - pendingTopicLoadingQueue.add(new TopicLoadingContext(topicName, - createIfMissing, topicFuture, properties)); - if (log.isDebugEnabled()) { - log.debug("topic-loading for {} added into pending queue", topic); - } - } - }).exceptionally(ex -> { + final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); + + if (topicLoadSemaphore.tryAcquire()) { + checkOwnershipAndCreatePersistentTopic(topicName, createIfMissing, topicFuture, + properties); + topicFuture.handle((persistentTopic, ex) -> { + // release permit and process pending topic + topicLoadSemaphore.release(); + // do not recreate topic if topic is already migrated and deleted by broker + // so, avoid creating a new topic if migration is already started + if (ex != null && (ex.getCause() instanceof TopicMigratedException)) { pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(ex.getCause()); return null; - }); + } + createPendingLoadTopic(); + return null; + }); + } else { + pendingTopicLoadingQueue.add(new TopicLoadingContext(topicName, + createIfMissing, topicFuture, properties)); + if (log.isDebugEnabled()) { + log.debug("topic-loading for {} added into pending queue", topic); + } + } return topicFuture; } From 32973e42bc816e11aa9fcb296332646d381cc81a Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Jul 2025 16:55:31 +0800 Subject: [PATCH 04/14] Don't wait for topic policies load --- .../pulsar/broker/service/BrokerService.java | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 1e9ee08b38480..46a01134e735b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1166,18 +1166,8 @@ public CompletableFuture> getTopic(final TopicName topicName, bo if (!exists && !createIfMissing) { return CompletableFuture.completedFuture(Optional.empty()); } - // The topic level policies are not needed now, but the meaning of calling - // "getTopicPoliciesBypassSystemTopic" will wait for system topic policies initialization. - return getTopicPoliciesBypassSystemTopic(topicName, TopicPoliciesService.GetType.LOCAL_ONLY) - .exceptionally(ex -> { - final Throwable rc = FutureUtil.unwrapCompletionException(ex); - final String errorInfo = String.format("Topic creation encountered an exception by initialize" - + " topic policies service. topic_name=%s error_message=%s", topicName, - rc.getMessage()); - log.error(errorInfo, rc); - throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo)); - }).thenCompose(optionalTopicPolicies -> topics.computeIfAbsent(topicName.toString(), - __ -> loadOrCreatePersistentTopic(topicName, createIfMissing, properties))); + return topics.computeIfAbsent(topicName.toString(), __ -> + loadOrCreatePersistentTopic(topicName, createIfMissing, properties)); }); } else { if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { From f328f8bfcfcff2d4b569fe26d3f30cc349e74409 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Jul 2025 18:41:40 +0800 Subject: [PATCH 05/14] Check isTransactionInternalName immediately --- .../apache/pulsar/broker/service/BrokerService.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 46a01134e735b..3ce703cae0a5e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1155,6 +1155,11 @@ public CompletableFuture> getTopic(final TopicName topicName, bo } final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent); if (isPersistentTopic) { + if (isTransactionInternalName(topicName)) { + String msg = String.format("Can not create transaction system topic %s", topicName); + log.warn(msg); + return CompletableFuture.failedFuture(new NotAllowedException(msg)); + } if (!pulsar.getConfiguration().isEnablePersistentTopics()) { if (log.isDebugEnabled()) { log.debug("Broker is unable to load persistent topic {}", topicName); @@ -1745,13 +1750,6 @@ public void createPersistentTopic0(TopicName topicName, boolean createIfMissing, Map properties) { final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); final var topic = topicName.toString(); - if (isTransactionInternalName(topicName)) { - String msg = String.format("Can not create transaction system topic %s", topic); - log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(new NotAllowedException(msg)); - return; - } CompletableFuture maxTopicsCheck = createIfMissing ? checkMaxTopicsPerNamespace(topicName) From 11cd1365cf5674fb651742e20335fc00cc210d1f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Jul 2025 19:36:36 +0800 Subject: [PATCH 06/14] Run validations concurrently --- .../pulsar/broker/service/BrokerService.java | 102 +++++++++--------- .../NonStartableTestPulsarService.java | 6 +- 2 files changed, 58 insertions(+), 50 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 3ce703cae0a5e..4ecc39aa3eb24 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -196,6 +196,7 @@ import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1082,13 +1083,18 @@ public CompletableFuture> getTopic(final String topic, boolean c * completes exceptionally with NotAllowedException if validation fails */ private CompletableFuture validateTopicConsistency(TopicName topicName) { + final var partitionMetadataFuture = fetchPartitionedTopicMetadataAsync(topicName.isPartitioned() + ? TopicName.get(topicName.getPartitionedTopicName()) : topicName); + return validateTopicConsistency(topicName, partitionMetadataFuture); + } + + private CompletableFuture validateTopicConsistency( + TopicName topicName, CompletableFuture partitionedMetadataFuture) { if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { // Skip validation for heartbeat namespace. return CompletableFuture.completedFuture(null); } - TopicName baseTopicName = - topicName.isPartitioned() ? TopicName.get(topicName.getPartitionedTopicName()) : topicName; - return fetchPartitionedTopicMetadataAsync(baseTopicName) + return partitionedMetadataFuture .thenCompose(metadata -> { if (topicName.isPartitioned()) { if (metadata.partitions == 0) { @@ -1146,7 +1152,7 @@ private CompletableFuture validateTopicConsistency(TopicName topicName) { * @return CompletableFuture with an Optional of the topic if found or created, otherwise empty. */ public CompletableFuture> getTopic(final TopicName topicName, boolean createIfMissing, - Map properties) { + @Nullable Map properties) { try { // If topic future exists in the cache returned directly regardless of whether it fails or timeout. CompletableFuture> tp = topics.get(topicName.toString()); @@ -1318,11 +1324,13 @@ public void deleteLedgerFailed(ManagedLedgerException exception, } public CompletableFuture getManagedLedgerFactoryForTopic(TopicName topicName) { - return getManagedLedgerConfig(topicName) - .thenApply(config -> { - String storageClassName = config.getStorageClassName(); - return getManagedLedgerFactoryForTopic(topicName, storageClassName); - }); + return getManagedLedgerFactoryForTopic(topicName, getManagedLedgerConfig(topicName)); + } + + private CompletableFuture getManagedLedgerFactoryForTopic( + TopicName topicName, CompletableFuture mlConfigFuture) { + return mlConfigFuture.thenApply(config -> getManagedLedgerFactoryForTopic(topicName, + config.getStorageClassName())); } public ManagedLedgerFactory getManagedLedgerFactoryForTopic(TopicName topicName, String storageClassName) { @@ -1384,8 +1392,9 @@ private CompletableFuture> createNonPersistentTopic(String topic topicFuture.completeExceptionally(e); return topicFuture; } - checkTopicNsOwnership(TopicName.get(topic)) - .thenCompose((__) -> validateTopicConsistency(TopicName.get(topic))) + final var topicName = TopicName.get(topic); + checkTopicNsOwnership(topicName) + .thenCompose((__) -> validateTopicConsistency(topicName)) .thenRun(() -> { nonPersistentTopic.initialize() .thenCompose(__ -> nonPersistentTopic.checkReplication()) @@ -1646,7 +1655,7 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional c * loading and puts them into queue once in-process topics are created. */ protected CompletableFuture> loadOrCreatePersistentTopic(TopicName topicName, - boolean createIfMissing, Map properties) { + boolean createIfMissing, @Nullable Map properties) { final CompletableFuture> topicFuture = FutureUtil.createFutureWithTimeout( Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(), () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION); @@ -1687,17 +1696,17 @@ protected CompletableFuture> loadOrCreatePersistentTopic(TopicNa } @VisibleForTesting - protected CompletableFuture> fetchTopicPropertiesAsync(TopicName topicName) { + protected CompletableFuture> fetchTopicPropertiesAsync( + TopicName topicName, CompletableFuture mlConfigFuture, + CompletableFuture partitionMetadataFuture) { if (!topicName.isPartitioned()) { - return getManagedLedgerFactoryForTopic(topicName).thenCompose( - managedLedgerFactory -> managedLedgerFactory.getManagedLedgerPropertiesAsync( - topicName.getPersistenceNamingEncoding())); + return getManagedLedgerFactoryForTopic(topicName, mlConfigFuture).thenCompose(managedLedgerFactory -> + managedLedgerFactory.getManagedLedgerPropertiesAsync(topicName.getPersistenceNamingEncoding())); } else { - TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); - return fetchPartitionedTopicMetadataAsync(partitionedTopicName) + return partitionMetadataFuture .thenCompose(metadata -> { if (metadata.partitions == PartitionedTopicMetadata.NON_PARTITIONED) { - return getManagedLedgerFactoryForTopic(topicName).thenCompose( + return getManagedLedgerFactoryForTopic(topicName, mlConfigFuture).thenCompose( managedLedgerFactory -> managedLedgerFactory.getManagedLedgerPropertiesAsync( topicName.getPersistenceNamingEncoding())); } else { @@ -1717,27 +1726,11 @@ protected CompletableFuture> fetchTopicPropertiesAsync(Topic private void checkOwnershipAndCreatePersistentTopic(TopicName topicName, boolean createIfMissing, CompletableFuture> topicFuture, - Map properties) { + @Nullable Map properties) { final var topic = topicName.toString(); - checkTopicNsOwnership(topicName).thenRun(() -> { - CompletableFuture> propertiesFuture; - if (properties == null) { - //Read properties from storage when loading topic. - propertiesFuture = fetchTopicPropertiesAsync(topicName); - } else { - propertiesFuture = CompletableFuture.completedFuture(properties); - } - propertiesFuture.thenAccept(finalProperties -> - //TODO add topicName in properties? - createPersistentTopic0(topicName, createIfMissing, topicFuture, - finalProperties) - ).exceptionally(throwable -> { - log.warn("[{}] Read topic property failed", topic, throwable); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(throwable); - return null; - }); - }).exceptionally(ex -> { + checkTopicNsOwnership(topicName).thenRun(() -> + createPersistentTopic0(topicName, createIfMissing, topicFuture, properties) + ).exceptionally(ex -> { pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(ex); return null; @@ -1747,19 +1740,30 @@ private void checkOwnershipAndCreatePersistentTopic(TopicName topicName, boolean @VisibleForTesting public void createPersistentTopic0(TopicName topicName, boolean createIfMissing, CompletableFuture> topicFuture, - Map properties) { + @Nullable Map originalProperties) { + + final var mlConfigFuture = getManagedLedgerConfig(topicName); + final var partitionedMetadataFuture = fetchPartitionedTopicMetadataAsync(topicName.isPartitioned() + ? TopicName.get(topicName.getPartitionedTopicName()) : topicName); + final CompletableFuture> propertiesFuture; + if (originalProperties == null) { + //Read properties from storage when loading topic. + propertiesFuture = fetchTopicPropertiesAsync(topicName, mlConfigFuture, partitionedMetadataFuture); + } else { + propertiesFuture = CompletableFuture.completedFuture(originalProperties); + } final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); final var topic = topicName.toString(); - CompletableFuture maxTopicsCheck = createIfMissing - ? checkMaxTopicsPerNamespace(topicName) - : CompletableFuture.completedFuture(null); - - CompletableFuture isTopicAlreadyMigrated = checkTopicAlreadyMigrated(topicName); - maxTopicsCheck.thenCompose(partitionedTopicMetadata -> validateTopicConsistency(topicName)) - .thenCompose(__ -> isTopicAlreadyMigrated) - .thenCompose(__ -> getManagedLedgerConfig(topicName)) - .thenAccept(managedLedgerConfig -> { + // The validations can be performed concurrently + final var validateFuture = CompletableFuture.allOf(mlConfigFuture, propertiesFuture, partitionedMetadataFuture, + createIfMissing ? checkMaxTopicsPerNamespace(topicName) : CompletableFuture.completedFuture(null), + checkTopicAlreadyMigrated(topicName), + validateTopicConsistency(topicName, partitionedMetadataFuture)); + validateFuture.thenRun(() -> { + log.info("Finished topic validation on {}", topicName); + final var managedLedgerConfig = mlConfigFuture.join(); + final var properties = propertiesFuture.join(); if (isBrokerEntryMetadataEnabled() || isBrokerPayloadProcessorEnabled()) { // init managedLedger interceptor Set interceptors = new HashSet<>(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java index 46e41be012511..9a82681c60952 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java @@ -29,6 +29,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.pulsar.broker.BookKeeperClientFactory; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -48,6 +49,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.compaction.CompactionServiceFactory; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -139,7 +141,9 @@ static class TestBrokerService extends BrokerService { } @Override - protected CompletableFuture> fetchTopicPropertiesAsync(TopicName topicName) { + protected CompletableFuture> fetchTopicPropertiesAsync( + TopicName topicName, CompletableFuture mlConfigFuture, + CompletableFuture partitionedMetadataFuture) { return CompletableFuture.completedFuture(Collections.emptyMap()); } } From 88cb6e38c7f3c09907d766ca3417940c82c3a3d4 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Jul 2025 19:58:27 +0800 Subject: [PATCH 07/14] Simplify the code and improve logging --- .../pulsar/broker/service/BrokerService.java | 39 +++++++++++-------- .../buffer/TopicTransactionBufferTest.java | 2 +- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 4ecc39aa3eb24..636dddc9b2852 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1728,8 +1728,9 @@ private void checkOwnershipAndCreatePersistentTopic(TopicName topicName, boolean CompletableFuture> topicFuture, @Nullable Map properties) { final var topic = topicName.toString(); + final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); checkTopicNsOwnership(topicName).thenRun(() -> - createPersistentTopic0(topicName, createIfMissing, topicFuture, properties) + createPersistentTopic0(topicName, createIfMissing, topicFuture, properties, topicCreateTimeMs) ).exceptionally(ex -> { pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(ex); @@ -1740,9 +1741,16 @@ private void checkOwnershipAndCreatePersistentTopic(TopicName topicName, boolean @VisibleForTesting public void createPersistentTopic0(TopicName topicName, boolean createIfMissing, CompletableFuture> topicFuture, - @Nullable Map originalProperties) { - + @Nullable Map originalProperties, + long topicCreateTimeMs) { + final var beforeGetManagedLedgerConfig = System.currentTimeMillis(); final var mlConfigFuture = getManagedLedgerConfig(topicName); + mlConfigFuture.thenRun(() -> { + // Log the latency specially for getManagedLedgerConfig() because it needs to load the topic policies, + // which could be a time-consuming task with no metrics yet. + final var latencyMs = System.currentTimeMillis() - beforeGetManagedLedgerConfig; + log.info("Got managed ledger config for {} after {} ms", topicName, latencyMs); + }); final var partitionedMetadataFuture = fetchPartitionedTopicMetadataAsync(topicName.isPartitioned() ? TopicName.get(topicName.getPartitionedTopicName()) : topicName); final CompletableFuture> propertiesFuture; @@ -1752,7 +1760,6 @@ public void createPersistentTopic0(TopicName topicName, boolean createIfMissing, } else { propertiesFuture = CompletableFuture.completedFuture(originalProperties); } - final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); final var topic = topicName.toString(); // The validations can be performed concurrently @@ -1761,7 +1768,6 @@ public void createPersistentTopic0(TopicName topicName, boolean createIfMissing, checkTopicAlreadyMigrated(topicName), validateTopicConsistency(topicName, partitionedMetadataFuture)); validateFuture.thenRun(() -> { - log.info("Finished topic validation on {}", topicName); final var managedLedgerConfig = mlConfigFuture.join(); final var properties = propertiesFuture.join(); if (isBrokerEntryMetadataEnabled() || isBrokerPayloadProcessorEnabled()) { @@ -1857,20 +1863,18 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + " Removing topic from topics list {}, {}", topic, ex); executor().submit(() -> { persistentTopic.close().whenComplete((ignore, closeEx) -> { - topics.remove(topic, topicFuture); if (closeEx != null) { log.warn("[{}] Get an error when closing topic.", topic, closeEx); } - topicFuture.completeExceptionally(ex); + failTopicFuture(topic, topicFuture, ex); }); }); return null; }); } catch (Exception e) { log.warn("Failed to create topic {}: {}", topic, e.getMessage()); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(e); + failTopicFuture(topic, topicFuture, e); } } @@ -1883,8 +1887,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { topicFuture.complete(Optional.empty()); } else { log.warn("Failed to create topic {}", topic, exception); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(new PersistenceException(exception)); + failTopicFuture(topic, topicFuture, new PersistenceException(exception)); } } }, () -> isTopicNsOwnedByBrokerAsync(topicName), null); @@ -1894,11 +1897,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { String msg = migrationFailure ? "Topic is already migrated" : "Failed to get topic configuration:"; log.warn("[{}] {} {}", topic, msg, exception.getMessage(), exception); - // remove topic from topics-map in different thread to avoid possible deadlock if - // createPersistentTopic-thread only tries to handle this future-result - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(exception); - return null; + return failTopicFuture(topic, topicFuture, exception); }); } @@ -3799,6 +3798,14 @@ public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory this.pulsarChannelInitFactory = factory; } + private Void failTopicFuture(String topic, CompletableFuture> topicFuture, Throwable throwable) { + // remove topic from topics-map in different thread to avoid possible deadlock if + // createPersistentTopic-thread only tries to handle this future-result + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + topicFuture.completeExceptionally(throwable); + return null; + } + @AllArgsConstructor @Getter private static class TopicLoadingContext { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 7af76e136c359..bc09266d272c3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -179,7 +179,7 @@ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Excep Mockito.eq(PersistentTopic.class)); brokerService.createPersistentTopic0(TopicName.get(topic), true, new CompletableFuture<>(), - Collections.emptyMap()); + Collections.emptyMap(), System.currentTimeMillis()); Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() != null); PersistentTopic persistentTopic = reference.get(); From e406f3a99e706bd7dd8be96314fbd034ce4c3324 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Jul 2025 21:07:29 +0800 Subject: [PATCH 08/14] Improve the code --- .../apache/pulsar/broker/service/BrokerService.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 636dddc9b2852..e167aeeb01708 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1161,11 +1161,6 @@ public CompletableFuture> getTopic(final TopicName topicName, bo } final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent); if (isPersistentTopic) { - if (isTransactionInternalName(topicName)) { - String msg = String.format("Can not create transaction system topic %s", topicName); - log.warn(msg); - return CompletableFuture.failedFuture(new NotAllowedException(msg)); - } if (!pulsar.getConfiguration().isEnablePersistentTopics()) { if (log.isDebugEnabled()) { log.debug("Broker is unable to load persistent topic {}", topicName); @@ -1173,6 +1168,11 @@ public CompletableFuture> getTopic(final TopicName topicName, bo return FutureUtil.failedFuture(new NotAllowedException( "Broker is unable to load persistent topic")); } + if (isTransactionInternalName(topicName)) { + String msg = String.format("Can not create transaction system topic %s", topicName); + log.warn(msg); + return CompletableFuture.failedFuture(new NotAllowedException(msg)); + } return checkNonPartitionedTopicExists(topicName).thenCompose(exists -> { if (!exists && !createIfMissing) { return CompletableFuture.completedFuture(Optional.empty()); @@ -1732,8 +1732,7 @@ private void checkOwnershipAndCreatePersistentTopic(TopicName topicName, boolean checkTopicNsOwnership(topicName).thenRun(() -> createPersistentTopic0(topicName, createIfMissing, topicFuture, properties, topicCreateTimeMs) ).exceptionally(ex -> { - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(ex); + failTopicFuture(topic, topicFuture, ex); return null; }); } From 02ddd81c1bb3cb3c2f0435df140801ef4254c02b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Jul 2025 21:21:20 +0800 Subject: [PATCH 09/14] improve --- .../apache/pulsar/broker/service/BrokerService.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index e167aeeb01708..574329c204a21 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1677,8 +1677,7 @@ protected CompletableFuture> loadOrCreatePersistentTopic(TopicNa // do not recreate topic if topic is already migrated and deleted by broker // so, avoid creating a new topic if migration is already started if (ex != null && (ex.getCause() instanceof TopicMigratedException)) { - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(ex.getCause()); + failTopicFuture(topic, topicFuture, ex.getCause()); return null; } createPendingLoadTopic(); @@ -3800,7 +3799,14 @@ public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory private Void failTopicFuture(String topic, CompletableFuture> topicFuture, Throwable throwable) { // remove topic from topics-map in different thread to avoid possible deadlock if // createPersistentTopic-thread only tries to handle this future-result - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> { + if (topics.remove(topic, topicFuture)) { + log.info("Removed cached topic {} for failure {}", topic, throwable.getMessage()); + } else { + log.warn("Cached failed topic {} was not removed because it's outdated (failure: {})", topic, + throwable.getMessage()); + } + }); topicFuture.completeExceptionally(throwable); return null; } From 7395efda1c808b75d189b89f66ea976c4bdc4637 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Jul 2025 21:25:46 +0800 Subject: [PATCH 10/14] Fix pending topic --- .../pulsar/broker/service/BrokerService.java | 51 ++++++++----------- 1 file changed, 20 insertions(+), 31 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 574329c204a21..64c240ae5d95e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1670,7 +1670,7 @@ protected CompletableFuture> loadOrCreatePersistentTopic(TopicNa if (topicLoadSemaphore.tryAcquire()) { checkOwnershipAndCreatePersistentTopic(topicName, createIfMissing, topicFuture, - properties); + properties, false); topicFuture.handle((persistentTopic, ex) -> { // release permit and process pending topic topicLoadSemaphore.release(); @@ -1724,13 +1724,17 @@ protected CompletableFuture> fetchTopicPropertiesAsync( } private void checkOwnershipAndCreatePersistentTopic(TopicName topicName, boolean createIfMissing, - CompletableFuture> topicFuture, - @Nullable Map properties) { + CompletableFuture> topicFuture, + @Nullable Map properties, + boolean fromPendingLoadTopic) { final var topic = topicName.toString(); final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); checkTopicNsOwnership(topicName).thenRun(() -> createPersistentTopic0(topicName, createIfMissing, topicFuture, properties, topicCreateTimeMs) ).exceptionally(ex -> { + if (fromPendingLoadTopic) { + inactivityMonitor.schedule(this::createPendingLoadTopic, 100, MILLISECONDS); + } failTopicFuture(topic, topicFuture, ex); return null; }); @@ -3222,28 +3226,17 @@ private void createPendingLoadTopic() { } final var topicName = pendingTopic.topicName; - checkTopicNsOwnership(topicName).thenRun(() -> { - CompletableFuture> pendingFuture = pendingTopic.getTopicFuture(); - final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); - final boolean acquiredPermit = topicLoadSemaphore.tryAcquire(); - checkOwnershipAndCreatePersistentTopic(topicName, - pendingTopic.isCreateIfMissing(), - pendingFuture, - pendingTopic.getProperties()); - pendingFuture.handle((persistentTopic, ex) -> { - // release permit and process next pending topic - if (acquiredPermit) { - topicLoadSemaphore.release(); - } - createPendingLoadTopic(); - return null; - }); - }).exceptionally(e -> { - log.error("Failed to create pending topic {}", topicName.toString(), e); - pendingTopic.getTopicFuture() - .completeExceptionally((e instanceof RuntimeException && e.getCause() != null) ? e.getCause() : e); - // schedule to process next pending topic - inactivityMonitor.schedule(this::createPendingLoadTopic, 100, MILLISECONDS); + final var pendingFuture = pendingTopic.topicFuture; + final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); + final boolean acquiredPermit = topicLoadSemaphore.tryAcquire(); + checkOwnershipAndCreatePersistentTopic(topicName, pendingTopic.createIfMissing, pendingFuture, + pendingTopic.properties, true); + pendingFuture.handle((persistentTopic, ex) -> { + // release permit and process next pending topic + if (acquiredPermit) { + topicLoadSemaphore.release(); + } + createPendingLoadTopic(); return null; }); } @@ -3811,12 +3804,8 @@ private Void failTopicFuture(String topic, CompletableFuture> to return null; } - @AllArgsConstructor @Getter - private static class TopicLoadingContext { - private final TopicName topicName; - private final boolean createIfMissing; - private final CompletableFuture> topicFuture; - private final Map properties; + private record TopicLoadingContext(TopicName topicName, boolean createIfMissing, + CompletableFuture> topicFuture, Map properties) { } } From a550a3ccb7e66430d724fd5558253fbace6cd61f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Jul 2025 21:26:10 +0800 Subject: [PATCH 11/14] fix --- .../java/org/apache/pulsar/broker/service/BrokerService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 64c240ae5d95e..0a918b5314ef3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -3804,7 +3804,6 @@ private Void failTopicFuture(String topic, CompletableFuture> to return null; } - @Getter private record TopicLoadingContext(TopicName topicName, boolean createIfMissing, CompletableFuture> topicFuture, Map properties) { } From 9a6b61946f88341a0e82f57cc32222e3cab65abe Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Jul 2025 21:36:52 +0800 Subject: [PATCH 12/14] Fix testNoOrphanClosedTopicIfTxnInternalFailed --- .../buffer/impl/TransactionPersistentTopicTest.java | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java index 508423adce4d8..4e7e99c00eb5e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java @@ -22,9 +22,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; import java.io.IOException; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -39,6 +37,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; @@ -69,7 +68,7 @@ protected void cleanup() throws Exception { } @Test - public void testNoOrphanClosedTopicIfTxnInternalFailed() { + public void testNoOrphanClosedTopicIfTxnInternalFailed() throws PulsarAdminException { String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2"); BrokerService brokerService = pulsar.getBrokerService(); @@ -89,13 +88,7 @@ public void testNoOrphanClosedTopicIfTxnInternalFailed() { pulsar.setTransactionBufferProvider(mockTransactionBufferProvider); // 2. Trigger create topic and assert topic load success. - CompletableFuture> firstLoad = brokerService.getTopic(tpName, true); - Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS) - .pollInterval(200, TimeUnit.MILLISECONDS) - .untilAsserted(() -> { - assertTrue(firstLoad.isDone()); - assertFalse(firstLoad.isCompletedExceptionally()); - }); + admin.topics().createNonPartitionedTopic(tpName); // 3. Assert topic removed from cache Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS) From 027ecfc269ebaaedf96ae66221a9c550d6025a65 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Jul 2025 21:43:14 +0800 Subject: [PATCH 13/14] Fix testGetStatsIfPartitionNotExists --- .../test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 1c2f2215a3e16..1db89fbff6670 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -3746,6 +3746,7 @@ public void testGetStatsIfPartitionNotExists() throws Exception { final String partitionedTp = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp"); admin.topics().createPartitionedTopic(partitionedTp, 1); TopicName partition0 = TopicName.get(partitionedTp).getPartition(0); + admin.lookups().lookupTopic(partition0.toString()); // trigger loading namespace bundles boolean topicExists1 = pulsar.getBrokerService().getTopic(partition0.toString(), false).join().isPresent(); assertTrue(topicExists1); // Verify topics-stats works. From 4ffe9e8df531021e599956814fa7aa096b617e72 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Jul 2025 21:59:38 +0800 Subject: [PATCH 14/14] Fix checkstyle --- .../java/org/apache/pulsar/broker/service/BrokerService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 0a918b5314ef3..769234d054061 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -81,7 +81,6 @@ import java.util.function.Consumer; import java.util.function.Predicate; import lombok.AccessLevel; -import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; import org.apache.bookkeeper.common.util.OrderedExecutor;