From 6837513652e466638aaedc178a48a51f548e3dc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Braz=CC=87ewicz?= Date: Thu, 28 May 2026 15:14:51 +0200 Subject: [PATCH 1/3] prevent connection flickering during rejoin --- packages/stream_video/lib/src/call/call.dart | 118 +++++++++++++++---- 1 file changed, 92 insertions(+), 26 deletions(-) diff --git a/packages/stream_video/lib/src/call/call.dart b/packages/stream_video/lib/src/call/call.dart index 117bee751..1fa6537cb 100644 --- a/packages/stream_video/lib/src/call/call.dart +++ b/packages/stream_video/lib/src/call/call.dart @@ -1856,7 +1856,17 @@ class Call { final reconnectStartTime = DateTime.now(); var fastReconnectAttemptsCount = 0; do { - _awaitNetworkAvailableFuture = _awaitNetworkAvailable(); + // Wait for a stable network before reconnecting with rejoin/migrate + // to prevent starting an SDP exchange on a transient connection that drops before the answer arrives. + final stabilityWindow = + (_reconnectStrategy == SfuReconnectionStrategy.rejoin || + _reconnectStrategy == SfuReconnectionStrategy.migrate) + ? const Duration(seconds: 3) + : Duration.zero; + + _awaitNetworkAvailableFuture = _awaitNetworkAvailable( + stabilityWindow: stabilityWindow, + ); if (state.value.preferences.reconnectTimeout > Duration.zero) { final elapsed = DateTime.now().difference(reconnectStartTime); @@ -2058,40 +2068,96 @@ class Call { } } - Future _awaitNetworkAvailable() async { + /// Waits until the network becomes available **and** stays connected for + /// [stabilityWindow]. When [stabilityWindow] is [Duration.zero] (the + /// default), the method returns as soon as connectivity is detected. + /// + /// The total time spent in this method is bounded by + /// [CallPreferences.networkAvailabilityTimeout]. If the network keeps + /// flickering (connecting then dropping within the stability window), + /// the remaining budget shrinks on each iteration until it is exhausted. + Future _awaitNetworkAvailable({ + Duration stabilityWindow = Duration.zero, + }) async { final previousCheckInterval = networkMonitor.checkInterval; + final budget = state.value.preferences.networkAvailabilityTimeout; + final deadline = Stopwatch()..start(); + try { networkMonitor.setIntervalAndResetTimer( _streamVideo.options.networkMonitorSettings.offlineCheckInterval, ); - final networkFuture = networkMonitor.onStatusChange - .startWithFuture(networkMonitor.internetStatus) - .firstWhere((status) => status == InternetStatus.connected) - .timeout( - state.value.preferences.networkAvailabilityTimeout, - onTimeout: () { - _logger.w(() => '[_awaitNetworkAvailable] timeout'); - return InternetStatus.disconnected; - }, + while (true) { + final remaining = budget - deadline.elapsed; + if (remaining <= Duration.zero) { + _logger.w( + () => '[_awaitNetworkAvailable] total budget exhausted', ); + return InternetStatus.disconnected; + } - final lifecycleFuture = _callLifecycleCompleter.future.then((_) { - _logger.w(() => '[_awaitNetworkAvailable] call was left'); - return InternetStatus.disconnected; - }); + final networkFuture = networkMonitor.onStatusChange + .startWithFuture(networkMonitor.internetStatus) + .firstWhere((status) => status == InternetStatus.connected) + .timeout( + remaining, + onTimeout: () { + _logger.w(() => '[_awaitNetworkAvailable] timeout'); + return InternetStatus.disconnected; + }, + ); + + final lifecycleFuture = _callLifecycleCompleter.future.then((_) { + _logger.w(() => '[_awaitNetworkAvailable] call was left'); + return InternetStatus.disconnected; + }); + + // Race the network future against the call lifecycle cancellable + // to ensure we don't wait for the network if the call was left + final connectionStatus = + await Future.any([ + networkFuture, + lifecycleFuture, + ]) + .asCancelable() + .storeIn(_idFastReconnectTimeout, _cancelables) + .valueOrDefault(InternetStatus.disconnected); + + if (connectionStatus == InternetStatus.disconnected) { + return connectionStatus; + } + + if (stabilityWindow <= Duration.zero) { + return connectionStatus; + } - // Race the network future against the call lifecycle cancellable - // to ensure we don't wait for the network if the call was left - final connectionStatus = - await Future.any([ - networkFuture, - lifecycleFuture, - ]) - .asCancelable() - .storeIn(_idFastReconnectTimeout, _cancelables) - .valueOrDefault(InternetStatus.disconnected); - return connectionStatus; + // Verify the connection holds for the full stability window. + try { + await networkMonitor.onStatusChange + .where((s) => s == InternetStatus.disconnected) + .first + .timeout(stabilityWindow); + + // Stream emitted before timeout → network dropped during window. + _logger.w( + () => + '[_awaitNetworkAvailable] network dropped during ' + '${stabilityWindow.inSeconds}s stability window, retrying', + ); + _session?.trace('awaitNetwork.unstable', { + 'stabilityWindowSeconds': stabilityWindow.inSeconds, + }); + } on TimeoutException { + // No drop detected within the window — network is stable. + _logger.v( + () => + '[_awaitNetworkAvailable] network stable for ' + '${stabilityWindow.inSeconds}s', + ); + return InternetStatus.connected; + } + } } finally { networkMonitor.setIntervalAndResetTimer(previousCheckInterval); } From fc70c0dbb4ac4e02f03174c6b3c1d3ad77f0dac6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Braz=CC=87ewicz?= Date: Thu, 28 May 2026 15:16:21 +0200 Subject: [PATCH 2/3] changelog --- packages/stream_video/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/stream_video/CHANGELOG.md b/packages/stream_video/CHANGELOG.md index 97ec04e0d..907a5f0c4 100644 --- a/packages/stream_video/CHANGELOG.md +++ b/packages/stream_video/CHANGELOG.md @@ -9,6 +9,7 @@ Each call now owns an isolated native PeerConnectionFactory — fixes cross-call ### 🐞 Fixed +- Fixed connection flickering causing rejoin flow to fail in some cases. - Fixed sibling-call audio capture being silently broken when another concurrently-active call ended (e.g. a 1:1 ringing call ending alongside a running livestream, or a previous ringing call ending before a new one was accepted). - Fixed a sibling call's audio breaking when a ringing 1:1 call ended via `dropIfAloneInRingingFlow` (the remote party hung up first). `Call.end()` and `Call.leave()` now share a single `_disconnect` cleanup path. - Made the audio processor teardown in `Call._clear` multi-call aware. The audio processor is owned by `StreamVideo`, not by an individual `Call`, so disabling it on one call's teardown silently dropped noise cancellation on any other still-active call. `_clear` now only stops the global processor when no other active call is configured to use `NoiceCancellationSettingsMode.autoOn`. From 843216d527d981f03846210ab932ee79a49ffb1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Braz=CC=87ewicz?= Date: Thu, 28 May 2026 23:13:53 +0200 Subject: [PATCH 3/3] unit tests --- .../call/call_reconnect_stability_test.dart | 377 ++++++++++++++++++ 1 file changed, 377 insertions(+) create mode 100644 packages/stream_video/test/src/call/call_reconnect_stability_test.dart diff --git a/packages/stream_video/test/src/call/call_reconnect_stability_test.dart b/packages/stream_video/test/src/call/call_reconnect_stability_test.dart new file mode 100644 index 000000000..c0e1655b0 --- /dev/null +++ b/packages/stream_video/test/src/call/call_reconnect_stability_test.dart @@ -0,0 +1,377 @@ +// ignore_for_file: avoid_redundant_argument_values + +import 'package:flutter_test/flutter_test.dart'; +import 'package:internet_connection_checker_plus/internet_connection_checker_plus.dart'; +import 'package:mocktail/mocktail.dart'; +import 'package:rxdart/rxdart.dart'; +import 'package:stream_video/src/call/state/call_state_notifier.dart'; +import 'package:stream_video/src/webrtc/peer_connection.dart'; +import 'package:stream_video/stream_video.dart'; + +import '../../test_helpers.dart'; +import 'fixtures/call_test_helpers.dart'; +import 'fixtures/data.dart'; + +class _MockPeerConnection extends Mock implements StreamPeerConnection {} + +void main() { + setUpAll(() { + TestWidgetsFlutterBinding.ensureInitialized(); + registerMockFallbackValues(); + }); + + group('Call reconnect network stability window', () { + late BehaviorSubject internetStatusController; + late MockCoordinatorClient coordinatorClient; + late MockCallSession callSession; + late MockSessionFactory sessionFactory; + late _MockPeerConnection mockPc; + OnReconnectionNeeded? capturedCallback; + + setUp(() { + internetStatusController = BehaviorSubject.seeded( + InternetStatus.connected, + ); + coordinatorClient = setupMockCoordinatorClient(); + callSession = setupMockCallSession(); + capturedCallback = null; + + // Build a session factory mock that captures the + // [onReconnectionNeeded] callback supplied by Call so tests can + // trigger reconnect with any strategy directly. + sessionFactory = MockSessionFactory(); + when(() => sessionFactory.sdpEditor).thenReturn(MockSdpEditor()); + when( + () => sessionFactory.makeCallSession( + onSuspendedAudioTrackRecorded: any( + named: 'onSuspendedAudioTrackRecorded', + ), + sessionId: any(named: 'sessionId'), + sessionSeq: any(named: 'sessionSeq'), + credentials: any(named: 'credentials'), + stateManager: any(named: 'stateManager'), + dynascaleManager: any(named: 'dynascaleManager'), + networkMonitor: any(named: 'networkMonitor'), + statsOptions: any(named: 'statsOptions'), + onReconnectionNeeded: any(named: 'onReconnectionNeeded'), + clientPublishOptions: any(named: 'clientPublishOptions'), + streamVideo: any(named: 'streamVideo'), + leftoverTraceRecords: any(named: 'leftoverTraceRecords'), + pcFactory: any(named: 'pcFactory'), + ), + ).thenAnswer((invocation) { + capturedCallback = + invocation.namedArguments[const Symbol('onReconnectionNeeded')] + as OnReconnectionNeeded; + return Future.value(callSession); + }); + + mockPc = _MockPeerConnection(); + when(() => mockPc.type).thenReturn(StreamPeerType.publisher); + }); + + tearDown(() async { + await internetStatusController.close(); + }); + + Call buildCall({CallStateNotifier? stateManager}) { + return createTestCall( + stateManager: stateManager, + networkMonitor: setupMockInternetConnection( + statusStream: internetStatusController, + ), + coordinatorClient: coordinatorClient, + sessionFactory: sessionFactory, + ); + } + + test( + 'fast reconnect proceeds without waiting the 3s stability window', + () async { + final call = buildCall(); + + final joinResult = await call.join(); + expect(joinResult.isSuccess, isTrue); + + // Toggle network to trigger fast reconnect via _observeReconnectEvents. + final stopwatch = Stopwatch()..start(); + internetStatusController.add(InternetStatus.disconnected); + await Future.delayed(const Duration(milliseconds: 10)); + internetStatusController.add(InternetStatus.connected); + + // fastReconnect must be invoked well before the 3s stability window + // that the rejoin/migrate path applies. Allow generous slack for + // scheduler jitter in CI but stay safely under 3s. + await Future.delayed(const Duration(milliseconds: 500)); + stopwatch.stop(); + + verify( + () => callSession.fastReconnect( + reconnectDetails: any(named: 'reconnectDetails'), + capabilities: any(named: 'capabilities'), + unifiedSessionId: any(named: 'unifiedSessionId'), + ), + ).called(1); + + expect( + stopwatch.elapsed, + lessThan(const Duration(seconds: 3)), + reason: 'fast strategy must not apply the 3s stability window', + ); + }, + ); + + test( + 'rejoin reconnect does not proceed before the stability window elapses', + () async { + final call = buildCall(); + + await call.join(); + expect(capturedCallback, isNotNull); + + // Initial join issues exactly one joinCall; the rejoin path will + // issue a second one — but only after the stability window expires. + verify( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: any(named: 'migratingFrom'), + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ).called(1); + + // Trigger a rejoin while the network is still reported as + // connected. The reconnect path must wait the stability window + // before re-issuing joinCall. + capturedCallback!(mockPc, SfuReconnectionStrategy.rejoin); + + await Future.delayed(const Duration(milliseconds: 500)); + + verifyNever( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: any(named: 'migratingFrom'), + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ); + }, + ); + + test( + 'rejoin reconnect proceeds once the network stays connected for the ' + 'stability window', + () async { + final call = buildCall(); + + await call.join(); + expect(capturedCallback, isNotNull); + + capturedCallback!(mockPc, SfuReconnectionStrategy.rejoin); + + // Wait past the 3s stability window with the network steady. + await Future.delayed(const Duration(milliseconds: 3500)); + + // Initial join + rejoin = 2 joinCall invocations. + verify( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: null, + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ).called(2); + }, + timeout: const Timeout(Duration(seconds: 10)), + ); + + test( + 'rejoin reconnect restarts the wait when network drops inside the ' + 'stability window', + () async { + final call = buildCall(); + + await call.join(); + expect(capturedCallback, isNotNull); + + // Consume the initial joinCall so subsequent verifications only + // observe rejoin-driven calls. + verify( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: any(named: 'migratingFrom'), + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ).called(1); + + capturedCallback!(mockPc, SfuReconnectionStrategy.rejoin); + + // Halfway through the first stability window, drop the network so + // the wait must restart from scratch. + await Future.delayed(const Duration(milliseconds: 1500)); + internetStatusController.add(InternetStatus.disconnected); + + // The rejoin must not have proceeded yet — the first window was + // interrupted by the drop above. + verifyNever( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: any(named: 'migratingFrom'), + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ); + + // Restore connectivity and give the wait a full clean window. + await Future.delayed(const Duration(milliseconds: 100)); + internetStatusController.add(InternetStatus.connected); + await Future.delayed(const Duration(milliseconds: 3500)); + + verify( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: null, + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ).called(1); + }, + timeout: const Timeout(Duration(seconds: 15)), + ); + + test( + 'rejoin reconnect gives up when network flicker exhausts the total ' + 'availability budget', + () async { + final customStateManager = CallStateNotifier( + CallState( + preferences: DefaultCallPreferences( + networkAvailabilityTimeout: const Duration(milliseconds: 1500), + ), + currentUserId: SampleCallData.defaultUserInfo.id, + callCid: SampleCallData.defaultCid, + ), + ); + + final call = buildCall(stateManager: customStateManager); + + await call.join(); + expect(capturedCallback, isNotNull); + + // Consume the initial joinCall before triggering rejoin so we can + // assert that no further joinCall is issued. + verify( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: any(named: 'migratingFrom'), + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ).called(1); + + capturedCallback!(mockPc, SfuReconnectionStrategy.rejoin); + + // Let the initial connectivity check pass, then drop the network + // inside the stability window so the wait loop iterates with a + // shrunken remaining budget. Because the network never returns, + // the bounded networkFuture times out and the reconnect fails. + await Future.delayed(const Duration(milliseconds: 500)); + internetStatusController.add(InternetStatus.disconnected); + + // 1500ms budget total → ~1000ms remaining after the drop above. + // Wait long enough for the bounded wait to time out and the + // reconnect-failed state to be published. + await Future.delayed(const Duration(milliseconds: 1500)); + + verifyNever( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: any(named: 'migratingFrom'), + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ); + + // Once budget is exhausted the reconnect path publishes + // [CallStatusReconnectionFailed], which the Call observer then + // converts into a disconnected status with the matching reason. + final status = call.state.value.status; + expect( + status is CallStatusReconnectionFailed || + (status is CallStatusDisconnected && + status.reason is DisconnectReasonReconnectionFailed), + isTrue, + reason: 'expected reconnect to give up, got status: $status', + ); + + customStateManager.dispose(); + }, + timeout: const Timeout(Duration(seconds: 10)), + ); + + test( + 'migrate reconnect also waits for the stability window', + () async { + final call = buildCall(); + + await call.join(); + expect(capturedCallback, isNotNull); + + // Consume the initial joinCall before triggering migrate. + verify( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: any(named: 'migratingFrom'), + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ).called(1); + + capturedCallback!(mockPc, SfuReconnectionStrategy.migrate); + + // Within the stability window, no extra joinCall should be issued. + await Future.delayed(const Duration(milliseconds: 500)); + + verifyNever( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: any(named: 'migratingFrom'), + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ); + }, + ); + }); +}