diff --git a/go.mod b/go.mod index ad5cb934e4a..4fd40f960b2 100644 --- a/go.mod +++ b/go.mod @@ -307,7 +307,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/polydawn/refmt v0.89.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect - github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/client_model v0.6.2 github.com/prometheus/common v0.61.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/psiemens/sconfig v0.1.0 // indirect diff --git a/module/metrics/alsp.go b/module/metrics/alsp.go index 3d5dc2bc510..33edf997cc9 100644 --- a/module/metrics/alsp.go +++ b/module/metrics/alsp.go @@ -4,6 +4,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/network/channels" ) // AlspMetrics is a struct that contains all the metrics related to the ALSP module. @@ -38,12 +39,13 @@ func NewAlspMetrics() *AlspMetrics { // OnMisbehaviorReported is called when a misbehavior is reported by the application layer to ALSP. // An engine detecting a spamming-related misbehavior reports it to the ALSP module. It increases // the counter vector of reported misbehavior. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. // Args: // - channel: the channel on which the misbehavior was reported // - misbehaviorType: the type of misbehavior reported func (a *AlspMetrics) OnMisbehaviorReported(channel string, misbehaviorType string) { a.reportedMisbehaviorCount.With(prometheus.Labels{ - LabelChannel: channel, + LabelChannel: channels.NormalizeTopicForMetrics(channel), LabelMisbehavior: misbehaviorType, }).Inc() } diff --git a/module/metrics/gossipsub.go b/module/metrics/gossipsub.go index 39dfd1f1e36..2379582ef89 100644 --- a/module/metrics/gossipsub.go +++ b/module/metrics/gossipsub.go @@ -5,6 +5,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/network/channels" ) // LocalGossipSubRouterMetrics encapsulates the metrics collectors for GossipSub router of the local node. @@ -261,8 +262,10 @@ func NewGossipSubLocalMeshMetrics(prefix string) *LocalGossipSubRouterMetrics { var _ module.LocalGossipSubRouterMetrics = (*LocalGossipSubRouterMetrics)(nil) // OnLocalMeshSizeUpdated updates the local mesh size metric. +// Cluster topics are normalized to their prefix (e.g., "sync-cluster", "consensus-cluster") +// to prevent unbounded cardinality growth during epoch transitions. func (g *LocalGossipSubRouterMetrics) OnLocalMeshSizeUpdated(topic string, size int) { - g.localMeshSize.WithLabelValues(topic).Set(float64(size)) + g.localMeshSize.WithLabelValues(channels.NormalizeTopicForMetrics(topic)).Set(float64(size)) } // OnPeerAddedToProtocol is called when the local node receives a stream from a peer on a gossipsub-related protocol. @@ -296,14 +299,16 @@ func (g *LocalGossipSubRouterMetrics) OnLocalPeerLeftTopic() { // OnPeerGraftTopic is called when the local node receives a GRAFT message from a remote peer on a topic. // Note: the received GRAFT at this point is considered passed the RPC inspection, and is accepted by the local node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (g *LocalGossipSubRouterMetrics) OnPeerGraftTopic(topic string) { - g.peerGraftTopicCount.WithLabelValues(topic).Inc() + g.peerGraftTopicCount.WithLabelValues(channels.NormalizeTopicForMetrics(topic)).Inc() } // OnPeerPruneTopic is called when the local node receives a PRUNE message from a remote peer on a topic. // Note: the received PRUNE at this point is considered passed the RPC inspection, and is accepted by the local node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (g *LocalGossipSubRouterMetrics) OnPeerPruneTopic(topic string) { - g.peerPruneTopicCount.WithLabelValues(topic).Inc() + g.peerPruneTopicCount.WithLabelValues(channels.NormalizeTopicForMetrics(topic)).Inc() } // OnMessageEnteredValidation is called when a received pubsub message enters the validation pipeline. It is the diff --git a/module/metrics/gossipsub_rpc_validation_inspector.go b/module/metrics/gossipsub_rpc_validation_inspector.go index d8c20fccc81..4611b3241d3 100644 --- a/module/metrics/gossipsub_rpc_validation_inspector.go +++ b/module/metrics/gossipsub_rpc_validation_inspector.go @@ -7,6 +7,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/network/channels" p2pmsg "github.com/onflow/flow-go/network/p2p/message" ) @@ -421,11 +422,12 @@ func (c *GossipSubRpcValidationInspectorMetrics) OnIWantMessageIDsReceived(msgId // OnIHaveMessageIDsReceived tracks the number of message ids received by the node from other nodes on an iHave message. // This function is called on each iHave message received by the node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. // Args: // - channel: the channel on which the iHave message was received. // - msgIdCount: the number of message ids received on the iHave message. func (c *GossipSubRpcValidationInspectorMetrics) OnIHaveMessageIDsReceived(channel string, msgIdCount int) { - c.receivedIHaveMsgIDsHistogram.WithLabelValues(channel).Observe(float64(msgIdCount)) + c.receivedIHaveMsgIDsHistogram.WithLabelValues(channels.NormalizeTopicForMetrics(channel)).Observe(float64(msgIdCount)) } // OnIncomingRpcReceived tracks the number of incoming RPC messages received by the node. diff --git a/module/metrics/gossipsub_score.go b/module/metrics/gossipsub_score.go index 15e3628469c..8e13f40e352 100644 --- a/module/metrics/gossipsub_score.go +++ b/module/metrics/gossipsub_score.go @@ -143,19 +143,19 @@ func (g *GossipSubScoreMetrics) OnBehaviourPenaltyUpdated(penalty float64) { } func (g *GossipSubScoreMetrics) OnTimeInMeshUpdated(topic channels.Topic, duration time.Duration) { - g.timeInMesh.WithLabelValues(string(topic)).Observe(duration.Seconds()) + g.timeInMesh.WithLabelValues(channels.NormalizeTopicForMetrics(string(topic))).Observe(duration.Seconds()) } func (g *GossipSubScoreMetrics) OnFirstMessageDeliveredUpdated(topic channels.Topic, f float64) { - g.firstMessageDelivery.WithLabelValues(string(topic)).Observe(f) + g.firstMessageDelivery.WithLabelValues(channels.NormalizeTopicForMetrics(string(topic))).Observe(f) } func (g *GossipSubScoreMetrics) OnMeshMessageDeliveredUpdated(topic channels.Topic, f float64) { - g.meshMessageDelivery.WithLabelValues(string(topic)).Observe(f) + g.meshMessageDelivery.WithLabelValues(channels.NormalizeTopicForMetrics(string(topic))).Observe(f) } func (g *GossipSubScoreMetrics) OnInvalidMessageDeliveredUpdated(topic channels.Topic, f float64) { - g.invalidMessageDelivery.WithLabelValues(string(topic)).Observe(f) + g.invalidMessageDelivery.WithLabelValues(channels.NormalizeTopicForMetrics(string(topic))).Observe(f) } func (g *GossipSubScoreMetrics) SetWarningStateCount(u uint) { diff --git a/module/metrics/network.go b/module/metrics/network.go index a6eead52e48..a7073eb0dcf 100644 --- a/module/metrics/network.go +++ b/module/metrics/network.go @@ -10,6 +10,7 @@ import ( "github.com/rs/zerolog" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/network/channels" logging2 "github.com/onflow/flow-go/network/p2p/logging" "github.com/onflow/flow-go/utils/logging" ) @@ -260,18 +261,21 @@ func NewNetworkCollector(logger zerolog.Logger, opts ...NetworkCollectorOpt) *Ne } // OutboundMessageSent collects metrics related to a message sent by the node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (nc *NetworkCollector) OutboundMessageSent(sizeBytes int, topic, protocol, messageType string) { - nc.outboundMessageSize.WithLabelValues(topic, protocol, messageType).Observe(float64(sizeBytes)) + nc.outboundMessageSize.WithLabelValues(channels.NormalizeTopicForMetrics(topic), protocol, messageType).Observe(float64(sizeBytes)) } // InboundMessageReceived collects metrics related to a message received by the node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (nc *NetworkCollector) InboundMessageReceived(sizeBytes int, topic, protocol, messageType string) { - nc.inboundMessageSize.WithLabelValues(topic, protocol, messageType).Observe(float64(sizeBytes)) + nc.inboundMessageSize.WithLabelValues(channels.NormalizeTopicForMetrics(topic), protocol, messageType).Observe(float64(sizeBytes)) } // DuplicateInboundMessagesDropped increments the metric tracking the number of duplicate messages dropped by the node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (nc *NetworkCollector) DuplicateInboundMessagesDropped(topic, protocol, messageType string) { - nc.duplicateMessagesDropped.WithLabelValues(topic, protocol, messageType).Add(1) + nc.duplicateMessagesDropped.WithLabelValues(channels.NormalizeTopicForMetrics(topic), protocol, messageType).Add(1) } func (nc *NetworkCollector) MessageAdded(priority int) { @@ -287,18 +291,21 @@ func (nc *NetworkCollector) QueueDuration(duration time.Duration, priority int) } // MessageProcessingStarted increments the metric tracking the number of messages being processed by the node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (nc *NetworkCollector) MessageProcessingStarted(topic string) { - nc.numMessagesProcessing.WithLabelValues(topic).Inc() + nc.numMessagesProcessing.WithLabelValues(channels.NormalizeTopicForMetrics(topic)).Inc() } // UnicastMessageSendingStarted increments the metric tracking the number of unicast messages sent by the node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (nc *NetworkCollector) UnicastMessageSendingStarted(topic string) { - nc.numDirectMessagesSending.WithLabelValues(topic).Inc() + nc.numDirectMessagesSending.WithLabelValues(channels.NormalizeTopicForMetrics(topic)).Inc() } // UnicastMessageSendingCompleted decrements the metric tracking the number of unicast messages sent by the node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (nc *NetworkCollector) UnicastMessageSendingCompleted(topic string) { - nc.numDirectMessagesSending.WithLabelValues(topic).Dec() + nc.numDirectMessagesSending.WithLabelValues(channels.NormalizeTopicForMetrics(topic)).Dec() } func (nc *NetworkCollector) RoutingTablePeerAdded() { @@ -311,9 +318,11 @@ func (nc *NetworkCollector) RoutingTablePeerRemoved() { // MessageProcessingFinished tracks the time spent by the node to process a message and decrements the metric tracking // the number of messages being processed by the node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (nc *NetworkCollector) MessageProcessingFinished(topic string, duration time.Duration) { - nc.numMessagesProcessing.WithLabelValues(topic).Dec() - nc.inboundProcessTime.WithLabelValues(topic).Add(duration.Seconds()) + normalizedTopic := channels.NormalizeTopicForMetrics(topic) + nc.numMessagesProcessing.WithLabelValues(normalizedTopic).Dec() + nc.inboundProcessTime.WithLabelValues(normalizedTopic).Add(duration.Seconds()) } // OutboundConnections updates the metric tracking the number of outbound connections of this node @@ -353,11 +362,13 @@ func (nc *NetworkCollector) OnDNSLookupRequestDropped() { } // OnUnauthorizedMessage tracks the number of unauthorized messages seen on the network. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (nc *NetworkCollector) OnUnauthorizedMessage(role, msgType, topic, offense string) { - nc.unAuthorizedMessagesCount.WithLabelValues(role, msgType, topic, offense).Inc() + nc.unAuthorizedMessagesCount.WithLabelValues(role, msgType, channels.NormalizeTopicForMetrics(topic), offense).Inc() } // OnRateLimitedPeer tracks the number of rate limited messages seen on the network. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (nc *NetworkCollector) OnRateLimitedPeer(peerID peer.ID, role, msgType, topic, reason string) { nc.logger.Warn(). Str("peer_id", logging2.PeerId(peerID)). @@ -367,7 +378,7 @@ func (nc *NetworkCollector) OnRateLimitedPeer(peerID peer.ID, role, msgType, top Str("reason", reason). Bool(logging.KeySuspicious, true). Msg("unicast peer rate limited") - nc.rateLimitedUnicastMessagesCount.WithLabelValues(role, msgType, topic, reason).Inc() + nc.rateLimitedUnicastMessagesCount.WithLabelValues(role, msgType, channels.NormalizeTopicForMetrics(topic), reason).Inc() } // OnViolationReportSkipped tracks the number of slashing violations consumer violations that were not diff --git a/module/metrics/network_test.go b/module/metrics/network_test.go new file mode 100644 index 00000000000..ea4c24946c7 --- /dev/null +++ b/module/metrics/network_test.go @@ -0,0 +1,172 @@ +package metrics + +import ( + "fmt" + "testing" + + "github.com/prometheus/client_golang/prometheus" + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestNetworkCollector_ClusterTopicNormalization verifies that cluster topics are normalized +// to their prefix (e.g., "sync-cluster-*" -> "sync-cluster") to prevent unbounded metric +// cardinality growth during epoch transitions. +func TestNetworkCollector_ClusterTopicNormalization(t *testing.T) { + prefix := fmt.Sprintf("test_%s_", t.Name()) + logger := zerolog.Nop() + nc := NewNetworkCollector(logger, WithNetworkPrefix(prefix)) + + // These cluster topics should be normalized to their prefix + clusterTopic1 := "sync-cluster-test-cluster-id-1" + clusterTopic2 := "sync-cluster-test-cluster-id-2" + consensusClusterTopic := "consensus-cluster-test-cluster-id" + nonClusterTopic := "blocks" + protocol := "pubsub" + msgType := "BlockProposal" + + // Record metrics for multiple cluster topics and a non-cluster topic + nc.InboundMessageReceived(100, clusterTopic1, protocol, msgType) + nc.InboundMessageReceived(200, clusterTopic2, protocol, msgType) + nc.InboundMessageReceived(300, consensusClusterTopic, protocol, msgType) + nc.InboundMessageReceived(400, nonClusterTopic, protocol, msgType) + + nc.OutboundMessageSent(150, clusterTopic1, protocol, msgType) + nc.OutboundMessageSent(250, clusterTopic2, protocol, msgType) + nc.OutboundMessageSent(350, consensusClusterTopic, protocol, msgType) + nc.OutboundMessageSent(450, nonClusterTopic, protocol, msgType) + + nc.DuplicateInboundMessagesDropped(clusterTopic1, protocol, msgType) + nc.DuplicateInboundMessagesDropped(clusterTopic2, protocol, msgType) + nc.DuplicateInboundMessagesDropped(consensusClusterTopic, protocol, msgType) + nc.DuplicateInboundMessagesDropped(nonClusterTopic, protocol, msgType) + + // Verify cluster topics are normalized to their prefix + // Both sync-cluster topics should be recorded under "sync-cluster" + assertHistogramVecHasLabel(t, nc.inboundMessageSize, LabelChannel, "sync-cluster", "sync-cluster topics should be normalized to 'sync-cluster'") + assertHistogramVecHasLabel(t, nc.inboundMessageSize, LabelChannel, "consensus-cluster", "consensus-cluster topics should be normalized to 'consensus-cluster'") + + // Verify the original cluster topic names are NOT present (they should be normalized) + assertHistogramVecNotHasLabel(t, nc.inboundMessageSize, LabelChannel, clusterTopic1, "original cluster topic name should not be present") + assertHistogramVecNotHasLabel(t, nc.inboundMessageSize, LabelChannel, clusterTopic2, "original cluster topic name should not be present") + assertHistogramVecNotHasLabel(t, nc.inboundMessageSize, LabelChannel, consensusClusterTopic, "original consensus-cluster topic name should not be present") + + // Verify non-cluster topics are NOT normalized (they should keep their original name) + assertHistogramVecHasLabel(t, nc.inboundMessageSize, LabelChannel, nonClusterTopic, "non-cluster topics should keep their original name") +} + +// TestLocalGossipSubRouterMetrics_ClusterTopicNormalization verifies that LocalGossipSubRouterMetrics +// normalizes cluster topics to their prefix. +func TestLocalGossipSubRouterMetrics_ClusterTopicNormalization(t *testing.T) { + prefix := fmt.Sprintf("test_%s_", t.Name()) + m := NewGossipSubLocalMeshMetrics(prefix) + + clusterTopic1 := "sync-cluster-cluster-1" + clusterTopic2 := "sync-cluster-cluster-2" + consensusClusterTopic := "consensus-cluster-cluster-1" + nonClusterTopic := "blocks" + + // Record metrics for cluster and non-cluster topics + m.OnLocalMeshSizeUpdated(clusterTopic1, 5) + m.OnLocalMeshSizeUpdated(clusterTopic2, 3) + m.OnLocalMeshSizeUpdated(consensusClusterTopic, 4) + m.OnLocalMeshSizeUpdated(nonClusterTopic, 2) + + m.OnPeerGraftTopic(clusterTopic1) + m.OnPeerGraftTopic(clusterTopic2) + m.OnPeerGraftTopic(consensusClusterTopic) + m.OnPeerGraftTopic(nonClusterTopic) + + m.OnPeerPruneTopic(clusterTopic1) + m.OnPeerPruneTopic(clusterTopic2) + m.OnPeerPruneTopic(consensusClusterTopic) + m.OnPeerPruneTopic(nonClusterTopic) + + // Verify cluster topics are normalized to their prefix + assertGaugeVecHasLabel(t, &m.localMeshSize, LabelChannel, "sync-cluster", "sync-cluster topics should be normalized") + assertGaugeVecHasLabel(t, &m.localMeshSize, LabelChannel, "consensus-cluster", "consensus-cluster topics should be normalized") + + // Verify original cluster topic names are NOT present + assertGaugeVecNotHasLabel(t, &m.localMeshSize, LabelChannel, clusterTopic1, "original cluster topic should not be present") + assertGaugeVecNotHasLabel(t, &m.localMeshSize, LabelChannel, clusterTopic2, "original cluster topic should not be present") + assertGaugeVecNotHasLabel(t, &m.localMeshSize, LabelChannel, consensusClusterTopic, "original consensus-cluster topic should not be present") + + // Verify non-cluster topics are NOT normalized + assertGaugeVecHasLabel(t, &m.localMeshSize, LabelChannel, nonClusterTopic, "non-cluster topics should keep their original name") +} + +// assertHistogramVecHasLabel checks that the histogram vec has at least one metric with the given label value. +func assertHistogramVecHasLabel(t *testing.T, hv *prometheus.HistogramVec, labelName, labelValue, msg string) { + t.Helper() + found := histogramVecHasLabelValue(t, hv, labelName, labelValue) + assert.True(t, found, msg) +} + +// assertHistogramVecNotHasLabel checks that the histogram vec has no metrics with the given label value. +func assertHistogramVecNotHasLabel(t *testing.T, hv *prometheus.HistogramVec, labelName, labelValue, msg string) { + t.Helper() + found := histogramVecHasLabelValue(t, hv, labelName, labelValue) + assert.False(t, found, msg) +} + +// assertGaugeVecHasLabel checks that the gauge vec has at least one metric with the given label value. +func assertGaugeVecHasLabel(t *testing.T, gv *prometheus.GaugeVec, labelName, labelValue, msg string) { + t.Helper() + found := gaugeVecHasLabelValue(t, gv, labelName, labelValue) + assert.True(t, found, msg) +} + +// assertGaugeVecNotHasLabel checks that the gauge vec has no metrics with the given label value. +func assertGaugeVecNotHasLabel(t *testing.T, gv *prometheus.GaugeVec, labelName, labelValue, msg string) { + t.Helper() + found := gaugeVecHasLabelValue(t, gv, labelName, labelValue) + assert.False(t, found, msg) +} + +// histogramVecHasLabelValue collects metrics from the histogram vec and checks if any have the given label value. +func histogramVecHasLabelValue(t *testing.T, hv *prometheus.HistogramVec, labelName, labelValue string) bool { + t.Helper() + ch := make(chan prometheus.Metric, 100) + go func() { + hv.Collect(ch) + close(ch) + }() + + for metric := range ch { + var m io_prometheus_client.Metric + err := metric.Write(&m) + require.NoError(t, err) + + for _, label := range m.GetLabel() { + if label.GetName() == labelName && label.GetValue() == labelValue { + return true + } + } + } + return false +} + +// gaugeVecHasLabelValue collects metrics from the gauge vec and checks if any have the given label value. +func gaugeVecHasLabelValue(t *testing.T, gv *prometheus.GaugeVec, labelName, labelValue string) bool { + t.Helper() + ch := make(chan prometheus.Metric, 100) + go func() { + gv.Collect(ch) + close(ch) + }() + + for metric := range ch { + var m io_prometheus_client.Metric + err := metric.Write(&m) + require.NoError(t, err) + + for _, label := range m.GetLabel() { + if label.GetName() == labelName && label.GetValue() == labelValue { + return true + } + } + } + return false +} diff --git a/network/channels/channels.go b/network/channels/channels.go index d279dd85c1e..3f858c15548 100644 --- a/network/channels/channels.go +++ b/network/channels/channels.go @@ -338,6 +338,20 @@ func SyncCluster(clusterID flow.ChainID) Channel { return Channel(fmt.Sprintf("%s-%s", SyncClusterPrefix, clusterID)) } +// NormalizeTopicForMetrics returns a normalized version of the topic suitable for use as a metrics label. +// For cluster topics (sync-cluster-*, consensus-cluster-*), it returns just the prefix to prevent +// unbounded cardinality growth during epoch transitions (each epoch creates new cluster IDs). +// For non-cluster topics, it returns the topic unchanged. +func NormalizeTopicForMetrics(topic string) string { + if strings.HasPrefix(topic, SyncClusterPrefix) { + return SyncClusterPrefix + } + if strings.HasPrefix(topic, ConsensusClusterPrefix) { + return ConsensusClusterPrefix + } + return topic +} + // IsValidNonClusterFlowTopic ensures the topic is a valid Flow network topic and // ensures the sporkID part of the Topic is equal to the current network sporkID. // Expected errors: diff --git a/network/channels/channels_test.go b/network/channels/channels_test.go index 37b2ca6c811..2e07466683a 100644 --- a/network/channels/channels_test.go +++ b/network/channels/channels_test.go @@ -129,3 +129,57 @@ func TestUniqueChannels_ClusterChannels(t *testing.T) { require.Contains(t, uniques, consensusCluster) // cluster channel require.Contains(t, uniques, PushTransactions) // non-cluster channel } + +// TestNormalizeTopicForMetrics verifies that cluster topics are normalized to their prefix +// (e.g., "sync-cluster-*" -> "sync-cluster") while non-cluster topics remain unchanged. +// This prevents unbounded metric cardinality growth during epoch transitions. +func TestNormalizeTopicForMetrics(t *testing.T) { + testCases := []struct { + name string + inputTopic string + expectedOutput string + }{ + { + name: "sync-cluster topic normalized", + inputTopic: "sync-cluster-cluster-123-abc123def456", + expectedOutput: SyncClusterPrefix, + }, + { + name: "consensus-cluster topic normalized", + inputTopic: "consensus-cluster-cluster-456-def789abc123", + expectedOutput: ConsensusClusterPrefix, + }, + { + name: "sync-cluster topic with different format normalized", + inputTopic: "sync-cluster-test-id", + expectedOutput: SyncClusterPrefix, + }, + { + name: "consensus-cluster topic with different format normalized", + inputTopic: "consensus-cluster-test-id", + expectedOutput: ConsensusClusterPrefix, + }, + { + name: "non-cluster topic unchanged", + inputTopic: "push-blocks/abc123", + expectedOutput: "push-blocks/abc123", + }, + { + name: "another non-cluster topic unchanged", + inputTopic: "consensus-committee/xyz789", + expectedOutput: "consensus-committee/xyz789", + }, + { + name: "empty string unchanged", + inputTopic: "", + expectedOutput: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := NormalizeTopicForMetrics(tc.inputTopic) + assert.Equal(t, tc.expectedOutput, result) + }) + } +} diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index a25f4e717d6..c36c5d041bf 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -279,6 +279,15 @@ func (t *GossipSubMeshTracer) Leave(topic string) { Str("topic", topic). Msg("local peer left topic") } + + // Clean up topic mesh tracking for cluster topics. + // Note: Metric cardinality is bounded by normalizing cluster topics to their prefix + // (e.g., "consensus-cluster", "sync-cluster") when recording metrics. + if channels.IsClusterChannel(channels.Channel(topic)) { + t.topicMeshMu.Lock() + delete(t.topicMeshMap, topic) + t.topicMeshMu.Unlock() + } } // ValidateMessage is called by GossipSub as a callback when a message is received by the local node and entered the validation phase.