From 342cc146c46203c536f7c873988b7bb32039f3ef Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 18 May 2026 15:10:10 -0700 Subject: [PATCH 1/6] clean up stale cluster topic metrics on epoch transitions --- module/metrics.go | 5 ++ module/metrics/gossipsub.go | 9 ++ module/metrics/noop.go | 1 + module/mock/gossip_sub_metrics.go | 40 +++++++++ module/mock/lib_p2_p_metrics.go | 40 +++++++++ .../mock/local_gossip_sub_router_metrics.go | 40 +++++++++ module/mock/network_metrics.go | 40 +++++++++ network/p2p/tracer/gossipSubMeshTracer.go | 12 +++ .../p2p/tracer/gossipSubMeshTracer_test.go | 82 +++++++++++++++++++ 9 files changed, 269 insertions(+) diff --git a/module/metrics.go b/module/metrics.go index c67059f7966..ab877bdcd8c 100644 --- a/module/metrics.go +++ b/module/metrics.go @@ -182,6 +182,11 @@ type LocalGossipSubRouterMetrics interface { // OnMessageDeliveredToAllSubscribers is called when a message is delivered to all subscribers of the topic. OnMessageDeliveredToAllSubscribers(size int) + + // OnClusterTopicMetricsCleanup is called when the local node leaves a cluster topic. It cleans up all + // metrics associated with the topic to prevent unbounded metric cardinality growth during epoch transitions. + // This should only be called for cluster topics (sync-cluster-*, consensus-cluster-*). + OnClusterTopicMetricsCleanup(topic string) } // UnicastManagerMetrics unicast manager metrics. diff --git a/module/metrics/gossipsub.go b/module/metrics/gossipsub.go index 39dfd1f1e36..3c58b7c1266 100644 --- a/module/metrics/gossipsub.go +++ b/module/metrics/gossipsub.go @@ -378,3 +378,12 @@ func (g *LocalGossipSubRouterMetrics) OnUndeliveredMessage() { func (g *LocalGossipSubRouterMetrics) OnMessageDeliveredToAllSubscribers(size int) { g.messageDeliveredSize.Observe(float64(size)) } + +// OnClusterTopicMetricsCleanup removes all metric label values associated with the given cluster topic. +// This prevents unbounded metric cardinality growth during epoch transitions when collection nodes +// join new clusters and leave old ones. Only call this for cluster topics (sync-cluster-*, consensus-cluster-*). +func (g *LocalGossipSubRouterMetrics) OnClusterTopicMetricsCleanup(topic string) { + g.localMeshSize.DeleteLabelValues(topic) + g.peerGraftTopicCount.DeleteLabelValues(topic) + g.peerPruneTopicCount.DeleteLabelValues(topic) +} diff --git a/module/metrics/noop.go b/module/metrics/noop.go index 4a4b69a2264..9306d56776d 100644 --- a/module/metrics/noop.go +++ b/module/metrics/noop.go @@ -310,6 +310,7 @@ func (nc *NoopCollector) OnRpcSent(msgCount int, iHaveCount int, iWantCount int, func (nc *NoopCollector) OnOutboundRpcDropped() {} func (nc *NoopCollector) OnUndeliveredMessage() {} func (nc *NoopCollector) OnMessageDeliveredToAllSubscribers(size int) {} +func (nc *NoopCollector) OnClusterTopicMetricsCleanup(topic string) {} func (nc *NoopCollector) AllowConn(network.Direction, bool) {} func (nc *NoopCollector) BlockConn(network.Direction, bool) {} func (nc *NoopCollector) AllowStream(peer.ID, network.Direction) {} diff --git a/module/mock/gossip_sub_metrics.go b/module/mock/gossip_sub_metrics.go index 57ce8bfdfb4..8bfefbd7b63 100644 --- a/module/mock/gossip_sub_metrics.go +++ b/module/mock/gossip_sub_metrics.go @@ -225,6 +225,46 @@ func (_c *GossipSubMetrics_OnBehaviourPenaltyUpdated_Call) RunAndReturn(run func return _c } +// OnClusterTopicMetricsCleanup provides a mock function for the type GossipSubMetrics +func (_mock *GossipSubMetrics) OnClusterTopicMetricsCleanup(topic string) { + _mock.Called(topic) + return +} + +// GossipSubMetrics_OnClusterTopicMetricsCleanup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnClusterTopicMetricsCleanup' +type GossipSubMetrics_OnClusterTopicMetricsCleanup_Call struct { + *mock.Call +} + +// OnClusterTopicMetricsCleanup is a helper method to define mock.On call +// - topic string +func (_e *GossipSubMetrics_Expecter) OnClusterTopicMetricsCleanup(topic interface{}) *GossipSubMetrics_OnClusterTopicMetricsCleanup_Call { + return &GossipSubMetrics_OnClusterTopicMetricsCleanup_Call{Call: _e.mock.On("OnClusterTopicMetricsCleanup", topic)} +} + +func (_c *GossipSubMetrics_OnClusterTopicMetricsCleanup_Call) Run(run func(topic string)) *GossipSubMetrics_OnClusterTopicMetricsCleanup_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 string + if args[0] != nil { + arg0 = args[0].(string) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *GossipSubMetrics_OnClusterTopicMetricsCleanup_Call) Return() *GossipSubMetrics_OnClusterTopicMetricsCleanup_Call { + _c.Call.Return() + return _c +} + +func (_c *GossipSubMetrics_OnClusterTopicMetricsCleanup_Call) RunAndReturn(run func(topic string)) *GossipSubMetrics_OnClusterTopicMetricsCleanup_Call { + _c.Run(run) + return _c +} + // OnControlMessagesTruncated provides a mock function for the type GossipSubMetrics func (_mock *GossipSubMetrics) OnControlMessagesTruncated(messageType p2pmsg.ControlMessageType, diff int) { _mock.Called(messageType, diff) diff --git a/module/mock/lib_p2_p_metrics.go b/module/mock/lib_p2_p_metrics.go index 8b26953b35b..49d2aa54089 100644 --- a/module/mock/lib_p2_p_metrics.go +++ b/module/mock/lib_p2_p_metrics.go @@ -984,6 +984,46 @@ func (_c *LibP2PMetrics_OnBehaviourPenaltyUpdated_Call) RunAndReturn(run func(f return _c } +// OnClusterTopicMetricsCleanup provides a mock function for the type LibP2PMetrics +func (_mock *LibP2PMetrics) OnClusterTopicMetricsCleanup(topic string) { + _mock.Called(topic) + return +} + +// LibP2PMetrics_OnClusterTopicMetricsCleanup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnClusterTopicMetricsCleanup' +type LibP2PMetrics_OnClusterTopicMetricsCleanup_Call struct { + *mock.Call +} + +// OnClusterTopicMetricsCleanup is a helper method to define mock.On call +// - topic string +func (_e *LibP2PMetrics_Expecter) OnClusterTopicMetricsCleanup(topic interface{}) *LibP2PMetrics_OnClusterTopicMetricsCleanup_Call { + return &LibP2PMetrics_OnClusterTopicMetricsCleanup_Call{Call: _e.mock.On("OnClusterTopicMetricsCleanup", topic)} +} + +func (_c *LibP2PMetrics_OnClusterTopicMetricsCleanup_Call) Run(run func(topic string)) *LibP2PMetrics_OnClusterTopicMetricsCleanup_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 string + if args[0] != nil { + arg0 = args[0].(string) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *LibP2PMetrics_OnClusterTopicMetricsCleanup_Call) Return() *LibP2PMetrics_OnClusterTopicMetricsCleanup_Call { + _c.Call.Return() + return _c +} + +func (_c *LibP2PMetrics_OnClusterTopicMetricsCleanup_Call) RunAndReturn(run func(topic string)) *LibP2PMetrics_OnClusterTopicMetricsCleanup_Call { + _c.Run(run) + return _c +} + // OnControlMessagesTruncated provides a mock function for the type LibP2PMetrics func (_mock *LibP2PMetrics) OnControlMessagesTruncated(messageType p2pmsg.ControlMessageType, diff int) { _mock.Called(messageType, diff) diff --git a/module/mock/local_gossip_sub_router_metrics.go b/module/mock/local_gossip_sub_router_metrics.go index 416027c49c9..5123ef76e49 100644 --- a/module/mock/local_gossip_sub_router_metrics.go +++ b/module/mock/local_gossip_sub_router_metrics.go @@ -35,6 +35,46 @@ func (_m *LocalGossipSubRouterMetrics) EXPECT() *LocalGossipSubRouterMetrics_Exp return &LocalGossipSubRouterMetrics_Expecter{mock: &_m.Mock} } +// OnClusterTopicMetricsCleanup provides a mock function for the type LocalGossipSubRouterMetrics +func (_mock *LocalGossipSubRouterMetrics) OnClusterTopicMetricsCleanup(topic string) { + _mock.Called(topic) + return +} + +// LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnClusterTopicMetricsCleanup' +type LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call struct { + *mock.Call +} + +// OnClusterTopicMetricsCleanup is a helper method to define mock.On call +// - topic string +func (_e *LocalGossipSubRouterMetrics_Expecter) OnClusterTopicMetricsCleanup(topic interface{}) *LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call { + return &LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call{Call: _e.mock.On("OnClusterTopicMetricsCleanup", topic)} +} + +func (_c *LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call) Run(run func(topic string)) *LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 string + if args[0] != nil { + arg0 = args[0].(string) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call) Return() *LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call { + _c.Call.Return() + return _c +} + +func (_c *LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call) RunAndReturn(run func(topic string)) *LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call { + _c.Run(run) + return _c +} + // OnLocalMeshSizeUpdated provides a mock function for the type LocalGossipSubRouterMetrics func (_mock *LocalGossipSubRouterMetrics) OnLocalMeshSizeUpdated(topic string, size int) { _mock.Called(topic, size) diff --git a/module/mock/network_metrics.go b/module/mock/network_metrics.go index ee118e026c5..c6c70800dd8 100644 --- a/module/mock/network_metrics.go +++ b/module/mock/network_metrics.go @@ -1260,6 +1260,46 @@ func (_c *NetworkMetrics_OnBehaviourPenaltyUpdated_Call) RunAndReturn(run func(f return _c } +// OnClusterTopicMetricsCleanup provides a mock function for the type NetworkMetrics +func (_mock *NetworkMetrics) OnClusterTopicMetricsCleanup(topic string) { + _mock.Called(topic) + return +} + +// NetworkMetrics_OnClusterTopicMetricsCleanup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnClusterTopicMetricsCleanup' +type NetworkMetrics_OnClusterTopicMetricsCleanup_Call struct { + *mock.Call +} + +// OnClusterTopicMetricsCleanup is a helper method to define mock.On call +// - topic string +func (_e *NetworkMetrics_Expecter) OnClusterTopicMetricsCleanup(topic interface{}) *NetworkMetrics_OnClusterTopicMetricsCleanup_Call { + return &NetworkMetrics_OnClusterTopicMetricsCleanup_Call{Call: _e.mock.On("OnClusterTopicMetricsCleanup", topic)} +} + +func (_c *NetworkMetrics_OnClusterTopicMetricsCleanup_Call) Run(run func(topic string)) *NetworkMetrics_OnClusterTopicMetricsCleanup_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 string + if args[0] != nil { + arg0 = args[0].(string) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *NetworkMetrics_OnClusterTopicMetricsCleanup_Call) Return() *NetworkMetrics_OnClusterTopicMetricsCleanup_Call { + _c.Call.Return() + return _c +} + +func (_c *NetworkMetrics_OnClusterTopicMetricsCleanup_Call) RunAndReturn(run func(topic string)) *NetworkMetrics_OnClusterTopicMetricsCleanup_Call { + _c.Run(run) + return _c +} + // OnControlMessagesTruncated provides a mock function for the type NetworkMetrics func (_mock *NetworkMetrics) OnControlMessagesTruncated(messageType p2pmsg.ControlMessageType, diff int) { _mock.Called(messageType, diff) diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index a25f4e717d6..8b3bd2ac8e5 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -279,6 +279,18 @@ func (t *GossipSubMeshTracer) Leave(topic string) { Str("topic", topic). Msg("local peer left topic") } + + // Clean up topic mesh tracking and metrics for cluster topics to prevent unbounded + // metric cardinality growth during epoch transitions. + if channels.IsClusterChannel(channels.Channel(topic)) { + t.topicMeshMu.Lock() + delete(t.topicMeshMap, topic) + t.topicMeshMu.Unlock() + t.metrics.OnClusterTopicMetricsCleanup(topic) + t.logger.Debug(). + Str("topic", topic). + Msg("cleaned up cluster topic metrics") + } } // ValidateMessage is called by GossipSub as a callback when a message is received by the local node and entered the validation phase. diff --git a/network/p2p/tracer/gossipSubMeshTracer_test.go b/network/p2p/tracer/gossipSubMeshTracer_test.go index 40a5fe76d58..1df91637528 100644 --- a/network/p2p/tracer/gossipSubMeshTracer_test.go +++ b/network/p2p/tracer/gossipSubMeshTracer_test.go @@ -218,3 +218,85 @@ func (c *localMeshTracerMetricsCollector) OnLocalMeshSizeUpdated(topic string, s // calls the mock method to assert the metrics. c.l.OnLocalMeshSizeUpdated(topic, size) } + +func (c *localMeshTracerMetricsCollector) OnClusterTopicMetricsCleanup(topic string) { + // calls the mock method to assert the metrics cleanup. + c.l.OnClusterTopicMetricsCleanup(topic) +} + +// TestGossipSubMeshTracer_ClusterTopicCleanup tests that when a node leaves a cluster topic, +// the mesh tracer cleans up metrics for that topic to prevent unbounded metric cardinality growth. +func TestGossipSubMeshTracer_ClusterTopicCleanup(t *testing.T) { + defaultConfig, err := config.DefaultConfig() + require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + sporkId := unittest.IdentifierFixture() + idProvider := mockmodule.NewIdentityProvider(t) + defer cancel() + + // create a non-cluster channel and a cluster channel + nonClusterChannel := channels.PushBlocks + nonClusterTopic := channels.TopicFromChannel(nonClusterChannel, sporkId) + clusterID := flow.ChainID("test-cluster-id") + clusterChannel := channels.SyncCluster(clusterID) + clusterTopic := channels.Topic(clusterChannel) + + logger := zerolog.New(io.Discard).Level(zerolog.DebugLevel) + + collector := newLocalMeshTracerMetricsCollector(t) + // set the meshTracer to log at 1 second intervals for sake of testing. + defaultConfig.NetworkConfig.GossipSub.RpcTracer.LocalMeshLogInterval = 1 * time.Second + // disables peer scoring for sake of testing + defaultConfig.NetworkConfig.GossipSub.PeerScoringEnabled = false + + tracerNode, tracerId := p2ptest.NodeFixture( + t, + sporkId, + t.Name(), + idProvider, + p2ptest.WithLogger(logger), + p2ptest.OverrideFlowConfig(defaultConfig), + p2ptest.WithMetricsCollector(collector), + p2ptest.WithRole(flow.RoleCollection)) + + idProvider.On("ByPeerID", tracerNode.ID()).Return(&tracerId, true).Maybe() + + nodes := []p2p.LibP2PNode{tracerNode} + ids := flow.IdentityList{&tracerId} + + p2ptest.RegisterPeerProviders(t, nodes) + p2ptest.StartNodes(t, signalerCtx, nodes) + defer p2ptest.StopNodes(t, nodes, cancel) + + p2ptest.LetNodesDiscoverEachOther(t, ctx, nodes, ids) + + // allow any OnLocalMeshSizeUpdated calls since we're not testing that here + collector.l.On("OnLocalMeshSizeUpdated", nonClusterTopic.String(), 0).Maybe() + collector.l.On("OnLocalMeshSizeUpdated", clusterTopic.String(), 0).Maybe() + + // subscribe to both topics + _, err = tracerNode.Subscribe( + nonClusterTopic, + validator.TopicValidator( + unittest.Logger(), + unittest.AllowAllPeerFilter())) + require.NoError(t, err) + + _, err = tracerNode.Subscribe( + clusterTopic, + validator.TopicValidator( + unittest.Logger(), + unittest.AllowAllPeerFilter())) + require.NoError(t, err) + + // set up expectation: OnClusterTopicMetricsCleanup should only be called for the cluster topic + collector.l.On("OnClusterTopicMetricsCleanup", clusterTopic.String()).Once() + + // unsubscribe from both topics + require.NoError(t, tracerNode.Unsubscribe(nonClusterTopic)) + require.NoError(t, tracerNode.Unsubscribe(clusterTopic)) + + // give time for the Leave callback to be processed + time.Sleep(100 * time.Millisecond) +} From 44d513b736fdeadf5511ecf09c4f432f7899b7e7 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Tue, 19 May 2026 08:57:02 -0700 Subject: [PATCH 2/6] clean up stale cluster topic metrics on epoch transitions --- module/metrics.go | 5 + .../gossipsub_rpc_validation_inspector.go | 7 + module/metrics/network.go | 18 ++ module/metrics/network_test.go | 183 ++++++++++++++++++ ...ip_sub_rpc_validation_inspector_metrics.go | 40 ++++ .../p2p/tracer/gossipSubMeshTracer_test.go | 11 +- 6 files changed, 262 insertions(+), 2 deletions(-) create mode 100644 module/metrics/network_test.go diff --git a/module/metrics.go b/module/metrics.go index ab877bdcd8c..0f4a6d63f75 100644 --- a/module/metrics.go +++ b/module/metrics.go @@ -389,6 +389,11 @@ type GossipSubRpcValidationInspectorMetrics interface { // - invalidSubscriptionsCount: the number of times that an invalid subscription was detected during the async inspection of publish messages. // - invalidSendersCount: the number of times that an invalid sender was detected during the async inspection of publish messages. OnPublishMessageInspected(totalErrCount int, invalidTopicIdsCount int, invalidSubscriptionsCount int, invalidSendersCount int) + + // OnClusterTopicMetricsCleanup is called when the local node leaves a cluster topic. It cleans up all + // metrics associated with the topic to prevent unbounded metric cardinality growth during epoch transitions. + // This should only be called for cluster topics (sync-cluster-*, consensus-cluster-*). + OnClusterTopicMetricsCleanup(topic string) } // NetworkInboundQueueMetrics encapsulates the metrics collectors for the inbound queue of the networking layer. diff --git a/module/metrics/gossipsub_rpc_validation_inspector.go b/module/metrics/gossipsub_rpc_validation_inspector.go index d8c20fccc81..512e7152ccb 100644 --- a/module/metrics/gossipsub_rpc_validation_inspector.go +++ b/module/metrics/gossipsub_rpc_validation_inspector.go @@ -584,3 +584,10 @@ func (c *GossipSubRpcValidationInspectorMetrics) OnPublishMessageInspected(total func (c *GossipSubRpcValidationInspectorMetrics) OnPublishMessagesInspectionErrorExceedsThreshold() { c.publishMessageInspectionErrExceedThresholdCount.Inc() } + +// OnClusterTopicMetricsCleanup removes all metric label values associated with the given cluster topic. +// This prevents unbounded metric cardinality growth during epoch transitions when collection nodes +// join new clusters and leave old ones. Only call this for cluster topics (sync-cluster-*, consensus-cluster-*). +func (c *GossipSubRpcValidationInspectorMetrics) OnClusterTopicMetricsCleanup(topic string) { + c.receivedIHaveMsgIDsHistogram.DeleteLabelValues(topic) +} diff --git a/module/metrics/network.go b/module/metrics/network.go index a6eead52e48..98ec52e04e5 100644 --- a/module/metrics/network.go +++ b/module/metrics/network.go @@ -274,6 +274,24 @@ func (nc *NetworkCollector) DuplicateInboundMessagesDropped(topic, protocol, mes nc.duplicateMessagesDropped.WithLabelValues(topic, protocol, messageType).Add(1) } +// OnClusterTopicMetricsCleanup removes all metric label values associated with the given cluster topic. +// This prevents unbounded metric cardinality growth during epoch transitions when collection nodes +// join new clusters and leave old ones. Only call this for cluster topics (sync-cluster-*, consensus-cluster-*). +// This method overrides the embedded LocalGossipSubRouterMetrics.OnClusterTopicMetricsCleanup to also +// clean up inbound/outbound message size metrics and iHave message ID metrics. +func (nc *NetworkCollector) OnClusterTopicMetricsCleanup(topic string) { + // Clean up LocalGossipSubRouterMetrics (localMeshSize, peerGraftTopicCount, peerPruneTopicCount) + nc.LocalGossipSubRouterMetrics.OnClusterTopicMetricsCleanup(topic) + + // Clean up GossipSubRpcValidationInspectorMetrics (receivedIHaveMsgIDsHistogram) + nc.GossipSubRpcValidationInspectorMetrics.OnClusterTopicMetricsCleanup(topic) + + // Clean up inbound/outbound message size metrics using partial match on topic + nc.inboundMessageSize.DeletePartialMatch(prometheus.Labels{LabelChannel: topic}) + nc.outboundMessageSize.DeletePartialMatch(prometheus.Labels{LabelChannel: topic}) + nc.duplicateMessagesDropped.DeletePartialMatch(prometheus.Labels{LabelChannel: topic}) +} + func (nc *NetworkCollector) MessageAdded(priority int) { nc.queueSize.WithLabelValues(strconv.Itoa(priority)).Inc() } diff --git a/module/metrics/network_test.go b/module/metrics/network_test.go new file mode 100644 index 00000000000..e1ea5e3c9b3 --- /dev/null +++ b/module/metrics/network_test.go @@ -0,0 +1,183 @@ +package metrics + +import ( + "fmt" + "testing" + + "github.com/prometheus/client_golang/prometheus" + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestNetworkCollector_OnClusterTopicMetricsCleanup verifies that OnClusterTopicMetricsCleanup +// properly cleans up all metrics associated with a cluster topic, including the top-offender +// metrics identified in the issue: inboundMessageSize, outboundMessageSize, and receivedIHaveMsgIDsHistogram. +func TestNetworkCollector_OnClusterTopicMetricsCleanup(t *testing.T) { + // Use a unique prefix to avoid metric registration conflicts between tests + prefix := fmt.Sprintf("test_%s_", t.Name()) + logger := zerolog.Nop() + nc := NewNetworkCollector(logger, WithNetworkPrefix(prefix)) + + clusterTopic := "sync-cluster-test-cluster-id" + otherTopic := "blocks" + protocol := "pubsub" + msgType := "BlockProposal" + + // Record metrics for both cluster and non-cluster topics + // This simulates normal operation where metrics accumulate for various topics + nc.InboundMessageReceived(100, clusterTopic, protocol, msgType) + nc.InboundMessageReceived(200, clusterTopic, protocol, "Transaction") + nc.InboundMessageReceived(300, otherTopic, protocol, msgType) + + nc.OutboundMessageSent(150, clusterTopic, protocol, msgType) + nc.OutboundMessageSent(250, otherTopic, protocol, msgType) + + nc.DuplicateInboundMessagesDropped(clusterTopic, protocol, msgType) + nc.DuplicateInboundMessagesDropped(otherTopic, protocol, msgType) + + // Record LocalGossipSubRouterMetrics + nc.OnLocalMeshSizeUpdated(clusterTopic, 5) + nc.OnLocalMeshSizeUpdated(otherTopic, 3) + nc.OnPeerGraftTopic(clusterTopic) + nc.OnPeerGraftTopic(otherTopic) + nc.OnPeerPruneTopic(clusterTopic) + nc.OnPeerPruneTopic(otherTopic) + + // Record GossipSubRpcValidationInspectorMetrics + nc.OnIHaveMessageIDsReceived(clusterTopic, 10) + nc.OnIHaveMessageIDsReceived(otherTopic, 5) + + // Verify metrics exist before cleanup + assertHistogramVecHasLabel(t, nc.inboundMessageSize, LabelChannel, clusterTopic, "inboundMessageSize should have cluster topic before cleanup") + assertHistogramVecHasLabel(t, nc.inboundMessageSize, LabelChannel, otherTopic, "inboundMessageSize should have other topic before cleanup") + assertHistogramVecHasLabel(t, nc.outboundMessageSize, LabelChannel, clusterTopic, "outboundMessageSize should have cluster topic before cleanup") + assertHistogramVecHasLabel(t, nc.outboundMessageSize, LabelChannel, otherTopic, "outboundMessageSize should have other topic before cleanup") + assertCounterVecHasLabel(t, nc.duplicateMessagesDropped, LabelChannel, clusterTopic, "duplicateMessagesDropped should have cluster topic before cleanup") + assertCounterVecHasLabel(t, nc.duplicateMessagesDropped, LabelChannel, otherTopic, "duplicateMessagesDropped should have other topic before cleanup") + + // Perform cleanup for the cluster topic + nc.OnClusterTopicMetricsCleanup(clusterTopic) + + // Verify cluster topic metrics are cleaned up + assertHistogramVecNotHasLabel(t, nc.inboundMessageSize, LabelChannel, clusterTopic, "inboundMessageSize should NOT have cluster topic after cleanup") + assertHistogramVecNotHasLabel(t, nc.outboundMessageSize, LabelChannel, clusterTopic, "outboundMessageSize should NOT have cluster topic after cleanup") + assertCounterVecNotHasLabel(t, nc.duplicateMessagesDropped, LabelChannel, clusterTopic, "duplicateMessagesDropped should NOT have cluster topic after cleanup") + + // Verify other topic metrics are NOT cleaned up + assertHistogramVecHasLabel(t, nc.inboundMessageSize, LabelChannel, otherTopic, "inboundMessageSize should still have other topic after cleanup") + assertHistogramVecHasLabel(t, nc.outboundMessageSize, LabelChannel, otherTopic, "outboundMessageSize should still have other topic after cleanup") + assertCounterVecHasLabel(t, nc.duplicateMessagesDropped, LabelChannel, otherTopic, "duplicateMessagesDropped should still have other topic after cleanup") +} + +// TestNetworkCollector_OnClusterTopicMetricsCleanup_Idempotent verifies that calling +// OnClusterTopicMetricsCleanup multiple times for the same topic doesn't cause issues. +func TestNetworkCollector_OnClusterTopicMetricsCleanup_Idempotent(t *testing.T) { + prefix := fmt.Sprintf("test_%s_", t.Name()) + logger := zerolog.Nop() + nc := NewNetworkCollector(logger, WithNetworkPrefix(prefix)) + + clusterTopic := "sync-cluster-test-cluster-id" + protocol := "pubsub" + msgType := "BlockProposal" + + // Record some metrics + nc.InboundMessageReceived(100, clusterTopic, protocol, msgType) + nc.OutboundMessageSent(150, clusterTopic, protocol, msgType) + nc.OnIHaveMessageIDsReceived(clusterTopic, 10) + + // Call cleanup multiple times - should not panic or cause issues + nc.OnClusterTopicMetricsCleanup(clusterTopic) + nc.OnClusterTopicMetricsCleanup(clusterTopic) + nc.OnClusterTopicMetricsCleanup(clusterTopic) + + // Verify metrics are cleaned up + assertHistogramVecNotHasLabel(t, nc.inboundMessageSize, LabelChannel, clusterTopic, "inboundMessageSize should be cleaned up") + assertHistogramVecNotHasLabel(t, nc.outboundMessageSize, LabelChannel, clusterTopic, "outboundMessageSize should be cleaned up") +} + +// TestNetworkCollector_OnClusterTopicMetricsCleanup_NonexistentTopic verifies that calling +// OnClusterTopicMetricsCleanup for a topic that was never recorded doesn't cause issues. +func TestNetworkCollector_OnClusterTopicMetricsCleanup_NonexistentTopic(t *testing.T) { + prefix := fmt.Sprintf("test_%s_", t.Name()) + logger := zerolog.Nop() + nc := NewNetworkCollector(logger, WithNetworkPrefix(prefix)) + + // Call cleanup for a topic that was never used - should not panic + nc.OnClusterTopicMetricsCleanup("nonexistent-cluster-topic") +} + +// assertHistogramVecHasLabel checks that the histogram vec has at least one metric with the given label value. +func assertHistogramVecHasLabel(t *testing.T, hv *prometheus.HistogramVec, labelName, labelValue, msg string) { + t.Helper() + found := histogramVecHasLabelValue(t, hv, labelName, labelValue) + assert.True(t, found, msg) +} + +// assertHistogramVecNotHasLabel checks that the histogram vec has no metrics with the given label value. +func assertHistogramVecNotHasLabel(t *testing.T, hv *prometheus.HistogramVec, labelName, labelValue, msg string) { + t.Helper() + found := histogramVecHasLabelValue(t, hv, labelName, labelValue) + assert.False(t, found, msg) +} + +// assertCounterVecHasLabel checks that the counter vec has at least one metric with the given label value. +func assertCounterVecHasLabel(t *testing.T, cv *prometheus.CounterVec, labelName, labelValue, msg string) { + t.Helper() + found := counterVecHasLabelValue(t, cv, labelName, labelValue) + assert.True(t, found, msg) +} + +// assertCounterVecNotHasLabel checks that the counter vec has no metrics with the given label value. +func assertCounterVecNotHasLabel(t *testing.T, cv *prometheus.CounterVec, labelName, labelValue, msg string) { + t.Helper() + found := counterVecHasLabelValue(t, cv, labelName, labelValue) + assert.False(t, found, msg) +} + +// histogramVecHasLabelValue collects metrics from the histogram vec and checks if any have the given label value. +func histogramVecHasLabelValue(t *testing.T, hv *prometheus.HistogramVec, labelName, labelValue string) bool { + t.Helper() + ch := make(chan prometheus.Metric, 100) + go func() { + hv.Collect(ch) + close(ch) + }() + + for metric := range ch { + var m io_prometheus_client.Metric + err := metric.Write(&m) + require.NoError(t, err) + + for _, label := range m.GetLabel() { + if label.GetName() == labelName && label.GetValue() == labelValue { + return true + } + } + } + return false +} + +// counterVecHasLabelValue collects metrics from the counter vec and checks if any have the given label value. +func counterVecHasLabelValue(t *testing.T, cv *prometheus.CounterVec, labelName, labelValue string) bool { + t.Helper() + ch := make(chan prometheus.Metric, 100) + go func() { + cv.Collect(ch) + close(ch) + }() + + for metric := range ch { + var m io_prometheus_client.Metric + err := metric.Write(&m) + require.NoError(t, err) + + for _, label := range m.GetLabel() { + if label.GetName() == labelName && label.GetValue() == labelValue { + return true + } + } + } + return false +} diff --git a/module/mock/gossip_sub_rpc_validation_inspector_metrics.go b/module/mock/gossip_sub_rpc_validation_inspector_metrics.go index 2e06c3e33b2..67cf58ff355 100644 --- a/module/mock/gossip_sub_rpc_validation_inspector_metrics.go +++ b/module/mock/gossip_sub_rpc_validation_inspector_metrics.go @@ -144,6 +144,46 @@ func (_c *GossipSubRpcValidationInspectorMetrics_OnActiveClusterIDsNotSetErr_Cal return _c } +// OnClusterTopicMetricsCleanup provides a mock function for the type GossipSubRpcValidationInspectorMetrics +func (_mock *GossipSubRpcValidationInspectorMetrics) OnClusterTopicMetricsCleanup(topic string) { + _mock.Called(topic) + return +} + +// GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnClusterTopicMetricsCleanup' +type GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call struct { + *mock.Call +} + +// OnClusterTopicMetricsCleanup is a helper method to define mock.On call +// - topic string +func (_e *GossipSubRpcValidationInspectorMetrics_Expecter) OnClusterTopicMetricsCleanup(topic interface{}) *GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call { + return &GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call{Call: _e.mock.On("OnClusterTopicMetricsCleanup", topic)} +} + +func (_c *GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call) Run(run func(topic string)) *GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 string + if args[0] != nil { + arg0 = args[0].(string) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call) Return() *GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call { + _c.Call.Return() + return _c +} + +func (_c *GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call) RunAndReturn(run func(topic string)) *GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call { + _c.Run(run) + return _c +} + // OnControlMessagesTruncated provides a mock function for the type GossipSubRpcValidationInspectorMetrics func (_mock *GossipSubRpcValidationInspectorMetrics) OnControlMessagesTruncated(messageType p2pmsg.ControlMessageType, diff int) { _mock.Called(messageType, diff) diff --git a/network/p2p/tracer/gossipSubMeshTracer_test.go b/network/p2p/tracer/gossipSubMeshTracer_test.go index 1df91637528..3ccbb4e7528 100644 --- a/network/p2p/tracer/gossipSubMeshTracer_test.go +++ b/network/p2p/tracer/gossipSubMeshTracer_test.go @@ -297,6 +297,13 @@ func TestGossipSubMeshTracer_ClusterTopicCleanup(t *testing.T) { require.NoError(t, tracerNode.Unsubscribe(nonClusterTopic)) require.NoError(t, tracerNode.Unsubscribe(clusterTopic)) - // give time for the Leave callback to be processed - time.Sleep(100 * time.Millisecond) + // wait until cleanup callback is observed (avoid flaky timing with time.Sleep) + require.Eventually(t, func() bool { + for _, c := range collector.l.Calls { + if c.Method == "OnClusterTopicMetricsCleanup" && len(c.Arguments) == 1 && c.Arguments[0] == clusterTopic.String() { + return true + } + } + return false + }, time.Second, 10*time.Millisecond) } From 323d584e11c9d183b4e884fa12c954653a8a7174 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Tue, 19 May 2026 10:51:09 -0700 Subject: [PATCH 3/6] tidy --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index ad5cb934e4a..4fd40f960b2 100644 --- a/go.mod +++ b/go.mod @@ -307,7 +307,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/polydawn/refmt v0.89.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect - github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/client_model v0.6.2 github.com/prometheus/common v0.61.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/psiemens/sconfig v0.1.0 // indirect From cfed2a1cc45bc384e15d1b84b627a237de4a81b4 Mon Sep 17 00:00:00 2001 From: Vishal <1117327+vishalchangrani@users.noreply.github.com> Date: Wed, 20 May 2026 15:21:44 -0400 Subject: [PATCH 4/6] extend cluster topic metrics cleanup to cover all per-topic metric families (#8564) * extend cluster topic metrics cleanup to cover all per-topic metric families * add public OnClusterTopicMetricsCleanup to AlspMetrics instead of accessing private field --- module/metrics/alsp.go | 6 ++++++ module/metrics/gossipsub_score.go | 10 ++++++++++ module/metrics/network.go | 23 ++++++++++++++++++----- 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/module/metrics/alsp.go b/module/metrics/alsp.go index 3d5dc2bc510..888ca4dd18d 100644 --- a/module/metrics/alsp.go +++ b/module/metrics/alsp.go @@ -35,6 +35,12 @@ func NewAlspMetrics() *AlspMetrics { return alsp } +// OnClusterTopicMetricsCleanup removes all misbehavior counter label values associated with the given +// cluster topic to prevent unbounded metric cardinality growth during epoch transitions. +func (a *AlspMetrics) OnClusterTopicMetricsCleanup(topic string) { + a.reportedMisbehaviorCount.DeletePartialMatch(prometheus.Labels{LabelChannel: topic}) +} + // OnMisbehaviorReported is called when a misbehavior is reported by the application layer to ALSP. // An engine detecting a spamming-related misbehavior reports it to the ALSP module. It increases // the counter vector of reported misbehavior. diff --git a/module/metrics/gossipsub_score.go b/module/metrics/gossipsub_score.go index 15e3628469c..4b3242108f0 100644 --- a/module/metrics/gossipsub_score.go +++ b/module/metrics/gossipsub_score.go @@ -161,3 +161,13 @@ func (g *GossipSubScoreMetrics) OnInvalidMessageDeliveredUpdated(topic channels. func (g *GossipSubScoreMetrics) SetWarningStateCount(u uint) { g.warningStateGauge.Set(float64(u)) } + +// OnClusterTopicMetricsCleanup removes all per-topic scoring metric label values associated with +// the given cluster topic. Call this when the local node leaves a cluster topic to prevent +// unbounded metric cardinality growth across epoch transitions. +func (g *GossipSubScoreMetrics) OnClusterTopicMetricsCleanup(topic string) { + g.timeInMesh.DeleteLabelValues(topic) + g.meshMessageDelivery.DeleteLabelValues(topic) + g.firstMessageDelivery.DeleteLabelValues(topic) + g.invalidMessageDelivery.DeleteLabelValues(topic) +} diff --git a/module/metrics/network.go b/module/metrics/network.go index 98ec52e04e5..195a03219e4 100644 --- a/module/metrics/network.go +++ b/module/metrics/network.go @@ -277,19 +277,32 @@ func (nc *NetworkCollector) DuplicateInboundMessagesDropped(topic, protocol, mes // OnClusterTopicMetricsCleanup removes all metric label values associated with the given cluster topic. // This prevents unbounded metric cardinality growth during epoch transitions when collection nodes // join new clusters and leave old ones. Only call this for cluster topics (sync-cluster-*, consensus-cluster-*). -// This method overrides the embedded LocalGossipSubRouterMetrics.OnClusterTopicMetricsCleanup to also -// clean up inbound/outbound message size metrics and iHave message ID metrics. func (nc *NetworkCollector) OnClusterTopicMetricsCleanup(topic string) { - // Clean up LocalGossipSubRouterMetrics (localMeshSize, peerGraftTopicCount, peerPruneTopicCount) + // LocalGossipSubRouterMetrics: localMeshSize, peerGraftTopicCount, peerPruneTopicCount nc.LocalGossipSubRouterMetrics.OnClusterTopicMetricsCleanup(topic) - // Clean up GossipSubRpcValidationInspectorMetrics (receivedIHaveMsgIDsHistogram) + // GossipSubRpcValidationInspectorMetrics: receivedIHaveMsgIDsHistogram nc.GossipSubRpcValidationInspectorMetrics.OnClusterTopicMetricsCleanup(topic) - // Clean up inbound/outbound message size metrics using partial match on topic + // GossipSubScoreMetrics: timeInMesh, meshMessageDelivery, firstMessageDelivery, invalidMessageDelivery + nc.GossipSubScoreMetrics.OnClusterTopicMetricsCleanup(topic) + + // inbound/outbound message size and duplicate drop counters (multi-label: channel + protocol + message) nc.inboundMessageSize.DeletePartialMatch(prometheus.Labels{LabelChannel: topic}) nc.outboundMessageSize.DeletePartialMatch(prometheus.Labels{LabelChannel: topic}) nc.duplicateMessagesDropped.DeletePartialMatch(prometheus.Labels{LabelChannel: topic}) + + // message processing gauges and inbound process time counter (single label: channel) + nc.numMessagesProcessing.DeleteLabelValues(topic) + nc.numDirectMessagesSending.DeleteLabelValues(topic) + nc.inboundProcessTime.DeleteLabelValues(topic) + + // security metrics (multi-label: role + message + channel + reason) + nc.unAuthorizedMessagesCount.DeletePartialMatch(prometheus.Labels{LabelChannel: topic}) + nc.rateLimitedUnicastMessagesCount.DeletePartialMatch(prometheus.Labels{LabelChannel: topic}) + + // ALSP misbehavior counter (multi-label: channel + misbehavior) + nc.AlspMetrics.OnClusterTopicMetricsCleanup(topic) } func (nc *NetworkCollector) MessageAdded(priority int) { From 3a538e502f7e1b29e2835646d60f8eafe9c5e647 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 27 May 2026 16:28:20 -0700 Subject: [PATCH 5/6] normalize cluster topic labels in metrics to prevent unbounded cardinality growth --- module/metrics.go | 10 - module/metrics/gossipsub.go | 20 +- .../gossipsub_rpc_validation_inspector.go | 11 +- module/metrics/network.go | 41 +--- module/metrics/network_test.go | 177 ++++++++---------- module/metrics/noop.go | 1 - module/mock/gossip_sub_metrics.go | 40 ---- ...ip_sub_rpc_validation_inspector_metrics.go | 40 ---- module/mock/lib_p2_p_metrics.go | 40 ---- .../mock/local_gossip_sub_router_metrics.go | 40 ---- module/mock/network_metrics.go | 40 ---- network/channels/channels.go | 14 ++ network/channels/channels_test.go | 54 ++++++ network/p2p/tracer/gossipSubMeshTracer.go | 9 +- .../p2p/tracer/gossipSubMeshTracer_test.go | 89 --------- 15 files changed, 172 insertions(+), 454 deletions(-) diff --git a/module/metrics.go b/module/metrics.go index 0f4a6d63f75..c67059f7966 100644 --- a/module/metrics.go +++ b/module/metrics.go @@ -182,11 +182,6 @@ type LocalGossipSubRouterMetrics interface { // OnMessageDeliveredToAllSubscribers is called when a message is delivered to all subscribers of the topic. OnMessageDeliveredToAllSubscribers(size int) - - // OnClusterTopicMetricsCleanup is called when the local node leaves a cluster topic. It cleans up all - // metrics associated with the topic to prevent unbounded metric cardinality growth during epoch transitions. - // This should only be called for cluster topics (sync-cluster-*, consensus-cluster-*). - OnClusterTopicMetricsCleanup(topic string) } // UnicastManagerMetrics unicast manager metrics. @@ -389,11 +384,6 @@ type GossipSubRpcValidationInspectorMetrics interface { // - invalidSubscriptionsCount: the number of times that an invalid subscription was detected during the async inspection of publish messages. // - invalidSendersCount: the number of times that an invalid sender was detected during the async inspection of publish messages. OnPublishMessageInspected(totalErrCount int, invalidTopicIdsCount int, invalidSubscriptionsCount int, invalidSendersCount int) - - // OnClusterTopicMetricsCleanup is called when the local node leaves a cluster topic. It cleans up all - // metrics associated with the topic to prevent unbounded metric cardinality growth during epoch transitions. - // This should only be called for cluster topics (sync-cluster-*, consensus-cluster-*). - OnClusterTopicMetricsCleanup(topic string) } // NetworkInboundQueueMetrics encapsulates the metrics collectors for the inbound queue of the networking layer. diff --git a/module/metrics/gossipsub.go b/module/metrics/gossipsub.go index 3c58b7c1266..2379582ef89 100644 --- a/module/metrics/gossipsub.go +++ b/module/metrics/gossipsub.go @@ -5,6 +5,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/network/channels" ) // LocalGossipSubRouterMetrics encapsulates the metrics collectors for GossipSub router of the local node. @@ -261,8 +262,10 @@ func NewGossipSubLocalMeshMetrics(prefix string) *LocalGossipSubRouterMetrics { var _ module.LocalGossipSubRouterMetrics = (*LocalGossipSubRouterMetrics)(nil) // OnLocalMeshSizeUpdated updates the local mesh size metric. +// Cluster topics are normalized to their prefix (e.g., "sync-cluster", "consensus-cluster") +// to prevent unbounded cardinality growth during epoch transitions. func (g *LocalGossipSubRouterMetrics) OnLocalMeshSizeUpdated(topic string, size int) { - g.localMeshSize.WithLabelValues(topic).Set(float64(size)) + g.localMeshSize.WithLabelValues(channels.NormalizeTopicForMetrics(topic)).Set(float64(size)) } // OnPeerAddedToProtocol is called when the local node receives a stream from a peer on a gossipsub-related protocol. @@ -296,14 +299,16 @@ func (g *LocalGossipSubRouterMetrics) OnLocalPeerLeftTopic() { // OnPeerGraftTopic is called when the local node receives a GRAFT message from a remote peer on a topic. // Note: the received GRAFT at this point is considered passed the RPC inspection, and is accepted by the local node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (g *LocalGossipSubRouterMetrics) OnPeerGraftTopic(topic string) { - g.peerGraftTopicCount.WithLabelValues(topic).Inc() + g.peerGraftTopicCount.WithLabelValues(channels.NormalizeTopicForMetrics(topic)).Inc() } // OnPeerPruneTopic is called when the local node receives a PRUNE message from a remote peer on a topic. // Note: the received PRUNE at this point is considered passed the RPC inspection, and is accepted by the local node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (g *LocalGossipSubRouterMetrics) OnPeerPruneTopic(topic string) { - g.peerPruneTopicCount.WithLabelValues(topic).Inc() + g.peerPruneTopicCount.WithLabelValues(channels.NormalizeTopicForMetrics(topic)).Inc() } // OnMessageEnteredValidation is called when a received pubsub message enters the validation pipeline. It is the @@ -378,12 +383,3 @@ func (g *LocalGossipSubRouterMetrics) OnUndeliveredMessage() { func (g *LocalGossipSubRouterMetrics) OnMessageDeliveredToAllSubscribers(size int) { g.messageDeliveredSize.Observe(float64(size)) } - -// OnClusterTopicMetricsCleanup removes all metric label values associated with the given cluster topic. -// This prevents unbounded metric cardinality growth during epoch transitions when collection nodes -// join new clusters and leave old ones. Only call this for cluster topics (sync-cluster-*, consensus-cluster-*). -func (g *LocalGossipSubRouterMetrics) OnClusterTopicMetricsCleanup(topic string) { - g.localMeshSize.DeleteLabelValues(topic) - g.peerGraftTopicCount.DeleteLabelValues(topic) - g.peerPruneTopicCount.DeleteLabelValues(topic) -} diff --git a/module/metrics/gossipsub_rpc_validation_inspector.go b/module/metrics/gossipsub_rpc_validation_inspector.go index 512e7152ccb..4611b3241d3 100644 --- a/module/metrics/gossipsub_rpc_validation_inspector.go +++ b/module/metrics/gossipsub_rpc_validation_inspector.go @@ -7,6 +7,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/network/channels" p2pmsg "github.com/onflow/flow-go/network/p2p/message" ) @@ -421,11 +422,12 @@ func (c *GossipSubRpcValidationInspectorMetrics) OnIWantMessageIDsReceived(msgId // OnIHaveMessageIDsReceived tracks the number of message ids received by the node from other nodes on an iHave message. // This function is called on each iHave message received by the node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. // Args: // - channel: the channel on which the iHave message was received. // - msgIdCount: the number of message ids received on the iHave message. func (c *GossipSubRpcValidationInspectorMetrics) OnIHaveMessageIDsReceived(channel string, msgIdCount int) { - c.receivedIHaveMsgIDsHistogram.WithLabelValues(channel).Observe(float64(msgIdCount)) + c.receivedIHaveMsgIDsHistogram.WithLabelValues(channels.NormalizeTopicForMetrics(channel)).Observe(float64(msgIdCount)) } // OnIncomingRpcReceived tracks the number of incoming RPC messages received by the node. @@ -584,10 +586,3 @@ func (c *GossipSubRpcValidationInspectorMetrics) OnPublishMessageInspected(total func (c *GossipSubRpcValidationInspectorMetrics) OnPublishMessagesInspectionErrorExceedsThreshold() { c.publishMessageInspectionErrExceedThresholdCount.Inc() } - -// OnClusterTopicMetricsCleanup removes all metric label values associated with the given cluster topic. -// This prevents unbounded metric cardinality growth during epoch transitions when collection nodes -// join new clusters and leave old ones. Only call this for cluster topics (sync-cluster-*, consensus-cluster-*). -func (c *GossipSubRpcValidationInspectorMetrics) OnClusterTopicMetricsCleanup(topic string) { - c.receivedIHaveMsgIDsHistogram.DeleteLabelValues(topic) -} diff --git a/module/metrics/network.go b/module/metrics/network.go index 195a03219e4..c4c231315b4 100644 --- a/module/metrics/network.go +++ b/module/metrics/network.go @@ -10,6 +10,7 @@ import ( "github.com/rs/zerolog" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/network/channels" logging2 "github.com/onflow/flow-go/network/p2p/logging" "github.com/onflow/flow-go/utils/logging" ) @@ -260,49 +261,21 @@ func NewNetworkCollector(logger zerolog.Logger, opts ...NetworkCollectorOpt) *Ne } // OutboundMessageSent collects metrics related to a message sent by the node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (nc *NetworkCollector) OutboundMessageSent(sizeBytes int, topic, protocol, messageType string) { - nc.outboundMessageSize.WithLabelValues(topic, protocol, messageType).Observe(float64(sizeBytes)) + nc.outboundMessageSize.WithLabelValues(channels.NormalizeTopicForMetrics(topic), protocol, messageType).Observe(float64(sizeBytes)) } // InboundMessageReceived collects metrics related to a message received by the node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (nc *NetworkCollector) InboundMessageReceived(sizeBytes int, topic, protocol, messageType string) { - nc.inboundMessageSize.WithLabelValues(topic, protocol, messageType).Observe(float64(sizeBytes)) + nc.inboundMessageSize.WithLabelValues(channels.NormalizeTopicForMetrics(topic), protocol, messageType).Observe(float64(sizeBytes)) } // DuplicateInboundMessagesDropped increments the metric tracking the number of duplicate messages dropped by the node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (nc *NetworkCollector) DuplicateInboundMessagesDropped(topic, protocol, messageType string) { - nc.duplicateMessagesDropped.WithLabelValues(topic, protocol, messageType).Add(1) -} - -// OnClusterTopicMetricsCleanup removes all metric label values associated with the given cluster topic. -// This prevents unbounded metric cardinality growth during epoch transitions when collection nodes -// join new clusters and leave old ones. Only call this for cluster topics (sync-cluster-*, consensus-cluster-*). -func (nc *NetworkCollector) OnClusterTopicMetricsCleanup(topic string) { - // LocalGossipSubRouterMetrics: localMeshSize, peerGraftTopicCount, peerPruneTopicCount - nc.LocalGossipSubRouterMetrics.OnClusterTopicMetricsCleanup(topic) - - // GossipSubRpcValidationInspectorMetrics: receivedIHaveMsgIDsHistogram - nc.GossipSubRpcValidationInspectorMetrics.OnClusterTopicMetricsCleanup(topic) - - // GossipSubScoreMetrics: timeInMesh, meshMessageDelivery, firstMessageDelivery, invalidMessageDelivery - nc.GossipSubScoreMetrics.OnClusterTopicMetricsCleanup(topic) - - // inbound/outbound message size and duplicate drop counters (multi-label: channel + protocol + message) - nc.inboundMessageSize.DeletePartialMatch(prometheus.Labels{LabelChannel: topic}) - nc.outboundMessageSize.DeletePartialMatch(prometheus.Labels{LabelChannel: topic}) - nc.duplicateMessagesDropped.DeletePartialMatch(prometheus.Labels{LabelChannel: topic}) - - // message processing gauges and inbound process time counter (single label: channel) - nc.numMessagesProcessing.DeleteLabelValues(topic) - nc.numDirectMessagesSending.DeleteLabelValues(topic) - nc.inboundProcessTime.DeleteLabelValues(topic) - - // security metrics (multi-label: role + message + channel + reason) - nc.unAuthorizedMessagesCount.DeletePartialMatch(prometheus.Labels{LabelChannel: topic}) - nc.rateLimitedUnicastMessagesCount.DeletePartialMatch(prometheus.Labels{LabelChannel: topic}) - - // ALSP misbehavior counter (multi-label: channel + misbehavior) - nc.AlspMetrics.OnClusterTopicMetricsCleanup(topic) + nc.duplicateMessagesDropped.WithLabelValues(channels.NormalizeTopicForMetrics(topic), protocol, messageType).Add(1) } func (nc *NetworkCollector) MessageAdded(priority int) { diff --git a/module/metrics/network_test.go b/module/metrics/network_test.go index e1ea5e3c9b3..ea4c24946c7 100644 --- a/module/metrics/network_test.go +++ b/module/metrics/network_test.go @@ -11,101 +11,90 @@ import ( "github.com/stretchr/testify/require" ) -// TestNetworkCollector_OnClusterTopicMetricsCleanup verifies that OnClusterTopicMetricsCleanup -// properly cleans up all metrics associated with a cluster topic, including the top-offender -// metrics identified in the issue: inboundMessageSize, outboundMessageSize, and receivedIHaveMsgIDsHistogram. -func TestNetworkCollector_OnClusterTopicMetricsCleanup(t *testing.T) { - // Use a unique prefix to avoid metric registration conflicts between tests +// TestNetworkCollector_ClusterTopicNormalization verifies that cluster topics are normalized +// to their prefix (e.g., "sync-cluster-*" -> "sync-cluster") to prevent unbounded metric +// cardinality growth during epoch transitions. +func TestNetworkCollector_ClusterTopicNormalization(t *testing.T) { prefix := fmt.Sprintf("test_%s_", t.Name()) logger := zerolog.Nop() nc := NewNetworkCollector(logger, WithNetworkPrefix(prefix)) - clusterTopic := "sync-cluster-test-cluster-id" - otherTopic := "blocks" + // These cluster topics should be normalized to their prefix + clusterTopic1 := "sync-cluster-test-cluster-id-1" + clusterTopic2 := "sync-cluster-test-cluster-id-2" + consensusClusterTopic := "consensus-cluster-test-cluster-id" + nonClusterTopic := "blocks" protocol := "pubsub" msgType := "BlockProposal" - // Record metrics for both cluster and non-cluster topics - // This simulates normal operation where metrics accumulate for various topics - nc.InboundMessageReceived(100, clusterTopic, protocol, msgType) - nc.InboundMessageReceived(200, clusterTopic, protocol, "Transaction") - nc.InboundMessageReceived(300, otherTopic, protocol, msgType) - - nc.OutboundMessageSent(150, clusterTopic, protocol, msgType) - nc.OutboundMessageSent(250, otherTopic, protocol, msgType) - - nc.DuplicateInboundMessagesDropped(clusterTopic, protocol, msgType) - nc.DuplicateInboundMessagesDropped(otherTopic, protocol, msgType) - - // Record LocalGossipSubRouterMetrics - nc.OnLocalMeshSizeUpdated(clusterTopic, 5) - nc.OnLocalMeshSizeUpdated(otherTopic, 3) - nc.OnPeerGraftTopic(clusterTopic) - nc.OnPeerGraftTopic(otherTopic) - nc.OnPeerPruneTopic(clusterTopic) - nc.OnPeerPruneTopic(otherTopic) - - // Record GossipSubRpcValidationInspectorMetrics - nc.OnIHaveMessageIDsReceived(clusterTopic, 10) - nc.OnIHaveMessageIDsReceived(otherTopic, 5) - - // Verify metrics exist before cleanup - assertHistogramVecHasLabel(t, nc.inboundMessageSize, LabelChannel, clusterTopic, "inboundMessageSize should have cluster topic before cleanup") - assertHistogramVecHasLabel(t, nc.inboundMessageSize, LabelChannel, otherTopic, "inboundMessageSize should have other topic before cleanup") - assertHistogramVecHasLabel(t, nc.outboundMessageSize, LabelChannel, clusterTopic, "outboundMessageSize should have cluster topic before cleanup") - assertHistogramVecHasLabel(t, nc.outboundMessageSize, LabelChannel, otherTopic, "outboundMessageSize should have other topic before cleanup") - assertCounterVecHasLabel(t, nc.duplicateMessagesDropped, LabelChannel, clusterTopic, "duplicateMessagesDropped should have cluster topic before cleanup") - assertCounterVecHasLabel(t, nc.duplicateMessagesDropped, LabelChannel, otherTopic, "duplicateMessagesDropped should have other topic before cleanup") - - // Perform cleanup for the cluster topic - nc.OnClusterTopicMetricsCleanup(clusterTopic) - - // Verify cluster topic metrics are cleaned up - assertHistogramVecNotHasLabel(t, nc.inboundMessageSize, LabelChannel, clusterTopic, "inboundMessageSize should NOT have cluster topic after cleanup") - assertHistogramVecNotHasLabel(t, nc.outboundMessageSize, LabelChannel, clusterTopic, "outboundMessageSize should NOT have cluster topic after cleanup") - assertCounterVecNotHasLabel(t, nc.duplicateMessagesDropped, LabelChannel, clusterTopic, "duplicateMessagesDropped should NOT have cluster topic after cleanup") - - // Verify other topic metrics are NOT cleaned up - assertHistogramVecHasLabel(t, nc.inboundMessageSize, LabelChannel, otherTopic, "inboundMessageSize should still have other topic after cleanup") - assertHistogramVecHasLabel(t, nc.outboundMessageSize, LabelChannel, otherTopic, "outboundMessageSize should still have other topic after cleanup") - assertCounterVecHasLabel(t, nc.duplicateMessagesDropped, LabelChannel, otherTopic, "duplicateMessagesDropped should still have other topic after cleanup") + // Record metrics for multiple cluster topics and a non-cluster topic + nc.InboundMessageReceived(100, clusterTopic1, protocol, msgType) + nc.InboundMessageReceived(200, clusterTopic2, protocol, msgType) + nc.InboundMessageReceived(300, consensusClusterTopic, protocol, msgType) + nc.InboundMessageReceived(400, nonClusterTopic, protocol, msgType) + + nc.OutboundMessageSent(150, clusterTopic1, protocol, msgType) + nc.OutboundMessageSent(250, clusterTopic2, protocol, msgType) + nc.OutboundMessageSent(350, consensusClusterTopic, protocol, msgType) + nc.OutboundMessageSent(450, nonClusterTopic, protocol, msgType) + + nc.DuplicateInboundMessagesDropped(clusterTopic1, protocol, msgType) + nc.DuplicateInboundMessagesDropped(clusterTopic2, protocol, msgType) + nc.DuplicateInboundMessagesDropped(consensusClusterTopic, protocol, msgType) + nc.DuplicateInboundMessagesDropped(nonClusterTopic, protocol, msgType) + + // Verify cluster topics are normalized to their prefix + // Both sync-cluster topics should be recorded under "sync-cluster" + assertHistogramVecHasLabel(t, nc.inboundMessageSize, LabelChannel, "sync-cluster", "sync-cluster topics should be normalized to 'sync-cluster'") + assertHistogramVecHasLabel(t, nc.inboundMessageSize, LabelChannel, "consensus-cluster", "consensus-cluster topics should be normalized to 'consensus-cluster'") + + // Verify the original cluster topic names are NOT present (they should be normalized) + assertHistogramVecNotHasLabel(t, nc.inboundMessageSize, LabelChannel, clusterTopic1, "original cluster topic name should not be present") + assertHistogramVecNotHasLabel(t, nc.inboundMessageSize, LabelChannel, clusterTopic2, "original cluster topic name should not be present") + assertHistogramVecNotHasLabel(t, nc.inboundMessageSize, LabelChannel, consensusClusterTopic, "original consensus-cluster topic name should not be present") + + // Verify non-cluster topics are NOT normalized (they should keep their original name) + assertHistogramVecHasLabel(t, nc.inboundMessageSize, LabelChannel, nonClusterTopic, "non-cluster topics should keep their original name") } -// TestNetworkCollector_OnClusterTopicMetricsCleanup_Idempotent verifies that calling -// OnClusterTopicMetricsCleanup multiple times for the same topic doesn't cause issues. -func TestNetworkCollector_OnClusterTopicMetricsCleanup_Idempotent(t *testing.T) { +// TestLocalGossipSubRouterMetrics_ClusterTopicNormalization verifies that LocalGossipSubRouterMetrics +// normalizes cluster topics to their prefix. +func TestLocalGossipSubRouterMetrics_ClusterTopicNormalization(t *testing.T) { prefix := fmt.Sprintf("test_%s_", t.Name()) - logger := zerolog.Nop() - nc := NewNetworkCollector(logger, WithNetworkPrefix(prefix)) - - clusterTopic := "sync-cluster-test-cluster-id" - protocol := "pubsub" - msgType := "BlockProposal" - - // Record some metrics - nc.InboundMessageReceived(100, clusterTopic, protocol, msgType) - nc.OutboundMessageSent(150, clusterTopic, protocol, msgType) - nc.OnIHaveMessageIDsReceived(clusterTopic, 10) - - // Call cleanup multiple times - should not panic or cause issues - nc.OnClusterTopicMetricsCleanup(clusterTopic) - nc.OnClusterTopicMetricsCleanup(clusterTopic) - nc.OnClusterTopicMetricsCleanup(clusterTopic) - - // Verify metrics are cleaned up - assertHistogramVecNotHasLabel(t, nc.inboundMessageSize, LabelChannel, clusterTopic, "inboundMessageSize should be cleaned up") - assertHistogramVecNotHasLabel(t, nc.outboundMessageSize, LabelChannel, clusterTopic, "outboundMessageSize should be cleaned up") -} - -// TestNetworkCollector_OnClusterTopicMetricsCleanup_NonexistentTopic verifies that calling -// OnClusterTopicMetricsCleanup for a topic that was never recorded doesn't cause issues. -func TestNetworkCollector_OnClusterTopicMetricsCleanup_NonexistentTopic(t *testing.T) { - prefix := fmt.Sprintf("test_%s_", t.Name()) - logger := zerolog.Nop() - nc := NewNetworkCollector(logger, WithNetworkPrefix(prefix)) - - // Call cleanup for a topic that was never used - should not panic - nc.OnClusterTopicMetricsCleanup("nonexistent-cluster-topic") + m := NewGossipSubLocalMeshMetrics(prefix) + + clusterTopic1 := "sync-cluster-cluster-1" + clusterTopic2 := "sync-cluster-cluster-2" + consensusClusterTopic := "consensus-cluster-cluster-1" + nonClusterTopic := "blocks" + + // Record metrics for cluster and non-cluster topics + m.OnLocalMeshSizeUpdated(clusterTopic1, 5) + m.OnLocalMeshSizeUpdated(clusterTopic2, 3) + m.OnLocalMeshSizeUpdated(consensusClusterTopic, 4) + m.OnLocalMeshSizeUpdated(nonClusterTopic, 2) + + m.OnPeerGraftTopic(clusterTopic1) + m.OnPeerGraftTopic(clusterTopic2) + m.OnPeerGraftTopic(consensusClusterTopic) + m.OnPeerGraftTopic(nonClusterTopic) + + m.OnPeerPruneTopic(clusterTopic1) + m.OnPeerPruneTopic(clusterTopic2) + m.OnPeerPruneTopic(consensusClusterTopic) + m.OnPeerPruneTopic(nonClusterTopic) + + // Verify cluster topics are normalized to their prefix + assertGaugeVecHasLabel(t, &m.localMeshSize, LabelChannel, "sync-cluster", "sync-cluster topics should be normalized") + assertGaugeVecHasLabel(t, &m.localMeshSize, LabelChannel, "consensus-cluster", "consensus-cluster topics should be normalized") + + // Verify original cluster topic names are NOT present + assertGaugeVecNotHasLabel(t, &m.localMeshSize, LabelChannel, clusterTopic1, "original cluster topic should not be present") + assertGaugeVecNotHasLabel(t, &m.localMeshSize, LabelChannel, clusterTopic2, "original cluster topic should not be present") + assertGaugeVecNotHasLabel(t, &m.localMeshSize, LabelChannel, consensusClusterTopic, "original consensus-cluster topic should not be present") + + // Verify non-cluster topics are NOT normalized + assertGaugeVecHasLabel(t, &m.localMeshSize, LabelChannel, nonClusterTopic, "non-cluster topics should keep their original name") } // assertHistogramVecHasLabel checks that the histogram vec has at least one metric with the given label value. @@ -122,17 +111,17 @@ func assertHistogramVecNotHasLabel(t *testing.T, hv *prometheus.HistogramVec, la assert.False(t, found, msg) } -// assertCounterVecHasLabel checks that the counter vec has at least one metric with the given label value. -func assertCounterVecHasLabel(t *testing.T, cv *prometheus.CounterVec, labelName, labelValue, msg string) { +// assertGaugeVecHasLabel checks that the gauge vec has at least one metric with the given label value. +func assertGaugeVecHasLabel(t *testing.T, gv *prometheus.GaugeVec, labelName, labelValue, msg string) { t.Helper() - found := counterVecHasLabelValue(t, cv, labelName, labelValue) + found := gaugeVecHasLabelValue(t, gv, labelName, labelValue) assert.True(t, found, msg) } -// assertCounterVecNotHasLabel checks that the counter vec has no metrics with the given label value. -func assertCounterVecNotHasLabel(t *testing.T, cv *prometheus.CounterVec, labelName, labelValue, msg string) { +// assertGaugeVecNotHasLabel checks that the gauge vec has no metrics with the given label value. +func assertGaugeVecNotHasLabel(t *testing.T, gv *prometheus.GaugeVec, labelName, labelValue, msg string) { t.Helper() - found := counterVecHasLabelValue(t, cv, labelName, labelValue) + found := gaugeVecHasLabelValue(t, gv, labelName, labelValue) assert.False(t, found, msg) } @@ -159,12 +148,12 @@ func histogramVecHasLabelValue(t *testing.T, hv *prometheus.HistogramVec, labelN return false } -// counterVecHasLabelValue collects metrics from the counter vec and checks if any have the given label value. -func counterVecHasLabelValue(t *testing.T, cv *prometheus.CounterVec, labelName, labelValue string) bool { +// gaugeVecHasLabelValue collects metrics from the gauge vec and checks if any have the given label value. +func gaugeVecHasLabelValue(t *testing.T, gv *prometheus.GaugeVec, labelName, labelValue string) bool { t.Helper() ch := make(chan prometheus.Metric, 100) go func() { - cv.Collect(ch) + gv.Collect(ch) close(ch) }() diff --git a/module/metrics/noop.go b/module/metrics/noop.go index 9306d56776d..4a4b69a2264 100644 --- a/module/metrics/noop.go +++ b/module/metrics/noop.go @@ -310,7 +310,6 @@ func (nc *NoopCollector) OnRpcSent(msgCount int, iHaveCount int, iWantCount int, func (nc *NoopCollector) OnOutboundRpcDropped() {} func (nc *NoopCollector) OnUndeliveredMessage() {} func (nc *NoopCollector) OnMessageDeliveredToAllSubscribers(size int) {} -func (nc *NoopCollector) OnClusterTopicMetricsCleanup(topic string) {} func (nc *NoopCollector) AllowConn(network.Direction, bool) {} func (nc *NoopCollector) BlockConn(network.Direction, bool) {} func (nc *NoopCollector) AllowStream(peer.ID, network.Direction) {} diff --git a/module/mock/gossip_sub_metrics.go b/module/mock/gossip_sub_metrics.go index 8bfefbd7b63..57ce8bfdfb4 100644 --- a/module/mock/gossip_sub_metrics.go +++ b/module/mock/gossip_sub_metrics.go @@ -225,46 +225,6 @@ func (_c *GossipSubMetrics_OnBehaviourPenaltyUpdated_Call) RunAndReturn(run func return _c } -// OnClusterTopicMetricsCleanup provides a mock function for the type GossipSubMetrics -func (_mock *GossipSubMetrics) OnClusterTopicMetricsCleanup(topic string) { - _mock.Called(topic) - return -} - -// GossipSubMetrics_OnClusterTopicMetricsCleanup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnClusterTopicMetricsCleanup' -type GossipSubMetrics_OnClusterTopicMetricsCleanup_Call struct { - *mock.Call -} - -// OnClusterTopicMetricsCleanup is a helper method to define mock.On call -// - topic string -func (_e *GossipSubMetrics_Expecter) OnClusterTopicMetricsCleanup(topic interface{}) *GossipSubMetrics_OnClusterTopicMetricsCleanup_Call { - return &GossipSubMetrics_OnClusterTopicMetricsCleanup_Call{Call: _e.mock.On("OnClusterTopicMetricsCleanup", topic)} -} - -func (_c *GossipSubMetrics_OnClusterTopicMetricsCleanup_Call) Run(run func(topic string)) *GossipSubMetrics_OnClusterTopicMetricsCleanup_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 string - if args[0] != nil { - arg0 = args[0].(string) - } - run( - arg0, - ) - }) - return _c -} - -func (_c *GossipSubMetrics_OnClusterTopicMetricsCleanup_Call) Return() *GossipSubMetrics_OnClusterTopicMetricsCleanup_Call { - _c.Call.Return() - return _c -} - -func (_c *GossipSubMetrics_OnClusterTopicMetricsCleanup_Call) RunAndReturn(run func(topic string)) *GossipSubMetrics_OnClusterTopicMetricsCleanup_Call { - _c.Run(run) - return _c -} - // OnControlMessagesTruncated provides a mock function for the type GossipSubMetrics func (_mock *GossipSubMetrics) OnControlMessagesTruncated(messageType p2pmsg.ControlMessageType, diff int) { _mock.Called(messageType, diff) diff --git a/module/mock/gossip_sub_rpc_validation_inspector_metrics.go b/module/mock/gossip_sub_rpc_validation_inspector_metrics.go index 67cf58ff355..2e06c3e33b2 100644 --- a/module/mock/gossip_sub_rpc_validation_inspector_metrics.go +++ b/module/mock/gossip_sub_rpc_validation_inspector_metrics.go @@ -144,46 +144,6 @@ func (_c *GossipSubRpcValidationInspectorMetrics_OnActiveClusterIDsNotSetErr_Cal return _c } -// OnClusterTopicMetricsCleanup provides a mock function for the type GossipSubRpcValidationInspectorMetrics -func (_mock *GossipSubRpcValidationInspectorMetrics) OnClusterTopicMetricsCleanup(topic string) { - _mock.Called(topic) - return -} - -// GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnClusterTopicMetricsCleanup' -type GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call struct { - *mock.Call -} - -// OnClusterTopicMetricsCleanup is a helper method to define mock.On call -// - topic string -func (_e *GossipSubRpcValidationInspectorMetrics_Expecter) OnClusterTopicMetricsCleanup(topic interface{}) *GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call { - return &GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call{Call: _e.mock.On("OnClusterTopicMetricsCleanup", topic)} -} - -func (_c *GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call) Run(run func(topic string)) *GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 string - if args[0] != nil { - arg0 = args[0].(string) - } - run( - arg0, - ) - }) - return _c -} - -func (_c *GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call) Return() *GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call { - _c.Call.Return() - return _c -} - -func (_c *GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call) RunAndReturn(run func(topic string)) *GossipSubRpcValidationInspectorMetrics_OnClusterTopicMetricsCleanup_Call { - _c.Run(run) - return _c -} - // OnControlMessagesTruncated provides a mock function for the type GossipSubRpcValidationInspectorMetrics func (_mock *GossipSubRpcValidationInspectorMetrics) OnControlMessagesTruncated(messageType p2pmsg.ControlMessageType, diff int) { _mock.Called(messageType, diff) diff --git a/module/mock/lib_p2_p_metrics.go b/module/mock/lib_p2_p_metrics.go index 49d2aa54089..8b26953b35b 100644 --- a/module/mock/lib_p2_p_metrics.go +++ b/module/mock/lib_p2_p_metrics.go @@ -984,46 +984,6 @@ func (_c *LibP2PMetrics_OnBehaviourPenaltyUpdated_Call) RunAndReturn(run func(f return _c } -// OnClusterTopicMetricsCleanup provides a mock function for the type LibP2PMetrics -func (_mock *LibP2PMetrics) OnClusterTopicMetricsCleanup(topic string) { - _mock.Called(topic) - return -} - -// LibP2PMetrics_OnClusterTopicMetricsCleanup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnClusterTopicMetricsCleanup' -type LibP2PMetrics_OnClusterTopicMetricsCleanup_Call struct { - *mock.Call -} - -// OnClusterTopicMetricsCleanup is a helper method to define mock.On call -// - topic string -func (_e *LibP2PMetrics_Expecter) OnClusterTopicMetricsCleanup(topic interface{}) *LibP2PMetrics_OnClusterTopicMetricsCleanup_Call { - return &LibP2PMetrics_OnClusterTopicMetricsCleanup_Call{Call: _e.mock.On("OnClusterTopicMetricsCleanup", topic)} -} - -func (_c *LibP2PMetrics_OnClusterTopicMetricsCleanup_Call) Run(run func(topic string)) *LibP2PMetrics_OnClusterTopicMetricsCleanup_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 string - if args[0] != nil { - arg0 = args[0].(string) - } - run( - arg0, - ) - }) - return _c -} - -func (_c *LibP2PMetrics_OnClusterTopicMetricsCleanup_Call) Return() *LibP2PMetrics_OnClusterTopicMetricsCleanup_Call { - _c.Call.Return() - return _c -} - -func (_c *LibP2PMetrics_OnClusterTopicMetricsCleanup_Call) RunAndReturn(run func(topic string)) *LibP2PMetrics_OnClusterTopicMetricsCleanup_Call { - _c.Run(run) - return _c -} - // OnControlMessagesTruncated provides a mock function for the type LibP2PMetrics func (_mock *LibP2PMetrics) OnControlMessagesTruncated(messageType p2pmsg.ControlMessageType, diff int) { _mock.Called(messageType, diff) diff --git a/module/mock/local_gossip_sub_router_metrics.go b/module/mock/local_gossip_sub_router_metrics.go index 5123ef76e49..416027c49c9 100644 --- a/module/mock/local_gossip_sub_router_metrics.go +++ b/module/mock/local_gossip_sub_router_metrics.go @@ -35,46 +35,6 @@ func (_m *LocalGossipSubRouterMetrics) EXPECT() *LocalGossipSubRouterMetrics_Exp return &LocalGossipSubRouterMetrics_Expecter{mock: &_m.Mock} } -// OnClusterTopicMetricsCleanup provides a mock function for the type LocalGossipSubRouterMetrics -func (_mock *LocalGossipSubRouterMetrics) OnClusterTopicMetricsCleanup(topic string) { - _mock.Called(topic) - return -} - -// LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnClusterTopicMetricsCleanup' -type LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call struct { - *mock.Call -} - -// OnClusterTopicMetricsCleanup is a helper method to define mock.On call -// - topic string -func (_e *LocalGossipSubRouterMetrics_Expecter) OnClusterTopicMetricsCleanup(topic interface{}) *LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call { - return &LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call{Call: _e.mock.On("OnClusterTopicMetricsCleanup", topic)} -} - -func (_c *LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call) Run(run func(topic string)) *LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 string - if args[0] != nil { - arg0 = args[0].(string) - } - run( - arg0, - ) - }) - return _c -} - -func (_c *LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call) Return() *LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call { - _c.Call.Return() - return _c -} - -func (_c *LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call) RunAndReturn(run func(topic string)) *LocalGossipSubRouterMetrics_OnClusterTopicMetricsCleanup_Call { - _c.Run(run) - return _c -} - // OnLocalMeshSizeUpdated provides a mock function for the type LocalGossipSubRouterMetrics func (_mock *LocalGossipSubRouterMetrics) OnLocalMeshSizeUpdated(topic string, size int) { _mock.Called(topic, size) diff --git a/module/mock/network_metrics.go b/module/mock/network_metrics.go index c6c70800dd8..ee118e026c5 100644 --- a/module/mock/network_metrics.go +++ b/module/mock/network_metrics.go @@ -1260,46 +1260,6 @@ func (_c *NetworkMetrics_OnBehaviourPenaltyUpdated_Call) RunAndReturn(run func(f return _c } -// OnClusterTopicMetricsCleanup provides a mock function for the type NetworkMetrics -func (_mock *NetworkMetrics) OnClusterTopicMetricsCleanup(topic string) { - _mock.Called(topic) - return -} - -// NetworkMetrics_OnClusterTopicMetricsCleanup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnClusterTopicMetricsCleanup' -type NetworkMetrics_OnClusterTopicMetricsCleanup_Call struct { - *mock.Call -} - -// OnClusterTopicMetricsCleanup is a helper method to define mock.On call -// - topic string -func (_e *NetworkMetrics_Expecter) OnClusterTopicMetricsCleanup(topic interface{}) *NetworkMetrics_OnClusterTopicMetricsCleanup_Call { - return &NetworkMetrics_OnClusterTopicMetricsCleanup_Call{Call: _e.mock.On("OnClusterTopicMetricsCleanup", topic)} -} - -func (_c *NetworkMetrics_OnClusterTopicMetricsCleanup_Call) Run(run func(topic string)) *NetworkMetrics_OnClusterTopicMetricsCleanup_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 string - if args[0] != nil { - arg0 = args[0].(string) - } - run( - arg0, - ) - }) - return _c -} - -func (_c *NetworkMetrics_OnClusterTopicMetricsCleanup_Call) Return() *NetworkMetrics_OnClusterTopicMetricsCleanup_Call { - _c.Call.Return() - return _c -} - -func (_c *NetworkMetrics_OnClusterTopicMetricsCleanup_Call) RunAndReturn(run func(topic string)) *NetworkMetrics_OnClusterTopicMetricsCleanup_Call { - _c.Run(run) - return _c -} - // OnControlMessagesTruncated provides a mock function for the type NetworkMetrics func (_mock *NetworkMetrics) OnControlMessagesTruncated(messageType p2pmsg.ControlMessageType, diff int) { _mock.Called(messageType, diff) diff --git a/network/channels/channels.go b/network/channels/channels.go index d279dd85c1e..3f858c15548 100644 --- a/network/channels/channels.go +++ b/network/channels/channels.go @@ -338,6 +338,20 @@ func SyncCluster(clusterID flow.ChainID) Channel { return Channel(fmt.Sprintf("%s-%s", SyncClusterPrefix, clusterID)) } +// NormalizeTopicForMetrics returns a normalized version of the topic suitable for use as a metrics label. +// For cluster topics (sync-cluster-*, consensus-cluster-*), it returns just the prefix to prevent +// unbounded cardinality growth during epoch transitions (each epoch creates new cluster IDs). +// For non-cluster topics, it returns the topic unchanged. +func NormalizeTopicForMetrics(topic string) string { + if strings.HasPrefix(topic, SyncClusterPrefix) { + return SyncClusterPrefix + } + if strings.HasPrefix(topic, ConsensusClusterPrefix) { + return ConsensusClusterPrefix + } + return topic +} + // IsValidNonClusterFlowTopic ensures the topic is a valid Flow network topic and // ensures the sporkID part of the Topic is equal to the current network sporkID. // Expected errors: diff --git a/network/channels/channels_test.go b/network/channels/channels_test.go index 37b2ca6c811..2e07466683a 100644 --- a/network/channels/channels_test.go +++ b/network/channels/channels_test.go @@ -129,3 +129,57 @@ func TestUniqueChannels_ClusterChannels(t *testing.T) { require.Contains(t, uniques, consensusCluster) // cluster channel require.Contains(t, uniques, PushTransactions) // non-cluster channel } + +// TestNormalizeTopicForMetrics verifies that cluster topics are normalized to their prefix +// (e.g., "sync-cluster-*" -> "sync-cluster") while non-cluster topics remain unchanged. +// This prevents unbounded metric cardinality growth during epoch transitions. +func TestNormalizeTopicForMetrics(t *testing.T) { + testCases := []struct { + name string + inputTopic string + expectedOutput string + }{ + { + name: "sync-cluster topic normalized", + inputTopic: "sync-cluster-cluster-123-abc123def456", + expectedOutput: SyncClusterPrefix, + }, + { + name: "consensus-cluster topic normalized", + inputTopic: "consensus-cluster-cluster-456-def789abc123", + expectedOutput: ConsensusClusterPrefix, + }, + { + name: "sync-cluster topic with different format normalized", + inputTopic: "sync-cluster-test-id", + expectedOutput: SyncClusterPrefix, + }, + { + name: "consensus-cluster topic with different format normalized", + inputTopic: "consensus-cluster-test-id", + expectedOutput: ConsensusClusterPrefix, + }, + { + name: "non-cluster topic unchanged", + inputTopic: "push-blocks/abc123", + expectedOutput: "push-blocks/abc123", + }, + { + name: "another non-cluster topic unchanged", + inputTopic: "consensus-committee/xyz789", + expectedOutput: "consensus-committee/xyz789", + }, + { + name: "empty string unchanged", + inputTopic: "", + expectedOutput: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := NormalizeTopicForMetrics(tc.inputTopic) + assert.Equal(t, tc.expectedOutput, result) + }) + } +} diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index 8b3bd2ac8e5..c36c5d041bf 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -280,16 +280,13 @@ func (t *GossipSubMeshTracer) Leave(topic string) { Msg("local peer left topic") } - // Clean up topic mesh tracking and metrics for cluster topics to prevent unbounded - // metric cardinality growth during epoch transitions. + // Clean up topic mesh tracking for cluster topics. + // Note: Metric cardinality is bounded by normalizing cluster topics to their prefix + // (e.g., "consensus-cluster", "sync-cluster") when recording metrics. if channels.IsClusterChannel(channels.Channel(topic)) { t.topicMeshMu.Lock() delete(t.topicMeshMap, topic) t.topicMeshMu.Unlock() - t.metrics.OnClusterTopicMetricsCleanup(topic) - t.logger.Debug(). - Str("topic", topic). - Msg("cleaned up cluster topic metrics") } } diff --git a/network/p2p/tracer/gossipSubMeshTracer_test.go b/network/p2p/tracer/gossipSubMeshTracer_test.go index 3ccbb4e7528..40a5fe76d58 100644 --- a/network/p2p/tracer/gossipSubMeshTracer_test.go +++ b/network/p2p/tracer/gossipSubMeshTracer_test.go @@ -218,92 +218,3 @@ func (c *localMeshTracerMetricsCollector) OnLocalMeshSizeUpdated(topic string, s // calls the mock method to assert the metrics. c.l.OnLocalMeshSizeUpdated(topic, size) } - -func (c *localMeshTracerMetricsCollector) OnClusterTopicMetricsCleanup(topic string) { - // calls the mock method to assert the metrics cleanup. - c.l.OnClusterTopicMetricsCleanup(topic) -} - -// TestGossipSubMeshTracer_ClusterTopicCleanup tests that when a node leaves a cluster topic, -// the mesh tracer cleans up metrics for that topic to prevent unbounded metric cardinality growth. -func TestGossipSubMeshTracer_ClusterTopicCleanup(t *testing.T) { - defaultConfig, err := config.DefaultConfig() - require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) - signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) - sporkId := unittest.IdentifierFixture() - idProvider := mockmodule.NewIdentityProvider(t) - defer cancel() - - // create a non-cluster channel and a cluster channel - nonClusterChannel := channels.PushBlocks - nonClusterTopic := channels.TopicFromChannel(nonClusterChannel, sporkId) - clusterID := flow.ChainID("test-cluster-id") - clusterChannel := channels.SyncCluster(clusterID) - clusterTopic := channels.Topic(clusterChannel) - - logger := zerolog.New(io.Discard).Level(zerolog.DebugLevel) - - collector := newLocalMeshTracerMetricsCollector(t) - // set the meshTracer to log at 1 second intervals for sake of testing. - defaultConfig.NetworkConfig.GossipSub.RpcTracer.LocalMeshLogInterval = 1 * time.Second - // disables peer scoring for sake of testing - defaultConfig.NetworkConfig.GossipSub.PeerScoringEnabled = false - - tracerNode, tracerId := p2ptest.NodeFixture( - t, - sporkId, - t.Name(), - idProvider, - p2ptest.WithLogger(logger), - p2ptest.OverrideFlowConfig(defaultConfig), - p2ptest.WithMetricsCollector(collector), - p2ptest.WithRole(flow.RoleCollection)) - - idProvider.On("ByPeerID", tracerNode.ID()).Return(&tracerId, true).Maybe() - - nodes := []p2p.LibP2PNode{tracerNode} - ids := flow.IdentityList{&tracerId} - - p2ptest.RegisterPeerProviders(t, nodes) - p2ptest.StartNodes(t, signalerCtx, nodes) - defer p2ptest.StopNodes(t, nodes, cancel) - - p2ptest.LetNodesDiscoverEachOther(t, ctx, nodes, ids) - - // allow any OnLocalMeshSizeUpdated calls since we're not testing that here - collector.l.On("OnLocalMeshSizeUpdated", nonClusterTopic.String(), 0).Maybe() - collector.l.On("OnLocalMeshSizeUpdated", clusterTopic.String(), 0).Maybe() - - // subscribe to both topics - _, err = tracerNode.Subscribe( - nonClusterTopic, - validator.TopicValidator( - unittest.Logger(), - unittest.AllowAllPeerFilter())) - require.NoError(t, err) - - _, err = tracerNode.Subscribe( - clusterTopic, - validator.TopicValidator( - unittest.Logger(), - unittest.AllowAllPeerFilter())) - require.NoError(t, err) - - // set up expectation: OnClusterTopicMetricsCleanup should only be called for the cluster topic - collector.l.On("OnClusterTopicMetricsCleanup", clusterTopic.String()).Once() - - // unsubscribe from both topics - require.NoError(t, tracerNode.Unsubscribe(nonClusterTopic)) - require.NoError(t, tracerNode.Unsubscribe(clusterTopic)) - - // wait until cleanup callback is observed (avoid flaky timing with time.Sleep) - require.Eventually(t, func() bool { - for _, c := range collector.l.Calls { - if c.Method == "OnClusterTopicMetricsCleanup" && len(c.Arguments) == 1 && c.Arguments[0] == clusterTopic.String() { - return true - } - } - return false - }, time.Second, 10*time.Millisecond) -} From 4610a3e24cdbafcb1f93a9252b4959b52ec716a6 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 27 May 2026 16:48:01 -0700 Subject: [PATCH 6/6] normalize cluster topic labels in metrics to prevent unbounded cardinality growth for more metrics --- module/metrics/alsp.go | 10 +++------- module/metrics/gossipsub_score.go | 18 ++++-------------- module/metrics/network.go | 21 ++++++++++++++------- 3 files changed, 21 insertions(+), 28 deletions(-) diff --git a/module/metrics/alsp.go b/module/metrics/alsp.go index 888ca4dd18d..33edf997cc9 100644 --- a/module/metrics/alsp.go +++ b/module/metrics/alsp.go @@ -4,6 +4,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/network/channels" ) // AlspMetrics is a struct that contains all the metrics related to the ALSP module. @@ -35,21 +36,16 @@ func NewAlspMetrics() *AlspMetrics { return alsp } -// OnClusterTopicMetricsCleanup removes all misbehavior counter label values associated with the given -// cluster topic to prevent unbounded metric cardinality growth during epoch transitions. -func (a *AlspMetrics) OnClusterTopicMetricsCleanup(topic string) { - a.reportedMisbehaviorCount.DeletePartialMatch(prometheus.Labels{LabelChannel: topic}) -} - // OnMisbehaviorReported is called when a misbehavior is reported by the application layer to ALSP. // An engine detecting a spamming-related misbehavior reports it to the ALSP module. It increases // the counter vector of reported misbehavior. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. // Args: // - channel: the channel on which the misbehavior was reported // - misbehaviorType: the type of misbehavior reported func (a *AlspMetrics) OnMisbehaviorReported(channel string, misbehaviorType string) { a.reportedMisbehaviorCount.With(prometheus.Labels{ - LabelChannel: channel, + LabelChannel: channels.NormalizeTopicForMetrics(channel), LabelMisbehavior: misbehaviorType, }).Inc() } diff --git a/module/metrics/gossipsub_score.go b/module/metrics/gossipsub_score.go index 4b3242108f0..8e13f40e352 100644 --- a/module/metrics/gossipsub_score.go +++ b/module/metrics/gossipsub_score.go @@ -143,31 +143,21 @@ func (g *GossipSubScoreMetrics) OnBehaviourPenaltyUpdated(penalty float64) { } func (g *GossipSubScoreMetrics) OnTimeInMeshUpdated(topic channels.Topic, duration time.Duration) { - g.timeInMesh.WithLabelValues(string(topic)).Observe(duration.Seconds()) + g.timeInMesh.WithLabelValues(channels.NormalizeTopicForMetrics(string(topic))).Observe(duration.Seconds()) } func (g *GossipSubScoreMetrics) OnFirstMessageDeliveredUpdated(topic channels.Topic, f float64) { - g.firstMessageDelivery.WithLabelValues(string(topic)).Observe(f) + g.firstMessageDelivery.WithLabelValues(channels.NormalizeTopicForMetrics(string(topic))).Observe(f) } func (g *GossipSubScoreMetrics) OnMeshMessageDeliveredUpdated(topic channels.Topic, f float64) { - g.meshMessageDelivery.WithLabelValues(string(topic)).Observe(f) + g.meshMessageDelivery.WithLabelValues(channels.NormalizeTopicForMetrics(string(topic))).Observe(f) } func (g *GossipSubScoreMetrics) OnInvalidMessageDeliveredUpdated(topic channels.Topic, f float64) { - g.invalidMessageDelivery.WithLabelValues(string(topic)).Observe(f) + g.invalidMessageDelivery.WithLabelValues(channels.NormalizeTopicForMetrics(string(topic))).Observe(f) } func (g *GossipSubScoreMetrics) SetWarningStateCount(u uint) { g.warningStateGauge.Set(float64(u)) } - -// OnClusterTopicMetricsCleanup removes all per-topic scoring metric label values associated with -// the given cluster topic. Call this when the local node leaves a cluster topic to prevent -// unbounded metric cardinality growth across epoch transitions. -func (g *GossipSubScoreMetrics) OnClusterTopicMetricsCleanup(topic string) { - g.timeInMesh.DeleteLabelValues(topic) - g.meshMessageDelivery.DeleteLabelValues(topic) - g.firstMessageDelivery.DeleteLabelValues(topic) - g.invalidMessageDelivery.DeleteLabelValues(topic) -} diff --git a/module/metrics/network.go b/module/metrics/network.go index c4c231315b4..a7073eb0dcf 100644 --- a/module/metrics/network.go +++ b/module/metrics/network.go @@ -291,18 +291,21 @@ func (nc *NetworkCollector) QueueDuration(duration time.Duration, priority int) } // MessageProcessingStarted increments the metric tracking the number of messages being processed by the node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (nc *NetworkCollector) MessageProcessingStarted(topic string) { - nc.numMessagesProcessing.WithLabelValues(topic).Inc() + nc.numMessagesProcessing.WithLabelValues(channels.NormalizeTopicForMetrics(topic)).Inc() } // UnicastMessageSendingStarted increments the metric tracking the number of unicast messages sent by the node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (nc *NetworkCollector) UnicastMessageSendingStarted(topic string) { - nc.numDirectMessagesSending.WithLabelValues(topic).Inc() + nc.numDirectMessagesSending.WithLabelValues(channels.NormalizeTopicForMetrics(topic)).Inc() } // UnicastMessageSendingCompleted decrements the metric tracking the number of unicast messages sent by the node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (nc *NetworkCollector) UnicastMessageSendingCompleted(topic string) { - nc.numDirectMessagesSending.WithLabelValues(topic).Dec() + nc.numDirectMessagesSending.WithLabelValues(channels.NormalizeTopicForMetrics(topic)).Dec() } func (nc *NetworkCollector) RoutingTablePeerAdded() { @@ -315,9 +318,11 @@ func (nc *NetworkCollector) RoutingTablePeerRemoved() { // MessageProcessingFinished tracks the time spent by the node to process a message and decrements the metric tracking // the number of messages being processed by the node. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (nc *NetworkCollector) MessageProcessingFinished(topic string, duration time.Duration) { - nc.numMessagesProcessing.WithLabelValues(topic).Dec() - nc.inboundProcessTime.WithLabelValues(topic).Add(duration.Seconds()) + normalizedTopic := channels.NormalizeTopicForMetrics(topic) + nc.numMessagesProcessing.WithLabelValues(normalizedTopic).Dec() + nc.inboundProcessTime.WithLabelValues(normalizedTopic).Add(duration.Seconds()) } // OutboundConnections updates the metric tracking the number of outbound connections of this node @@ -357,11 +362,13 @@ func (nc *NetworkCollector) OnDNSLookupRequestDropped() { } // OnUnauthorizedMessage tracks the number of unauthorized messages seen on the network. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (nc *NetworkCollector) OnUnauthorizedMessage(role, msgType, topic, offense string) { - nc.unAuthorizedMessagesCount.WithLabelValues(role, msgType, topic, offense).Inc() + nc.unAuthorizedMessagesCount.WithLabelValues(role, msgType, channels.NormalizeTopicForMetrics(topic), offense).Inc() } // OnRateLimitedPeer tracks the number of rate limited messages seen on the network. +// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth. func (nc *NetworkCollector) OnRateLimitedPeer(peerID peer.ID, role, msgType, topic, reason string) { nc.logger.Warn(). Str("peer_id", logging2.PeerId(peerID)). @@ -371,7 +378,7 @@ func (nc *NetworkCollector) OnRateLimitedPeer(peerID peer.ID, role, msgType, top Str("reason", reason). Bool(logging.KeySuspicious, true). Msg("unicast peer rate limited") - nc.rateLimitedUnicastMessagesCount.WithLabelValues(role, msgType, topic, reason).Inc() + nc.rateLimitedUnicastMessagesCount.WithLabelValues(role, msgType, channels.NormalizeTopicForMetrics(topic), reason).Inc() } // OnViolationReportSkipped tracks the number of slashing violations consumer violations that were not