From ba6d15105ca5fc38799e5a0818bea6eb1d73706e Mon Sep 17 00:00:00 2001 From: Christine Yan Date: Wed, 10 Jun 2026 12:27:12 -0400 Subject: [PATCH 1/3] Guard websocket lifecycle with connection generations Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/OpenClaw.Shared/WebSocketClientBase.cs | 101 ++++++++-- .../WebSocketClientBaseTests.cs | 172 ++++++++++++++++++ 2 files changed, 255 insertions(+), 18 deletions(-) diff --git a/src/OpenClaw.Shared/WebSocketClientBase.cs b/src/OpenClaw.Shared/WebSocketClientBase.cs index 205a0329..e1affe9f 100644 --- a/src/OpenClaw.Shared/WebSocketClientBase.cs +++ b/src/OpenClaw.Shared/WebSocketClientBase.cs @@ -21,6 +21,7 @@ public abstract class WebSocketClientBase : IDisposable private bool _disposed; private int _reconnectAttempts; private int _reconnectLoopActive; + private long _connectionGeneration; private readonly SemaphoreSlim _sendLock = new(1, 1); private static readonly int[] BackoffMs = { 1000, 2000, 4000, 8000, 15000, 30000, 60000 }; @@ -111,29 +112,38 @@ public async Task ConnectAsync() return; } + var connectGeneration = Interlocked.Increment(ref _connectionGeneration); + ClientWebSocket? ws = null; + try { RaiseStatusChanged(ConnectionStatus.Connecting); _logger.Info($"Connecting to {ClientRole}: {GatewayUrlForDisplay}"); - _webSocket = new ClientWebSocket(); - _webSocket.Options.KeepAliveInterval = TimeSpan.FromSeconds(30); + ws = new ClientWebSocket(); + ws.Options.KeepAliveInterval = TimeSpan.FromSeconds(30); + _webSocket = ws; // Set Origin header (convert ws/wss to http/https) var uri = new Uri(_gatewayUrl); var originScheme = uri.Scheme == "wss" ? "https" : "http"; var origin = $"{originScheme}://{uri.Host}:{uri.Port}"; - _webSocket.Options.SetRequestHeader("Origin", origin); + ws.Options.SetRequestHeader("Origin", origin); if (!string.IsNullOrEmpty(_credentials)) { var credentialsToEncode = GatewayUrlHelper.DecodeCredentials(_credentials); - _webSocket.Options.SetRequestHeader( + ws.Options.SetRequestHeader( "Authorization", $"Basic {Convert.ToBase64String(Encoding.UTF8.GetBytes(credentialsToEncode))}"); } - await _webSocket.ConnectAsync(uri, _cts.Token); + await ws.ConnectAsync(uri, _cts.Token); + if (!IsCurrentConnection(ws, connectGeneration)) + { + DisposeStaleSocket(ws); + return; + } // Don't reset _reconnectAttempts here — TCP connect succeeding doesn't mean // auth will succeed. Reset only after the full application-level handshake @@ -141,19 +151,43 @@ public async Task ConnectAsync() _logger.Info($"{ClientRole} connected, waiting for challenge..."); await OnConnectedAsync(); + if (!IsCurrentConnection(ws, connectGeneration)) + { + DisposeStaleSocket(ws); + return; + } - _ = Task.Run(() => ListenForMessagesAsync(), _cts.Token); + _ = Task.Run(() => ListenForMessagesAsync(ws, connectGeneration), _cts.Token); } catch (OperationCanceledException) { + if (ws != null) + { + DisposeStaleSocket(ws); + } _logger.Debug($"{ClientRole} connect canceled (likely shutdown)"); } catch (ObjectDisposedException) { + if (ws != null) + { + DisposeStaleSocket(ws); + } _logger.Debug($"{ClientRole} connect aborted after dispose"); } catch (Exception ex) { + if (ws != null && !IsCurrentConnection(ws, connectGeneration)) + { + DisposeStaleSocket(ws); + _logger.Debug($"{ClientRole} stale connection failure ignored: {ex.Message}"); + return; + } + + if (ws != null) + { + DisposeStaleSocket(ws); + } _logger.Error($"{ClientRole} connection failed", ex); RaiseStatusChanged(ConnectionStatus.Error); @@ -164,7 +198,23 @@ public async Task ConnectAsync() } } - private async Task ListenForMessagesAsync() + private bool IsCurrentConnection(ClientWebSocket ws, long generation) => + !_disposed + && Interlocked.Read(ref _connectionGeneration) == generation + && ReferenceEquals(_webSocket, ws); + + private void DisposeStaleSocket(ClientWebSocket ws) + { + if (ReferenceEquals(_webSocket, ws)) + { + _webSocket = null; + } + + // slopwatch-ignore: SW003 Cleanup is best-effort for superseded sockets. + try { ws.Dispose(); } catch { } + } + + private async Task ListenForMessagesAsync(ClientWebSocket ws, long connectionGeneration) { // Rent a pooled buffer — consistent with the SendRawAsync hot path; avoids a large // (16–64 KB) heap allocation per connection that would otherwise land on the LOH. @@ -173,10 +223,14 @@ private async Task ListenForMessagesAsync() try { - while (_webSocket?.State == WebSocketState.Open && !_cts.Token.IsCancellationRequested) + while (ws.State == WebSocketState.Open && !_cts.Token.IsCancellationRequested) { - var result = await _webSocket.ReceiveAsync( + var result = await ws.ReceiveAsync( new ArraySegment(buffer, 0, ReceiveBufferSize), _cts.Token); + if (!IsCurrentConnection(ws, connectionGeneration)) + { + break; + } if (result.MessageType == WebSocketMessageType.Text) { @@ -211,11 +265,14 @@ private async Task ListenForMessagesAsync() } else if (result.MessageType == WebSocketMessageType.Close) { - var closeStatus = _webSocket.CloseStatus?.ToString() ?? "unknown"; - var closeDesc = _webSocket.CloseStatusDescription ?? "no description"; + var closeStatus = ws.CloseStatus?.ToString() ?? "unknown"; + var closeDesc = ws.CloseStatusDescription ?? "no description"; _logger.Info($"Server closed connection: {closeStatus} - {closeDesc}"); - OnDisconnected(); - RaiseStatusChanged(ConnectionStatus.Disconnected); + if (IsCurrentConnection(ws, connectionGeneration)) + { + OnDisconnected(); + RaiseStatusChanged(ConnectionStatus.Disconnected); + } break; } } @@ -223,8 +280,11 @@ private async Task ListenForMessagesAsync() catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely) { _logger.Warn("Connection closed prematurely"); - OnDisconnected(); - RaiseStatusChanged(ConnectionStatus.Disconnected); + if (IsCurrentConnection(ws, connectionGeneration)) + { + OnDisconnected(); + RaiseStatusChanged(ConnectionStatus.Disconnected); + } } // slopwatch-ignore: SW003 Shutdown cancellation or disposal is expected and the caller already preserves the safe state. catch (OperationCanceledException) { } @@ -233,8 +293,11 @@ private async Task ListenForMessagesAsync() catch (Exception ex) { _logger.Error($"{ClientRole} listen error", ex); - OnError(ex); - RaiseStatusChanged(ConnectionStatus.Error); + if (IsCurrentConnection(ws, connectionGeneration)) + { + OnError(ex); + RaiseStatusChanged(ConnectionStatus.Error); + } } finally { @@ -242,7 +305,7 @@ private async Task ListenForMessagesAsync() } // Auto-reconnect if not intentionally disposed - if (!_disposed) + if (IsCurrentConnection(ws, connectionGeneration)) { try { @@ -393,6 +456,8 @@ public void Dispose() OnDisposing(); + Interlocked.Increment(ref _connectionGeneration); + // slopwatch-ignore: SW003 Cleanup is best-effort; failure cannot improve caller state and the original outcome is preserved. try { _cts.Cancel(); } catch { } diff --git a/tests/OpenClaw.Shared.Tests/WebSocketClientBaseTests.cs b/tests/OpenClaw.Shared.Tests/WebSocketClientBaseTests.cs index 2a68dc4f..4b8104e8 100644 --- a/tests/OpenClaw.Shared.Tests/WebSocketClientBaseTests.cs +++ b/tests/OpenClaw.Shared.Tests/WebSocketClientBaseTests.cs @@ -1,6 +1,10 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Net; +using System.Net.WebSockets; +using System.Threading; using System.Threading.Tasks; using Xunit; @@ -277,6 +281,45 @@ public async Task ConnectAsync_WhenAutoReconnectDisabled_DoesNotStartReconnectLo client.Dispose(); } + [Fact] + public async Task ConnectAsync_StaleConnectionDoesNotStartListenerOnNewerSocket() + { + using var server = new LoopbackWebSocketServer(); + await server.StartAsync(); + var client = new BlockingFirstConnectClient(server.WebSocketUrl, "token", _logger); + var statuses = new ConcurrentQueue(); + var unexpectedErrorStatus = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + client.StatusChanged += (_, s) => statuses.Enqueue(s); + client.StatusChanged += (_, s) => + { + if (s == ConnectionStatus.Error) + unexpectedErrorStatus.TrySetResult(); + }; + + var firstConnect = client.ConnectAsync(); + await client.FirstConnectEntered.WaitAsync(TimeSpan.FromSeconds(2)); + + var secondConnect = client.ConnectAsync(); + await client.SecondConnectReturned.WaitAsync(TimeSpan.FromSeconds(2)); + + client.ReleaseFirstConnect(); + await Task.WhenAll(firstConnect, secondConnect).WaitAsync(TimeSpan.FromSeconds(2)); + + // If the stale first ConnectAsync starts a listener after the second + // connection is current, two listeners race on the same ClientWebSocket + // and one reports a listen error. + var unexpected = await Task.WhenAny( + unexpectedErrorStatus.Task, + Task.Delay(TimeSpan.FromMilliseconds(250))); + + Assert.Equal(2, client.OnConnectedCallCount); + Assert.NotSame(unexpectedErrorStatus.Task, unexpected); + Assert.Equal(0, client.OnErrorCallCount); + Assert.DoesNotContain(ConnectionStatus.Error, statuses); + + client.Dispose(); + } + private static async Task WaitForConditionAsync(Func predicate, TimeSpan timeout) { var start = DateTime.UtcNow; @@ -291,6 +334,135 @@ private static async Task WaitForConditionAsync(Func predicate, TimeSpan t } } +internal sealed class BlockingFirstConnectClient : WebSocketClientBase +{ + private readonly TaskCompletionSource _firstConnectEntered = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _releaseFirstConnect = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _secondConnectReturned = new(TaskCreationOptions.RunContinuationsAsynchronously); + private int _connectCallbacks; + + public int OnConnectedCallCount => Volatile.Read(ref _connectCallbacks); + public int OnErrorCallCount { get; private set; } + public Task FirstConnectEntered => _firstConnectEntered.Task; + public Task SecondConnectReturned => _secondConnectReturned.Task; + + public BlockingFirstConnectClient(string gatewayUrl, string token, IOpenClawLogger? logger = null) + : base(gatewayUrl, token, logger) + { + } + + protected override int ReceiveBufferSize => 8192; + protected override string ClientRole => "race-test"; + protected override bool ShouldAutoReconnect() => false; + + protected override Task ProcessMessageAsync(string json) => Task.CompletedTask; + + protected override async Task OnConnectedAsync() + { + var count = Interlocked.Increment(ref _connectCallbacks); + if (count == 1) + { + _firstConnectEntered.TrySetResult(); + await _releaseFirstConnect.Task; + return; + } + + _secondConnectReturned.TrySetResult(); + } + + protected override void OnError(Exception ex) => OnErrorCallCount++; + + public void ReleaseFirstConnect() => _releaseFirstConnect.TrySetResult(); +} + +internal sealed class LoopbackWebSocketServer : IDisposable +{ + private readonly HttpListener _listener = new(); + private readonly CancellationTokenSource _cts = new(); + private readonly List _acceptedSockets = new(); + private Task? _acceptLoop; + + public string WebSocketUrl { get; } + + public LoopbackWebSocketServer() + { + var port = GetFreeTcpPort(); + var prefix = $"http://127.0.0.1:{port}/"; + _listener.Prefixes.Add(prefix); + WebSocketUrl = $"ws://127.0.0.1:{port}/"; + } + + public Task StartAsync() + { + _listener.Start(); + _acceptLoop = Task.Run(AcceptLoopAsync); + return Task.CompletedTask; + } + + private async Task AcceptLoopAsync() + { + while (!_cts.Token.IsCancellationRequested) + { + HttpListenerContext context; + try + { + context = await _listener.GetContextAsync(); + } + catch (HttpListenerException) when (_cts.Token.IsCancellationRequested) + { + return; + } + catch (ObjectDisposedException) + { + return; + } + + if (!context.Request.IsWebSocketRequest) + { + context.Response.StatusCode = 400; + context.Response.Close(); + continue; + } + + var wsContext = await context.AcceptWebSocketAsync(subProtocol: null); + lock (_acceptedSockets) + { + _acceptedSockets.Add(wsContext.WebSocket); + } + } + } + + public void Dispose() + { + _cts.Cancel(); + try { _listener.Stop(); } catch { } + try { _listener.Close(); } catch { } + lock (_acceptedSockets) + { + foreach (var socket in _acceptedSockets) + { + try { socket.Dispose(); } catch { } + } + } + try { _acceptLoop?.Wait(TimeSpan.FromSeconds(1)); } catch { } + _cts.Dispose(); + } + + private static int GetFreeTcpPort() + { + var listener = new System.Net.Sockets.TcpListener(IPAddress.Loopback, 0); + listener.Start(); + try + { + return ((IPEndPoint)listener.LocalEndpoint).Port; + } + finally + { + listener.Stop(); + } + } +} + public class TestLogger : IOpenClawLogger { public List Logs { get; } = new(); From c73800821bb332ef599e4e5aef026d53db81649e Mon Sep 17 00:00:00 2001 From: Christine Yan Date: Wed, 10 Jun 2026 15:35:36 -0400 Subject: [PATCH 2/3] Guard reconnect backoff against stale websocket generations Bind listener-initiated reconnect loops to the socket generation that requested them so a stale loop cannot dispose or replace a newer successful connection after backoff delay. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/OpenClaw.Shared/WebSocketClientBase.cs | 28 +++-- .../WebSocketClientBaseTests.cs | 100 ++++++++++++++++++ 2 files changed, 120 insertions(+), 8 deletions(-) diff --git a/src/OpenClaw.Shared/WebSocketClientBase.cs b/src/OpenClaw.Shared/WebSocketClientBase.cs index e1235eb0..a59337da 100644 --- a/src/OpenClaw.Shared/WebSocketClientBase.cs +++ b/src/OpenClaw.Shared/WebSocketClientBase.cs @@ -309,7 +309,7 @@ private async Task ListenForMessagesAsync(ClientWebSocket ws, long connectionGen { if (!_cts.Token.IsCancellationRequested && ShouldAutoReconnect()) { - await ReconnectWithBackoffAsync(); + await ReconnectWithBackoffAsync(ws, connectionGeneration); } } // slopwatch-ignore: SW003 Shutdown cancellation or disposal is expected and the caller already preserves the safe state. @@ -317,7 +317,9 @@ private async Task ListenForMessagesAsync(ClientWebSocket ws, long connectionGen } } - protected async Task ReconnectWithBackoffAsync() + protected async Task ReconnectWithBackoffAsync( + ClientWebSocket? expectedSocket = null, + long expectedGeneration = 0) { if (Interlocked.CompareExchange(ref _reconnectLoopActive, 1, 0) != 0) { @@ -326,7 +328,10 @@ protected async Task ReconnectWithBackoffAsync() try { - while (!_disposed && !_cts.Token.IsCancellationRequested && ShouldAutoReconnect()) + while (!_disposed + && !_cts.Token.IsCancellationRequested + && ShouldAutoReconnect() + && IsReconnectOwner(expectedSocket, expectedGeneration)) { var delay = BackoffMs[Math.Min(_reconnectAttempts, BackoffMs.Length - 1)]; // Add 0-25% jitter to prevent thundering herd when multiple clients @@ -339,16 +344,20 @@ protected async Task ReconnectWithBackoffAsync() await Task.Delay(delay, _cts.Token); - if (_cts.Token.IsCancellationRequested || _disposed || !ShouldAutoReconnect()) + if (_cts.Token.IsCancellationRequested + || _disposed + || !ShouldAutoReconnect() + || !IsReconnectOwner(expectedSocket, expectedGeneration)) { break; } // Safely dispose old socket - var oldSocket = _webSocket; - _webSocket = null; - try { oldSocket?.Dispose(); } - catch (Exception ex) { _logger.Debug($"WebSocketClientBase: Dispose of old WebSocket during reconnect threw: {ex.Message}"); } + var oldSocket = expectedSocket ?? _webSocket; + if (oldSocket != null) + { + DisposeStaleSocket(oldSocket); + } await ConnectAsync(); @@ -371,6 +380,9 @@ protected async Task ReconnectWithBackoffAsync() } } + private bool IsReconnectOwner(ClientWebSocket? expectedSocket, long expectedGeneration) => + expectedSocket is null || IsCurrentConnection(expectedSocket, expectedGeneration); + /// Send a text message over the WebSocket. Thread-safe. protected async Task SendRawAsync(string message) { diff --git a/tests/OpenClaw.Shared.Tests/WebSocketClientBaseTests.cs b/tests/OpenClaw.Shared.Tests/WebSocketClientBaseTests.cs index 4b8104e8..3deb3b9d 100644 --- a/tests/OpenClaw.Shared.Tests/WebSocketClientBaseTests.cs +++ b/tests/OpenClaw.Shared.Tests/WebSocketClientBaseTests.cs @@ -320,6 +320,38 @@ public async Task ConnectAsync_StaleConnectionDoesNotStartListenerOnNewerSocket( client.Dispose(); } + [Fact] + public async Task ReconnectBackoff_DoesNotDisposeNewerConnection_WhenSupersededDuringDelay() + { + using var server = new LoopbackWebSocketServer(); + await server.StartAsync(); + var client = new ReconnectBackoffRaceClient(server.WebSocketUrl, "token", _logger); + var statuses = new ConcurrentQueue(); + client.StatusChanged += (_, s) => statuses.Enqueue(s); + + await client.ConnectAsync(); + await client.FirstConnected.WaitAsync(TimeSpan.FromSeconds(2)); + await WaitForConditionAsync(() => server.AcceptedCount >= 1, TimeSpan.FromSeconds(2)); + + await server.CloseSocketAsync(0); + await WaitForConditionAsync( + () => statuses.Count(s => s == ConnectionStatus.Connecting) >= 2, + TimeSpan.FromSeconds(2)); + + await client.ConnectAsync(); + await client.SecondConnected.WaitAsync(TimeSpan.FromSeconds(2)); + + var staleReconnectWon = await Task.WhenAny( + client.ThirdConnected, + Task.Delay(TimeSpan.FromMilliseconds(1800))); + + Assert.NotSame(client.ThirdConnected, staleReconnectWon); + Assert.Equal(2, client.OnConnectedCallCount); + Assert.Equal(2, server.AcceptedCount); + + client.Dispose(); + } + private static async Task WaitForConditionAsync(Func predicate, TimeSpan timeout) { var start = DateTime.UtcNow; @@ -375,6 +407,47 @@ protected override async Task OnConnectedAsync() public void ReleaseFirstConnect() => _releaseFirstConnect.TrySetResult(); } +internal sealed class ReconnectBackoffRaceClient : WebSocketClientBase +{ + private readonly TaskCompletionSource _firstConnected = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _secondConnected = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _thirdConnected = new(TaskCreationOptions.RunContinuationsAsynchronously); + private int _connectCallbacks; + + public int OnConnectedCallCount => Volatile.Read(ref _connectCallbacks); + public Task FirstConnected => _firstConnected.Task; + public Task SecondConnected => _secondConnected.Task; + public Task ThirdConnected => _thirdConnected.Task; + + public ReconnectBackoffRaceClient(string gatewayUrl, string token, IOpenClawLogger? logger = null) + : base(gatewayUrl, token, logger) + { + } + + protected override int ReceiveBufferSize => 8192; + protected override string ClientRole => "reconnect-race-test"; + protected override Task ProcessMessageAsync(string json) => Task.CompletedTask; + + protected override Task OnConnectedAsync() + { + var count = Interlocked.Increment(ref _connectCallbacks); + switch (count) + { + case 1: + _firstConnected.TrySetResult(); + break; + case 2: + _secondConnected.TrySetResult(); + break; + case 3: + _thirdConnected.TrySetResult(); + break; + } + + return Task.CompletedTask; + } +} + internal sealed class LoopbackWebSocketServer : IDisposable { private readonly HttpListener _listener = new(); @@ -383,6 +456,16 @@ internal sealed class LoopbackWebSocketServer : IDisposable private Task? _acceptLoop; public string WebSocketUrl { get; } + public int AcceptedCount + { + get + { + lock (_acceptedSockets) + { + return _acceptedSockets.Count; + } + } + } public LoopbackWebSocketServer() { @@ -432,6 +515,23 @@ private async Task AcceptLoopAsync() } } + public async Task CloseSocketAsync(int index) + { + WebSocket socket; + lock (_acceptedSockets) + { + socket = _acceptedSockets[index]; + } + + if (socket.State == WebSocketState.Open) + { + await socket.CloseOutputAsync( + WebSocketCloseStatus.NormalClosure, + "test close", + CancellationToken.None); + } + } + public void Dispose() { _cts.Cancel(); From 121947c43effaecb94d79878c2266dde03e7c820 Mon Sep 17 00:00:00 2001 From: Christine Yan Date: Wed, 10 Jun 2026 16:08:18 -0400 Subject: [PATCH 3/3] Retry CI after transient sanitizer timeout Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>