diff --git a/src/TurboHTTP.IntegrationTests.End2End/H11/WirePipeliningSpec.cs b/src/TurboHTTP.IntegrationTests.End2End/H11/WirePipeliningSpec.cs index 3bd4b11f..260f8106 100644 --- a/src/TurboHTTP.IntegrationTests.End2End/H11/WirePipeliningSpec.cs +++ b/src/TurboHTTP.IntegrationTests.End2End/H11/WirePipeliningSpec.cs @@ -53,7 +53,12 @@ private async Task ReadUntilThreeResponsesAsync(NetworkStream stream, So try { - while (CountOccurrences(sb.ToString(), "HTTP/1.1 200") < 3) + // Read until all three response bodies have arrived, not just their status lines: the + // final response's header and body routinely land in separate TCP segments, so counting + // "HTTP/1.1 200" alone would stop before the last body and race the wire. A genuinely + // dropped or reordered body is still caught — its marker never arrives, the read blocks + // to the receive timeout, and the in-order assertion below then fails. + while (!HasAllThreeBodies(sb.ToString())) { var read = await Task.Run(() => stream.Read(buffer, 0, buffer.Length)); if (read == 0) @@ -70,6 +75,11 @@ private async Task ReadUntilThreeResponsesAsync(NetworkStream stream, So return sb.ToString(); } + private static bool HasAllThreeBodies(string raw) + => raw.Contains("RESP-1", StringComparison.Ordinal) + && raw.Contains("RESP-2", StringComparison.Ordinal) + && raw.Contains("RESP-3", StringComparison.Ordinal); + private static int CountOccurrences(string haystack, string needle) { var count = 0; diff --git a/src/TurboHTTP.Tests/Protocol/Body/QueuedBodyReaderSpec.cs b/src/TurboHTTP.Tests/Protocol/Body/QueuedBodyReaderSpec.cs index 26ac815d..8628602c 100644 --- a/src/TurboHTTP.Tests/Protocol/Body/QueuedBodyReaderSpec.cs +++ b/src/TurboHTTP.Tests/Protocol/Body/QueuedBodyReaderSpec.cs @@ -206,4 +206,63 @@ public async Task ReadAsync_should_succeed_when_data_arrives_before_cancellation Assert.Equal("hello"u8.ToArray(), result.Memory.ToArray()); Assert.False(result.IsCompleted); } + + [Fact(Timeout = 5000)] + public async Task Reset_should_not_return_a_checked_out_rental_to_the_pool() + { + // Regression: on connection teardown/abort, Reset/Dispose ran while a consumer was + // still reading the current chunk and returned its rental to the shared ArrayPool. + // A concurrent stream then re-rented and overwrote it — correct-length, wrong-content + // corruption. The checked-out rental must be returned only by the consumer's AdvanceTo. + var pool = new TrackingArrayPool(); + var reader = new QueuedBodyReader(4, pool); + + reader.TryEnqueue("payload"u8); + var result = await reader.ReadAsync(TestContext.Current.CancellationToken); + Assert.Equal("payload"u8.ToArray(), result.Memory.ToArray()); + + // Teardown while the consumer still holds result.Memory. + reader.Dispose(); + Assert.Equal(0, pool.ReturnedCount); + + // The consumer finishes reading and advances: the rental is returned exactly once. + reader.AdvanceTo(); + Assert.Equal(1, pool.ReturnedCount); + } + + [Fact(Timeout = 5000)] + public void Reset_should_return_queued_but_unread_rentals() + { + // Queued chunks were never handed to the consumer, so Reset must reclaim them + // (no leak) — only the published _current chunk is left for the consumer. + var pool = new TrackingArrayPool(); + var reader = new QueuedBodyReader(4, pool); + + reader.TryEnqueue("a"u8); + reader.TryEnqueue("b"u8); + Assert.Equal(2, pool.RentedCount); + + reader.Dispose(); + Assert.Equal(2, pool.ReturnedCount); + } + + private sealed class TrackingArrayPool : System.Buffers.ArrayPool + { + private readonly System.Buffers.ArrayPool _inner = System.Buffers.ArrayPool.Shared; + + public int RentedCount { get; private set; } + public int ReturnedCount { get; private set; } + + public override byte[] Rent(int minimumLength) + { + RentedCount++; + return _inner.Rent(minimumLength); + } + + public override void Return(byte[] array, bool clearArray = false) + { + ReturnedCount++; + _inner.Return(array, clearArray); + } + } } diff --git a/src/TurboHTTP.Tests/Protocol/ProtocolNegotiatingStateMachineSpec.cs b/src/TurboHTTP.Tests/Protocol/ProtocolNegotiatingStateMachineSpec.cs index 628e4c82..c0aaaef6 100644 --- a/src/TurboHTTP.Tests/Protocol/ProtocolNegotiatingStateMachineSpec.cs +++ b/src/TurboHTTP.Tests/Protocol/ProtocolNegotiatingStateMachineSpec.cs @@ -142,6 +142,34 @@ public void DecodeClientData_should_stay_sniffing_for_insufficient_data() Assert.Empty(ops.ScheduledTimers); } + [Fact(Timeout = 5000)] + [Trait("RFC", "RFC9112-9.3.2")] + public void MaxConcurrentRequests_should_serialize_dispatch_for_negotiated_http11() + { + var ops = new FakeServerOps(); + var sm = new ProtocolNegotiatingStateMachine(new TurboServerOptions(), ops); + + sm.DecodeClientData(MakeConnected()); + sm.DecodeClientData(MakeData("GET / HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n"u8.ToArray())); + + // HTTP/1.1 responses are positional on the wire, so the negotiator must forward the inner + // machine's one-at-a-time dispatch limit — otherwise the shared, completion-ordered bridge + // can reorder pipelined responses (RFC 9112 §9.3.2). + Assert.Equal(1, sm.MaxConcurrentRequests); + } + + [Fact(Timeout = 5000)] + public void MaxConcurrentRequests_should_stay_unbounded_for_negotiated_http2() + { + var ops = new FakeServerOps(); + var sm = new ProtocolNegotiatingStateMachine(new TurboServerOptions(), ops); + + sm.DecodeClientData(MakeConnected(SslApplicationProtocol.Http2)); + + // HTTP/2 routes responses to streams by id, so concurrent dispatch must remain unbounded. + Assert.Equal(int.MaxValue, sm.MaxConcurrentRequests); + } + [Fact(Timeout = 5000)] public void Cleanup_should_dispose_buffered_data() { diff --git a/src/TurboHTTP.Tests/Streams/Stages/Server/Http11ServerConnectionStagePipeliningSpec.cs b/src/TurboHTTP.Tests/Streams/Stages/Server/Http11ServerConnectionStagePipeliningSpec.cs new file mode 100644 index 00000000..89ecaef7 --- /dev/null +++ b/src/TurboHTTP.Tests/Streams/Stages/Server/Http11ServerConnectionStagePipeliningSpec.cs @@ -0,0 +1,106 @@ +using System.Collections.Concurrent; +using System.Net; +using System.Text; +using Akka.Actor; +using Akka.Streams.Dsl; +using Akka.Streams.TestKit; +using Microsoft.AspNetCore.Hosting.Server; +using Microsoft.AspNetCore.Http.Features; +using Servus.Akka.Transport; +using TurboHTTP.Server; +using TurboHTTP.Streams; +using TurboHTTP.Streams.Stages.Server; +using TurboHTTP.Tests.Shared; + +namespace TurboHTTP.Tests.Streams.Stages.Server; + +/// +/// RFC 9112 §9.3.2: a server MAY process pipelined requests in parallel but MUST send the +/// corresponding responses in the same order the requests were received. TurboHTTP guarantees +/// this by dispatching pipelined HTTP/1.1 requests to the application handler strictly +/// one-at-a-time, so the shared (completion-ordered) ApplicationBridgeStage can never reorder +/// or corrupt responses on a single H1.1 connection. +/// +public sealed class Http11ServerConnectionStagePipeliningSpec : StreamTestBase +{ + private sealed class FakeApplication(Func handler) + : IHttpApplication + { + public IFeatureCollection CreateContext(IFeatureCollection contextFeatures) => contextFeatures; + public Task ProcessRequestAsync(IFeatureCollection context) => handler(context); + public void DisposeContext(IFeatureCollection context, Exception? exception) { } + } + + private static TransportConnected Connected() + => new(new ConnectionInfo( + new IPEndPoint(IPAddress.Loopback, 80), + new IPEndPoint(IPAddress.Loopback, 50000), + TransportProtocol.Tcp)); + + private static TransportData PipelinedRequests(params string[] paths) + { + var sb = new StringBuilder(); + foreach (var path in paths) + { + sb.Append("GET ").Append(path).Append(" HTTP/1.1\r\nHost: example.com\r\n\r\n"); + } + + var bytes = Encoding.ASCII.GetBytes(sb.ToString()); + var buffer = TransportBuffer.Rent(bytes.Length); + bytes.CopyTo(buffer.FullMemory.Span); + buffer.Length = bytes.Length; + return TransportData.Rent(buffer); + } + + [Fact(Timeout = 15000)] + [Trait("RFC", "RFC9112-9.3.2")] + public void Http11_pipelined_requests_are_dispatched_one_at_a_time() + { + var options = new TurboServerOptions(); + var gates = new ConcurrentDictionary(); + var probe = CreateTestProbe(); + + // Each handler invocation blocks on its own gate. If pipelined requests were dispatched + // concurrently, the probe would observe all three paths before any gate is released. + var app = new FakeApplication(features => + { + var path = features.Get()!.Path; + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + gates[path] = tcs; + features.Get()!.StatusCode = 204; + probe.Ref.Tell(path, ActorRefs.NoSender); + return tcs.Task; + }); + + // Production wires the bridge with unbounded parallelism and routes connections through the + // negotiating engine; the connection stage — not the bridge — must enforce H1.1 ordering, so + // the test reproduces that exact path (negotiating engine + int.MaxValue bridge). + var bridge = Flow.FromGraph(new ApplicationBridgeStage( + app, int.MaxValue, options.HandlerTimeout, options.HandlerGracePeriod)); + + var joined = new NegotiatingServerEngine(options).CreateFlow().Join(bridge); + + var (netIn, netOut) = this.SourceProbe() + .Via(joined) + .ToMaterialized(this.SinkProbe(), Keep.Both) + .Run(Materializer); + + netOut.Request(100); + netIn.SendNext(Connected(), TestContext.Current.CancellationToken); + netIn.SendNext(PipelinedRequests("/p/1", "/p/2", "/p/3"), TestContext.Current.CancellationToken); + + var first = probe.ExpectMsg(); + Assert.Equal("/p/1", first); + probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + gates[first].SetResult(); + + var second = probe.ExpectMsg(); + Assert.Equal("/p/2", second); + probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + gates[second].SetResult(); + + var third = probe.ExpectMsg(); + Assert.Equal("/p/3", third); + gates[third].SetResult(); + } +} diff --git a/src/TurboHTTP/Protocol/Body/QueuedBodyReader.cs b/src/TurboHTTP/Protocol/Body/QueuedBodyReader.cs index ecc91691..5f6cb8bf 100644 --- a/src/TurboHTTP/Protocol/Body/QueuedBodyReader.cs +++ b/src/TurboHTTP/Protocol/Body/QueuedBodyReader.cs @@ -12,6 +12,7 @@ internal sealed class QueuedBodyReader : IStreamingBodyReader, IValueTaskSource< // never executes on the producing stage thread. private readonly object _sync = new(); + private readonly ArrayPool _pool; private OwnedChunk[] _slots; private readonly int _backpressureThreshold; private int _head; @@ -25,8 +26,9 @@ internal sealed class QueuedBodyReader : IStreamingBodyReader, IValueTaskSource< private readonly int _initialSlotCount; - public QueuedBodyReader(int capacity) + public QueuedBodyReader(int capacity, ArrayPool? pool = null) { + _pool = pool ?? ArrayPool.Shared; _backpressureThreshold = capacity; _initialSlotCount = capacity * 2; _slots = new OwnedChunk[_initialSlotCount]; @@ -61,7 +63,7 @@ public bool IsFull public bool TryEnqueue(ReadOnlySpan data) { - var rental = ArrayPool.Shared.Rent(data.Length); + var rental = _pool.Rent(data.Length); data.CopyTo(rental); var chunk = new OwnedChunk(rental, data.Length); @@ -208,7 +210,7 @@ public void AdvanceTo() { if (_current.Rental is not null) { - ArrayPool.Shared.Return(_current.Rental); + _pool.Return(_current.Rental); } _current = default; @@ -248,18 +250,22 @@ public void Reset() _head = (_head + 1) % _slots.Length; _count--; + // Queued chunks were never handed to the consumer, so their rentals are + // safe to return here. if (chunk.Rental is not null) { - ArrayPool.Shared.Return(chunk.Rental); + _pool.Return(chunk.Rental); } } - if (_current.Rental is not null) - { - ArrayPool.Shared.Return(_current.Rental); - } - - _current = default; + // Do NOT return _current's rental here. Once _current is non-default its Memory + // has been published to the consumer (via the last ReadAsync result) and may + // still be read on another thread. On teardown/abort this lock can run while + // that read is in flight; returning the rental would recycle a buffer still in + // use and let a concurrent stream overwrite it (cross-stream, correct-length + // wrong-content corruption). Ownership stays with the consumer: its AdvanceTo + // returns the rental exactly once. If the consumer abandons it, the array is + // simply GC-reclaimed rather than returned to the pool — a rare, bounded miss. _head = 0; _tail = 0; _count = 0; diff --git a/src/TurboHTTP/Protocol/IServerStateMachine.cs b/src/TurboHTTP/Protocol/IServerStateMachine.cs index b118e10b..7ad6857a 100644 --- a/src/TurboHTTP/Protocol/IServerStateMachine.cs +++ b/src/TurboHTTP/Protocol/IServerStateMachine.cs @@ -10,6 +10,15 @@ internal interface IServerStateMachine bool ShouldPauseNetwork => false; int MaxQueuedRequests { get; } + /// + /// Maximum number of requests that may be dispatched to the application handler concurrently + /// on this connection. HTTP/1.x returns 1 so the connection stage serializes handler dispatch + /// and the shared (completion-ordered) ApplicationBridgeStage can never reorder responses + /// — RFC 9112 §9.3.2 requires pipelined responses in request order. Multiplexed protocols + /// (HTTP/2, HTTP/3) route responses by stream id and leave this unbounded. + /// + int MaxConcurrentRequests => int.MaxValue; + void PreStart(); void OnResponse(IFeatureCollection features); void DecodeClientData(ITransportInbound data); diff --git a/src/TurboHTTP/Protocol/ProtocolNegotiatingStateMachine.cs b/src/TurboHTTP/Protocol/ProtocolNegotiatingStateMachine.cs index b798fc51..249082ef 100644 --- a/src/TurboHTTP/Protocol/ProtocolNegotiatingStateMachine.cs +++ b/src/TurboHTTP/Protocol/ProtocolNegotiatingStateMachine.cs @@ -27,6 +27,11 @@ private enum Phase { WaitingForConnect, Sniffing, Running } public bool ShouldComplete => _phase == Phase.Running && _inner!.ShouldComplete; public int MaxQueuedRequests => _phase == Phase.Running ? _inner!.MaxQueuedRequests : 1; + // Forward the concurrency limit so a negotiated/sniffed HTTP/1.x connection still serializes + // handler dispatch (RFC 9112 §9.3.2). Until a protocol is chosen no request is dispatched, so + // the conservative default of 1 is safe. + public int MaxConcurrentRequests => _phase == Phase.Running ? _inner!.MaxConcurrentRequests : 1; + public ProtocolNegotiatingStateMachine(TurboServerOptions options, IServerStageOperations ops) { _options = options; diff --git a/src/TurboHTTP/Protocol/Syntax/Http10/Server/Http10ServerStateMachine.cs b/src/TurboHTTP/Protocol/Syntax/Http10/Server/Http10ServerStateMachine.cs index 918844c2..3456ef00 100644 --- a/src/TurboHTTP/Protocol/Syntax/Http10/Server/Http10ServerStateMachine.cs +++ b/src/TurboHTTP/Protocol/Syntax/Http10/Server/Http10ServerStateMachine.cs @@ -44,6 +44,9 @@ internal sealed class Http10ServerStateMachine : IServerStateMachine public int MaxQueuedRequests => 1; + // HTTP/1.0 dispatches one request per connection; mirror H1.1 so handler dispatch stays serial. + public int MaxConcurrentRequests => 1; + public Http10ServerStateMachine(Http1ConnectionOptions options, IServerStageOperations ops, TimeProvider? timeProvider = null) { diff --git a/src/TurboHTTP/Protocol/Syntax/Http11/Server/Http11ServerStateMachine.cs b/src/TurboHTTP/Protocol/Syntax/Http11/Server/Http11ServerStateMachine.cs index 252c3d17..a62f11d4 100644 --- a/src/TurboHTTP/Protocol/Syntax/Http11/Server/Http11ServerStateMachine.cs +++ b/src/TurboHTTP/Protocol/Syntax/Http11/Server/Http11ServerStateMachine.cs @@ -63,6 +63,11 @@ internal sealed class Http11ServerStateMachine : IServerStateMachine public bool ShouldPauseNetwork => _activeStreamingReader?.IsFull ?? false; public int MaxQueuedRequests { get; } + // HTTP/1.1 responses are matched to requests by position on the wire, so a pipelined request + // must not be dispatched to the handler until the previous response has been emitted + // (RFC 9112 §9.3.2). One-at-a-time dispatch keeps the shared bridge from reordering responses. + public int MaxConcurrentRequests => 1; + public Http11ServerStateMachine(Http1ConnectionOptions options, Http2ConnectionOptions h2UpgradeOptions, IServerStageOperations ops, TimeProvider? timeProvider = null) { diff --git a/src/TurboHTTP/Protocol/Syntax/Http2/Client/Http2ClientSessionManager.cs b/src/TurboHTTP/Protocol/Syntax/Http2/Client/Http2ClientSessionManager.cs index 63dcc36a..3e33310f 100644 --- a/src/TurboHTTP/Protocol/Syntax/Http2/Client/Http2ClientSessionManager.cs +++ b/src/TurboHTTP/Protocol/Syntax/Http2/Client/Http2ClientSessionManager.cs @@ -34,6 +34,14 @@ internal sealed class Http2ClientSessionManager private readonly Dictionary _streams = new(); private readonly Dictionary _activeBodyStreams = new(); private readonly Dictionary> _activeBodyBuffers = new(); + private readonly Dictionary _activeBodyReadCts = new(); + // Streams whose reusable drain buffer currently has a ReadAsync in flight. The buffer + // must not be returned to the pool until that read completes, or a concurrent stream + // could re-rent and overwrite it mid-read. + private readonly HashSet _drainReadInFlight = new(); + // Streams torn down while a drain read was in flight: buffer/cts disposal is deferred to + // the read-completion handler. + private readonly HashSet _drainBufferOrphaned = new(); private bool _prefaceSent; private bool _awaitingPingAck; @@ -436,6 +444,22 @@ public void Cleanup() CleanupBodyDrain(streamId); } + // The actor is being torn down; deferred read completions may never arrive (or this + // session manager may be reused across a reconnect with reused stream IDs). Drop all + // drain tracking so a stale completion cannot dispose a future stream's buffer. + // Buffers with a still-in-flight read are abandoned to the GC rather than returned to + // the pool — returning a buffer a read is still writing into is the corruption we fix. + foreach (var (_, cts) in _activeBodyReadCts) + { + cts.Dispose(); + } + + _activeBodyReadCts.Clear(); + _activeBodyBuffers.Clear(); + _activeBodyStreams.Clear(); + _drainReadInFlight.Clear(); + _drainBufferOrphaned.Clear(); + ReleaseAllStreamState(); } @@ -785,6 +809,16 @@ public void OnBodyMessage(object msg) break; case StreamBodyReadFailed failed: + _drainReadInFlight.Remove(failed.StreamId); + if (_drainBufferOrphaned.Remove(failed.StreamId)) + { + // The stream was already torn down while this read was in flight (the read + // failed because CleanupBodyDrain cancelled it). The buffer was kept alive + // for the read; release it now. The stream is gone — no RST/close needed. + DisposeDrainResources(failed.StreamId); + break; + } + Tracing.For("Protocol").Warning(this, "HTTP/2: Body drain failed for stream {0}: {1}", failed.StreamId, failed.Reason.Message); EmitFrame(new RstStreamFrame(failed.StreamId, Http2ErrorCode.InternalError)); @@ -796,6 +830,15 @@ public void OnBodyMessage(object msg) private void HandleStreamBodyRead(StreamBodyReadComplete read) { + _drainReadInFlight.Remove(read.StreamId); + if (_drainBufferOrphaned.Remove(read.StreamId)) + { + // The stream was torn down while this read was in flight; the buffer was kept + // alive for the read. Release it now and drop the result — the stream is gone. + DisposeDrainResources(read.StreamId); + return; + } + if (!_streams.TryGetValue(read.StreamId, out var state)) { CleanupBodyDrain(read.StreamId); @@ -929,6 +972,7 @@ private void StartStreamBodyDrain(int streamId, Stream bodyStream, long? content : maxSize; var buffer = MemoryPool.Shared.Rent(Math.Max(bufferSize, 256)); _activeBodyBuffers[streamId] = buffer; + _activeBodyReadCts[streamId] = new CancellationTokenSource(); ReadNextBodyChunk(streamId); } @@ -945,19 +989,51 @@ private void ReadNextBodyChunk(int streamId) state.IsBodyReadPending = true; } - stream.ReadAsync(buffer.Memory).AsTask().PipeTo( + // Mark the reusable buffer as having a read in flight so CleanupBodyDrain defers its + // disposal until the read completes (see Fix B comment in CleanupBodyDrain). + _drainReadInFlight.Add(streamId); + var token = _activeBodyReadCts.TryGetValue(streamId, out var cts) ? cts.Token : CancellationToken.None; + + stream.ReadAsync(buffer.Memory, token).AsTask().PipeTo( _ops.StageActor, success: bytesRead => new StreamBodyReadComplete(streamId, bytesRead), failure: ex => new StreamBodyReadFailed(streamId, ex)); } private void CleanupBodyDrain(int streamId) + { + _activeBodyStreams.Remove(streamId); + + if (_drainReadInFlight.Contains(streamId)) + { + // A ReadAsync into the reusable drain buffer is still in flight. Returning the + // buffer to the pool now would let a concurrent stream re-rent and overwrite it + // mid-read (cross-stream, correct-length/wrong-content corruption). Cancel the + // read and defer buffer/cts disposal to the read-completion handler. + _drainBufferOrphaned.Add(streamId); + if (_activeBodyReadCts.TryGetValue(streamId, out var pendingCts)) + { + pendingCts.Cancel(); + } + + return; + } + + DisposeDrainResources(streamId); + } + + private void DisposeDrainResources(int streamId) { if (_activeBodyBuffers.Remove(streamId, out var buffer)) { buffer.Dispose(); } - _activeBodyStreams.Remove(streamId); + if (_activeBodyReadCts.Remove(streamId, out var cts)) + { + cts.Dispose(); + } + + _drainBufferOrphaned.Remove(streamId); } } \ No newline at end of file diff --git a/src/TurboHTTP/Protocol/Syntax/Http2/Server/Http2ServerSessionManager.cs b/src/TurboHTTP/Protocol/Syntax/Http2/Server/Http2ServerSessionManager.cs index 7c352f5c..5ed62510 100644 --- a/src/TurboHTTP/Protocol/Syntax/Http2/Server/Http2ServerSessionManager.cs +++ b/src/TurboHTTP/Protocol/Syntax/Http2/Server/Http2ServerSessionManager.cs @@ -48,6 +48,14 @@ internal sealed class Http2ServerSessionManager private readonly Dictionary _activeBodyStreams = new(); private readonly Dictionary> _activeBodyBuffers = new(); + private readonly Dictionary _activeBodyReadCts = new(); + // Streams whose reusable response-drain buffer currently has an async ReadAsync in flight. + // The buffer must not be returned to the pool until that read completes, or a concurrent + // stream could re-rent and overwrite it mid-read (cross-stream, wrong-content corruption). + private readonly HashSet _drainReadInFlight = new(); + // Streams torn down while a drain read was in flight: disposal is deferred to the + // read-completion handler. + private readonly HashSet _drainBufferOrphaned = new(); private int _nextContinuationStreamId; private bool _continuationEndStream; @@ -347,6 +355,16 @@ public void OnBodyMessage(object msg) break; case StreamBodyReadFailed failed: + _drainReadInFlight.Remove(failed.StreamId); + if (_drainBufferOrphaned.Remove(failed.StreamId)) + { + // Stream already torn down while this read was in flight (the read failed + // because CleanupBodyDrain cancelled it). Release the deferred buffer; the + // stream is gone, so no RST is needed. + DisposeDrainResources(failed.StreamId); + break; + } + Tracing.For("Protocol").Warning(this, "HTTP/2: Response body drain failed for stream {0}: {1}", failed.StreamId, failed.Reason.Message); @@ -358,6 +376,15 @@ public void OnBodyMessage(object msg) private void HandleStreamBodyRead(StreamBodyReadComplete read) { + _drainReadInFlight.Remove(read.StreamId); + if (_drainBufferOrphaned.Remove(read.StreamId)) + { + // The stream was torn down while this read was in flight; the buffer was kept + // alive for the read. Release it now and drop the result — the stream is gone. + DisposeDrainResources(read.StreamId); + return; + } + if (!_streams.TryGetValue(read.StreamId, out var state)) { CleanupBodyDrain(read.StreamId); @@ -527,6 +554,29 @@ public void Cleanup() state.AbortBody(); } + // Release response-drain resources. A buffer with a still-in-flight read is abandoned + // to the GC rather than returned to the pool — returning a buffer a read is still + // writing into is exactly the cross-stream corruption we guard against elsewhere. + foreach (var (streamId, buffer) in _activeBodyBuffers) + { + if (!_drainReadInFlight.Contains(streamId)) + { + buffer.Dispose(); + } + } + + foreach (var (_, cts) in _activeBodyReadCts) + { + cts.Cancel(); + cts.Dispose(); + } + + _activeBodyBuffers.Clear(); + _activeBodyStreams.Clear(); + _activeBodyReadCts.Clear(); + _drainReadInFlight.Clear(); + _drainBufferOrphaned.Clear(); + _frameDecoder.Dispose(); foreach (var state in _streams.Values) @@ -972,6 +1022,7 @@ private void StartStreamBodyDrain(int streamId, Stream bodyStream, long? content : maxSize; var buffer = MemoryPool.Shared.Rent(Math.Max(bufferSize, 256)); _activeBodyBuffers[streamId] = buffer; + _activeBodyReadCts[streamId] = new CancellationTokenSource(); ReadNextBodyChunk(streamId); } @@ -988,13 +1039,19 @@ private void ReadNextBodyChunk(int streamId) state.IsBodyReadPending = true; } - var vt = stream.ReadAsync(buffer.Memory); + var token = _activeBodyReadCts.TryGetValue(streamId, out var cts) ? cts.Token : CancellationToken.None; + var vt = stream.ReadAsync(buffer.Memory, token); if (vt.IsCompletedSuccessfully) { + // Completed inline on the actor thread: the buffer was never exposed across a + // message boundary, so no in-flight tracking is needed. HandleStreamBodyRead(new StreamBodyReadComplete(streamId, vt.Result)); return; } + // The read is now genuinely in flight on another thread writing into the reusable + // buffer. Mark it so CleanupBodyDrain defers buffer disposal until it completes. + _drainReadInFlight.Add(streamId); vt.AsTask().PipeTo( _ops.StageActor, success: bytesRead => new StreamBodyReadComplete(streamId, bytesRead), @@ -1002,13 +1059,39 @@ private void ReadNextBodyChunk(int streamId) } private void CleanupBodyDrain(int streamId) + { + _activeBodyStreams.Remove(streamId); + + if (_drainReadInFlight.Contains(streamId)) + { + // A ReadAsync into the reusable drain buffer is still in flight. Returning the + // buffer to the pool now would let a concurrent stream re-rent and overwrite it + // mid-read. Cancel the read and defer buffer/cts disposal to its completion. + _drainBufferOrphaned.Add(streamId); + if (_activeBodyReadCts.TryGetValue(streamId, out var pendingCts)) + { + pendingCts.Cancel(); + } + + return; + } + + DisposeDrainResources(streamId); + } + + private void DisposeDrainResources(int streamId) { if (_activeBodyBuffers.Remove(streamId, out var buffer)) { buffer.Dispose(); } - _activeBodyStreams.Remove(streamId); + if (_activeBodyReadCts.Remove(streamId, out var cts)) + { + cts.Dispose(); + } + + _drainBufferOrphaned.Remove(streamId); } private void EmitFrame(Http2Frame frame) diff --git a/src/TurboHTTP/Server/Context/Features/TurboHttpResponseBodyFeature.cs b/src/TurboHTTP/Server/Context/Features/TurboHttpResponseBodyFeature.cs index 44b98a94..85eef78c 100644 --- a/src/TurboHTTP/Server/Context/Features/TurboHttpResponseBodyFeature.cs +++ b/src/TurboHTTP/Server/Context/Features/TurboHttpResponseBodyFeature.cs @@ -11,6 +11,10 @@ namespace TurboHTTP.Server.Context.Features; internal sealed class TurboHttpResponseBodyFeature : IHttpResponseBodyFeature { private Pipe? _pipe; + // UpgradeToPipe can be invoked from both the stage-actor thread (ApplicationBridgeStage) + // and the application/handler thread (first response write). Guard pipe creation so at + // most one Pipe is ever constructed — a true cross-thread boundary, hence the lock. + private readonly object _pipeLock = new(); private ArrayBufferWriter _bufferWriter = FeatureCollectionFactory.RentBuffer(); private ResponsePipeWriter _writer; private Stream? _stream; @@ -90,31 +94,45 @@ internal void UpgradeToPipe() return; } - Tracing.For("Stage").Debug(this, "response upgraded to pipe (buffered={0}, completed={1})", - _bufferWriter.WrittenCount, _writer.IsCompleted); + lock (_pipeLock) + { + // Double-checked: another thread may have created the pipe between the fast-path + // read above and acquiring the lock. + if (_pipe is not null) + { + return; + } - // The initial flush below is not awaited, so the pause threshold must exceed the - // already-buffered content or the pending FlushAsync would be silently discarded. - var buffered = _bufferWriter.WrittenCount; - _pipe = buffered < 64 * 1024 - ? new Pipe() - : new Pipe(new PipeOptions( - pauseWriterThreshold: buffered + 64 * 1024, - resumeWriterThreshold: buffered / 2)); + Tracing.For("Stage").Debug(this, "response upgraded to pipe (buffered={0}, completed={1})", + _bufferWriter.WrittenCount, _writer.IsCompleted); - if (buffered > 0) - { - var src = _bufferWriter.WrittenSpan; - var dest = _pipe.Writer.GetMemory(src.Length); - src.CopyTo(dest.Span); - _pipe.Writer.Advance(src.Length); - _pipe.Writer.FlushAsync(); - _bufferWriter.ResetWrittenCount(); - } + // The initial flush below is not awaited, so the pause threshold must exceed the + // already-buffered content or the pending FlushAsync would be silently discarded. + var buffered = _bufferWriter.WrittenCount; + var pipe = buffered < 64 * 1024 + ? new Pipe() + : new Pipe(new PipeOptions( + pauseWriterThreshold: buffered + 64 * 1024, + resumeWriterThreshold: buffered / 2)); - if (_writer.IsCompleted) - { - _pipe.Writer.Complete(); + if (buffered > 0) + { + var src = _bufferWriter.WrittenSpan; + var dest = pipe.Writer.GetMemory(src.Length); + src.CopyTo(dest.Span); + pipe.Writer.Advance(src.Length); + pipe.Writer.FlushAsync(); + _bufferWriter.ResetWrittenCount(); + } + + if (_writer.IsCompleted) + { + pipe.Writer.Complete(); + } + + // Publish the fully-initialized pipe last so concurrent readers never observe a + // partially-set-up instance. + _pipe = pipe; } } diff --git a/src/TurboHTTP/Streams/Stages/Server/HttpConnectionServerStageLogic.cs b/src/TurboHTTP/Streams/Stages/Server/HttpConnectionServerStageLogic.cs index 32daba60..94afcbb3 100644 --- a/src/TurboHTTP/Streams/Stages/Server/HttpConnectionServerStageLogic.cs +++ b/src/TurboHTTP/Streams/Stages/Server/HttpConnectionServerStageLogic.cs @@ -28,6 +28,13 @@ internal sealed class HttpConnectionServerStageLogic : TimerGraphStageLogic private readonly Queue _requestQueue = new(); private readonly Queue _outboundQueue = new(); private bool _completeAfterFlush; + + // Requests pushed to the handler (_outRequest) minus responses received on _inResponse. The + // stage refuses to dispatch more than the state machine's MaxConcurrentRequests concurrently; + // HTTP/1.x reports 1, which serializes pipelined dispatch so the shared, completion-ordered + // ApplicationBridgeStage can never reorder responses (RFC 9112 §9.3.2). Multiplexed protocols + // leave the limit unbounded, so this counter never gates them. + private int _handlerInFlight; private IActorRef _stageActor = ActorRefs.Nobody; private readonly IServiceProvider? _services; private TurboHttpConnectionFeature? _connectionFeature; @@ -88,7 +95,15 @@ public HttpConnectionServerStageLogic( { if (_requestQueue.Count > 0) { - Push(_outRequest, _requestQueue.Dequeue()); + if (CanDispatch) + { + Push(_outRequest, _requestQueue.Dequeue()); + _handlerInFlight++; + } + + // Otherwise the handler is busy: leave the demand outstanding so a completing + // response releases the next queued request via TryPushRequest. OnNetworkPush keeps + // reading the wire ahead independently, so requests still drain off the socket. return; } @@ -102,6 +117,11 @@ public HttpConnectionServerStageLogic( onPush: () => { var response = Grab(_inResponse); + if (_handlerInFlight > 0) + { + _handlerInFlight--; + } + try { _sm.OnResponse(response); @@ -134,6 +154,10 @@ public HttpConnectionServerStageLogic( FeatureCollectionFactory.Return(response); } + // A handler slot just freed: release the next pipelined request (a no-op for + // multiplexed protocols, whose queue is already drained) before pulling the + // following response. + TryPushRequest(); TryPullResponse(); }, onUpstreamFinish: () => @@ -449,11 +473,14 @@ void IServerStageOperations.OnResponseBodyComplete(IFeatureCollection features) FeatureCollectionFactory.Return(features); } + private bool CanDispatch => _handlerInFlight < _sm.MaxConcurrentRequests; + private void TryPushRequest() { - if (_requestQueue.Count > 0 && IsAvailable(_outRequest)) + if (_requestQueue.Count > 0 && IsAvailable(_outRequest) && CanDispatch) { Push(_outRequest, _requestQueue.Dequeue()); + _handlerInFlight++; } }