diff --git a/packages/stream_video/CHANGELOG.md b/packages/stream_video/CHANGELOG.md index 69b977a49..1342f021d 100644 --- a/packages/stream_video/CHANGELOG.md +++ b/packages/stream_video/CHANGELOG.md @@ -9,6 +9,7 @@ Each call now owns an isolated native PeerConnectionFactory — fixes cross-call ### 🐞 Fixed +- Fixed connection flickering causing rejoin flow to fail in some cases. - Added safety nets and recovery for cases when the publisher connection doesn't establish after a reconnection (e.g. the SFU answer is lost or ICE stays in `new` state). - Fixed sibling-call audio capture being silently broken when another concurrently-active call ended (e.g. a 1:1 ringing call ending alongside a running livestream, or a previous ringing call ending before a new one was accepted). - Fixed a sibling call's audio breaking when a ringing 1:1 call ended via `dropIfAloneInRingingFlow` (the remote party hung up first). `Call.end()` and `Call.leave()` now share a single `_disconnect` cleanup path. diff --git a/packages/stream_video/lib/src/call/call.dart b/packages/stream_video/lib/src/call/call.dart index 3e7967884..7e34f39e8 100644 --- a/packages/stream_video/lib/src/call/call.dart +++ b/packages/stream_video/lib/src/call/call.dart @@ -1873,7 +1873,17 @@ class Call { final reconnectStartTime = DateTime.now(); var fastReconnectAttemptsCount = 0; do { - _awaitNetworkAvailableFuture = _awaitNetworkAvailable(); + // Wait for a stable network before reconnecting with rejoin/migrate + // to prevent starting an SDP exchange on a transient connection that drops before the answer arrives. + final stabilityWindow = + (_reconnectStrategy == SfuReconnectionStrategy.rejoin || + _reconnectStrategy == SfuReconnectionStrategy.migrate) + ? const Duration(seconds: 3) + : Duration.zero; + + _awaitNetworkAvailableFuture = _awaitNetworkAvailable( + stabilityWindow: stabilityWindow, + ); if (state.value.preferences.reconnectTimeout > Duration.zero) { final elapsed = DateTime.now().difference(reconnectStartTime); @@ -2075,40 +2085,96 @@ class Call { } } - Future _awaitNetworkAvailable() async { + /// Waits until the network becomes available **and** stays connected for + /// [stabilityWindow]. When [stabilityWindow] is [Duration.zero] (the + /// default), the method returns as soon as connectivity is detected. + /// + /// The total time spent in this method is bounded by + /// [CallPreferences.networkAvailabilityTimeout]. If the network keeps + /// flickering (connecting then dropping within the stability window), + /// the remaining budget shrinks on each iteration until it is exhausted. + Future _awaitNetworkAvailable({ + Duration stabilityWindow = Duration.zero, + }) async { final previousCheckInterval = networkMonitor.checkInterval; + final budget = state.value.preferences.networkAvailabilityTimeout; + final deadline = Stopwatch()..start(); + try { networkMonitor.setIntervalAndResetTimer( _streamVideo.options.networkMonitorSettings.offlineCheckInterval, ); - final networkFuture = networkMonitor.onStatusChange - .startWithFuture(networkMonitor.internetStatus) - .firstWhere((status) => status == InternetStatus.connected) - .timeout( - state.value.preferences.networkAvailabilityTimeout, - onTimeout: () { - _logger.w(() => '[_awaitNetworkAvailable] timeout'); - return InternetStatus.disconnected; - }, + while (true) { + final remaining = budget - deadline.elapsed; + if (remaining <= Duration.zero) { + _logger.w( + () => '[_awaitNetworkAvailable] total budget exhausted', ); + return InternetStatus.disconnected; + } - final lifecycleFuture = _callLifecycleCompleter.future.then((_) { - _logger.w(() => '[_awaitNetworkAvailable] call was left'); - return InternetStatus.disconnected; - }); + final networkFuture = networkMonitor.onStatusChange + .startWithFuture(networkMonitor.internetStatus) + .firstWhere((status) => status == InternetStatus.connected) + .timeout( + remaining, + onTimeout: () { + _logger.w(() => '[_awaitNetworkAvailable] timeout'); + return InternetStatus.disconnected; + }, + ); + + final lifecycleFuture = _callLifecycleCompleter.future.then((_) { + _logger.w(() => '[_awaitNetworkAvailable] call was left'); + return InternetStatus.disconnected; + }); + + // Race the network future against the call lifecycle cancellable + // to ensure we don't wait for the network if the call was left + final connectionStatus = + await Future.any([ + networkFuture, + lifecycleFuture, + ]) + .asCancelable() + .storeIn(_idFastReconnectTimeout, _cancelables) + .valueOrDefault(InternetStatus.disconnected); + + if (connectionStatus == InternetStatus.disconnected) { + return connectionStatus; + } + + if (stabilityWindow <= Duration.zero) { + return connectionStatus; + } - // Race the network future against the call lifecycle cancellable - // to ensure we don't wait for the network if the call was left - final connectionStatus = - await Future.any([ - networkFuture, - lifecycleFuture, - ]) - .asCancelable() - .storeIn(_idFastReconnectTimeout, _cancelables) - .valueOrDefault(InternetStatus.disconnected); - return connectionStatus; + // Verify the connection holds for the full stability window. + try { + await networkMonitor.onStatusChange + .where((s) => s == InternetStatus.disconnected) + .first + .timeout(stabilityWindow); + + // Stream emitted before timeout → network dropped during window. + _logger.w( + () => + '[_awaitNetworkAvailable] network dropped during ' + '${stabilityWindow.inSeconds}s stability window, retrying', + ); + _session?.trace('awaitNetwork.unstable', { + 'stabilityWindowSeconds': stabilityWindow.inSeconds, + }); + } on TimeoutException { + // No drop detected within the window — network is stable. + _logger.v( + () => + '[_awaitNetworkAvailable] network stable for ' + '${stabilityWindow.inSeconds}s', + ); + return InternetStatus.connected; + } + } } finally { networkMonitor.setIntervalAndResetTimer(previousCheckInterval); } diff --git a/packages/stream_video/test/src/call/call_reconnect_stability_test.dart b/packages/stream_video/test/src/call/call_reconnect_stability_test.dart new file mode 100644 index 000000000..c0e1655b0 --- /dev/null +++ b/packages/stream_video/test/src/call/call_reconnect_stability_test.dart @@ -0,0 +1,377 @@ +// ignore_for_file: avoid_redundant_argument_values + +import 'package:flutter_test/flutter_test.dart'; +import 'package:internet_connection_checker_plus/internet_connection_checker_plus.dart'; +import 'package:mocktail/mocktail.dart'; +import 'package:rxdart/rxdart.dart'; +import 'package:stream_video/src/call/state/call_state_notifier.dart'; +import 'package:stream_video/src/webrtc/peer_connection.dart'; +import 'package:stream_video/stream_video.dart'; + +import '../../test_helpers.dart'; +import 'fixtures/call_test_helpers.dart'; +import 'fixtures/data.dart'; + +class _MockPeerConnection extends Mock implements StreamPeerConnection {} + +void main() { + setUpAll(() { + TestWidgetsFlutterBinding.ensureInitialized(); + registerMockFallbackValues(); + }); + + group('Call reconnect network stability window', () { + late BehaviorSubject internetStatusController; + late MockCoordinatorClient coordinatorClient; + late MockCallSession callSession; + late MockSessionFactory sessionFactory; + late _MockPeerConnection mockPc; + OnReconnectionNeeded? capturedCallback; + + setUp(() { + internetStatusController = BehaviorSubject.seeded( + InternetStatus.connected, + ); + coordinatorClient = setupMockCoordinatorClient(); + callSession = setupMockCallSession(); + capturedCallback = null; + + // Build a session factory mock that captures the + // [onReconnectionNeeded] callback supplied by Call so tests can + // trigger reconnect with any strategy directly. + sessionFactory = MockSessionFactory(); + when(() => sessionFactory.sdpEditor).thenReturn(MockSdpEditor()); + when( + () => sessionFactory.makeCallSession( + onSuspendedAudioTrackRecorded: any( + named: 'onSuspendedAudioTrackRecorded', + ), + sessionId: any(named: 'sessionId'), + sessionSeq: any(named: 'sessionSeq'), + credentials: any(named: 'credentials'), + stateManager: any(named: 'stateManager'), + dynascaleManager: any(named: 'dynascaleManager'), + networkMonitor: any(named: 'networkMonitor'), + statsOptions: any(named: 'statsOptions'), + onReconnectionNeeded: any(named: 'onReconnectionNeeded'), + clientPublishOptions: any(named: 'clientPublishOptions'), + streamVideo: any(named: 'streamVideo'), + leftoverTraceRecords: any(named: 'leftoverTraceRecords'), + pcFactory: any(named: 'pcFactory'), + ), + ).thenAnswer((invocation) { + capturedCallback = + invocation.namedArguments[const Symbol('onReconnectionNeeded')] + as OnReconnectionNeeded; + return Future.value(callSession); + }); + + mockPc = _MockPeerConnection(); + when(() => mockPc.type).thenReturn(StreamPeerType.publisher); + }); + + tearDown(() async { + await internetStatusController.close(); + }); + + Call buildCall({CallStateNotifier? stateManager}) { + return createTestCall( + stateManager: stateManager, + networkMonitor: setupMockInternetConnection( + statusStream: internetStatusController, + ), + coordinatorClient: coordinatorClient, + sessionFactory: sessionFactory, + ); + } + + test( + 'fast reconnect proceeds without waiting the 3s stability window', + () async { + final call = buildCall(); + + final joinResult = await call.join(); + expect(joinResult.isSuccess, isTrue); + + // Toggle network to trigger fast reconnect via _observeReconnectEvents. + final stopwatch = Stopwatch()..start(); + internetStatusController.add(InternetStatus.disconnected); + await Future.delayed(const Duration(milliseconds: 10)); + internetStatusController.add(InternetStatus.connected); + + // fastReconnect must be invoked well before the 3s stability window + // that the rejoin/migrate path applies. Allow generous slack for + // scheduler jitter in CI but stay safely under 3s. + await Future.delayed(const Duration(milliseconds: 500)); + stopwatch.stop(); + + verify( + () => callSession.fastReconnect( + reconnectDetails: any(named: 'reconnectDetails'), + capabilities: any(named: 'capabilities'), + unifiedSessionId: any(named: 'unifiedSessionId'), + ), + ).called(1); + + expect( + stopwatch.elapsed, + lessThan(const Duration(seconds: 3)), + reason: 'fast strategy must not apply the 3s stability window', + ); + }, + ); + + test( + 'rejoin reconnect does not proceed before the stability window elapses', + () async { + final call = buildCall(); + + await call.join(); + expect(capturedCallback, isNotNull); + + // Initial join issues exactly one joinCall; the rejoin path will + // issue a second one — but only after the stability window expires. + verify( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: any(named: 'migratingFrom'), + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ).called(1); + + // Trigger a rejoin while the network is still reported as + // connected. The reconnect path must wait the stability window + // before re-issuing joinCall. + capturedCallback!(mockPc, SfuReconnectionStrategy.rejoin); + + await Future.delayed(const Duration(milliseconds: 500)); + + verifyNever( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: any(named: 'migratingFrom'), + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ); + }, + ); + + test( + 'rejoin reconnect proceeds once the network stays connected for the ' + 'stability window', + () async { + final call = buildCall(); + + await call.join(); + expect(capturedCallback, isNotNull); + + capturedCallback!(mockPc, SfuReconnectionStrategy.rejoin); + + // Wait past the 3s stability window with the network steady. + await Future.delayed(const Duration(milliseconds: 3500)); + + // Initial join + rejoin = 2 joinCall invocations. + verify( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: null, + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ).called(2); + }, + timeout: const Timeout(Duration(seconds: 10)), + ); + + test( + 'rejoin reconnect restarts the wait when network drops inside the ' + 'stability window', + () async { + final call = buildCall(); + + await call.join(); + expect(capturedCallback, isNotNull); + + // Consume the initial joinCall so subsequent verifications only + // observe rejoin-driven calls. + verify( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: any(named: 'migratingFrom'), + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ).called(1); + + capturedCallback!(mockPc, SfuReconnectionStrategy.rejoin); + + // Halfway through the first stability window, drop the network so + // the wait must restart from scratch. + await Future.delayed(const Duration(milliseconds: 1500)); + internetStatusController.add(InternetStatus.disconnected); + + // The rejoin must not have proceeded yet — the first window was + // interrupted by the drop above. + verifyNever( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: any(named: 'migratingFrom'), + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ); + + // Restore connectivity and give the wait a full clean window. + await Future.delayed(const Duration(milliseconds: 100)); + internetStatusController.add(InternetStatus.connected); + await Future.delayed(const Duration(milliseconds: 3500)); + + verify( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: null, + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ).called(1); + }, + timeout: const Timeout(Duration(seconds: 15)), + ); + + test( + 'rejoin reconnect gives up when network flicker exhausts the total ' + 'availability budget', + () async { + final customStateManager = CallStateNotifier( + CallState( + preferences: DefaultCallPreferences( + networkAvailabilityTimeout: const Duration(milliseconds: 1500), + ), + currentUserId: SampleCallData.defaultUserInfo.id, + callCid: SampleCallData.defaultCid, + ), + ); + + final call = buildCall(stateManager: customStateManager); + + await call.join(); + expect(capturedCallback, isNotNull); + + // Consume the initial joinCall before triggering rejoin so we can + // assert that no further joinCall is issued. + verify( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: any(named: 'migratingFrom'), + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ).called(1); + + capturedCallback!(mockPc, SfuReconnectionStrategy.rejoin); + + // Let the initial connectivity check pass, then drop the network + // inside the stability window so the wait loop iterates with a + // shrunken remaining budget. Because the network never returns, + // the bounded networkFuture times out and the reconnect fails. + await Future.delayed(const Duration(milliseconds: 500)); + internetStatusController.add(InternetStatus.disconnected); + + // 1500ms budget total → ~1000ms remaining after the drop above. + // Wait long enough for the bounded wait to time out and the + // reconnect-failed state to be published. + await Future.delayed(const Duration(milliseconds: 1500)); + + verifyNever( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: any(named: 'migratingFrom'), + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ); + + // Once budget is exhausted the reconnect path publishes + // [CallStatusReconnectionFailed], which the Call observer then + // converts into a disconnected status with the matching reason. + final status = call.state.value.status; + expect( + status is CallStatusReconnectionFailed || + (status is CallStatusDisconnected && + status.reason is DisconnectReasonReconnectionFailed), + isTrue, + reason: 'expected reconnect to give up, got status: $status', + ); + + customStateManager.dispose(); + }, + timeout: const Timeout(Duration(seconds: 10)), + ); + + test( + 'migrate reconnect also waits for the stability window', + () async { + final call = buildCall(); + + await call.join(); + expect(capturedCallback, isNotNull); + + // Consume the initial joinCall before triggering migrate. + verify( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: any(named: 'migratingFrom'), + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ).called(1); + + capturedCallback!(mockPc, SfuReconnectionStrategy.migrate); + + // Within the stability window, no extra joinCall should be issued. + await Future.delayed(const Duration(milliseconds: 500)); + + verifyNever( + () => coordinatorClient.joinCall( + callCid: any(named: 'callCid'), + ringing: any(named: 'ringing'), + create: any(named: 'create'), + migratingFrom: any(named: 'migratingFrom'), + migratingFromList: any(named: 'migratingFromList'), + video: any(named: 'video'), + membersLimit: any(named: 'membersLimit'), + ), + ); + }, + ); + }); +}