From e4ed84aca2eb42f839979b3b4cb2ba16884c4607 Mon Sep 17 00:00:00 2001 From: Maxime Caron Date: Wed, 16 Mar 2016 17:53:46 -0700 Subject: [PATCH 1/5] call RecreateSyncProducerPoolForMetadata() in RefreshMetadata retry loop If ProducerPoolForMetadata get stale or invalid call to RefreshMetadataInternal will throw Kafka.Client.Exceptions.UnableToConnectToHostException. Calling RecreateSyncProducerPoolForMetadata will query zookeeper and recreate the pool that is passed to BrokerPartitionInfo by RefreshMetadataInternal allowing for next retry to succeed. at Kafka.Client.KafkaConnection.Connect() at Kafka.Client.KafkaConnection.Send(Byte[] data) at Kafka.Client.KafkaConnection.Handle[T](Byte[] data, IResponseParser`1 parser, Boolean shouldParse) at Kafka.Client.KafkaConnection.Send(TopicMetadataRequest request) at Kafka.Client.Producers.Partitioning.BrokerPartitionInfo.UpdateInfo(Int16 versionId, Int32 correlationId, String clientId, String topic) at Kafka.Client.Helper.KafkaSimpleManager`2.RefreshMetadataInternal(Int16 versionId, String clientId, Int32 correlationId, String topic, Dictionary`2 tempTopicMetadatas, Dictionary`2 tempTopicMetadatasLastUpdateTime, Dictionary`2 partitionLeaders) at Kafka.Client.Helper.KafkaSimpleManager`2.RefreshMetadata(Int16 versionId, String clientId, Int32 correlationId, String topic, Boolean force) --- .../Helper/KafkaSimpleManager.cs | 45 ++++++++++++------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs b/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs index 90aa80a..e5379da 100644 --- a/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs +++ b/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs @@ -177,31 +177,45 @@ public TopicMetadata RefreshMetadata(short versionId, string clientId, int corre Logger.InfoFormat("RefreshMetadata enter: {0} {1} {2} Topic:{3} Force:{4}", versionId, clientId, correlationId, topic, force); if (!force && this.TopicMetadatas.ContainsKey(topic)) return this.TopicMetadatas[topic]; - + int maxRetryCount = 2; int retry = 0; - while (retry < 2) + while (retry < maxRetryCount) { Dictionary tempTopicMetadatas = new Dictionary(); Dictionary tempTopicMetadatasLastUpdateTime = new Dictionary(); Dictionary> partitionLeaders = new Dictionary>(); - RefreshMetadataInternal(versionId, clientId, correlationId, topic, tempTopicMetadatas, tempTopicMetadatasLastUpdateTime, partitionLeaders); - - if (tempTopicMetadatas.ContainsKey(topic)) + try { - this.TopicMetadatas[topic] = tempTopicMetadatas[topic]; - this.TopicMetadatasLastUpdateTime[topic] = tempTopicMetadatasLastUpdateTime[topic]; - this.TopicMetadataPartitionsLeaders[topic] = partitionLeaders; - int partitionCountInZK = GetTopicPartitionsFromZK(topic).Count; - if (partitionCountInZK != partitionLeaders.Count) - Logger.WarnFormat("RefreshMetadata exit return. Some partitions has no leader. Topic:{0} PartitionMetadata:{1} partitionLeaders:{2} != partitionCountInZK:{3}", topic, tempTopicMetadatas[topic].PartitionsMetadata.Count(), partitionLeaders.Count, partitionCountInZK); + RefreshMetadataInternal(versionId, clientId, correlationId, topic, tempTopicMetadatas, tempTopicMetadatasLastUpdateTime, partitionLeaders); + + if (tempTopicMetadatas.ContainsKey(topic)) + { + this.TopicMetadatas[topic] = tempTopicMetadatas[topic]; + this.TopicMetadatasLastUpdateTime[topic] = tempTopicMetadatasLastUpdateTime[topic]; + this.TopicMetadataPartitionsLeaders[topic] = partitionLeaders; + int partitionCountInZK = GetTopicPartitionsFromZK(topic).Count; + if (partitionCountInZK != partitionLeaders.Count) + Logger.WarnFormat("RefreshMetadata exit return. Some partitions has no leader. Topic:{0} PartitionMetadata:{1} partitionLeaders:{2} != partitionCountInZK:{3}", topic, tempTopicMetadatas[topic].PartitionsMetadata.Count(), partitionLeaders.Count, partitionCountInZK); + else + Logger.InfoFormat("RefreshMetadata exit return. Topic:{0} PartitionMetadata:{1} partitionLeaders:{2} partitionCountInZK:{3}", topic, tempTopicMetadatas[topic].PartitionsMetadata.Count(), partitionLeaders.Count, partitionCountInZK); + return this.TopicMetadatas[topic]; + } else - Logger.InfoFormat("RefreshMetadata exit return. Topic:{0} PartitionMetadata:{1} partitionLeaders:{2} partitionCountInZK:{3}", topic, tempTopicMetadatas[topic].PartitionsMetadata.Count(), partitionLeaders.Count, partitionCountInZK); - return this.TopicMetadatas[topic]; + { + Logger.WarnFormat("Got null for metadata of topic {0}, will RecreateSyncProducerPoolForMetadata and retry . ", topic); + RecreateSyncProducerPoolForMetadata(); + } } - else + catch (Exception ex) { - Logger.WarnFormat("Got null for metadata of topic {0}, will RecreateSyncProducerPoolForMetadata and retry . ", topic); + Logger.WarnFormat("Got exception while refreshing metadata of topic {0}, will RecreateSyncProducerPoolForMetadata and retry . {1} ",topic, + ,ExceptionUtil.GetExceptionDetailInfo(ex)); RecreateSyncProducerPoolForMetadata(); + retry++ + if (retry >= maxRetryCount) + { + throw ex; + } } retry++; } @@ -241,7 +255,6 @@ internal BrokerConfiguration GetLeaderBrokerOfPartition(string topic, int partit private void RefreshMetadataInternal(short versionId, string clientId, int correlationId, string topic, Dictionary tempTopicMetadatas, Dictionary tempTopicMetadatasLastUpdateTime, Dictionary> partitionLeaders) { Logger.InfoFormat("RefreshMetadataInternal enter: {0} {1} {2} Topic:{3} ", versionId, clientId, correlationId, topic); - lock (syncProducerPoolForMetadataLock) { BrokerPartitionInfo brokerPartitionInfo = new BrokerPartitionInfo(this.syncProducerPoolForMetaData, tempTopicMetadatas, tempTopicMetadatasLastUpdateTime, ProducerConfiguration.DefaultTopicMetaDataRefreshIntervalMS, this.syncProducerPoolForMetaData.zkClient); From dc9c4d16217b942fa5e00c500cd3a48841a34b6f Mon Sep 17 00:00:00 2001 From: Maxime Caron Date: Wed, 16 Mar 2016 17:58:14 -0700 Subject: [PATCH 2/5] add continue; to not increment retry twice --- src/KafkaNET.Library/Helper/KafkaSimpleManager.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs b/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs index e5379da..58cb460 100644 --- a/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs +++ b/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs @@ -216,6 +216,7 @@ public TopicMetadata RefreshMetadata(short versionId, string clientId, int corre { throw ex; } + continue; } retry++; } From 9b3ad1f75b2b5cb599003fae6108705e8ec7c367 Mon Sep 17 00:00:00 2001 From: Maxime Caron Date: Thu, 17 Mar 2016 10:50:08 -0700 Subject: [PATCH 3/5] Fix build error --- src/KafkaNET.Library/Helper/KafkaSimpleManager.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs b/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs index 58cb460..386de51 100644 --- a/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs +++ b/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs @@ -13,6 +13,7 @@ namespace Kafka.Client.Helper using Kafka.Client.Producers.Sync; using Kafka.Client.Requests; using Kafka.Client.Utils; + using Microsoft.KafkaNET.Library.Util; using System; using System.Collections.Concurrent; using System.Collections.Generic; @@ -209,9 +210,9 @@ public TopicMetadata RefreshMetadata(short versionId, string clientId, int corre catch (Exception ex) { Logger.WarnFormat("Got exception while refreshing metadata of topic {0}, will RecreateSyncProducerPoolForMetadata and retry . {1} ",topic, - ,ExceptionUtil.GetExceptionDetailInfo(ex)); + ExceptionUtil.GetExceptionDetailInfo(ex)); RecreateSyncProducerPoolForMetadata(); - retry++ + retry++; if (retry >= maxRetryCount) { throw ex; From 31c682f7ab032237129533b324a081c94b30fd80 Mon Sep 17 00:00:00 2001 From: Maxime Caron Date: Wed, 18 May 2016 10:47:08 -0700 Subject: [PATCH 4/5] Remove retry loop inside KafkaSimpleManager.RefreshMetadata Each retry is doing a network call. The caller of RefreshMetadata should take care of retry using Exponential Backoff and Jitter. Blindly retrying in a loop is not likely to work and will often make things worst because of thundering herd effect on Zookeeper and Kafka severs. --- .../Helper/KafkaSimpleManager.cs | 73 ++++++++----------- 1 file changed, 30 insertions(+), 43 deletions(-) diff --git a/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs b/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs index 386de51..bfb1070 100644 --- a/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs +++ b/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs @@ -178,50 +178,37 @@ public TopicMetadata RefreshMetadata(short versionId, string clientId, int corre Logger.InfoFormat("RefreshMetadata enter: {0} {1} {2} Topic:{3} Force:{4}", versionId, clientId, correlationId, topic, force); if (!force && this.TopicMetadatas.ContainsKey(topic)) return this.TopicMetadatas[topic]; - int maxRetryCount = 2; - int retry = 0; - while (retry < maxRetryCount) - { - Dictionary tempTopicMetadatas = new Dictionary(); - Dictionary tempTopicMetadatasLastUpdateTime = new Dictionary(); - Dictionary> partitionLeaders = new Dictionary>(); - try - { - RefreshMetadataInternal(versionId, clientId, correlationId, topic, tempTopicMetadatas, tempTopicMetadatasLastUpdateTime, partitionLeaders); - - if (tempTopicMetadatas.ContainsKey(topic)) - { - this.TopicMetadatas[topic] = tempTopicMetadatas[topic]; - this.TopicMetadatasLastUpdateTime[topic] = tempTopicMetadatasLastUpdateTime[topic]; - this.TopicMetadataPartitionsLeaders[topic] = partitionLeaders; - int partitionCountInZK = GetTopicPartitionsFromZK(topic).Count; - if (partitionCountInZK != partitionLeaders.Count) - Logger.WarnFormat("RefreshMetadata exit return. Some partitions has no leader. Topic:{0} PartitionMetadata:{1} partitionLeaders:{2} != partitionCountInZK:{3}", topic, tempTopicMetadatas[topic].PartitionsMetadata.Count(), partitionLeaders.Count, partitionCountInZK); - else - Logger.InfoFormat("RefreshMetadata exit return. Topic:{0} PartitionMetadata:{1} partitionLeaders:{2} partitionCountInZK:{3}", topic, tempTopicMetadatas[topic].PartitionsMetadata.Count(), partitionLeaders.Count, partitionCountInZK); - return this.TopicMetadatas[topic]; - } - else - { - Logger.WarnFormat("Got null for metadata of topic {0}, will RecreateSyncProducerPoolForMetadata and retry . ", topic); - RecreateSyncProducerPoolForMetadata(); - } - } - catch (Exception ex) - { - Logger.WarnFormat("Got exception while refreshing metadata of topic {0}, will RecreateSyncProducerPoolForMetadata and retry . {1} ",topic, - ExceptionUtil.GetExceptionDetailInfo(ex)); - RecreateSyncProducerPoolForMetadata(); - retry++; - if (retry >= maxRetryCount) - { - throw ex; - } - continue; - } - retry++; + Dictionary tempTopicMetadatas = new Dictionary(); + Dictionary tempTopicMetadatasLastUpdateTime = new Dictionary(); + Dictionary> partitionLeaders = new Dictionary>(); + try + { + RefreshMetadataInternal(versionId, clientId, correlationId, topic, tempTopicMetadatas, tempTopicMetadatasLastUpdateTime, partitionLeaders); + } + catch (Exception ex) + { + Logger.WarnFormat("Got exception while refreshing metadata of topic {0}, {1} ",topic, ExceptionUtil.GetExceptionDetailInfo(ex)); + // Calling RecreateSyncProducerPoolForMetadata + RecreateSyncProducerPoolForMetadata (); + throw; + } + if (tempTopicMetadatas.ContainsKey(topic)) + { + this.TopicMetadatas[topic] = tempTopicMetadatas[topic]; + this.TopicMetadatasLastUpdateTime[topic] = tempTopicMetadatasLastUpdateTime[topic]; + this.TopicMetadataPartitionsLeaders[topic] = partitionLeaders; + int partitionCountInZK = GetTopicPartitionsFromZK(topic).Count; + if (partitionCountInZK != partitionLeaders.Count) + Logger.WarnFormat("RefreshMetadata exit return. Some partitions has no leader. Topic:{0} PartitionMetadata:{1} partitionLeaders:{2} != partitionCountInZK:{3}", topic, tempTopicMetadatas[topic].PartitionsMetadata.Count(), partitionLeaders.Count, partitionCountInZK); + else + Logger.InfoFormat("RefreshMetadata exit return. Topic:{0} PartitionMetadata:{1} partitionLeaders:{2} partitionCountInZK:{3}", topic, tempTopicMetadatas[topic].PartitionsMetadata.Count(), partitionLeaders.Count, partitionCountInZK); + return this.TopicMetadatas[topic]; + } + else + { + Logger.WarnFormat("Got null for metadata of topic {0}, will RecreateSyncProducerPoolForMetadata and retry . ", topic); + RecreateSyncProducerPoolForMetadata(); } - Logger.WarnFormat("RefreshMetadata exit return NULL: {0} {1} {2} Topic:{3} Force:{4}", versionId, clientId, correlationId, topic, force); return null; } From e44b9735d1d232d0c31e0cd957c07c35de4b468b Mon Sep 17 00:00:00 2001 From: Maxime Caron Date: Wed, 18 May 2016 10:47:58 -0700 Subject: [PATCH 5/5] Update KafkaSimpleManager.cs --- src/KafkaNET.Library/Helper/KafkaSimpleManager.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs b/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs index bfb1070..7195d88 100644 --- a/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs +++ b/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs @@ -188,7 +188,7 @@ public TopicMetadata RefreshMetadata(short versionId, string clientId, int corre catch (Exception ex) { Logger.WarnFormat("Got exception while refreshing metadata of topic {0}, {1} ",topic, ExceptionUtil.GetExceptionDetailInfo(ex)); - // Calling RecreateSyncProducerPoolForMetadata + // Calling RecreateSyncProducerPoolForMetadata to get a fresh list of broker used for metadata call RecreateSyncProducerPoolForMetadata (); throw; }