Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func (csm *ConsumerSessionManager) probeProviders(ctx context.Context, pairingLi
// PHASE 2a — feed metrics + optimizer with per-provider results.
for _, r := range results {
csm.consumerMetricsManager.SetProviderLiveness(csm.rpcEndpoint.ChainID, r.ProviderAddress, r.NetworkAddress, r.Success)
csm.providerOptimizer.AppendProbeRelayData(r.ProviderAddress, r.Latency, r.Success)
csm.providerOptimizer.AppendProbeRelayData(r.ProviderAddress, r.Latency, r.Success, uint64(r.LatestBlock))
}

// PHASE 2b — compute majorityBaseline consensus from collected block heights.
Expand Down
2 changes: 1 addition & 1 deletion protocol/lavasession/consumer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type SessionInfo struct {
type ConsumerSessionsMap map[string]*SessionInfo

type ProviderOptimizer interface {
AppendProbeRelayData(providerAddress string, latency time.Duration, success bool)
AppendProbeRelayData(providerAddress string, latency time.Duration, success bool, syncBlock uint64)
AppendRelayFailure(providerAddress string)
AppendRelayData(providerAddress string, latency time.Duration, cu, syncBlock uint64)
ChooseProvider(ctx context.Context, allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (addresses []string)
Expand Down
17 changes: 15 additions & 2 deletions protocol/provideroptimizer/provider_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,14 +306,13 @@ func (po *ProviderOptimizer) appendRelayData(provider string, latency time.Durat
}

// AppendProbeRelayData updates a provider's QoS metrics for a probe relay message
func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latency time.Duration, success bool) {
func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latency time.Duration, success bool, syncBlock uint64) {
providerData, _ := po.getProviderData(providerAddress)
sampleTime := po.now()
halfTime := po.calculateHalfTime(providerAddress, sampleTime)
weight := score.ProbeUpdateWeight
var updateErr error
if success {
// update latency only on success
providerData, updateErr = po.updateDecayingWeightedAverage(providerData, score.AvailabilityScoreType, 1, weight, halfTime, 0, sampleTime)
if updateErr != nil {
return
Expand All @@ -322,6 +321,19 @@ func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latenc
if updateErr != nil {
return
}
// Sync scoring: mirror the sync path from appendRelayData().
// Skip when syncBlock=0 (static providers, failed probes).
if syncBlock > 0 {
latestSync, timeSync := po.updateLatestSyncData(syncBlock, sampleTime)
if syncBlock > providerData.SyncBlock {
providerData.SyncBlock = syncBlock
}
syncLag := po.calculateSyncLag(latestSync, timeSync, providerData.SyncBlock, sampleTime)
providerData, updateErr = po.updateDecayingWeightedAverage(providerData, score.SyncScoreType, syncLag.Seconds(), weight, halfTime, 0, sampleTime)
if updateErr != nil {
return
}
}
} else {
providerData, updateErr = po.updateDecayingWeightedAverage(providerData, score.AvailabilityScoreType, 0, weight, halfTime, 0, sampleTime)
if updateErr != nil {
Expand All @@ -334,6 +346,7 @@ func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latenc
utils.LogAttr("providerAddress", providerAddress),
utils.LogAttr("latency", latency),
utils.LogAttr("success", success),
utils.LogAttr("syncBlock", syncBlock),
)
}

Expand Down
105 changes: 97 additions & 8 deletions protocol/provideroptimizer/provider_optimizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@ func TestProviderOptimizerBasicProbeData(t *testing.T) {
// damage providers 5-7 scores with bad latency probes relays
// they should be selected less often due to lower weighted scores
badLatency := TEST_BASE_WORLD_LATENCY * 3
providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[5], badLatency, true)
providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[6], badLatency, true)
providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[7], badLatency, true)
providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[5], badLatency, true, 0)
providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[6], badLatency, true, 0)
providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[7], badLatency, true, 0)
time.Sleep(4 * time.Millisecond)

// improve providers 0-2 scores with good latency probes relays
// they should be selected by the optimizer more often
goodLatency := TEST_BASE_WORLD_LATENCY / 2
providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[0], goodLatency, true)
providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[1], goodLatency, true)
providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[2], goodLatency, true)
providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[0], goodLatency, true, 0)
providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[1], goodLatency, true, 0)
providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[2], goodLatency, true, 0)
time.Sleep(4 * time.Millisecond)
results := runChooseManyTimesAndReturnResults(t, providerOptimizer, providersGen.providersAddresses, nil, 1000, cu, requestBlock)

Expand Down Expand Up @@ -313,7 +313,7 @@ func TestProviderOptimizerUpdatingLatency(t *testing.T) {
syncBlock := uint64(requestBlock)

// add an average latency probe relay to determine average score
providerOptimizer.AppendProbeRelayData(providerAddress, TEST_BASE_WORLD_LATENCY, true)
providerOptimizer.AppendProbeRelayData(providerAddress, TEST_BASE_WORLD_LATENCY, true, 0)
time.Sleep(4 * time.Millisecond)

// add good latency probe relays, score should improve
Expand All @@ -325,7 +325,7 @@ func TestProviderOptimizerUpdatingLatency(t *testing.T) {
require.NoError(t, err)

// add good latency probe
providerOptimizer.AppendProbeRelayData(providerAddress, TEST_BASE_WORLD_LATENCY/10, true)
providerOptimizer.AppendProbeRelayData(providerAddress, TEST_BASE_WORLD_LATENCY/10, true, 0)
time.Sleep(4 * time.Millisecond)

// check score again and compare to the last score
Expand Down Expand Up @@ -1366,3 +1366,92 @@ func TestResetLatestSyncDataIfOutlier_ZeroBlockUnchanged(t *testing.T) {

require.Equal(t, uint64(0), optimizer.latestSyncData.Block, "zero block should not be touched")
}

// --- AppendProbeRelayData sync scoring tests (Step 3, Task 2.1) ---

func TestAppendProbeRelayData_SyncBlockUpdatesGlobalChainHead(t *testing.T) {
optimizer := NewProviderOptimizer(StrategyBalanced, TEST_AVERAGE_BLOCK_TIME, 1, nil, "test", nil)
providerAddress := "provider1"

// Probe with syncBlock=1000 should update the global chain head
optimizer.AppendProbeRelayData(providerAddress, TEST_BASE_WORLD_LATENCY, true, 1000)
time.Sleep(4 * time.Millisecond)

// Verify global chain head was updated
optimizer.latestSyncData.Lock.Lock()
block := optimizer.latestSyncData.Block
optimizer.latestSyncData.Lock.Unlock()
require.Equal(t, uint64(1000), block)
}

func TestAppendProbeRelayData_ProviderBehindGetsHigherSyncLag(t *testing.T) {
optimizer := NewProviderOptimizer(StrategyBalanced, TEST_AVERAGE_BLOCK_TIME, 1, nil, "test", nil)
syncedProvider := "synced"
behindProvider := "behind"

// Provider A is fully synced at block 1000
optimizer.AppendProbeRelayData(syncedProvider, TEST_BASE_WORLD_LATENCY, true, 1000)
time.Sleep(4 * time.Millisecond)

// Provider B is behind at block 990 (10 blocks behind)
optimizer.AppendProbeRelayData(behindProvider, TEST_BASE_WORLD_LATENCY, true, 990)
time.Sleep(4 * time.Millisecond)

// Provider A should have better (lower) sync score than Provider B
syncedData, _ := optimizer.getProviderData(syncedProvider)
behindData, _ := optimizer.getProviderData(behindProvider)

// Sync score stores lag in seconds via EWMA — lower is better
// Synced provider: lag=0 (at chain head). Behind provider: lag = 10 blocks × avgBlockTime
syncedLag, _ := syncedData.Sync.Resolve()
behindLag, _ := behindData.Sync.Resolve()
require.Less(t, syncedLag, behindLag, "synced provider should have lower sync lag than behind provider")
}

func TestAppendProbeRelayData_SyncBlockZeroSkipsSyncScoring(t *testing.T) {
optimizer := NewProviderOptimizer(StrategyBalanced, TEST_AVERAGE_BLOCK_TIME, 1, nil, "test", nil)
providerAddress := "provider1"

// Probe with syncBlock=0 (static provider / failed probe)
optimizer.AppendProbeRelayData(providerAddress, TEST_BASE_WORLD_LATENCY, true, 0)
time.Sleep(4 * time.Millisecond)

// Global chain head should NOT be updated
optimizer.latestSyncData.Lock.Lock()
block := optimizer.latestSyncData.Block
optimizer.latestSyncData.Lock.Unlock()
require.Equal(t, uint64(0), block, "syncBlock=0 should not update global chain head")

// Provider's SyncBlock should remain at default (0)
providerData, _ := optimizer.getProviderData(providerAddress)
require.Equal(t, uint64(0), providerData.SyncBlock, "syncBlock=0 should not update provider SyncBlock")
}

func TestAppendProbeRelayData_RelayWeightDominatesProbeWeight(t *testing.T) {
optimizer := NewProviderOptimizer(StrategyBalanced, TEST_AVERAGE_BLOCK_TIME, 1, nil, "test", nil)
behindProvider := "behind"
syncedProvider := "synced"

// Probe seeds: behind provider at block 990, synced provider at block 1000
// This sets global chain head to 1000
optimizer.AppendProbeRelayData(syncedProvider, TEST_BASE_WORLD_LATENCY, true, 1000)
optimizer.AppendProbeRelayData(behindProvider, TEST_BASE_WORLD_LATENCY, true, 990)
time.Sleep(4 * time.Millisecond)

// Behind provider has sync lag from probe (weight 0.25)
probeData, _ := optimizer.getProviderData(behindProvider)
probeSyncLag, _ := probeData.Sync.Resolve()
require.Greater(t, probeSyncLag, 0.0, "behind provider should have non-zero sync lag from probe")

// Now relay confirms behind provider is still at 990 (weight 1.0)
// Relay should reinforce the lag with 4x the weight
optimizer.AppendRelayData(behindProvider, TEST_BASE_WORLD_LATENCY, 10, 990)
time.Sleep(4 * time.Millisecond)

relayData, _ := optimizer.getProviderData(behindProvider)
relaySyncLag, _ := relayData.Sync.Resolve()

// After relay with higher weight, the sync lag should be higher (more confident)
// because relay weight (1.0) reinforces the lag signal stronger than probe (0.25)
require.Greater(t, relaySyncLag, probeSyncLag, "relay data should reinforce sync lag with higher weight")
}
Loading