Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/client_model v0.6.2
github.com/prometheus/common v0.61.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/psiemens/sconfig v0.1.0 // indirect
Expand Down
4 changes: 3 additions & 1 deletion module/metrics/alsp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/prometheus/client_golang/prometheus"

"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/network/channels"
)

// AlspMetrics is a struct that contains all the metrics related to the ALSP module.
Expand Down Expand Up @@ -38,12 +39,13 @@ func NewAlspMetrics() *AlspMetrics {
// OnMisbehaviorReported is called when a misbehavior is reported by the application layer to ALSP.
// An engine detecting a spamming-related misbehavior reports it to the ALSP module. It increases
// the counter vector of reported misbehavior.
// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth.
// Args:
// - channel: the channel on which the misbehavior was reported
// - misbehaviorType: the type of misbehavior reported
func (a *AlspMetrics) OnMisbehaviorReported(channel string, misbehaviorType string) {
a.reportedMisbehaviorCount.With(prometheus.Labels{
LabelChannel: channel,
LabelChannel: channels.NormalizeTopicForMetrics(channel),
LabelMisbehavior: misbehaviorType,
}).Inc()
}
11 changes: 8 additions & 3 deletions module/metrics/gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/network/channels"
)

// LocalGossipSubRouterMetrics encapsulates the metrics collectors for GossipSub router of the local node.
Expand Down Expand Up @@ -261,8 +262,10 @@ func NewGossipSubLocalMeshMetrics(prefix string) *LocalGossipSubRouterMetrics {
var _ module.LocalGossipSubRouterMetrics = (*LocalGossipSubRouterMetrics)(nil)

// OnLocalMeshSizeUpdated updates the local mesh size metric.
// Cluster topics are normalized to their prefix (e.g., "sync-cluster", "consensus-cluster")
// to prevent unbounded cardinality growth during epoch transitions.
func (g *LocalGossipSubRouterMetrics) OnLocalMeshSizeUpdated(topic string, size int) {
g.localMeshSize.WithLabelValues(topic).Set(float64(size))
g.localMeshSize.WithLabelValues(channels.NormalizeTopicForMetrics(topic)).Set(float64(size))
}

// OnPeerAddedToProtocol is called when the local node receives a stream from a peer on a gossipsub-related protocol.
Expand Down Expand Up @@ -296,14 +299,16 @@ func (g *LocalGossipSubRouterMetrics) OnLocalPeerLeftTopic() {

// OnPeerGraftTopic is called when the local node receives a GRAFT message from a remote peer on a topic.
// Note: the received GRAFT at this point is considered passed the RPC inspection, and is accepted by the local node.
// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth.
func (g *LocalGossipSubRouterMetrics) OnPeerGraftTopic(topic string) {
g.peerGraftTopicCount.WithLabelValues(topic).Inc()
g.peerGraftTopicCount.WithLabelValues(channels.NormalizeTopicForMetrics(topic)).Inc()
}

// OnPeerPruneTopic is called when the local node receives a PRUNE message from a remote peer on a topic.
// Note: the received PRUNE at this point is considered passed the RPC inspection, and is accepted by the local node.
// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth.
func (g *LocalGossipSubRouterMetrics) OnPeerPruneTopic(topic string) {
g.peerPruneTopicCount.WithLabelValues(topic).Inc()
g.peerPruneTopicCount.WithLabelValues(channels.NormalizeTopicForMetrics(topic)).Inc()
}

// OnMessageEnteredValidation is called when a received pubsub message enters the validation pipeline. It is the
Expand Down
4 changes: 3 additions & 1 deletion module/metrics/gossipsub_rpc_validation_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/network/channels"
p2pmsg "github.com/onflow/flow-go/network/p2p/message"
)

Expand Down Expand Up @@ -421,11 +422,12 @@ func (c *GossipSubRpcValidationInspectorMetrics) OnIWantMessageIDsReceived(msgId

// OnIHaveMessageIDsReceived tracks the number of message ids received by the node from other nodes on an iHave message.
// This function is called on each iHave message received by the node.
// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth.
// Args:
// - channel: the channel on which the iHave message was received.
// - msgIdCount: the number of message ids received on the iHave message.
func (c *GossipSubRpcValidationInspectorMetrics) OnIHaveMessageIDsReceived(channel string, msgIdCount int) {
c.receivedIHaveMsgIDsHistogram.WithLabelValues(channel).Observe(float64(msgIdCount))
c.receivedIHaveMsgIDsHistogram.WithLabelValues(channels.NormalizeTopicForMetrics(channel)).Observe(float64(msgIdCount))
}

// OnIncomingRpcReceived tracks the number of incoming RPC messages received by the node.
Expand Down
8 changes: 4 additions & 4 deletions module/metrics/gossipsub_score.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,19 @@ func (g *GossipSubScoreMetrics) OnBehaviourPenaltyUpdated(penalty float64) {
}

func (g *GossipSubScoreMetrics) OnTimeInMeshUpdated(topic channels.Topic, duration time.Duration) {
g.timeInMesh.WithLabelValues(string(topic)).Observe(duration.Seconds())
g.timeInMesh.WithLabelValues(channels.NormalizeTopicForMetrics(string(topic))).Observe(duration.Seconds())
}

func (g *GossipSubScoreMetrics) OnFirstMessageDeliveredUpdated(topic channels.Topic, f float64) {
g.firstMessageDelivery.WithLabelValues(string(topic)).Observe(f)
g.firstMessageDelivery.WithLabelValues(channels.NormalizeTopicForMetrics(string(topic))).Observe(f)
}

func (g *GossipSubScoreMetrics) OnMeshMessageDeliveredUpdated(topic channels.Topic, f float64) {
g.meshMessageDelivery.WithLabelValues(string(topic)).Observe(f)
g.meshMessageDelivery.WithLabelValues(channels.NormalizeTopicForMetrics(string(topic))).Observe(f)
}

func (g *GossipSubScoreMetrics) OnInvalidMessageDeliveredUpdated(topic channels.Topic, f float64) {
g.invalidMessageDelivery.WithLabelValues(string(topic)).Observe(f)
g.invalidMessageDelivery.WithLabelValues(channels.NormalizeTopicForMetrics(string(topic))).Observe(f)
}

func (g *GossipSubScoreMetrics) SetWarningStateCount(u uint) {
Expand Down
31 changes: 21 additions & 10 deletions module/metrics/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/rs/zerolog"

"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/network/channels"
logging2 "github.com/onflow/flow-go/network/p2p/logging"
"github.com/onflow/flow-go/utils/logging"
)
Expand Down Expand Up @@ -260,18 +261,21 @@ func NewNetworkCollector(logger zerolog.Logger, opts ...NetworkCollectorOpt) *Ne
}

// OutboundMessageSent collects metrics related to a message sent by the node.
// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth.
func (nc *NetworkCollector) OutboundMessageSent(sizeBytes int, topic, protocol, messageType string) {
nc.outboundMessageSize.WithLabelValues(topic, protocol, messageType).Observe(float64(sizeBytes))
nc.outboundMessageSize.WithLabelValues(channels.NormalizeTopicForMetrics(topic), protocol, messageType).Observe(float64(sizeBytes))
}

// InboundMessageReceived collects metrics related to a message received by the node.
// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth.
func (nc *NetworkCollector) InboundMessageReceived(sizeBytes int, topic, protocol, messageType string) {
nc.inboundMessageSize.WithLabelValues(topic, protocol, messageType).Observe(float64(sizeBytes))
nc.inboundMessageSize.WithLabelValues(channels.NormalizeTopicForMetrics(topic), protocol, messageType).Observe(float64(sizeBytes))
}

// DuplicateInboundMessagesDropped increments the metric tracking the number of duplicate messages dropped by the node.
// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth.
func (nc *NetworkCollector) DuplicateInboundMessagesDropped(topic, protocol, messageType string) {
nc.duplicateMessagesDropped.WithLabelValues(topic, protocol, messageType).Add(1)
nc.duplicateMessagesDropped.WithLabelValues(channels.NormalizeTopicForMetrics(topic), protocol, messageType).Add(1)
}

func (nc *NetworkCollector) MessageAdded(priority int) {
Expand All @@ -287,18 +291,21 @@ func (nc *NetworkCollector) QueueDuration(duration time.Duration, priority int)
}

// MessageProcessingStarted increments the metric tracking the number of messages being processed by the node.
// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth.
func (nc *NetworkCollector) MessageProcessingStarted(topic string) {
nc.numMessagesProcessing.WithLabelValues(topic).Inc()
nc.numMessagesProcessing.WithLabelValues(channels.NormalizeTopicForMetrics(topic)).Inc()
}

// UnicastMessageSendingStarted increments the metric tracking the number of unicast messages sent by the node.
// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth.
func (nc *NetworkCollector) UnicastMessageSendingStarted(topic string) {
nc.numDirectMessagesSending.WithLabelValues(topic).Inc()
nc.numDirectMessagesSending.WithLabelValues(channels.NormalizeTopicForMetrics(topic)).Inc()
}

// UnicastMessageSendingCompleted decrements the metric tracking the number of unicast messages sent by the node.
// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth.
func (nc *NetworkCollector) UnicastMessageSendingCompleted(topic string) {
nc.numDirectMessagesSending.WithLabelValues(topic).Dec()
nc.numDirectMessagesSending.WithLabelValues(channels.NormalizeTopicForMetrics(topic)).Dec()
}

func (nc *NetworkCollector) RoutingTablePeerAdded() {
Expand All @@ -311,9 +318,11 @@ func (nc *NetworkCollector) RoutingTablePeerRemoved() {

// MessageProcessingFinished tracks the time spent by the node to process a message and decrements the metric tracking
// the number of messages being processed by the node.
// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth.
func (nc *NetworkCollector) MessageProcessingFinished(topic string, duration time.Duration) {
nc.numMessagesProcessing.WithLabelValues(topic).Dec()
nc.inboundProcessTime.WithLabelValues(topic).Add(duration.Seconds())
normalizedTopic := channels.NormalizeTopicForMetrics(topic)
nc.numMessagesProcessing.WithLabelValues(normalizedTopic).Dec()
nc.inboundProcessTime.WithLabelValues(normalizedTopic).Add(duration.Seconds())
}

// OutboundConnections updates the metric tracking the number of outbound connections of this node
Expand Down Expand Up @@ -353,11 +362,13 @@ func (nc *NetworkCollector) OnDNSLookupRequestDropped() {
}

// OnUnauthorizedMessage tracks the number of unauthorized messages seen on the network.
// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth.
func (nc *NetworkCollector) OnUnauthorizedMessage(role, msgType, topic, offense string) {
nc.unAuthorizedMessagesCount.WithLabelValues(role, msgType, topic, offense).Inc()
nc.unAuthorizedMessagesCount.WithLabelValues(role, msgType, channels.NormalizeTopicForMetrics(topic), offense).Inc()
}

// OnRateLimitedPeer tracks the number of rate limited messages seen on the network.
// Cluster topics are normalized to their prefix to prevent unbounded cardinality growth.
func (nc *NetworkCollector) OnRateLimitedPeer(peerID peer.ID, role, msgType, topic, reason string) {
nc.logger.Warn().
Str("peer_id", logging2.PeerId(peerID)).
Expand All @@ -367,7 +378,7 @@ func (nc *NetworkCollector) OnRateLimitedPeer(peerID peer.ID, role, msgType, top
Str("reason", reason).
Bool(logging.KeySuspicious, true).
Msg("unicast peer rate limited")
nc.rateLimitedUnicastMessagesCount.WithLabelValues(role, msgType, topic, reason).Inc()
nc.rateLimitedUnicastMessagesCount.WithLabelValues(role, msgType, channels.NormalizeTopicForMetrics(topic), reason).Inc()
}

// OnViolationReportSkipped tracks the number of slashing violations consumer violations that were not
Expand Down
172 changes: 172 additions & 0 deletions module/metrics/network_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package metrics

import (
"fmt"
"testing"

"github.com/prometheus/client_golang/prometheus"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestNetworkCollector_ClusterTopicNormalization verifies that cluster topics are normalized
// to their prefix (e.g., "sync-cluster-*" -> "sync-cluster") to prevent unbounded metric
// cardinality growth during epoch transitions.
func TestNetworkCollector_ClusterTopicNormalization(t *testing.T) {
prefix := fmt.Sprintf("test_%s_", t.Name())
logger := zerolog.Nop()
nc := NewNetworkCollector(logger, WithNetworkPrefix(prefix))

// These cluster topics should be normalized to their prefix
clusterTopic1 := "sync-cluster-test-cluster-id-1"
clusterTopic2 := "sync-cluster-test-cluster-id-2"
consensusClusterTopic := "consensus-cluster-test-cluster-id"
nonClusterTopic := "blocks"
protocol := "pubsub"
msgType := "BlockProposal"

// Record metrics for multiple cluster topics and a non-cluster topic
nc.InboundMessageReceived(100, clusterTopic1, protocol, msgType)
nc.InboundMessageReceived(200, clusterTopic2, protocol, msgType)
nc.InboundMessageReceived(300, consensusClusterTopic, protocol, msgType)
nc.InboundMessageReceived(400, nonClusterTopic, protocol, msgType)

nc.OutboundMessageSent(150, clusterTopic1, protocol, msgType)
nc.OutboundMessageSent(250, clusterTopic2, protocol, msgType)
nc.OutboundMessageSent(350, consensusClusterTopic, protocol, msgType)
nc.OutboundMessageSent(450, nonClusterTopic, protocol, msgType)

nc.DuplicateInboundMessagesDropped(clusterTopic1, protocol, msgType)
nc.DuplicateInboundMessagesDropped(clusterTopic2, protocol, msgType)
nc.DuplicateInboundMessagesDropped(consensusClusterTopic, protocol, msgType)
nc.DuplicateInboundMessagesDropped(nonClusterTopic, protocol, msgType)

// Verify cluster topics are normalized to their prefix
// Both sync-cluster topics should be recorded under "sync-cluster"
assertHistogramVecHasLabel(t, nc.inboundMessageSize, LabelChannel, "sync-cluster", "sync-cluster topics should be normalized to 'sync-cluster'")
assertHistogramVecHasLabel(t, nc.inboundMessageSize, LabelChannel, "consensus-cluster", "consensus-cluster topics should be normalized to 'consensus-cluster'")

// Verify the original cluster topic names are NOT present (they should be normalized)
assertHistogramVecNotHasLabel(t, nc.inboundMessageSize, LabelChannel, clusterTopic1, "original cluster topic name should not be present")
assertHistogramVecNotHasLabel(t, nc.inboundMessageSize, LabelChannel, clusterTopic2, "original cluster topic name should not be present")
assertHistogramVecNotHasLabel(t, nc.inboundMessageSize, LabelChannel, consensusClusterTopic, "original consensus-cluster topic name should not be present")

// Verify non-cluster topics are NOT normalized (they should keep their original name)
assertHistogramVecHasLabel(t, nc.inboundMessageSize, LabelChannel, nonClusterTopic, "non-cluster topics should keep their original name")
}

// TestLocalGossipSubRouterMetrics_ClusterTopicNormalization verifies that LocalGossipSubRouterMetrics
// normalizes cluster topics to their prefix.
func TestLocalGossipSubRouterMetrics_ClusterTopicNormalization(t *testing.T) {
prefix := fmt.Sprintf("test_%s_", t.Name())
m := NewGossipSubLocalMeshMetrics(prefix)

clusterTopic1 := "sync-cluster-cluster-1"
clusterTopic2 := "sync-cluster-cluster-2"
consensusClusterTopic := "consensus-cluster-cluster-1"
nonClusterTopic := "blocks"

// Record metrics for cluster and non-cluster topics
m.OnLocalMeshSizeUpdated(clusterTopic1, 5)
m.OnLocalMeshSizeUpdated(clusterTopic2, 3)
m.OnLocalMeshSizeUpdated(consensusClusterTopic, 4)
m.OnLocalMeshSizeUpdated(nonClusterTopic, 2)

m.OnPeerGraftTopic(clusterTopic1)
m.OnPeerGraftTopic(clusterTopic2)
m.OnPeerGraftTopic(consensusClusterTopic)
m.OnPeerGraftTopic(nonClusterTopic)

m.OnPeerPruneTopic(clusterTopic1)
m.OnPeerPruneTopic(clusterTopic2)
m.OnPeerPruneTopic(consensusClusterTopic)
m.OnPeerPruneTopic(nonClusterTopic)

// Verify cluster topics are normalized to their prefix
assertGaugeVecHasLabel(t, &m.localMeshSize, LabelChannel, "sync-cluster", "sync-cluster topics should be normalized")
assertGaugeVecHasLabel(t, &m.localMeshSize, LabelChannel, "consensus-cluster", "consensus-cluster topics should be normalized")

// Verify original cluster topic names are NOT present
assertGaugeVecNotHasLabel(t, &m.localMeshSize, LabelChannel, clusterTopic1, "original cluster topic should not be present")
assertGaugeVecNotHasLabel(t, &m.localMeshSize, LabelChannel, clusterTopic2, "original cluster topic should not be present")
assertGaugeVecNotHasLabel(t, &m.localMeshSize, LabelChannel, consensusClusterTopic, "original consensus-cluster topic should not be present")

// Verify non-cluster topics are NOT normalized
assertGaugeVecHasLabel(t, &m.localMeshSize, LabelChannel, nonClusterTopic, "non-cluster topics should keep their original name")
}

// assertHistogramVecHasLabel checks that the histogram vec has at least one metric with the given label value.
func assertHistogramVecHasLabel(t *testing.T, hv *prometheus.HistogramVec, labelName, labelValue, msg string) {
t.Helper()
found := histogramVecHasLabelValue(t, hv, labelName, labelValue)
assert.True(t, found, msg)
}

// assertHistogramVecNotHasLabel checks that the histogram vec has no metrics with the given label value.
func assertHistogramVecNotHasLabel(t *testing.T, hv *prometheus.HistogramVec, labelName, labelValue, msg string) {
t.Helper()
found := histogramVecHasLabelValue(t, hv, labelName, labelValue)
assert.False(t, found, msg)
}

// assertGaugeVecHasLabel checks that the gauge vec has at least one metric with the given label value.
func assertGaugeVecHasLabel(t *testing.T, gv *prometheus.GaugeVec, labelName, labelValue, msg string) {
t.Helper()
found := gaugeVecHasLabelValue(t, gv, labelName, labelValue)
assert.True(t, found, msg)
}

// assertGaugeVecNotHasLabel checks that the gauge vec has no metrics with the given label value.
func assertGaugeVecNotHasLabel(t *testing.T, gv *prometheus.GaugeVec, labelName, labelValue, msg string) {
t.Helper()
found := gaugeVecHasLabelValue(t, gv, labelName, labelValue)
assert.False(t, found, msg)
}

// histogramVecHasLabelValue collects metrics from the histogram vec and checks if any have the given label value.
func histogramVecHasLabelValue(t *testing.T, hv *prometheus.HistogramVec, labelName, labelValue string) bool {
t.Helper()
ch := make(chan prometheus.Metric, 100)
go func() {
hv.Collect(ch)
close(ch)
}()

for metric := range ch {
var m io_prometheus_client.Metric
err := metric.Write(&m)
require.NoError(t, err)

for _, label := range m.GetLabel() {
if label.GetName() == labelName && label.GetValue() == labelValue {
return true
}
}
}
return false
}

// gaugeVecHasLabelValue collects metrics from the gauge vec and checks if any have the given label value.
func gaugeVecHasLabelValue(t *testing.T, gv *prometheus.GaugeVec, labelName, labelValue string) bool {
t.Helper()
ch := make(chan prometheus.Metric, 100)
go func() {
gv.Collect(ch)
close(ch)
}()

for metric := range ch {
var m io_prometheus_client.Metric
err := metric.Write(&m)
require.NoError(t, err)

for _, label := range m.GetLabel() {
if label.GetName() == labelName && label.GetValue() == labelValue {
return true
}
}
}
return false
}
Loading
Loading