From 292ec466a1d831fe30bd8ba7115162d35f785cc8 Mon Sep 17 00:00:00 2001 From: Tomelia1999 Date: Sun, 29 Mar 2026 18:11:01 +0300 Subject: [PATCH] penalize non synced providers by probe --- .../lavasession/consumer_session_manager.go | 2 +- protocol/lavasession/consumer_types.go | 2 +- .../provideroptimizer/provider_optimizer.go | 17 ++- .../provider_optimizer_test.go | 105 ++++++++++++++++-- 4 files changed, 114 insertions(+), 12 deletions(-) diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index b5c4313b13..2db089ab64 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -547,7 +547,7 @@ func (csm *ConsumerSessionManager) probeProviders(ctx context.Context, pairingLi // PHASE 2a — feed metrics + optimizer with per-provider results. for _, r := range results { csm.consumerMetricsManager.SetProviderLiveness(csm.rpcEndpoint.ChainID, r.ProviderAddress, r.NetworkAddress, r.Success) - csm.providerOptimizer.AppendProbeRelayData(r.ProviderAddress, r.Latency, r.Success) + csm.providerOptimizer.AppendProbeRelayData(r.ProviderAddress, r.Latency, r.Success, uint64(r.LatestBlock)) } // PHASE 2b — compute majorityBaseline consensus from collected block heights. diff --git a/protocol/lavasession/consumer_types.go b/protocol/lavasession/consumer_types.go index 2c49b6b737..7eeecd60a0 100644 --- a/protocol/lavasession/consumer_types.go +++ b/protocol/lavasession/consumer_types.go @@ -125,7 +125,7 @@ type SessionInfo struct { type ConsumerSessionsMap map[string]*SessionInfo type ProviderOptimizer interface { - AppendProbeRelayData(providerAddress string, latency time.Duration, success bool) + AppendProbeRelayData(providerAddress string, latency time.Duration, success bool, syncBlock uint64) AppendRelayFailure(providerAddress string) AppendRelayData(providerAddress string, latency time.Duration, cu, syncBlock uint64) ChooseProvider(ctx context.Context, allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (addresses []string) diff --git a/protocol/provideroptimizer/provider_optimizer.go b/protocol/provideroptimizer/provider_optimizer.go index 03fe178535..4538ec8671 100644 --- a/protocol/provideroptimizer/provider_optimizer.go +++ b/protocol/provideroptimizer/provider_optimizer.go @@ -306,14 +306,13 @@ func (po *ProviderOptimizer) appendRelayData(provider string, latency time.Durat } // AppendProbeRelayData updates a provider's QoS metrics for a probe relay message -func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latency time.Duration, success bool) { +func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latency time.Duration, success bool, syncBlock uint64) { providerData, _ := po.getProviderData(providerAddress) sampleTime := po.now() halfTime := po.calculateHalfTime(providerAddress, sampleTime) weight := score.ProbeUpdateWeight var updateErr error if success { - // update latency only on success providerData, updateErr = po.updateDecayingWeightedAverage(providerData, score.AvailabilityScoreType, 1, weight, halfTime, 0, sampleTime) if updateErr != nil { return @@ -322,6 +321,19 @@ func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latenc if updateErr != nil { return } + // Sync scoring: mirror the sync path from appendRelayData(). + // Skip when syncBlock=0 (static providers, failed probes). + if syncBlock > 0 { + latestSync, timeSync := po.updateLatestSyncData(syncBlock, sampleTime) + if syncBlock > providerData.SyncBlock { + providerData.SyncBlock = syncBlock + } + syncLag := po.calculateSyncLag(latestSync, timeSync, providerData.SyncBlock, sampleTime) + providerData, updateErr = po.updateDecayingWeightedAverage(providerData, score.SyncScoreType, syncLag.Seconds(), weight, halfTime, 0, sampleTime) + if updateErr != nil { + return + } + } } else { providerData, updateErr = po.updateDecayingWeightedAverage(providerData, score.AvailabilityScoreType, 0, weight, halfTime, 0, sampleTime) if updateErr != nil { @@ -334,6 +346,7 @@ func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latenc utils.LogAttr("providerAddress", providerAddress), utils.LogAttr("latency", latency), utils.LogAttr("success", success), + utils.LogAttr("syncBlock", syncBlock), ) } diff --git a/protocol/provideroptimizer/provider_optimizer_test.go b/protocol/provideroptimizer/provider_optimizer_test.go index 619c1d0fb4..5ec05a4d8b 100644 --- a/protocol/provideroptimizer/provider_optimizer_test.go +++ b/protocol/provideroptimizer/provider_optimizer_test.go @@ -84,17 +84,17 @@ func TestProviderOptimizerBasicProbeData(t *testing.T) { // damage providers 5-7 scores with bad latency probes relays // they should be selected less often due to lower weighted scores badLatency := TEST_BASE_WORLD_LATENCY * 3 - providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[5], badLatency, true) - providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[6], badLatency, true) - providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[7], badLatency, true) + providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[5], badLatency, true, 0) + providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[6], badLatency, true, 0) + providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[7], badLatency, true, 0) time.Sleep(4 * time.Millisecond) // improve providers 0-2 scores with good latency probes relays // they should be selected by the optimizer more often goodLatency := TEST_BASE_WORLD_LATENCY / 2 - providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[0], goodLatency, true) - providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[1], goodLatency, true) - providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[2], goodLatency, true) + providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[0], goodLatency, true, 0) + providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[1], goodLatency, true, 0) + providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[2], goodLatency, true, 0) time.Sleep(4 * time.Millisecond) results := runChooseManyTimesAndReturnResults(t, providerOptimizer, providersGen.providersAddresses, nil, 1000, cu, requestBlock) @@ -313,7 +313,7 @@ func TestProviderOptimizerUpdatingLatency(t *testing.T) { syncBlock := uint64(requestBlock) // add an average latency probe relay to determine average score - providerOptimizer.AppendProbeRelayData(providerAddress, TEST_BASE_WORLD_LATENCY, true) + providerOptimizer.AppendProbeRelayData(providerAddress, TEST_BASE_WORLD_LATENCY, true, 0) time.Sleep(4 * time.Millisecond) // add good latency probe relays, score should improve @@ -325,7 +325,7 @@ func TestProviderOptimizerUpdatingLatency(t *testing.T) { require.NoError(t, err) // add good latency probe - providerOptimizer.AppendProbeRelayData(providerAddress, TEST_BASE_WORLD_LATENCY/10, true) + providerOptimizer.AppendProbeRelayData(providerAddress, TEST_BASE_WORLD_LATENCY/10, true, 0) time.Sleep(4 * time.Millisecond) // check score again and compare to the last score @@ -1366,3 +1366,92 @@ func TestResetLatestSyncDataIfOutlier_ZeroBlockUnchanged(t *testing.T) { require.Equal(t, uint64(0), optimizer.latestSyncData.Block, "zero block should not be touched") } + +// --- AppendProbeRelayData sync scoring tests (Step 3, Task 2.1) --- + +func TestAppendProbeRelayData_SyncBlockUpdatesGlobalChainHead(t *testing.T) { + optimizer := NewProviderOptimizer(StrategyBalanced, TEST_AVERAGE_BLOCK_TIME, 1, nil, "test", nil) + providerAddress := "provider1" + + // Probe with syncBlock=1000 should update the global chain head + optimizer.AppendProbeRelayData(providerAddress, TEST_BASE_WORLD_LATENCY, true, 1000) + time.Sleep(4 * time.Millisecond) + + // Verify global chain head was updated + optimizer.latestSyncData.Lock.Lock() + block := optimizer.latestSyncData.Block + optimizer.latestSyncData.Lock.Unlock() + require.Equal(t, uint64(1000), block) +} + +func TestAppendProbeRelayData_ProviderBehindGetsHigherSyncLag(t *testing.T) { + optimizer := NewProviderOptimizer(StrategyBalanced, TEST_AVERAGE_BLOCK_TIME, 1, nil, "test", nil) + syncedProvider := "synced" + behindProvider := "behind" + + // Provider A is fully synced at block 1000 + optimizer.AppendProbeRelayData(syncedProvider, TEST_BASE_WORLD_LATENCY, true, 1000) + time.Sleep(4 * time.Millisecond) + + // Provider B is behind at block 990 (10 blocks behind) + optimizer.AppendProbeRelayData(behindProvider, TEST_BASE_WORLD_LATENCY, true, 990) + time.Sleep(4 * time.Millisecond) + + // Provider A should have better (lower) sync score than Provider B + syncedData, _ := optimizer.getProviderData(syncedProvider) + behindData, _ := optimizer.getProviderData(behindProvider) + + // Sync score stores lag in seconds via EWMA — lower is better + // Synced provider: lag=0 (at chain head). Behind provider: lag = 10 blocks × avgBlockTime + syncedLag, _ := syncedData.Sync.Resolve() + behindLag, _ := behindData.Sync.Resolve() + require.Less(t, syncedLag, behindLag, "synced provider should have lower sync lag than behind provider") +} + +func TestAppendProbeRelayData_SyncBlockZeroSkipsSyncScoring(t *testing.T) { + optimizer := NewProviderOptimizer(StrategyBalanced, TEST_AVERAGE_BLOCK_TIME, 1, nil, "test", nil) + providerAddress := "provider1" + + // Probe with syncBlock=0 (static provider / failed probe) + optimizer.AppendProbeRelayData(providerAddress, TEST_BASE_WORLD_LATENCY, true, 0) + time.Sleep(4 * time.Millisecond) + + // Global chain head should NOT be updated + optimizer.latestSyncData.Lock.Lock() + block := optimizer.latestSyncData.Block + optimizer.latestSyncData.Lock.Unlock() + require.Equal(t, uint64(0), block, "syncBlock=0 should not update global chain head") + + // Provider's SyncBlock should remain at default (0) + providerData, _ := optimizer.getProviderData(providerAddress) + require.Equal(t, uint64(0), providerData.SyncBlock, "syncBlock=0 should not update provider SyncBlock") +} + +func TestAppendProbeRelayData_RelayWeightDominatesProbeWeight(t *testing.T) { + optimizer := NewProviderOptimizer(StrategyBalanced, TEST_AVERAGE_BLOCK_TIME, 1, nil, "test", nil) + behindProvider := "behind" + syncedProvider := "synced" + + // Probe seeds: behind provider at block 990, synced provider at block 1000 + // This sets global chain head to 1000 + optimizer.AppendProbeRelayData(syncedProvider, TEST_BASE_WORLD_LATENCY, true, 1000) + optimizer.AppendProbeRelayData(behindProvider, TEST_BASE_WORLD_LATENCY, true, 990) + time.Sleep(4 * time.Millisecond) + + // Behind provider has sync lag from probe (weight 0.25) + probeData, _ := optimizer.getProviderData(behindProvider) + probeSyncLag, _ := probeData.Sync.Resolve() + require.Greater(t, probeSyncLag, 0.0, "behind provider should have non-zero sync lag from probe") + + // Now relay confirms behind provider is still at 990 (weight 1.0) + // Relay should reinforce the lag with 4x the weight + optimizer.AppendRelayData(behindProvider, TEST_BASE_WORLD_LATENCY, 10, 990) + time.Sleep(4 * time.Millisecond) + + relayData, _ := optimizer.getProviderData(behindProvider) + relaySyncLag, _ := relayData.Sync.Resolve() + + // After relay with higher weight, the sync lag should be higher (more confident) + // because relay weight (1.0) reinforces the lag signal stronger than probe (0.25) + require.Greater(t, relaySyncLag, probeSyncLag, "relay data should reinforce sync lag with higher weight") +}