Skip to content

Commit 72e380c

Browse files
kevin-wu24jsancio
andauthored
KAFKA-20380; backwards compatible advertised.listeners when it is not defined (#21918)
Kafka can incorrectly resolve the advertised listener for controllers if it is not specified. For controller configurations that specify the controller.quorum.voters but not the advertised.listener, Kafka can deduce the advertise listener for the default listener. In this cases, Kafka will automatically set the advertised listener for the default listener to the endpoint specified in controller.quorum.voters. Reviewers: José Armando García Sancio <jsancio@apache.org> Co-authored-by: José Armando García Sancio <jsancio@apache.org>
1 parent bbd81e8 commit 72e380c

3 files changed

Lines changed: 62 additions & 21 deletions

File tree

core/src/main/scala/kafka/server/KafkaConfig.scala

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -362,22 +362,47 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
362362
}
363363
val controllerListenersValue = controllerListeners
364364

365-
controllerListenerNames.asScala.flatMap { name =>
365+
def nameToEndpoint(name: String, isDefault: Boolean): Option[Endpoint] = {
366366
controllerAdvertisedListeners
367367
.find(endpoint => ListenerName.normalised(endpoint.listener).equals(ListenerName.normalised(name)))
368-
.orElse(
369-
// If users don't define advertised.listeners, the advertised controller listeners inherit from listeners configuration
370-
// which match listener names in controller.listener.names.
371-
// Removing "0.0.0.0" host to avoid validation errors. This is to be compatible with the old behavior before 3.9.
372-
// The null or "" host does a reverse lookup in ListenerInfo#withWildcardHostnamesResolved.
368+
.orElse {
373369
controllerListenersValue
374370
.find(endpoint => ListenerName.normalised(endpoint.listener).equals(ListenerName.normalised(name)))
375-
.map(endpoint => if (endpoint.host == "0.0.0.0") {
376-
new Endpoint(endpoint.listener, endpoint.securityProtocol, null, endpoint.port)
377-
} else {
378-
endpoint
379-
})
380-
)
371+
.map { endpoint =>
372+
val voterListenerOverride = {
373+
// the user did not provide an advertised listener for the default controller listener;
374+
// if controller.quorum.voters defines an endpoint for this node, use that as the advertised listener
375+
if (isDefault && (endpoint.host == null || endpoint.host == "0.0.0.0")) {
376+
val votersAddress = QuorumConfig.parseVoterConnections(quorumConfig.voters).asScala.get(nodeId())
377+
votersAddress.map { socketAddress =>
378+
new Endpoint(
379+
endpoint.listener,
380+
endpoint.securityProtocol,
381+
socketAddress.getHostString,
382+
socketAddress.getPort
383+
)
384+
}
385+
} else {
386+
None
387+
}
388+
}
389+
voterListenerOverride.getOrElse {
390+
// Removing "0.0.0.0" host to avoid validation errors.
391+
// This is to be compatible with the old behavior before 3.9.
392+
// The null or "" host does a reverse lookup in ListenerInfo#withWildcardHostnamesResolved.
393+
if (endpoint.host == "0.0.0.0") {
394+
new Endpoint(endpoint.listener, endpoint.securityProtocol, null, endpoint.port)
395+
} else {
396+
endpoint
397+
}
398+
}
399+
}
400+
}
401+
}
402+
403+
controllerListenerNames.asScala.toList match {
404+
case Nil => Nil
405+
case head :: tail => nameToEndpoint(head, true).toList ++ tail.flatMap(nameToEndpoint(_, false))
381406
}
382407
}
383408

core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -348,17 +348,33 @@ class KafkaConfigTest {
348348
}
349349

350350
@Test
351-
def testEffectAdvertiseControllerListenerForControllerWithoutAdvertisement(): Unit = {
351+
def testEffectAdvertiseControllerListenerForControllerWithoutAdvertisementAndVotersConfig(): Unit = {
352352
val props = new Properties()
353353
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
354-
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://localhost:9093")
355354
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
356-
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
357355
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
356+
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@lb1.example.com:9092")
357+
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://:9093")
358358

359359
val config = KafkaConfig.fromProps(props)
360360
assertEquals(
361-
Seq(new Endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT, "localhost", 9093)),
361+
Seq(new Endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT, "lb1.example.com", 9092)),
362+
config.effectiveAdvertisedControllerListeners
363+
)
364+
}
365+
366+
@Test
367+
def testEffectAdvertiseControllerListenerForControllerWithoutAdvertisementOrVotersConfig(): Unit = {
368+
val props = new Properties()
369+
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
370+
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
371+
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
372+
props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
373+
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://:9093")
374+
375+
val config = KafkaConfig.fromProps(props)
376+
assertEquals(
377+
Seq(new Endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT, null, 9093)),
362378
config.effectiveAdvertisedControllerListeners
363379
)
364380
}
@@ -367,17 +383,17 @@ class KafkaConfigTest {
367383
def testEffectAdvertiseControllerListenerForControllerWithMixedAdvertisement(): Unit = {
368384
val props = new Properties()
369385
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
370-
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://localhost:9093,CONTROLLER_NEW://localhost:9094")
371386
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
372-
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
373387
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER,CONTROLLER_NEW")
374388
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "CONTROLLER://lb1.example.com:9000")
389+
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@lb2.example.com:9092")
390+
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://:9093,CONTROLLER_NEW://:9094")
375391

376392
val config = KafkaConfig.fromProps(props)
377393
assertEquals(
378394
Seq(
379395
new Endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT, "lb1.example.com", 9000),
380-
new Endpoint("CONTROLLER_NEW", SecurityProtocol.PLAINTEXT, "localhost", 9094)
396+
new Endpoint("CONTROLLER_NEW", SecurityProtocol.PLAINTEXT, null, 9094)
381397
),
382398
config.effectiveAdvertisedControllerListeners
383399
)
@@ -1475,7 +1491,7 @@ class KafkaConfigTest {
14751491
@Test
14761492
def testValidQuorumVotersParsingWithIpAddress(): Unit = {
14771493
val expected = new util.HashMap[Integer, InetSocketAddress]()
1478-
expected.put(1, new InetSocketAddress("127.0.0.1", 9092))
1494+
expected.put(1, InetSocketAddress.createUnresolved("127.0.0.1", 9092))
14791495
assertValidQuorumVoters(expected, "1@127.0.0.1:9092")
14801496
}
14811497

raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ private static Map<Integer, InetSocketAddress> parseVoterConnections(
243243
+ ". Each entry should be in the form `{id}@{host}:{port}`.");
244244
}
245245

246-
InetSocketAddress address = new InetSocketAddress(host, port);
246+
InetSocketAddress address = InetSocketAddress.createUnresolved(host, port);
247247
if (address.getHostString().equals(NON_ROUTABLE_HOST) && requireRoutableAddresses) {
248248
throw new ConfigException(
249249
String.format("Host string (%s) is not routeable", address.getHostString())

0 commit comments

Comments
 (0)