diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index e728b3b9abd6e..12ab076097574 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -140,8 +140,11 @@ public void getActiveBrokers(@Suspended final AsyncResponse asyncResponse) throw public void getLeaderBroker(@Suspended final AsyncResponse asyncResponse) { validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(), BrokerOperation.GET_LEADER_BROKER) - .thenAccept(__ -> { - LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader() + // The authoritative read: waits for an in-progress leader election to settle + // instead of returning 404 while a re-election is still in flight. + .thenCompose(__ -> pulsar().getLeaderElectionService().readCurrentLeader()) + .thenAccept(leader -> { + LeaderBroker leaderBroker = leader .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find leader broker")); BrokerInfo brokerInfo = BrokerInfo.builder() .serviceUrl(leaderBroker.getServiceUrl()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index baff684bd08c8..bff682e0d1bf4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -57,7 +57,6 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; -import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -1417,26 +1416,28 @@ private CompletableFuture validateLeaderBrokerAsync() { if (this.isLeaderBroker()) { return CompletableFuture.completedFuture(null); } - Optional currentLeaderOpt = pulsar().getLeaderElectionService().getCurrentLeader(); - if (currentLeaderOpt.isEmpty()) { - String errorStr = "The current leader is empty."; - log.error(errorStr); - return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, errorStr)); - } - LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get(); - String leaderBrokerId = leaderBroker.getBrokerId(); - LookupOptions lookupOptions = LookupOptions.builder() - .webServiceAdvertisedListenerName(getWebServiceListenerName()).build(); - return pulsar().getNamespaceService() - .createLookupResult(leaderBrokerId, false, lookupOptions) - .thenCompose(lookupResult -> { - URI redirectUri = lookupResult.toRedirectUri(uri.getRequestUri()); - log.debug() - .attr("leaderBrokerId", leaderBrokerId) - .attr("redirectUri", redirectUri).log("Redirecting the request call to leader broker"); - return FutureUtil.failedFuture( - new WebApplicationException(Response.temporaryRedirect(redirectUri).build())); - }); + // The authoritative read: waits for an in-progress leader election to settle instead of + // failing the request while a re-election is still in flight. + return pulsar().getLeaderElectionService().readCurrentLeader().thenCompose(currentLeaderOpt -> { + if (currentLeaderOpt.isEmpty()) { + String errorStr = "The current leader is empty."; + log.error(errorStr); + return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, errorStr)); + } + String leaderBrokerId = currentLeaderOpt.get().getBrokerId(); + LookupOptions lookupOptions = LookupOptions.builder() + .webServiceAdvertisedListenerName(getWebServiceListenerName()).build(); + return pulsar().getNamespaceService() + .createLookupResult(leaderBrokerId, false, lookupOptions) + .thenCompose(lookupResult -> { + URI redirectUri = lookupResult.toRedirectUri(uri.getRequestUri()); + log.debug() + .attr("leaderBrokerId", leaderBrokerId) + .attr("redirectUri", redirectUri).log("Redirecting the request call to leader broker"); + return FutureUtil.failedFuture( + new WebApplicationException(Response.temporaryRedirect(redirectUri).build())); + }); + }); } public CompletableFuture setNamespaceBundleAffinityAsync(String bundleRange, String destinationBroker) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java index 2e53b54e98f61..21f67bd6b3613 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java @@ -56,10 +56,20 @@ public void close() throws Exception { leaderElection.close(); } + /** + * Authoritative read of the current leader: if a leader election is in progress, the returned + * future completes once it settles (bounded by the default metadata operation timeout). Use + * this whenever a decision is made based on who the leader is. + */ public CompletableFuture> readCurrentLeader() { return leaderElection.getLeaderValue(); } + /** + * Non-blocking snapshot of the current leader; empty while a re-election is settling even + * though a leader may technically exist. Only suitable for best-effort uses such as logging — + * decision-making callers must use {@link #readCurrentLeader()}. + */ public Optional getCurrentLeader() { return leaderElection.getLeaderValueIfPresent(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index fb7c3d68bf630..91632c1ee4cd7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -59,7 +59,6 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.broker.loadbalance.LeaderElectionService; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.ResourceUnit; @@ -561,7 +560,6 @@ static void resolveBrokerServiceLookupResult(LookupOptions options, NamespaceEph private void searchForCandidateBroker(NamespaceBundle bundle, CompletableFuture> lookupFuture, LookupOptions options) { - String candidateBroker; LeaderElectionService les = pulsar.getLeaderElectionService(); if (les == null) { log.warn() @@ -572,71 +570,102 @@ private void searchForCandidateBroker(NamespaceBundle bundle, return; } - boolean authoritativeRedirect = les.isLeader(); + selectCandidateBroker(bundle, options, les) + .thenAcceptAsync(selection -> { + if (selection.isEmpty()) { + log.warn() + .attr("namespaceBundle", bundle) + .log("Load manager didn't return any available broker. Returning empty result to" + + " lookup. NamespaceBundle"); + lookupFuture.complete(Optional.empty()); + return; + } + acquireOwnershipOrRedirect(bundle, options, selection.get(), lookupFuture); + }, pulsar.getExecutor()) + .exceptionally(e -> { + log.warn() + .attr("acquire", bundle) + .exception(e) + .log("Error when searching for candidate broker to acquire"); + lookupFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e)); + return null; + }); + } - try { - // check if this is Heartbeat or SLAMonitor namespace - candidateBroker = getHeartbeatOrSLAMonitorBrokerId(bundle, cb -> - CompletableFuture.completedFuture(isBrokerActive(cb))) - .get(config.getMetadataStoreOperationTimeoutSeconds(), SECONDS); + /** The broker selected for a bundle assignment, and whether the redirect to it is authoritative. */ + private record CandidateBrokerSelection(String candidateBroker, boolean authoritativeRedirect) { } - if (candidateBroker == null) { - Optional currentLeader = pulsar.getLeaderElectionService().getCurrentLeader(); + private CompletableFuture> selectCandidateBroker( + NamespaceBundle bundle, LookupOptions options, LeaderElectionService les) { + boolean authoritativeRedirect = les.isLeader(); - if (options.isAuthoritative()) { - // leader broker already assigned the current broker as owner - candidateBroker = pulsar.getBrokerId(); - } else { + // check if this is Heartbeat or SLAMonitor namespace + return getHeartbeatOrSLAMonitorBrokerId(bundle, cb -> + CompletableFuture.completedFuture(isBrokerActive(cb))) + .thenComposeAsync(heartbeatOrSlaBroker -> { + if (heartbeatOrSlaBroker != null) { + return completedSelection(heartbeatOrSlaBroker, authoritativeRedirect); + } + if (options.isAuthoritative()) { + // leader broker already assigned the current broker as owner + return completedSelection(pulsar.getBrokerId(), authoritativeRedirect); + } LoadManager loadManager = this.loadManager.get(); - boolean makeLoadManagerDecisionOnThisBroker = !loadManager.isCentralized() || les.isLeader(); - if (!makeLoadManagerDecisionOnThisBroker) { - // If leader is not active, fallback to pick the least loaded from current broker loadmanager + if (!loadManager.isCentralized() || les.isLeader()) { + return selectLeastLoadedBroker(bundle); + } + // The load manager decision belongs to the leader: read the leader + // authoritatively (waits for an in-progress election to settle) instead of + // acting on a possibly-empty snapshot during a leadership handoff. + return les.readCurrentLeader().thenComposeAsync(currentLeader -> { boolean leaderBrokerActive = currentLeader.isPresent() && isBrokerActive(currentLeader.get().getBrokerId()); - if (!leaderBrokerActive) { - makeLoadManagerDecisionOnThisBroker = true; - if (currentLeader.isEmpty()) { - log.warn() - .attr("namespaceBundle", bundle) - .log("Leader broker info unavailable." - + " Using decentralized load" - + " manager decisions"); - } else { - log.warn() - .attr("broker", currentLeader.get()) - .attr("namespaceBundle", bundle) - .log("The current leader broker isn't active. Handling load manager" - + " decisions in a decentralized way. NamespaceBundle"); - } + if (leaderBrokerActive) { + // forward to leader broker to make assignment + return completedSelection(currentLeader.get().getBrokerId(), authoritativeRedirect); } - } - if (makeLoadManagerDecisionOnThisBroker) { - Optional availableBroker = getLeastLoadedFromLoadManager(bundle); - if (availableBroker.isEmpty()) { + // If leader is not active, fallback to pick the least loaded from current broker loadmanager + if (currentLeader.isEmpty()) { log.warn() .attr("namespaceBundle", bundle) - .log("Load manager didn't return any available broker. Returning empty result to" - + " lookup. NamespaceBundle"); - lookupFuture.complete(Optional.empty()); - return; + .log("Leader broker info unavailable." + + " Using decentralized load" + + " manager decisions"); + } else { + log.warn() + .attr("broker", currentLeader.get()) + .attr("namespaceBundle", bundle) + .log("The current leader broker isn't active. Handling load manager" + + " decisions in a decentralized way. NamespaceBundle"); } - candidateBroker = availableBroker.get(); - authoritativeRedirect = true; - } else { - // forward to leader broker to make assignment - candidateBroker = currentLeader.get().getBrokerId(); - } - } - } + return selectLeastLoadedBroker(bundle); + }, pulsar.getExecutor()); + }, pulsar.getExecutor()); + } + + private static CompletableFuture> completedSelection( + String candidateBroker, boolean authoritativeRedirect) { + return CompletableFuture.completedFuture( + Optional.of(new CandidateBrokerSelection(candidateBroker, authoritativeRedirect))); + } + + // The decentralized decision is authoritative: this broker picked the owner itself. + private CompletableFuture> selectLeastLoadedBroker(NamespaceBundle bundle) { + Optional availableBroker; + try { + availableBroker = getLeastLoadedFromLoadManager(bundle); } catch (Exception e) { - log.warn() - .attr("acquire", bundle) - .exception(e) - .log("Error when searching for candidate broker to acquire"); - lookupFuture.completeExceptionally(e); - return; + return CompletableFuture.failedFuture(e); } + return CompletableFuture.completedFuture( + availableBroker.map(broker -> new CandidateBrokerSelection(broker, true))); + } + private void acquireOwnershipOrRedirect(NamespaceBundle bundle, LookupOptions options, + CandidateBrokerSelection selection, + CompletableFuture> lookupFuture) { + final String candidateBroker = selection.candidateBroker(); + final boolean authoritativeRedirect = selection.authoritativeRedirect(); try { Objects.requireNonNull(candidateBroker); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java index 0bdeb4341d99e..0bfc626398fd9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java @@ -21,6 +21,7 @@ import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import com.google.common.collect.Sets; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -116,6 +117,9 @@ public void anErrorShouldBeThrowBeforeLeaderElected() throws PulsarServerExcepti leaderBrokerReference.get() != null); Mockito.when(leaderElectionService.getCurrentLeader()) .thenAnswer(invocation -> Optional.ofNullable(leaderBrokerReference.get())); + Mockito.when(leaderElectionService.readCurrentLeader()) + .thenAnswer(invocation -> + CompletableFuture.completedFuture(Optional.ofNullable(leaderBrokerReference.get()))); leaderElectionServiceReference.set(leaderElectionService); // broker, webService and leaderElectionService is started, but elect not ready; diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/LeaderElection.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/LeaderElection.java index 016b7d061d073..4b34815c558e6 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/LeaderElection.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/LeaderElection.java @@ -45,19 +45,30 @@ public interface LeaderElection extends AutoCloseable { LeaderElectionState getState(); /** - * Get the value set by the elected leader, or empty if there's currently no leader. + * Get the value set by the elected leader. + *

+ * This is the authoritative read: if a leader election is currently in progress (e.g. the + * previous leader's node was just deleted and the participants are re-electing), the returned + * future completes once the election has settled, with the newly determined leader value. The + * future completes exceptionally with a {@link java.util.concurrent.TimeoutException} if the + * election does not complete within the default metadata operation timeout. + *

+ * An instance that never participated in the election (no {@link #elect(Object)} call) reads + * the leader value directly from the metadata store. A closed instance does not wait: it + * reports an empty leader if it held the leadership when closed, or its last known view + * otherwise. * * @return a future that will track the completion of the operation */ CompletableFuture> getLeaderValue(); /** - * Get the value set by the elected leader, or empty if there's currently no leader. + * Get a non-blocking snapshot of the value set by the elected leader, or empty if no leader is + * known right now. *

- * The call is non blocking and in certain cases can return Optional.empty() even though a leader is - * technically elected. - * - * @return a future that will track the completion of the operation + * The snapshot can return Optional.empty() even though a leader is technically + * elected (for example while a re-election is still settling). Callers that need the + * authoritative leader must use {@link #getLeaderValue()} instead. */ Optional getLeaderValueIfPresent(); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java index ce8167271f563..302d18b574cd2 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java @@ -20,20 +20,18 @@ import com.fasterxml.jackson.databind.type.TypeFactory; import com.google.common.annotations.VisibleForTesting; +import java.time.Duration; import java.util.EnumSet; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import lombok.CustomLog; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.GetResult; -import org.apache.pulsar.metadata.api.MetadataCache; -import org.apache.pulsar.metadata.api.MetadataCacheConfig; import org.apache.pulsar.metadata.api.MetadataSerde; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyClosedException; @@ -52,14 +50,19 @@ class LeaderElectionImpl implements LeaderElection { private final String path; private final MetadataSerde serde; private final MetadataStoreExtended store; - private final MetadataCache cache; private final Consumer stateChangesListener; - private final ScheduledFuture updateCachedValueFuture; private LeaderElectionState leaderElectionState; private Optional version = Optional.empty(); private Optional proposedValue; + // The leader value as known by the election cycle (the leader can only change through an + // election cycle). Pending while no leader is known — election in progress or the leader node + // deleted — and completed with the leader value once the election settles. Readers of + // getLeaderValue() wait on it (bounded by leaderElectionCompletionTimeoutSeconds); + // getLeaderValueIfPresent() takes a non-blocking snapshot of it. + private CompletableFuture> currentLeaderFuture = new CompletableFuture<>(); + private final ScheduledExecutorService executor; private final FutureUtil.Sequencer sequencer; @@ -71,16 +74,21 @@ private enum InternalState { private static final int LEADER_ELECTION_RETRY_DELAY_SECONDS = 5; + // Upper bound for getLeaderValue() waiting on an election that never settles, aligned with the + // default metadata-store operation timeout (the broker's metadataStoreOperationTimeoutSeconds). + private volatile int leaderElectionCompletionTimeoutSeconds = 30; + + @VisibleForTesting + void setLeaderElectionCompletionTimeoutSeconds(int leaderElectionCompletionTimeoutSeconds) { + this.leaderElectionCompletionTimeoutSeconds = leaderElectionCompletionTimeoutSeconds; + } + LeaderElectionImpl(MetadataStoreExtended store, Class clazz, String path, Consumer stateChangesListener, ScheduledExecutorService executor) { this.path = path; this.serde = new JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz, null)); this.store = store; - MetadataCacheConfig metadataCacheConfig = MetadataCacheConfig.builder() - .expireAfterWriteMillis(-1L) - .build(); - this.cache = store.getMetadataCache(clazz, metadataCacheConfig); this.leaderElectionState = LeaderElectionState.NoLeader; this.internalState = InternalState.Init; this.stateChangesListener = stateChangesListener; @@ -88,13 +96,38 @@ private enum InternalState { this.sequencer = FutureUtil.Sequencer.create(); store.registerListener(this::handlePathNotification); store.registerSessionListener(this::handleSessionNotification); - updateCachedValueFuture = executor.scheduleWithFixedDelay(this::getLeaderValue, - metadataCacheConfig.getRefreshAfterWriteMillis() / 2, - metadataCacheConfig.getRefreshAfterWriteMillis(), TimeUnit.MILLISECONDS); + } + + /** + * Record the leader value determined by the election cycle, waking up any getLeaderValue() + * callers waiting for the election to settle. + */ + private synchronized void leaderKnown(Optional leaderValue) { + if (currentLeaderFuture.isDone()) { + currentLeaderFuture = CompletableFuture.completedFuture(leaderValue); + } else { + currentLeaderFuture.complete(leaderValue); + } + } + + /** + * Mark the leader as unknown (the leader node was deleted) so getLeaderValue() callers wait for + * the next election cycle to settle instead of observing a stale value. + */ + private synchronized void leaderUnknown() { + if (currentLeaderFuture.isDone()) { + currentLeaderFuture = new CompletableFuture<>(); + } } @Override public synchronized CompletableFuture elect(T proposedValue) { + if (internalState == InternalState.Closed) { + // Reopened after close() (e.g. the broker's LeaderElectionService is close()d and then + // start()ed again): reset so a fresh election cycle runs and readers wait for it. + leaderElectionState = LeaderElectionState.NoLeader; + currentLeaderFuture = new CompletableFuture<>(); + } if (leaderElectionState != LeaderElectionState.NoLeader) { return CompletableFuture.completedFuture(leaderElectionState); } @@ -112,12 +145,6 @@ private synchronized CompletableFuture elect() { } else { return tryToBecomeLeader(); } - }).thenCompose(leaderElectionState -> { - // make sure that the cache contains the current leader - // so that getLeaderValueIfPresent works on all brokers - cache.refresh(path); - return cache.get(path) - .thenApply(__ -> leaderElectionState); }); } @@ -137,6 +164,7 @@ private synchronized CompletableFuture handleExistingLeader log.info().attr("value", existingValue).attr("path", path).attr("stat", res.getStat()) .log("Keeping the existing value as it's from the same session"); // The value is still valid because it was created in the same session + leaderKnown(Optional.of(existingValue)); changeState(LeaderElectionState.Leading); return CompletableFuture.completedFuture(LeaderElectionState.Leading); } else { @@ -160,6 +188,7 @@ private synchronized CompletableFuture handleExistingLeader } // If the existing value is different, it means there's already another leader + leaderKnown(Optional.of(existingValue)); changeState(LeaderElectionState.Following); return CompletableFuture.completedFuture(LeaderElectionState.Following); } @@ -190,37 +219,19 @@ private synchronized CompletableFuture tryToBecomeLeader() .thenAccept(stat -> { synchronized (LeaderElectionImpl.this) { if (internalState == InternalState.ElectionInProgress) { - // Do a get() in order to force a notification later, if the z-node disappears - cache.get(path) - .thenRun(() -> { - synchronized (LeaderElectionImpl.this) { - log.info().attr("path", path).attr("value", value) - .log("Acquired leadership"); - internalState = InternalState.LeaderIsPresent; - if (leaderElectionState != LeaderElectionState.Leading) { - leaderElectionState = LeaderElectionState.Leading; - try { - stateChangesListener.accept(leaderElectionState); - } catch (Throwable t) { - log.warn().exception(t).log("Exception in state change listener"); - } - } - result.complete(leaderElectionState); - } - }).exceptionally(ex -> { - // We fail to do the get(), so clean up the leader election fail the whole - // operation - log.warn().attr("path", path).exception(ex) - .log("Failed to get the current state after acquiring leadership." - + " Conditionally deleting current entry."); - store.delete(path, Optional.of(stat.getVersion())) - .thenRun(() -> result.completeExceptionally(ex)) - .exceptionally(ex2 -> { - result.completeExceptionally(ex2); - return null; - }); - return null; - }); + log.info().attr("path", path).attr("value", value) + .log("Acquired leadership"); + internalState = InternalState.LeaderIsPresent; + leaderKnown(Optional.of(value)); + if (leaderElectionState != LeaderElectionState.Leading) { + leaderElectionState = LeaderElectionState.Leading; + try { + stateChangesListener.accept(leaderElectionState); + } catch (Throwable t) { + log.warn().exception(t).log("Exception in state change listener"); + } + } + result.complete(leaderElectionState); } else { log.info().attr("path", path).attr("value", value).attr("stat", stat) .log("Leadership was lost. Conditionally deleting entry."); @@ -259,7 +270,6 @@ private synchronized CompletableFuture tryToBecomeLeader() @Override public void close() throws Exception { - updateCachedValueFuture.cancel(true); try { asyncClose().join(); } catch (CompletionException e) { @@ -274,6 +284,12 @@ public synchronized CompletableFuture asyncClose() { } internalState = InternalState.Closed; + // A closed election reports "no leader" rather than waiting or failing: callers like the + // extensible load manager's handleNoChannelOwnerError() key off the resulting + // "no channel owner" condition to restart the election. + if (!currentLeaderFuture.isDone()) { + currentLeaderFuture.complete(Optional.empty()); + } if (leaderElectionState != LeaderElectionState.Leading) { return CompletableFuture.completedFuture(null); @@ -283,6 +299,9 @@ public synchronized CompletableFuture asyncClose() { .thenAccept(__ -> { synchronized (LeaderElectionImpl.this) { leaderElectionState = LeaderElectionState.NoLeader; + // The deleted leader node was ours and a closed instance no longer + // observes elections; don't keep reporting ourselves as leader. + currentLeaderFuture = CompletableFuture.completedFuture(Optional.empty()); } } ); @@ -295,12 +314,61 @@ public synchronized LeaderElectionState getState() { @Override public CompletableFuture> getLeaderValue() { - return cache.get(path); + CompletableFuture> future; + synchronized (this) { + if (internalState == InternalState.Init) { + // This instance never participated in the election (a pure observer, e.g. + // BookKeeper's MetadataDrivers helpers querying the current auditor): there is no + // local election cycle to wait for, so the store content is the authoritative + // answer. + return readLeaderValueFromStore(); + } + future = currentLeaderFuture; + } + if (future.isDone()) { + // Hand out a derived future so callers cannot complete the internal one. + return future.thenApply(value -> value); + } + int timeoutSeconds = leaderElectionCompletionTimeoutSeconds; + return FutureUtil.addTimeoutHandling(whenLeaderKnown(future), + Duration.ofSeconds(timeoutSeconds), executor, + () -> FutureUtil.createTimeoutException( + "Leader election on path " + path + " did not complete within " + + timeoutSeconds + " seconds", + LeaderElectionImpl.class, "getLeaderValue()")); + } + + private CompletableFuture> readLeaderValueFromStore() { + return store.get(path).thenApply(optRes -> optRes.map(res -> { + try { + return serde.deserialize(path, res.getValue(), res.getStat()); + } catch (Throwable t) { + throw new CompletionException(t); + } + })); + } + + // Track the internal future without exposing it: completing/cancelling the returned future + // (e.g. by the timeout handling) must not complete the election's own future. + private CompletableFuture> whenLeaderKnown(CompletableFuture> future) { + CompletableFuture> result = new CompletableFuture<>(); + future.whenComplete((value, ex) -> { + if (ex != null) { + result.completeExceptionally(ex); + } else { + result.complete(value); + } + }); + return result; } @Override public Optional getLeaderValueIfPresent() { - return cache.getIfCached(path); + CompletableFuture> future; + synchronized (this) { + future = currentLeaderFuture; + } + return future.isDone() && !future.isCompletedExceptionally() ? future.join() : Optional.empty(); } private void handleSessionNotification(SessionEvent event) { @@ -338,6 +406,9 @@ private void handlePathNotification(Notification notification) { } leaderElectionState = LeaderElectionState.NoLeader; + // The leader is unknown until the re-election below settles; getLeaderValue() + // callers wait for it instead of observing the stale value. + leaderUnknown(); if (proposedValue.isPresent()) { elect() diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java index 146e4f0dd58ca..47d5e9f1f7964 100644 --- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java +++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java @@ -143,12 +143,16 @@ public void testAutoRecoverySessionLoss() throws Exception { } BookieId currentAuditor = main1.auditorElector.getCurrentAuditor(); assertNotNull(currentAuditor); - Auditor auditor1 = main1.auditorElector.getAuditor(); assertEquals("Current Auditor should be AR1", currentAuditor, BookieImpl.getBookieId(confByIndex(0))); + // getCurrentAuditor() can resolve as soon as the election settles, before the elector + // thread has constructed the Auditor instance — re-read getAuditor() on every poll instead + // of capturing a possibly-null reference once. Awaitility.waitAtMost(30, TimeUnit.SECONDS).untilAsserted(() -> { - assertNotNull(auditor1); - assertTrue("Auditor of AR1 should be running", auditor1.isRunning()); + Auditor a1 = main1.auditorElector.getAuditor(); + assertNotNull(a1); + assertTrue("Auditor of AR1 should be running", a1.isRunning()); }); + Auditor auditor1 = main1.auditorElector.getAuditor(); /* diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java index 4b48f3c20b02b..cb26953738374 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java @@ -18,10 +18,13 @@ */ package org.apache.pulsar.metadata; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import java.util.EnumSet; +import java.util.List; import java.util.Optional; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -35,6 +38,7 @@ import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl; +import org.awaitility.Awaitility; import org.testng.annotations.Test; public class LeaderElectionTest extends BaseMetadataStoreTest { @@ -284,4 +288,155 @@ public void revalidateLeaderWithDifferentSessionsDifferentValue(String provider, assertEquals(le.getLeaderValue().join(), Optional.of("test-1")); assertEqualsAndRetry(() -> le.getLeaderValueIfPresent(), Optional.of("test-1"), Optional.empty()); } + + @Test(dataProvider = "impl", timeOut = 30000) + public void readsDoNotObserveEmptyLeaderDuringReElection(String provider, Supplier urlSupplier) + throws Exception { + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(), + MetadataStoreConfig.builder().fsyncEnable(false).build()); + + String path = newKey(); + + @Cleanup + CoordinationService cs = new CoordinationServiceImpl(store); + + @Cleanup + LeaderElection le = cs.getLeaderElection(String.class, path, __ -> { + }); + + assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading); + + // Externally delete the leader node: the instance re-elects itself. An authoritative read + // issued during the churn either returns the last settled value or waits for the + // re-election to settle — it never observes an empty leader. + store.delete(path, Optional.empty()).join(); + assertEquals(le.getLeaderValue().join(), Optional.of("test-1")); + + Awaitility.await().untilAsserted(() -> { + assertEquals(le.getState(), LeaderElectionState.Leading); + assertEquals(le.getLeaderValueIfPresent(), Optional.of("test-1")); + }); + } + + @Test(dataProvider = "zkImpls", timeOut = 30000) + public void followerReadsResolveToTheNewLeaderAfterHandoff(String provider, Supplier urlSupplier) + throws Exception { + @Cleanup + MetadataStoreExtended store1 = MetadataStoreExtended.create(urlSupplier.get(), + MetadataStoreConfig.builder().build()); + @Cleanup + MetadataStoreExtended store2 = MetadataStoreExtended.create(urlSupplier.get(), + MetadataStoreConfig.builder().build()); + + String path = newKey(); + + @Cleanup + CoordinationService cs1 = new CoordinationServiceImpl(store1); + @Cleanup + CoordinationService cs2 = new CoordinationServiceImpl(store2); + + @Cleanup + LeaderElection le1 = cs1.getLeaderElection(String.class, path, __ -> { + }); + @Cleanup + LeaderElection le2 = cs2.getLeaderElection(String.class, path, __ -> { + }); + + assertEquals(le1.elect("test-1").join(), LeaderElectionState.Leading); + assertEquals(le2.elect("test-2").join(), LeaderElectionState.Following); + assertEquals(le2.getLeaderValue().join(), Optional.of("test-1")); + + // The leader hands off: le2 re-elects itself. Authoritative reads during the handoff + // return one of the settled leader values and converge to the new leader, but never + // observe an empty leader. + le1.close(); + List> observed = new CopyOnWriteArrayList<>(); + Awaitility.await().untilAsserted(() -> { + Optional leader = le2.getLeaderValue().join(); + observed.add(leader); + assertEquals(leader, Optional.of("test-2")); + }); + assertThat(observed) + .as("authoritative reads during the leadership handoff") + .doesNotContain(Optional.empty()); + } + + @Test(dataProvider = "impl", timeOut = 30000) + public void closedLeaderReportsEmptyLeader(String provider, Supplier urlSupplier) throws Exception { + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(), + MetadataStoreConfig.builder().fsyncEnable(false).build()); + + String path = newKey(); + + @Cleanup + CoordinationService cs = new CoordinationServiceImpl(store); + + LeaderElection le = cs.getLeaderElection(String.class, path, __ -> { + }); + + assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading); + assertEquals(le.getLeaderValue().join(), Optional.of("test-1")); + + // Closing the leader releases the leadership; reads on the closed instance must report an + // empty leader without waiting (recovery paths key off the "no leader" condition). + le.close(); + assertEquals(le.getLeaderValue().join(), Optional.empty()); + assertEquals(le.getLeaderValueIfPresent(), Optional.empty()); + } + + @Test(dataProvider = "impl", timeOut = 30000) + public void electAfterCloseRunsANewElection(String provider, Supplier urlSupplier) throws Exception { + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(), + MetadataStoreConfig.builder().fsyncEnable(false).build()); + + String path = newKey(); + + @Cleanup + CoordinationService cs = new CoordinationServiceImpl(store); + + @Cleanup + LeaderElection le = cs.getLeaderElection(String.class, path, __ -> { + }); + + assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading); + le.close(); + + // Re-electing on a closed instance reopens it (the broker's LeaderElectionService is + // close()d and start()ed again to force a leadership change). + assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading); + assertEquals(le.getLeaderValue().join(), Optional.of("test-1")); + assertEquals(le.getLeaderValueIfPresent(), Optional.of("test-1")); + } + + @Test(dataProvider = "impl", timeOut = 30000) + public void observerReadsLeaderValueFromStore(String provider, Supplier urlSupplier) throws Exception { + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(), + MetadataStoreConfig.builder().fsyncEnable(false).build()); + + String path = newKey(); + + @Cleanup + CoordinationService cs = new CoordinationServiceImpl(store); + @Cleanup + CoordinationService observerCs = new CoordinationServiceImpl(store); + + @Cleanup + LeaderElection le = cs.getLeaderElection(String.class, path, __ -> { + }); + // The observer never calls elect(): there is no local election cycle to wait for, so the + // authoritative read goes directly to the metadata store, while the snapshot stays empty. + @Cleanup + LeaderElection observer = observerCs.getLeaderElection(String.class, path, __ -> { + }); + + assertEquals(observer.getLeaderValue().join(), Optional.empty()); + + assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading); + assertEquals(observer.getLeaderValue().join(), Optional.of("test-1")); + assertEquals(observer.getLeaderValueIfPresent(), Optional.empty()); + } } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java index 09c9d71c41a30..4827c4a197ad5 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java @@ -18,7 +18,14 @@ */ package org.apache.pulsar.metadata.coordination.impl; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import lombok.Cleanup; import org.apache.pulsar.metadata.BaseMetadataStoreTest; @@ -62,4 +69,25 @@ public void validateDeadLock(String provider, Supplier urlSupplier) }); blockFuture.join(); } + + @Test(timeOut = 20000) + public void getLeaderValueTimesOutWhenElectionNeverCompletes() { + MetadataStoreExtended store = mock(MetadataStoreExtended.class); + // The store never answers, so the election never settles. + when(store.get(anyString())).thenReturn(new CompletableFuture<>()); + + @Cleanup("shutdownNow") + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + LeaderElectionImpl le = new LeaderElectionImpl<>(store, String.class, + "/getLeaderValueTimesOutWhenElectionNeverCompletes", __ -> { + }, executor); + le.setLeaderElectionCompletionTimeoutSeconds(1); + + le.elect("test-1"); + + assertThatThrownBy(() -> le.getLeaderValue().join()) + .hasCauseInstanceOf(TimeoutException.class) + .cause() + .hasMessageContaining("did not complete within"); + } }