Search before reporting
Read release policy
User environment
- master branch; the underlying behavior is long-standing (
LeaderElectionImpl has read the leader value through a MetadataCache since the metadata-store refactor), so supported release branches are affected as well
- Broker-side issue; no specific client involved
Issue Description
Scenario. When the leader is lost — the elected broker shuts down, crashes, loses its metadata session, or hands off leadership — the leader's ephemeral node is deleted and the remaining participants re-elect. While that re-election is settling, reading the current leader gives wrong answers on participants that did not just run their own elect():
LeaderElectionService.getCurrentLeader() → LeaderElection.getLeaderValueIfPresent() returns Optional.empty() even though a new leader is being (or has been) elected. The cache entry backing it is invalidated by the Deleted notification and is only repopulated by the next loading read — in the worst case by the periodic refresh task, up to 5 minutes later.
LeaderElectionService.readCurrentLeader() → LeaderElection.getLeaderValue() is no better as an authoritative read: it is just cache.get(path), so it reflects whatever the cache/store has at that instant, not the settled election outcome. Mid-handoff it can return empty or the pre-handoff leader.
In short, there is currently no authoritative way to ask "who is the leader" — both reads are snapshots of a cache whose lifecycle is disconnected from the election state machine.
Use cases where the authoritative leader is required, and what goes wrong today when the leader is lost:
BrokersBase.getLeaderBroker (REST GET /brokers/leaderBroker): returns 404 "Couldn't find leader broker" although a leader exists or the election settles a moment later.
NamespacesBase.validateLeaderBrokerAsync: fails the request with 412 PRECONDITION_FAILED ("The current leader is empty.") instead of redirecting to the actual leader (it also calls getCurrentLeader() twice).
NamespaceService.searchForCandidateBroker (modular load manager path): an empty leader makes leaderBrokerActive == false, so each broker falls back to making the load-manager decision locally — several brokers can do this concurrently during the same handoff window.
ServiceUnitStateChannelImpl (extensible load manager): the channel owner is the elected leader; getChannelOwnerAsync()/isChannelOwner() go through readCurrentLeader(). During a handoff the unsettled answer surfaces as "There is no channel owner now" errors, triggering recovery paths (leader election restarts) that add churn instead of simply waiting for the election to complete.
Root cause. LeaderElectionImpl keeps the leader value in a MetadataCache (expireAfterWrite = -1, plus a 5-minute periodic read), updated by store notifications rather than by the election cycle. The cache is most likely unnecessary: the leader can only change through an election cycle (elect / observe existing leader / leader-node deletion → re-elect), so the election state machine itself always knows the leader value at the moments it changes. The cache historically also served to register the metadata watch ("do a get() in order to force a notification later"), which is obsolete now that ZKMetadataStore uses a persistent recursive watch (AddWatchMode.PERSISTENT_RECURSIVE).
Proposed fix
Refactor LeaderElectionImpl to keep the current-leader value as part of the leader election cycle, and give the two reads distinct, documented semantics:
getLeaderValue() / readCurrentLeader() — authoritative: if an election is in progress (leader unknown), the returned future completes once the election settles, with the newly determined leader. To guard against an election that never completes, the wait times out after the default metadata operation timeout (30s), completing exceptionally with a TimeoutException.
getLeaderValueIfPresent() / getCurrentLeader() — snapshot: non-blocking; may be empty while a re-election is settling; only suitable for best-effort uses such as logging.
- The
MetadataCache and the periodic refresh task are removed; the leader value is set when the election settles (Leading with the proposed value, Following with the observed existing value) and cleared to "unknown" when the leader node is deleted.
Call sites then need converting to the authoritative read where a decision is made:
BrokersBase.getLeaderBroker and NamespacesBase.validateLeaderBrokerAsync are already in async contexts — straightforward conversions to readCurrentLeader().
NamespaceService.searchForCandidateBroker is synchronous today (blocking .get() calls inside), so it would have to be converted to fully async before it can use readCurrentLeader().
PulsarService's election-state listener only logs the leader and can stay on the snapshot.
A prototype of this refactoring passes the existing leader-election and coordination test suites (LeaderElectionTest 35/35). Two integration points need explicit care in the final change:
- Reads on a closed election should report "no leader" (empty) rather than fail or wait, because the extensible load manager's no-channel-owner recovery (
handleNoChannelOwnerError) keys off the resulting "There is no channel owner now" condition to restart the election.
- Synchronous wrappers such as the BookKeeper auditor's
PulsarLedgerAuditorManager.getCurrentAuditor() (which join()s the read) will now block for up to the timeout while an election is unsettled; callers and tests polling them need to account for that.
Reproducing the issue
Kill or close the current leader broker (or close and restart its LeaderElectionService, as ExtensibleLoadManagerImplTest does) and concurrently call GET /brokers/leaderBroker or any admin operation that goes through validateLeaderBrokerAsync: during the re-election window the calls fail with 404/412 although a leader is elected moments later. The same window is observable directly at the API level: after the leader node is deleted and recreated, getLeaderValueIfPresent() on a non-electing participant stays empty until the next loading read (up to 5 minutes via the periodic refresh).
Found while investigating flaky ExtensibleLoadManagerImplTest.testRoleChange runs (the client-side part of that investigation is tracked separately in #25997).
Are you willing to submit a PR?
Search before reporting
Read release policy
User environment
LeaderElectionImplhas read the leader value through aMetadataCachesince the metadata-store refactor), so supported release branches are affected as wellIssue Description
Scenario. When the leader is lost — the elected broker shuts down, crashes, loses its metadata session, or hands off leadership — the leader's ephemeral node is deleted and the remaining participants re-elect. While that re-election is settling, reading the current leader gives wrong answers on participants that did not just run their own
elect():LeaderElectionService.getCurrentLeader()→LeaderElection.getLeaderValueIfPresent()returnsOptional.empty()even though a new leader is being (or has been) elected. The cache entry backing it is invalidated by theDeletednotification and is only repopulated by the next loading read — in the worst case by the periodic refresh task, up to 5 minutes later.LeaderElectionService.readCurrentLeader()→LeaderElection.getLeaderValue()is no better as an authoritative read: it is justcache.get(path), so it reflects whatever the cache/store has at that instant, not the settled election outcome. Mid-handoff it can return empty or the pre-handoff leader.In short, there is currently no authoritative way to ask "who is the leader" — both reads are snapshots of a cache whose lifecycle is disconnected from the election state machine.
Use cases where the authoritative leader is required, and what goes wrong today when the leader is lost:
BrokersBase.getLeaderBroker(RESTGET /brokers/leaderBroker): returns 404 "Couldn't find leader broker" although a leader exists or the election settles a moment later.NamespacesBase.validateLeaderBrokerAsync: fails the request with412 PRECONDITION_FAILED ("The current leader is empty.")instead of redirecting to the actual leader (it also callsgetCurrentLeader()twice).NamespaceService.searchForCandidateBroker(modular load manager path): an empty leader makesleaderBrokerActive == false, so each broker falls back to making the load-manager decision locally — several brokers can do this concurrently during the same handoff window.ServiceUnitStateChannelImpl(extensible load manager): the channel owner is the elected leader;getChannelOwnerAsync()/isChannelOwner()go throughreadCurrentLeader(). During a handoff the unsettled answer surfaces as "There is no channel owner now" errors, triggering recovery paths (leader election restarts) that add churn instead of simply waiting for the election to complete.Root cause.
LeaderElectionImplkeeps the leader value in aMetadataCache(expireAfterWrite = -1, plus a 5-minute periodic read), updated by store notifications rather than by the election cycle. The cache is most likely unnecessary: the leader can only change through an election cycle (elect / observe existing leader / leader-node deletion → re-elect), so the election state machine itself always knows the leader value at the moments it changes. The cache historically also served to register the metadata watch ("do aget()in order to force a notification later"), which is obsolete now thatZKMetadataStoreuses a persistent recursive watch (AddWatchMode.PERSISTENT_RECURSIVE).Proposed fix
Refactor
LeaderElectionImplto keep the current-leader value as part of the leader election cycle, and give the two reads distinct, documented semantics:getLeaderValue()/readCurrentLeader()— authoritative: if an election is in progress (leader unknown), the returned future completes once the election settles, with the newly determined leader. To guard against an election that never completes, the wait times out after the default metadata operation timeout (30s), completing exceptionally with aTimeoutException.getLeaderValueIfPresent()/getCurrentLeader()— snapshot: non-blocking; may be empty while a re-election is settling; only suitable for best-effort uses such as logging.MetadataCacheand the periodic refresh task are removed; the leader value is set when the election settles (Leadingwith the proposed value,Followingwith the observed existing value) and cleared to "unknown" when the leader node is deleted.Call sites then need converting to the authoritative read where a decision is made:
BrokersBase.getLeaderBrokerandNamespacesBase.validateLeaderBrokerAsyncare already in async contexts — straightforward conversions toreadCurrentLeader().NamespaceService.searchForCandidateBrokeris synchronous today (blocking.get()calls inside), so it would have to be converted to fully async before it can usereadCurrentLeader().PulsarService's election-state listener only logs the leader and can stay on the snapshot.A prototype of this refactoring passes the existing leader-election and coordination test suites (
LeaderElectionTest35/35). Two integration points need explicit care in the final change:handleNoChannelOwnerError) keys off the resulting "There is no channel owner now" condition to restart the election.PulsarLedgerAuditorManager.getCurrentAuditor()(whichjoin()s the read) will now block for up to the timeout while an election is unsettled; callers and tests polling them need to account for that.Reproducing the issue
Kill or close the current leader broker (or close and restart its
LeaderElectionService, asExtensibleLoadManagerImplTestdoes) and concurrently callGET /brokers/leaderBrokeror any admin operation that goes throughvalidateLeaderBrokerAsync: during the re-election window the calls fail with 404/412 although a leader is elected moments later. The same window is observable directly at the API level: after the leader node is deleted and recreated,getLeaderValueIfPresent()on a non-electing participant stays empty until the next loading read (up to 5 minutes via the periodic refresh).Found while investigating flaky
ExtensibleLoadManagerImplTest.testRoleChangeruns (the client-side part of that investigation is tracked separately in #25997).Are you willing to submit a PR?