diff --git a/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs b/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs index 90aa80a..7195d88 100644 --- a/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs +++ b/src/KafkaNET.Library/Helper/KafkaSimpleManager.cs @@ -13,6 +13,7 @@ namespace Kafka.Client.Helper using Kafka.Client.Producers.Sync; using Kafka.Client.Requests; using Kafka.Client.Utils; + using Microsoft.KafkaNET.Library.Util; using System; using System.Collections.Concurrent; using System.Collections.Generic; @@ -177,35 +178,37 @@ public TopicMetadata RefreshMetadata(short versionId, string clientId, int corre Logger.InfoFormat("RefreshMetadata enter: {0} {1} {2} Topic:{3} Force:{4}", versionId, clientId, correlationId, topic, force); if (!force && this.TopicMetadatas.ContainsKey(topic)) return this.TopicMetadatas[topic]; - - int retry = 0; - while (retry < 2) + Dictionary tempTopicMetadatas = new Dictionary(); + Dictionary tempTopicMetadatasLastUpdateTime = new Dictionary(); + Dictionary> partitionLeaders = new Dictionary>(); + try { - Dictionary tempTopicMetadatas = new Dictionary(); - Dictionary tempTopicMetadatasLastUpdateTime = new Dictionary(); - Dictionary> partitionLeaders = new Dictionary>(); RefreshMetadataInternal(versionId, clientId, correlationId, topic, tempTopicMetadatas, tempTopicMetadatasLastUpdateTime, partitionLeaders); - - if (tempTopicMetadatas.ContainsKey(topic)) - { - this.TopicMetadatas[topic] = tempTopicMetadatas[topic]; - this.TopicMetadatasLastUpdateTime[topic] = tempTopicMetadatasLastUpdateTime[topic]; - this.TopicMetadataPartitionsLeaders[topic] = partitionLeaders; - int partitionCountInZK = GetTopicPartitionsFromZK(topic).Count; - if (partitionCountInZK != partitionLeaders.Count) - Logger.WarnFormat("RefreshMetadata exit return. Some partitions has no leader. Topic:{0} PartitionMetadata:{1} partitionLeaders:{2} != partitionCountInZK:{3}", topic, tempTopicMetadatas[topic].PartitionsMetadata.Count(), partitionLeaders.Count, partitionCountInZK); - else - Logger.InfoFormat("RefreshMetadata exit return. Topic:{0} PartitionMetadata:{1} partitionLeaders:{2} partitionCountInZK:{3}", topic, tempTopicMetadatas[topic].PartitionsMetadata.Count(), partitionLeaders.Count, partitionCountInZK); - return this.TopicMetadatas[topic]; - } + } + catch (Exception ex) + { + Logger.WarnFormat("Got exception while refreshing metadata of topic {0}, {1} ",topic, ExceptionUtil.GetExceptionDetailInfo(ex)); + // Calling RecreateSyncProducerPoolForMetadata to get a fresh list of broker used for metadata call + RecreateSyncProducerPoolForMetadata (); + throw; + } + if (tempTopicMetadatas.ContainsKey(topic)) + { + this.TopicMetadatas[topic] = tempTopicMetadatas[topic]; + this.TopicMetadatasLastUpdateTime[topic] = tempTopicMetadatasLastUpdateTime[topic]; + this.TopicMetadataPartitionsLeaders[topic] = partitionLeaders; + int partitionCountInZK = GetTopicPartitionsFromZK(topic).Count; + if (partitionCountInZK != partitionLeaders.Count) + Logger.WarnFormat("RefreshMetadata exit return. Some partitions has no leader. Topic:{0} PartitionMetadata:{1} partitionLeaders:{2} != partitionCountInZK:{3}", topic, tempTopicMetadatas[topic].PartitionsMetadata.Count(), partitionLeaders.Count, partitionCountInZK); else - { - Logger.WarnFormat("Got null for metadata of topic {0}, will RecreateSyncProducerPoolForMetadata and retry . ", topic); - RecreateSyncProducerPoolForMetadata(); - } - retry++; + Logger.InfoFormat("RefreshMetadata exit return. Topic:{0} PartitionMetadata:{1} partitionLeaders:{2} partitionCountInZK:{3}", topic, tempTopicMetadatas[topic].PartitionsMetadata.Count(), partitionLeaders.Count, partitionCountInZK); + return this.TopicMetadatas[topic]; + } + else + { + Logger.WarnFormat("Got null for metadata of topic {0}, will RecreateSyncProducerPoolForMetadata and retry . ", topic); + RecreateSyncProducerPoolForMetadata(); } - Logger.WarnFormat("RefreshMetadata exit return NULL: {0} {1} {2} Topic:{3} Force:{4}", versionId, clientId, correlationId, topic, force); return null; } @@ -241,7 +244,6 @@ internal BrokerConfiguration GetLeaderBrokerOfPartition(string topic, int partit private void RefreshMetadataInternal(short versionId, string clientId, int correlationId, string topic, Dictionary tempTopicMetadatas, Dictionary tempTopicMetadatasLastUpdateTime, Dictionary> partitionLeaders) { Logger.InfoFormat("RefreshMetadataInternal enter: {0} {1} {2} Topic:{3} ", versionId, clientId, correlationId, topic); - lock (syncProducerPoolForMetadataLock) { BrokerPartitionInfo brokerPartitionInfo = new BrokerPartitionInfo(this.syncProducerPoolForMetaData, tempTopicMetadatas, tempTopicMetadatasLastUpdateTime, ProducerConfiguration.DefaultTopicMetaDataRefreshIntervalMS, this.syncProducerPoolForMetaData.zkClient);