Skip to content

[fix][meta] Keep the leader value in the election cycle and make leader reads authoritative#26000

Open
lhotari wants to merge 3 commits into
apache:masterfrom
lhotari:lh-fix-leader-election-consistency
Open

[fix][meta] Keep the leader value in the election cycle and make leader reads authoritative#26000
lhotari wants to merge 3 commits into
apache:masterfrom
lhotari:lh-fix-leader-election-consistency

Conversation

@lhotari

@lhotari lhotari commented Jun 11, 2026

Copy link
Copy Markdown
Member

Fixes #25999

Motivation

When the leader is lost — the elected broker shuts down, crashes, loses its metadata session, or hands off leadership — the leader's ephemeral node is deleted and the remaining participants re-elect. While that re-election is settling, there is currently no authoritative way to ask "who is the leader":

  • LeaderElectionService.getCurrentLeader()LeaderElection.getLeaderValueIfPresent() returns Optional.empty() even though a new leader is being (or has been) elected: the cache entry backing it is invalidated by the Deleted notification and is only repopulated by the next loading read — in the worst case by the periodic refresh task, up to 5 minutes later.
  • LeaderElectionService.readCurrentLeader()LeaderElection.getLeaderValue() is just cache.get(path), so it reflects whatever the cache/store has at that instant, not the settled election outcome.

Several call sites make decisions on these reads and misbehave during a leadership handoff: BrokersBase.getLeaderBroker returns 404 although a leader exists, NamespacesBase.validateLeaderBrokerAsync fails with 412 instead of redirecting to the leader, NamespaceService.searchForCandidateBroker falls back to decentralized load-manager decisions on several brokers concurrently, and the extensible load manager's channel-owner resolution surfaces "There is no channel owner now" errors that trigger recovery churn.

The root cause is that LeaderElectionImpl keeps the leader value in a MetadataCache whose lifecycle is disconnected from the election state machine. The cache is unnecessary: the leader can only change through an election cycle, so the state machine always knows the leader value at the moments it changes. The cache's historical second purpose — registering the metadata watch via a get() — is obsolete now that ZKMetadataStore uses a persistent recursive watch.

Modifications

pulsar-metadata — keep the leader value in the election cycle and make getLeaderValue() authoritative:

  • LeaderElectionImpl tracks the current leader in a currentLeaderFuture updated by the election cycle: completed when the election settles (Leading with the proposed value, Following with the observed existing value), reset to pending when the leader node is deleted. The MetadataCache and the periodic refresh task are removed.
  • getLeaderValue() / readCurrentLeader() is the authoritative read: it waits for an in-progress election to settle, bounded by the default metadata operation timeout (30s, failing with a TimeoutException). An instance that never participated in the election (pure observer, e.g. BookKeeper's MetadataDrivers helpers querying the auditor) reads the store directly.
  • getLeaderValueIfPresent() / getCurrentLeader() is an explicitly documented non-blocking snapshot, suitable only for best-effort uses.
  • A closed instance does not wait: a closed leader reports an empty leader (the extensible load manager's handleNoChannelOwnerError recovery keys off the "no channel owner" condition), and elect() after close() reopens the instance (the LeaderElectionService close()+start() pattern used to force a leadership change).

pulsar-broker — use the authoritative read where leader decisions are made:

  • BrokersBase.getLeaderBroker and NamespacesBase.validateLeaderBrokerAsync use readCurrentLeader().
  • NamespaceService.searchForCandidateBroker is converted to a fully asynchronous flow (selectCandidateBroker + acquireOwnershipOrRedirect) so it can use readCurrentLeader(); the blocking get() on the heartbeat/SLA-monitor check is removed, and the decentralized-fallback and authoritativeRedirect semantics are preserved.

Tests:

  • New LeaderElectionTest cases: authoritative reads never observe an empty leader during a re-election (single-instance external delete, and a two-member leadership handoff converging to the new leader); a closed leader reports an empty leader; elect() after close() runs a new election; a pure observer reads the leader value from the store while its snapshot stays empty.
  • New LeaderElectionImplTest case: getLeaderValue() fails with a TimeoutException when the election never settles (via a @VisibleForTesting override of the completion timeout).
  • LeaderElectionServiceTest's mock now stubs readCurrentLeader(); AutoRecoveryMainTest re-reads getAuditor() inside its await instead of capturing a possibly-null reference once (the promptly-resolving read exposed the existing race).

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • New LeaderElectionTest and LeaderElectionImplTest cases described above cover the wait-for-election, never-empty-during-handoff, timeout, close, reopen, and observer semantics.
  • Existing coverage validated locally: LeaderElectionTest, LeaderElectionImplTest, LeaderElectionServiceTest, MultiBrokerLeaderElectionTest, NamespaceServiceTest, ModularLoadManagerImplTest, AdminApiMultiBrokersTest, AdminTest, AutoRecoveryMainTest, AuditorRollingRestartTest, TransactionCoordinatorV5Test, ExtensibleLoadManagerImplTest, and the full pulsar-metadata suite.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

The LeaderElection.getLeaderValue() contract in pulsar-metadata changes from "read the cached store value" to "wait for an in-progress election to settle (bounded by the default metadata operation timeout)"; getLeaderValueIfPresent() is now documented as a non-blocking snapshot. All in-tree callers participate in the election or are handled by the observer fallback, and GET /brokers/leaderBroker / leader redirects now succeed during a leadership handoff instead of failing with 404/412.

lhotari added 3 commits June 11, 2026 14:21
…eaderValue() authoritative

The leader value was read through a MetadataCache whose lifecycle is
disconnected from the election state machine: after a leadership loss the
cached value is empty (or stale) until the next loading read, so neither
getLeaderValue() nor getLeaderValueIfPresent() could answer 'who is the
leader' authoritatively while a re-election is settling (apache#25999).

The leader can only change through an election cycle, so track the value
there directly and remove the cache and its periodic refresh task (the
cache's watch-forcing get() is obsolete with ZK persistent recursive
watches):

- getLeaderValue() / readCurrentLeader() now waits for an in-progress
  election to settle, bounded by the default metadata operation timeout
  (30s); instances that never participated in the election (pure
  observers, e.g. BookKeeper MetadataDrivers helpers) read the store
  directly.
- getLeaderValueIfPresent() / getCurrentLeader() is an explicit
  non-blocking snapshot.
- A closed election reports an empty leader (the extensible load
  manager's no-channel-owner recovery depends on that condition), and
  elect() after close() reopens the instance for the
  LeaderElectionService close()+start() pattern.
- AutoRecoveryMainTest captured getAuditor() before awaiting it; the now
  promptly-resolving read exposes the race, so read it inside the poll.

Assisted-by: Claude Code (Opus 4.8)
…decisions are made

Convert the call sites that made decisions on the best-effort
getCurrentLeader() snapshot — which is empty while a re-election is
settling — to the authoritative readCurrentLeader() that waits for the
election to complete (apache#25999):

- BrokersBase.getLeaderBroker no longer returns 404 while a leader
  election is in flight.
- NamespacesBase.validateLeaderBrokerAsync no longer fails with 412
  ('The current leader is empty.') instead of redirecting to the leader.
- NamespaceService.searchForCandidateBroker is converted to a fully
  asynchronous flow (selectCandidateBroker + acquireOwnershipOrRedirect)
  so it can use readCurrentLeader(); the blocking get() on the
  heartbeat/SLA-monitor check is gone and the decentralized-fallback and
  authoritativeRedirect semantics are preserved.
- LeaderElectionServiceTest's LeaderElectionService mock now stubs
  readCurrentLeader() consistently with getCurrentLeader().

Assisted-by: Claude Code (Opus 4.8)
Cover the new getLeaderValue()/getLeaderValueIfPresent() contract
(apache#25999):

- authoritative reads never observe an empty leader while a re-election
  is settling (single-instance external delete, and a two-member
  leadership handoff converging to the new leader)
- a closed leader reports an empty leader without waiting
- elect() after close() runs a new election (LeaderElectionService
  close()+start() pattern)
- a pure observer (no elect() call) reads the leader value directly from
  the metadata store while its snapshot stays empty
- getLeaderValue() fails with a TimeoutException when the election never
  settles, via a @VisibleForTesting override of the completion timeout

Also document that a closed follower keeps its last known view (only a
closed leader resets to empty).

Assisted-by: Claude Code (Opus 4.8)
@lhotari lhotari added release/blocker Indicate the PR or issue that should block the release until it gets resolved release/4.0.11 release/4.2.3 labels Jun 11, 2026
@lhotari lhotari added this to the 5.0.0-M1 milestone Jun 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release/blocker Indicate the PR or issue that should block the release until it gets resolved release/4.0.11 release/4.2.3

Projects

None yet

1 participant