Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,11 @@ public void getActiveBrokers(@Suspended final AsyncResponse asyncResponse) throw
public void getLeaderBroker(@Suspended final AsyncResponse asyncResponse) {
validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(),
pulsar().getBrokerId(), BrokerOperation.GET_LEADER_BROKER)
.thenAccept(__ -> {
LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader()
// The authoritative read: waits for an in-progress leader election to settle
// instead of returning 404 while a re-election is still in flight.
.thenCompose(__ -> pulsar().getLeaderElectionService().readCurrentLeader())
.thenAccept(leader -> {
LeaderBroker leaderBroker = leader
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find leader broker"));
BrokerInfo brokerInfo = BrokerInfo.builder()
.serviceUrl(leaderBroker.getServiceUrl())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.service.BrokerServiceException;
Expand Down Expand Up @@ -1417,26 +1416,28 @@ private CompletableFuture<Void> validateLeaderBrokerAsync() {
if (this.isLeaderBroker()) {
return CompletableFuture.completedFuture(null);
}
Optional<LeaderBroker> currentLeaderOpt = pulsar().getLeaderElectionService().getCurrentLeader();
if (currentLeaderOpt.isEmpty()) {
String errorStr = "The current leader is empty.";
log.error(errorStr);
return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, errorStr));
}
LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get();
String leaderBrokerId = leaderBroker.getBrokerId();
LookupOptions lookupOptions = LookupOptions.builder()
.webServiceAdvertisedListenerName(getWebServiceListenerName()).build();
return pulsar().getNamespaceService()
.createLookupResult(leaderBrokerId, false, lookupOptions)
.thenCompose(lookupResult -> {
URI redirectUri = lookupResult.toRedirectUri(uri.getRequestUri());
log.debug()
.attr("leaderBrokerId", leaderBrokerId)
.attr("redirectUri", redirectUri).log("Redirecting the request call to leader broker");
return FutureUtil.failedFuture(
new WebApplicationException(Response.temporaryRedirect(redirectUri).build()));
});
// The authoritative read: waits for an in-progress leader election to settle instead of
// failing the request while a re-election is still in flight.
return pulsar().getLeaderElectionService().readCurrentLeader().thenCompose(currentLeaderOpt -> {
if (currentLeaderOpt.isEmpty()) {
String errorStr = "The current leader is empty.";
log.error(errorStr);
return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, errorStr));
}
String leaderBrokerId = currentLeaderOpt.get().getBrokerId();
LookupOptions lookupOptions = LookupOptions.builder()
.webServiceAdvertisedListenerName(getWebServiceListenerName()).build();
return pulsar().getNamespaceService()
.createLookupResult(leaderBrokerId, false, lookupOptions)
.thenCompose(lookupResult -> {
URI redirectUri = lookupResult.toRedirectUri(uri.getRequestUri());
log.debug()
.attr("leaderBrokerId", leaderBrokerId)
.attr("redirectUri", redirectUri).log("Redirecting the request call to leader broker");
return FutureUtil.failedFuture(
new WebApplicationException(Response.temporaryRedirect(redirectUri).build()));
});
});
}

public CompletableFuture<Void> setNamespaceBundleAffinityAsync(String bundleRange, String destinationBroker) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,20 @@ public void close() throws Exception {
leaderElection.close();
}

/**
* Authoritative read of the current leader: if a leader election is in progress, the returned
* future completes once it settles (bounded by the default metadata operation timeout). Use
* this whenever a decision is made based on who the leader is.
*/
public CompletableFuture<Optional<LeaderBroker>> readCurrentLeader() {
return leaderElection.getLeaderValue();
}

/**
* Non-blocking snapshot of the current leader; empty while a re-election is settling even
* though a leader may technically exist. Only suitable for best-effort uses such as logging —
* decision-making callers must use {@link #readCurrentLeader()}.
*/
public Optional<LeaderBroker> getCurrentLeader() {
return leaderElection.getLeaderValueIfPresent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
Expand Down Expand Up @@ -561,7 +560,6 @@ static void resolveBrokerServiceLookupResult(LookupOptions options, NamespaceEph
private void searchForCandidateBroker(NamespaceBundle bundle,
CompletableFuture<Optional<LookupResult>> lookupFuture,
LookupOptions options) {
String candidateBroker;
LeaderElectionService les = pulsar.getLeaderElectionService();
if (les == null) {
log.warn()
Expand All @@ -572,71 +570,102 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
return;
}

boolean authoritativeRedirect = les.isLeader();
selectCandidateBroker(bundle, options, les)
.thenAcceptAsync(selection -> {
if (selection.isEmpty()) {
log.warn()
.attr("namespaceBundle", bundle)
.log("Load manager didn't return any available broker. Returning empty result to"
+ " lookup. NamespaceBundle");
lookupFuture.complete(Optional.empty());
return;
}
acquireOwnershipOrRedirect(bundle, options, selection.get(), lookupFuture);
}, pulsar.getExecutor())
.exceptionally(e -> {
log.warn()
.attr("acquire", bundle)
.exception(e)
.log("Error when searching for candidate broker to acquire");
lookupFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
return null;
});
}

try {
// check if this is Heartbeat or SLAMonitor namespace
candidateBroker = getHeartbeatOrSLAMonitorBrokerId(bundle, cb ->
CompletableFuture.completedFuture(isBrokerActive(cb)))
.get(config.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
/** The broker selected for a bundle assignment, and whether the redirect to it is authoritative. */
private record CandidateBrokerSelection(String candidateBroker, boolean authoritativeRedirect) { }

if (candidateBroker == null) {
Optional<LeaderBroker> currentLeader = pulsar.getLeaderElectionService().getCurrentLeader();
private CompletableFuture<Optional<CandidateBrokerSelection>> selectCandidateBroker(
NamespaceBundle bundle, LookupOptions options, LeaderElectionService les) {
boolean authoritativeRedirect = les.isLeader();

if (options.isAuthoritative()) {
// leader broker already assigned the current broker as owner
candidateBroker = pulsar.getBrokerId();
} else {
// check if this is Heartbeat or SLAMonitor namespace
return getHeartbeatOrSLAMonitorBrokerId(bundle, cb ->
CompletableFuture.completedFuture(isBrokerActive(cb)))
.thenComposeAsync(heartbeatOrSlaBroker -> {
if (heartbeatOrSlaBroker != null) {
return completedSelection(heartbeatOrSlaBroker, authoritativeRedirect);
}
if (options.isAuthoritative()) {
// leader broker already assigned the current broker as owner
return completedSelection(pulsar.getBrokerId(), authoritativeRedirect);
}
LoadManager loadManager = this.loadManager.get();
boolean makeLoadManagerDecisionOnThisBroker = !loadManager.isCentralized() || les.isLeader();
if (!makeLoadManagerDecisionOnThisBroker) {
// If leader is not active, fallback to pick the least loaded from current broker loadmanager
if (!loadManager.isCentralized() || les.isLeader()) {
return selectLeastLoadedBroker(bundle);
}
// The load manager decision belongs to the leader: read the leader
// authoritatively (waits for an in-progress election to settle) instead of
// acting on a possibly-empty snapshot during a leadership handoff.
return les.readCurrentLeader().thenComposeAsync(currentLeader -> {
boolean leaderBrokerActive = currentLeader.isPresent()
&& isBrokerActive(currentLeader.get().getBrokerId());
if (!leaderBrokerActive) {
makeLoadManagerDecisionOnThisBroker = true;
if (currentLeader.isEmpty()) {
log.warn()
.attr("namespaceBundle", bundle)
.log("Leader broker info unavailable."
+ " Using decentralized load"
+ " manager decisions");
} else {
log.warn()
.attr("broker", currentLeader.get())
.attr("namespaceBundle", bundle)
.log("The current leader broker isn't active. Handling load manager"
+ " decisions in a decentralized way. NamespaceBundle");
}
if (leaderBrokerActive) {
// forward to leader broker to make assignment
return completedSelection(currentLeader.get().getBrokerId(), authoritativeRedirect);
}
}
if (makeLoadManagerDecisionOnThisBroker) {
Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
if (availableBroker.isEmpty()) {
// If leader is not active, fallback to pick the least loaded from current broker loadmanager
if (currentLeader.isEmpty()) {
log.warn()
.attr("namespaceBundle", bundle)
.log("Load manager didn't return any available broker. Returning empty result to"
+ " lookup. NamespaceBundle");
lookupFuture.complete(Optional.empty());
return;
.log("Leader broker info unavailable."
+ " Using decentralized load"
+ " manager decisions");
} else {
log.warn()
.attr("broker", currentLeader.get())
.attr("namespaceBundle", bundle)
.log("The current leader broker isn't active. Handling load manager"
+ " decisions in a decentralized way. NamespaceBundle");
}
candidateBroker = availableBroker.get();
authoritativeRedirect = true;
} else {
// forward to leader broker to make assignment
candidateBroker = currentLeader.get().getBrokerId();
}
}
}
return selectLeastLoadedBroker(bundle);
}, pulsar.getExecutor());
}, pulsar.getExecutor());
}

private static CompletableFuture<Optional<CandidateBrokerSelection>> completedSelection(
String candidateBroker, boolean authoritativeRedirect) {
return CompletableFuture.completedFuture(
Optional.of(new CandidateBrokerSelection(candidateBroker, authoritativeRedirect)));
}

// The decentralized decision is authoritative: this broker picked the owner itself.
private CompletableFuture<Optional<CandidateBrokerSelection>> selectLeastLoadedBroker(NamespaceBundle bundle) {
Optional<String> availableBroker;
try {
availableBroker = getLeastLoadedFromLoadManager(bundle);
} catch (Exception e) {
log.warn()
.attr("acquire", bundle)
.exception(e)
.log("Error when searching for candidate broker to acquire");
lookupFuture.completeExceptionally(e);
return;
return CompletableFuture.failedFuture(e);
}
return CompletableFuture.completedFuture(
availableBroker.map(broker -> new CandidateBrokerSelection(broker, true)));
}

private void acquireOwnershipOrRedirect(NamespaceBundle bundle, LookupOptions options,
CandidateBrokerSelection selection,
CompletableFuture<Optional<LookupResult>> lookupFuture) {
final String candidateBroker = selection.candidateBroker();
final boolean authoritativeRedirect = selection.authoritativeRedirect();
try {
Objects.requireNonNull(candidateBroker);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import com.google.common.collect.Sets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -116,6 +117,9 @@ public void anErrorShouldBeThrowBeforeLeaderElected() throws PulsarServerExcepti
leaderBrokerReference.get() != null);
Mockito.when(leaderElectionService.getCurrentLeader())
.thenAnswer(invocation -> Optional.ofNullable(leaderBrokerReference.get()));
Mockito.when(leaderElectionService.readCurrentLeader())
.thenAnswer(invocation ->
CompletableFuture.completedFuture(Optional.ofNullable(leaderBrokerReference.get())));
leaderElectionServiceReference.set(leaderElectionService);

// broker, webService and leaderElectionService is started, but elect not ready;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,30 @@ public interface LeaderElection<T> extends AutoCloseable {
LeaderElectionState getState();

/**
* Get the value set by the elected leader, or empty if there's currently no leader.
* Get the value set by the elected leader.
* <p>
* This is the authoritative read: if a leader election is currently in progress (e.g. the
* previous leader's node was just deleted and the participants are re-electing), the returned
* future completes once the election has settled, with the newly determined leader value. The
* future completes exceptionally with a {@link java.util.concurrent.TimeoutException} if the
* election does not complete within the default metadata operation timeout.
* <p>
* An instance that never participated in the election (no {@link #elect(Object)} call) reads
* the leader value directly from the metadata store. A closed instance does not wait: it
* reports an empty leader if it held the leadership when closed, or its last known view
* otherwise.
*
* @return a future that will track the completion of the operation
*/
CompletableFuture<Optional<T>> getLeaderValue();

/**
* Get the value set by the elected leader, or empty if there's currently no leader.
* Get a non-blocking snapshot of the value set by the elected leader, or empty if no leader is
* known right now.
* <p>
* The call is non blocking and in certain cases can return <code>Optional.empty()</code> even though a leader is
* technically elected.
*
* @return a future that will track the completion of the operation
* The snapshot can return <code>Optional.empty()</code> even though a leader is technically
* elected (for example while a re-election is still settling). Callers that need the
* authoritative leader must use {@link #getLeaderValue()} instead.
*/
Optional<T> getLeaderValueIfPresent();

Expand Down
Loading
Loading