diff --git a/docs/.vitepress/config.ts b/docs/.vitepress/config.ts index 7e3cc2eb..5c898788 100644 --- a/docs/.vitepress/config.ts +++ b/docs/.vitepress/config.ts @@ -22,6 +22,7 @@ export default defineConfig({ nav: [ { text: 'Getting Started', link: '/getting-started/' }, + { text: 'When to Use', link: '/when-to-use' }, { text: 'Scenarios', link: '/scenarios' }, { text: 'Client', link: '/client/' }, { text: 'Server', link: '/server/' }, @@ -47,6 +48,16 @@ export default defineConfig({ text: 'Scenarios', items: [ { text: 'Overview', link: '/scenarios' }, + { text: 'When to Use TurboHTTP', link: '/when-to-use' }, + ], + }, + ], + '/when-to-use': [ + { + text: 'Scenarios', + items: [ + { text: 'Overview', link: '/scenarios' }, + { text: 'When to Use TurboHTTP', link: '/when-to-use' }, ], }, ], diff --git a/docs/when-to-use.md b/docs/when-to-use.md new file mode 100644 index 00000000..de96d47b --- /dev/null +++ b/docs/when-to-use.md @@ -0,0 +1,92 @@ +# When to Use TurboHTTP + +TurboHTTP is not a drop-in "faster HttpClient/Kestrel". It is an HTTP stack built on Akka.Streams +whose strengths are **streaming, backpressure, large payloads under concurrency, and actor +integration** — and whose trade-off is per-request overhead on tiny, latency-critical requests. +This page summarizes where each side of the stack wins, based on the benchmark suite +(BenchmarkDotNet, loopback, 2026-06). + +## TL;DR + +| Your workload | Recommendation | +|---|---| +| Many small GETs, lowest possible latency | HttpClient / Kestrel | +| Large request bodies (uploads) under concurrency | **TurboHTTP client** (H2/H3: up to 2–3.5× HttpClient) | +| Upload-heavy server endpoints (HTTP/1.1) | **TurboServer** (+10–34 % vs Kestrel) | +| Streaming, SSE, backpressure end-to-end | **TurboHTTP (both sides)** | +| Actor-based backends (Akka.NET) | **TurboServer** — shares your `ActorSystem` | +| Bulk request pipelines (fire thousands, drain results) | **TurboHTTP client channel API** | + +## As a Client + +### Where it wins + +- **Concurrent uploads over HTTP/2 and HTTP/3.** With many in-flight POSTs, the multiplexed + upload path clearly beats `SocketsHttpHandler`: at 512–4096 concurrent 10 KB uploads the + benchmark shows **+12 % to +58 % (H2)** and **+123 % to +247 % (H3)** throughput, with up to + **84 % fewer allocations** (H2, CL=4096). Tail latency follows: p99 is 40–70 % lower in these + scenarios. +- **HTTP/1.1 uploads at scale** run close to HttpClient (within ~30–40 % at high concurrency) + with bounded memory — the request body pump is backpressured against the socket instead of + buffering whole bodies. +- **Resilience built into the pipeline.** Retries, reconnect with request replay, redirects, + cookies, HTTP caching, and content encoding are stream stages, not handler wrappers — and all + of it is observable through permanent `Servus.Senf` tracing. +- **The channel API** (`client.Requests` / `client.Responses`) turns the client into a + backpressured pipeline: write thousands of requests, drain responses as they complete. Ideal + for crawlers, batch syncs, and fan-out jobs where aggregate throughput matters, not + per-request latency. + +### Where HttpClient is the better tool + +- **Single-request latency on light GETs.** A lone ~3 B GET costs ~150–160 µs vs HttpClient's + ~74 µs; light-GET fan-out at very high concurrency is also slower (H2/H3 light concurrent). +- **The channel API has a latency floor** (~1.3–1.6 ms per isolated request) from its + stream-materialization hops — it amortizes over bulk work, not single calls. + +## As a Server + +### Where it wins + +- **HTTP/1.1 upload endpoints.** 1 MB POSTs run **+10 % to +34 %** faster than Kestrel + (sequential and CL=1 concurrent; +10–20 % at CL=64/256 sequential). +- **HTTP/2 / HTTP/3 request handling at parity.** Plaintext/JSON/Fortunes sequential are within + ±5–15 % of Kestrel across protocols; several H2 concurrent scenarios (plaintext, JSON) are + ahead at p95/p99. +- **Streaming responses with real backpressure.** Return an Akka Streams `Source` (SSE, long + downloads) and flow control runs end-to-end — a slow client slows the producer instead of + growing a buffer. +- **Actor integration.** TurboServer reuses your `ActorSystem` from DI; HTTP connections and + domain actors share supervision, dispatchers, and tracing (see [Scenarios](/scenarios)). + +### Where Kestrel is the better tool + +- **Small-response throughput/latency records.** Plaintext/JSON-style endpoints are ~6–16 % + slower at p50 and allocate more per request (managed allocations are roughly 3–4× Kestrel's + 2.7 KB; native/pooled buffers excluded on both sides). +- **Very high fan-out on HTTP/3.** Light-request concurrency over QUIC currently trails Kestrel + significantly (-50 % to -74 %) — a known limitation of the shared pipeline, being worked on. + +## In Combination + +Running TurboHTTP on both ends pays off when the *pipeline* is the product: + +- **Service-to-service with large payloads.** TurboHTTP client → TurboServer keeps uploads + backpressured on both sides; neither end buffers whole bodies, so memory stays flat under + load spikes. +- **End-to-end streaming.** An Akka Streams `Source` on the server feeds an Akka Streams + consumer on the client — one flow-controlled graph across the network, including SSE. +- **Gateways and proxies.** Forward-proxy and CONNECT tunneling are supported; combined with + the channel API this makes backpressured relay/aggregation services straightforward. +- **One ActorSystem everywhere.** Client stages, server connections, and your domain actors + share dispatchers, supervision, and `Servus.Senf` tracing categories — a single operational + surface from socket to business logic. + +## Benchmark Context + +Numbers above come from the repo's benchmark suite (`TurboHTTP.Benchmarks`): localhost loopback, +BenchmarkDotNet, HTTP/1.1 + h2c cleartext, HTTP/3 with self-signed TLS, run 2026-06. Loopback +isolates protocol-stack overhead and exaggerates per-request costs relative to real networks — +over WAN latencies, the gaps on light requests shrink while the streaming/backpressure advantages +remain. Memory figures count managed allocations only. Re-run with +`dotnet run -c Release --project TurboHTTP.Benchmarks` to reproduce on your hardware. diff --git a/notes/Bugs/H1-response-rate-monitor-leak.md b/notes/Bugs/H1-response-rate-monitor-leak.md new file mode 100644 index 00000000..33940500 --- /dev/null +++ b/notes/Bugs/H1-response-rate-monitor-leak.md @@ -0,0 +1,75 @@ +--- +status: fixed +component: Protocol/Http11/Server +discovered: '2026-06-12' +fixed: '2026-06-12' +branch: release-next +severity: critical +tags: + - bug + - http11 + - http10 + - server + - data-rate + - connection-reset + - fixed +--- +# H1.x Server Killed Idle Keep-Alive Connections (Response-Rate Entry Leak) — FIXED (2026-06-12) + +## Symptom + +The four zero-result rows in the 2026-06-12 server benchmark report (H1.1 +Plaintext/Fortunes/Upload_Concurrent): BenchmarkDotNet cases failed with +`SocketException 10054: connection forcibly closed by the remote host`. GET benchmarks +mostly survived because SocketsHttpHandler silently retries requests that die on a reused +connection before the response starts (BDN still measured 0.77 first-chance exceptions per +op on Plaintext CL=256!); POST uploads are not retryable → the whole benchmark case errored. + +## Root cause + +`Http11ServerStateMachine.EmitBufferedBody` — the standard path for **buffered** response +bodies (i.e. virtually every normal MapGet/MapPost response) — called +`_responseRate.Observe(...)` per chunk but never `_responseRate.Remove(0)` on completion. +Only the *streaming* response path removed the entry. The stale entry's measured rate +decayed toward 0 B/s; once the connection sat idle on keep-alive longer than +`MinResponseDataRateGracePeriod` (default 5 s), the periodic `data-rate-check` timer flagged +a "violation" and set `ShouldComplete` → the server reset a perfectly healthy connection. + +Trace signature: `data rate violation (reqRate=0, respRate=1, paused=False)` at Warning level. + +Same leak in `Http10ServerStateMachine.HandleResponseBodyRead` (streaming completion, +`bytesRead == 0` branch) — relevant for `Connection: keep-alive` H1.0 clients. +H2/H3 are unaffected (per-stream entries removed in `CloseStream`). + +## Why it was intermittent + +Under continuous tight-loop load the Observe calls keep refreshing the rate, so no violation. +BDN's pauses between iterations (and HttpClient's >64 pooled connections rotating in and out +of use) created exactly the idle-past-grace windows. A 1000-round tight-loop repro stayed +green while the same scenario under BDN failed — first-chance exception counting + +Senf Warning tracing exposed it. + +## The fix + +- `EmitBufferedBody`: `_responseRate.Remove(0)` after `writer.CompleteAsync()`. +- `Http10ServerStateMachine`: same removal in the streaming-completion branch. + +## Tests + +- `Http11DataRateSpec.Buffered_response_completion_should_not_flag_idle_keepalive_connection` + (FakeTimeProvider, buffered body via `TurboHttpResponseBodyFeature.Writer`, idle 10 s, two + data-rate-check fires → must NOT set ShouldComplete; failed pre-fix). +- `Http10DataRateSpec.Completed_streaming_response_should_not_flag_idle_connection`. + +## Verification + +- Repro (HttpClient → TurboServer, 500×64 concurrent 1 MB uploads): pre-fix 13 connection + resets + violation traces; post-fix 0/0 across 32 000 uploads. +- BDN `Upload_Concurrent` H1.1 CL=64/256: was NA (errored), post-fix produces results. + +## Lesson + +Rate-monitor entries are per-response state with mandatory removal on every completion path — +buffered, streamed, and failed. When BDN shows a 0/NA row, check the per-case log for +first-chance exceptions; HttpClient retry semantics can hide server connection-kills on GETs +while only POST benchmarks fail. diff --git a/notes/Bugs/H1.1-client-body-pump-backpressure.md b/notes/Bugs/H1.1-client-body-pump-backpressure.md new file mode 100644 index 00000000..07b62b15 --- /dev/null +++ b/notes/Bugs/H1.1-client-body-pump-backpressure.md @@ -0,0 +1,65 @@ +--- +status: fixed +component: Protocol/Http11/Client +discovered: '2026-06-12' +fixed: '2026-06-12' +branch: release-next +severity: high +tags: + - bug + - http11 + - client + - backpressure + - memory + - fixed +--- +# H1.1 Client Body Pump Had No Backpressure — FIXED (2026-06-12) + +## Symptom + +Benchmark: TurboHTTP client allocated **~1 MB per 1 MB POST at CL=512** over HTTP/1.1 +(534 MB total vs HttpClient's 35 MB) and trailed HttpClient by 63% in throughput, while +single-request heavy was fine (93 KB/op). Repro (in-process Kestrel + real client): + +| | no backpressure | HWM=2 (fix) | +|---|---|---| +| CL=1 | 770 K B/op, 104 req/s | 660 K B/op, 164 req/s | +| CL=64 | 789 K B/op, 487 req/s | 287 K B/op, 1 016 req/s | +| CL=512 | 1 001 K B/op, 738 req/s | 173 K B/op, 1 262 req/s | + +## Root cause + +`Http11ClientStateMachine.HandleBodyRead` pumped the next chunk immediately after +`FlushAsync()` with no regard for whether the network had consumed the previous one. +For memory-backed content the entire request body was copied into pooled 16–64 KB +chunks instantly and parked in the connection stage's outbound queue. Under N concurrent +uploads the aggregate rented working set (N × body size) far exceeded `ArrayPool` +capacity, so nearly every body byte became a fresh allocation — and the resulting GC +pressure also throttled throughput. + +## The fix + +High-water mark in `Http11ClientStateMachine`: count body chunks emitted but not yet +flushed (`_unflushedBodyChunks`, HWM = 2). At the HWM the pump pauses instead of issuing +the next read; the already-existing `IClientStateMachine.OnOutboundFlushed()` callback +(invoked by `HttpConnectionStageLogic` on every push to the network out-port, previously +ignored by H1.1) decrements and resumes. Counter incremented *before* `FlushAsync` because +a free port flushes synchronously and re-enters `OnOutboundFlushed`. State reset in +`StartBodyDrain`, `BodyReadFailed`, and `Cleanup`. + +HTTP/1.0 client is unaffected (it buffers the whole body into a single +`BufferedBodyWriter` by design — no chunked framing in 1.0). + +## Tests + +- `TurboHTTP.Tests/Protocol/Syntax/Http11/Client/Http11ClientBodyBackpressureSpec.cs` — + counting body stream proves the pump stops issuing reads after the HWM without flush + signals (pre-fix: all 64 chunks pumped) and resumes on `OnOutboundFlushed` to complete + the full 1 MB body intact. + +## Follow-up ideas (not done) + +- Zero-copy fast path for visible-buffer `MemoryStream` content (mirror + `Http2ClientSessionManager` fast path A) would eliminate the remaining per-chunk copy. +- The server response-body path has its own pause/resume; H2/H3 bound in-flight data via + flow-control windows — only H1.1 client lacked a bound. diff --git a/notes/Bugs/H3-frame-buffer-leak.md b/notes/Bugs/H3-frame-buffer-leak.md new file mode 100644 index 00000000..ff714326 --- /dev/null +++ b/notes/Bugs/H3-frame-buffer-leak.md @@ -0,0 +1,61 @@ +--- +status: fixed +component: Protocol/Http3 +discovered: '2026-06-12' +fixed: '2026-06-12' +branch: release-next +severity: high +tags: + - bug + - http3 + - memory + - pooling + - fixed +--- +# H3 Inbound Frame Buffer Leak — FIXED (2026-06-12) + +## Symptom + +Benchmark: TurboServer allocated **1.27 MB managed memory per 1 MB HTTP/3 upload** (Kestrel: 87 KB). +Client side showed the same pathology for H3 response bodies. Unit repro measured 1.04 bytes +allocated per body byte at steady state. + +## Root cause + +`FrameDecoder.DecodeDataFrame`/`DecodeHeadersFrame` (`Protocol/Syntax/Http3/FrameDecoder.cs`) +copy each frame payload into a `MemoryPool.Shared` rental owned by the frame +(`DataFrame`/`HeadersFrame` implement `IDisposable`). Neither consumer ever disposed the frames: + +- Server: `Http3ServerSessionManager.ProcessFrames` — handled frames in a `foreach`/`switch`, no dispose. +- Client: `Http3ClientStateMachine.ProcessFrameData` — same. + +Rentals were never returned → the array pool drained permanently → every subsequent +`Rent` allocated a fresh array → allocations ≈ full body size per request/response. + +## The fix + +1. Both frame loops dispose each frame in a per-frame `finally` after handling + (handling copies what it keeps: body bytes via `QueuedBodyReader.TryEnqueue`, + header strings via QPACK decode). +2. **Prerequisite**: `QpackTableSync.TryDecodeOrBlock` used to retain the *caller's* + `ReadOnlyMemory` (aliasing the frame's pooled rental) in `_blockedStreams` — + disposal would have corrupted blocked header blocks on pool reuse. It now stores an + owned copy (`data.ToArray()`; blocked streams are rare and small). + +## Tests + +- `TurboHTTP.Tests/Protocol/Syntax/Http3/Server/SessionManager/Http3DataFrameBufferReleaseSpec.cs` + — allocation-budget spec (thread-local `GC.GetAllocatedBytesForCurrentThread`, warmup + steady + state, asserts < ¼ body size) + body round-trip integrity. Pre-fix: 4.35 MB for 4.19 MB body. +- `TurboHTTP.Tests/Protocol/Syntax/Http3/Client/StateMachine/Http3ResponseFrameBufferReleaseSpec.cs` + — client-side equivalent. Pre-fix: 2.15 MB for 2.10 MB body. +- `TurboHTTP.Tests/Protocol/Syntax/Http3/Qpack/QpackBlockedStreamBufferOwnershipSpec.cs` + — blocked header block must survive caller-buffer scribble. + +## Lesson + +Pooled-rent + copy paths must have an explicit owner with a deterministic dispose point. +The same audit found the H1.1 client pump issue (see [[H1.1-client-body-pump-backpressure]]). +A remaining (separate, optimization-level) issue: H3 still double-copies DATA payloads +(FrameDecoder rental → QueuedBodyReader rental); H2 avoids the first copy by slicing its +working buffer. Aligning H3 with H2 would cut another ~50% of transient copy traffic. diff --git a/notes/Bugs/SendAsync-options-race.md b/notes/Bugs/SendAsync-options-race.md new file mode 100644 index 00000000..88e1e5ee --- /dev/null +++ b/notes/Bugs/SendAsync-options-race.md @@ -0,0 +1,51 @@ +--- +status: fixed +component: Client +discovered: '2026-06-12' +fixed: '2026-06-12' +branch: release-next +severity: high +tags: + - bug + - client + - race-condition + - fixed +--- +# SendAsync Mutated request.Options After Enqueue (Dictionary Corruption Race) — FIXED (2026-06-12) + +## Symptom + +Full benchmark run, `KestrelTurboSendAsyncConcurrentBenchmarks` H2 CL=4096: benchmark child +process crashed (exit -1) with + +``` +InvalidOperationException: Operations that change non-concurrent collections must have +exclusive access. ... at RequestEnricher.Enrich (line 88, Options.TryGetValue) +→ MergeHub ProducerFailed → consumer ingress dies +``` + +## Root cause + +`TurboHttpClient.SendAsync` wrote the request into the channel (`Requests.WriteAsync`) and +**afterwards** called `request.SetCancellationToken(cts.Token)` → `request.Options.Set(...)` +on the caller thread. From the moment the request is enqueued, the pipeline's +`RequestEnricher.Enrich` reads and mutates the same `HttpRequestOptions` (a plain +`Dictionary`) on a MergeHub stream thread. Two unsynchronized writers → +dictionary state corruption under high concurrency (CL=4096 reliably hit the window). + +## The fix + +Reordered `SendAsync`: the CTS is created (linked / pooled / fresh) and the cancellation +token stamped into `request.Options` **before** `Requests.WriteAsync`. `CancelAfter` and the +`UnsafeRegister` callback still happen after the write (they don't touch Options). Cleanup +(TryReset/pool/dispose) moved to the outer finally so the early-created CTS is also released +when the channel write throws. + +**Invariant to preserve:** nothing may touch `request.Options` after the channel write — +the options dictionary is single-owner until enqueue, then owned by the stream side. + +## Tests + +- `TurboHttpClientSpec.SendAsync_should_set_cancellation_token_before_enqueueing` — a + capturing `ChannelWriter` asserts the token is present at the moment of `TryWrite` + (deterministic; failed pre-fix). diff --git a/src/TurboHTTP.Tests.Shared/FakeServerOps.cs b/src/TurboHTTP.Tests.Shared/FakeServerOps.cs index 0f2c2000..dc9b8c12 100644 --- a/src/TurboHTTP.Tests.Shared/FakeServerOps.cs +++ b/src/TurboHTTP.Tests.Shared/FakeServerOps.cs @@ -15,11 +15,15 @@ internal sealed class FakeServerOps : IServerStageOperations public List<(string Name, TimeSpan Delay)> ScheduledTimers { get; } = []; public List CancelledTimers { get; } = []; + /// Every OnScheduleTimer call in order, without the de-duplication applied to . + public List<(string Name, TimeSpan Delay)> ScheduleTimerCalls { get; } = []; + public void OnRequest(IFeatureCollection features) => Requests.Add(features); public void OnOutbound(ITransportOutbound item) => Outbound.Add(item); public void OnScheduleTimer(string name, TimeSpan delay) { + ScheduleTimerCalls.Add((name, delay)); ScheduledTimers.RemoveAll(t => t.Name == name); ScheduledTimers.Add((name, delay)); } diff --git a/src/TurboHTTP.Tests/Client/TurboHttpClientSpec.cs b/src/TurboHTTP.Tests/Client/TurboHttpClientSpec.cs index 8f7e8998..db8cac4a 100644 --- a/src/TurboHTTP.Tests/Client/TurboHttpClientSpec.cs +++ b/src/TurboHTTP.Tests/Client/TurboHttpClientSpec.cs @@ -102,6 +102,63 @@ public async Task SendAsync_should_set_pending_request_tcs_on_request() Assert.NotNull(result); } + /// + /// Once the request is written to the channel, the pipeline's enricher reads and + /// mutates request.Options on a stream thread. SendAsync must therefore not + /// touch the (non-thread-safe) options dictionary after the write — the cancellation + /// token has to be set before enqueueing. Regression for a Dictionary corruption + /// observed at CL=4096 ("Operations that change non-concurrent collections must have + /// exclusive access" in RequestEnricher.Enrich). + /// + [Fact(Timeout = 5000)] + public async Task SendAsync_should_set_cancellation_token_before_enqueueing() + { + var inner = Channel.CreateUnbounded(); + var capturing = new TokenCapturingWriter(inner.Writer); + var responses = Channel.CreateUnbounded(); + + var registration = new NamedClientConsumerRegistration(ActorRefs.Nobody, "test", Guid.NewGuid()); + var options = new TurboRequestOptions( + BaseAddress: null, + DefaultRequestHeaders: new HttpRequestMessage().Headers, + DefaultRequestVersion: HttpVersion.Version11, + DefaultVersionPolicy: HttpVersionPolicy.RequestVersionOrLower, + Timeout: TimeSpan.FromSeconds(30), + Credentials: null, + PreAuthenticate: false); + + using var client = (TurboHttpClient)TurboHttpClientCtor.Invoke( + [capturing, responses.Reader, options, registration]); + + var request = new HttpRequestMessage(HttpMethod.Get, "http://example.com/"); + var sendTask = client.SendAsync(request, TestContext.Current.CancellationToken); + + var observed = await inner.Reader.ReadAsync(TestContext.Current.CancellationToken); + Assert.True(capturing.TokenPresentAtWrite, + "SendAsync mutated request.Options after writing the request to the channel — " + + "this races with the pipeline's RequestEnricher on the stream thread."); + + Assert.True(observed.Options.TryGetValue(OptionsKey.Key, out var pending)); + Assert.True(observed.Options.TryGetValue(OptionsKey.VersionKey, out var ver)); + pending.TrySetResult(new HttpResponseMessage(HttpStatusCode.OK) { RequestMessage = observed }, ver); + await sendTask; + } + + private sealed class TokenCapturingWriter(ChannelWriter inner) + : ChannelWriter + { + public bool TokenPresentAtWrite { get; private set; } + + public override bool TryWrite(HttpRequestMessage item) + { + TokenPresentAtWrite = item.Options.TryGetValue(OptionsKey.CancellationTokenKey, out _); + return inner.TryWrite(item); + } + + public override ValueTask WaitToWriteAsync(CancellationToken cancellationToken = default) + => inner.WaitToWriteAsync(cancellationToken); + } + [Fact(Timeout = 5000)] public async Task SendAsync_should_return_response_when_tcs_is_completed() { diff --git a/src/TurboHTTP.Tests/Protocol/Syntax/Http10/Server/Http10DataRateSpec.cs b/src/TurboHTTP.Tests/Protocol/Syntax/Http10/Server/Http10DataRateSpec.cs index 4a8dc561..3f08bd9b 100644 --- a/src/TurboHTTP.Tests/Protocol/Syntax/Http10/Server/Http10DataRateSpec.cs +++ b/src/TurboHTTP.Tests/Protocol/Syntax/Http10/Server/Http10DataRateSpec.cs @@ -164,6 +164,32 @@ public void Response_completion_should_remove_rate_tracking() Assert.False(sm.ShouldComplete); } + [Fact(Timeout = 5000)] + public void Completed_streaming_response_should_not_flag_idle_connection() + { + // Regression: streaming completion (bytesRead == 0) left the response-rate entry in + // place; the stale entry decayed to 0 B/s and the next data-rate-check after the + // grace period flagged the healthy idle connection as a violation. + var options = CreateOptionsWithResponseRate(1000, TimeSpan.FromSeconds(1)); + var clock = new FakeTimeProvider(); + var ops = new FakeServerOps(); + var sm = new Http10ServerStateMachine(options, ops, clock); + + var requestData = "GET / HTTP/1.0\r\nHost: localhost\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n"; + sm.DecodeClientData(TransportData.Rent(MakeBuffer(requestData))); + + sm.OnBodyMessage(new ResponseBodyReadComplete(10)); + sm.OnBodyMessage(new ResponseBodyReadComplete(0)); + + clock.Advance(TimeSpan.FromMilliseconds(600)); + sm.OnTimerFired("data-rate-check"); + clock.Advance(TimeSpan.FromSeconds(10)); + sm.OnTimerFired("data-rate-check"); + + Assert.False(sm.ShouldComplete, + "Idle connection was flagged as a data-rate violation after the response completed."); + } + [Fact(Timeout = 5000)] public void Slow_response_body_violation_sets_should_complete_with_injected_clock() { diff --git a/src/TurboHTTP.Tests/Protocol/Syntax/Http11/Client/Http11ClientBodyBackpressureSpec.cs b/src/TurboHTTP.Tests/Protocol/Syntax/Http11/Client/Http11ClientBodyBackpressureSpec.cs new file mode 100644 index 00000000..ce586e37 --- /dev/null +++ b/src/TurboHTTP.Tests/Protocol/Syntax/Http11/Client/Http11ClientBodyBackpressureSpec.cs @@ -0,0 +1,121 @@ +using Servus.Akka.Transport; +using TurboHTTP.Client; +using TurboHTTP.Protocol.Syntax.Http11.Client; +using TurboHTTP.Tests.Shared; + +namespace TurboHTTP.Tests.Protocol.Syntax.Http11.Client; + +/// +/// Regression spec for the H1.1 client request body pump: without a high-water mark the +/// pump copied the entire body into pooled chunks ahead of the socket, so concurrent +/// uploads queued their full bodies in rentals (benchmark: ~1 MB allocated per 1 MB POST +/// at CL=512 vs 93 KB at CL=1). The pump must pause once chunks pile up unflushed and +/// resume on OnOutboundFlushed. +/// +public sealed class Http11ClientBodyBackpressureSpec +{ + private const int ChunkSize = 16 * 1024; + private const int BodySize = 1024 * 1024; + + /// Body stream that records how many reads the pump has issued. + private sealed class CountingStream(int length) : Stream + { + private int _position; + + public int ReadsIssued { get; private set; } + public int LastReadSize { get; private set; } + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + ReadsIssued++; + var n = Math.Min(buffer.Length, length - _position); + buffer.Span[..n].Fill(0x42); + _position += n; + LastReadSize = n; + return ValueTask.FromResult(n); + } + + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => false; + public override long Length => length; + public override long Position { get => _position; set => throw new NotSupportedException(); } + public override void Flush() { } + public override int Read(byte[] buffer, int offset, int count) + => ReadAsync(buffer.AsMemory(offset, count)).Result; + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + public override void SetLength(long value) => throw new NotSupportedException(); + public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + } + + private static (Http11ClientStateMachine Sm, FakeClientOps Ops, CountingStream Body) CreatePostedRequest() + { + var ops = new FakeClientOps(); + var sm = new Http11ClientStateMachine(ops, new TurboClientOptions + { + RequestBodyChunkSize = ChunkSize, + }); + sm.PreStart(); + + var body = new CountingStream(BodySize); + var content = new StreamContent(body); + content.Headers.ContentLength = BodySize; + var request = new HttpRequestMessage(HttpMethod.Post, "http://example.com/upload") + { + Version = new Version(1, 1), + Content = content, + }; + + sm.OnRequest(request); + return (sm, ops, body); + } + + /// Feeds completed reads back into the SM, mimicking the PipeTo message loop. + private static void PumpCompletedReads(Http11ClientStateMachine sm, CountingStream body, ref int fed) + { + while (body.ReadsIssued > fed) + { + fed++; + sm.OnBodyMessage(new Http11ClientStateMachine.BodyReadComplete(body.LastReadSize)); + } + } + + [Fact(Timeout = 5000)] + public void Body_pump_should_pause_when_outbound_is_not_flushed() + { + var (sm, ops, body) = CreatePostedRequest(); + + // Drive the read/complete loop without ever signalling a flush. The pump must + // stop issuing reads after a bounded number of unflushed chunks instead of + // copying the whole 1 MB body into pooled buffers. + var fed = 0; + PumpCompletedReads(sm, body, ref fed); + + var bodyChunks = ops.Outbound.OfType().Count() - 1; + Assert.True(bodyChunks < 8, + $"Pump emitted {bodyChunks} chunks ({BodySize / ChunkSize} total) without any flush signal — no backpressure."); + Assert.True(body.ReadsIssued < 8, + $"Pump issued {body.ReadsIssued} reads without any flush signal — no backpressure."); + } + + [Fact(Timeout = 5000)] + public void Body_pump_should_resume_on_flush_and_complete_body() + { + var (sm, ops, body) = CreatePostedRequest(); + + var fed = 0; + PumpCompletedReads(sm, body, ref fed); + + // Alternate flush signals and read completions until the body is fully sent. + var guard = 0; + while (fed <= BodySize / ChunkSize && guard++ < 10 * BodySize / ChunkSize) + { + sm.OnOutboundFlushed(); + PumpCompletedReads(sm, body, ref fed); + } + + var totalBodyBytes = ops.Outbound.OfType().Skip(1).Sum(d => (long)d.Buffer.Length); + Assert.Equal(BodySize, totalBodyBytes); + Assert.True(sm.CanAcceptRequest, "Request should be dispatchable again after body completion."); + } +} diff --git a/src/TurboHTTP.Tests/Protocol/Syntax/Http11/Server/Http11DataRateSpec.cs b/src/TurboHTTP.Tests/Protocol/Syntax/Http11/Server/Http11DataRateSpec.cs index 97bce107..ae40f087 100644 --- a/src/TurboHTTP.Tests/Protocol/Syntax/Http11/Server/Http11DataRateSpec.cs +++ b/src/TurboHTTP.Tests/Protocol/Syntax/Http11/Server/Http11DataRateSpec.cs @@ -188,6 +188,73 @@ public async Task Response_completion_should_remove_rate_tracking() Assert.False(sm.ShouldComplete); } + [Fact(Timeout = 5000)] + public void Rate_timer_should_be_scheduled_once_until_it_fires() + { + // EnsureRateTimer used to re-schedule the Akka timer on every observed chunk — + // one redundant scheduler call per response chunk on the hot path. + var options = CreateOptionsWithResponseRate(240, TimeSpan.FromSeconds(5)); + var ops = new FakeServerOps(); + var sm = new Http11ServerStateMachine(options, new TurboServerOptions().ToHttp2Options(), ops); + + const string requestData = "GET / HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n"; + sm.DecodeClientData(TransportData.Rent(MakeBuffer(requestData))); + + var context = CreateResponseContext(); + sm.OnResponse(context); + + // Stream several response chunks; each one observes bytes in the rate monitor. + for (var i = 0; i < 5; i++) + { + sm.OnBodyMessage(new ResponseBodyReadComplete(1024)); + } + + Assert.Equal(1, ops.ScheduleTimerCalls.Count(t => t.Name == "data-rate-check")); + + // Once the timer fires it may be re-armed (entries still active) — exactly once. + sm.OnTimerFired("data-rate-check"); + sm.OnBodyMessage(new ResponseBodyReadComplete(1024)); + sm.OnBodyMessage(new ResponseBodyReadComplete(1024)); + + Assert.Equal(2, ops.ScheduleTimerCalls.Count(t => t.Name == "data-rate-check")); + } + + [Fact(Timeout = 5000)] + public void Buffered_response_completion_should_not_flag_idle_keepalive_connection() + { + // Regression: EmitBufferedBody observed response bytes in the rate monitor but never + // removed the entry on completion. The stale entry decayed to 0 B/s and the next + // data-rate-check after the grace period killed the healthy idle keep-alive connection + // (benchmark: HttpClient got "connection forcibly closed" on concurrent H1.1 uploads). + var options = CreateOptionsWithResponseRate(240, TimeSpan.FromSeconds(1)); + var clock = new FakeTimeProvider(); + var ops = new FakeServerOps(); + var sm = new Http11ServerStateMachine(options, new TurboServerOptions().ToHttp2Options(), ops, clock); + + const string requestData = "GET / HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n"; + sm.DecodeClientData(TransportData.Rent(MakeBuffer(requestData))); + + // Buffered response body: written into the feature before OnResponse, emitted + // synchronously via EmitBufferedBody (the standard path for normal responses). + var context = CreateResponseContext(); + var bodyFeature = (TurboHttpResponseBodyFeature)context.Get()!; + var span = bodyFeature.Writer.GetSpan(64); + span[..64].Fill(0x41); + bodyFeature.Writer.Advance(64); + bodyFeature.Writer.Complete(); + sm.OnResponse(context); + + // Connection sits idle on keep-alive well past the grace period; the periodic + // check fires repeatedly (first below-rate check starts the grace window). + clock.Advance(TimeSpan.FromMilliseconds(600)); + sm.OnTimerFired("data-rate-check"); + clock.Advance(TimeSpan.FromSeconds(10)); + sm.OnTimerFired("data-rate-check"); + + Assert.False(sm.ShouldComplete, + "Idle keep-alive connection was flagged as a data-rate violation after a buffered response completed."); + } + [Fact(Timeout = 5000)] public void Slow_response_body_violation_sets_should_complete_with_injected_clock() { diff --git a/src/TurboHTTP.Tests/Protocol/Syntax/Http2/Server/Streaming/Http2ServerOutboundFrameSplittingSpec.cs b/src/TurboHTTP.Tests/Protocol/Syntax/Http2/Server/Streaming/Http2ServerOutboundFrameSplittingSpec.cs index 6c9a601b..efa0aee7 100644 --- a/src/TurboHTTP.Tests/Protocol/Syntax/Http2/Server/Streaming/Http2ServerOutboundFrameSplittingSpec.cs +++ b/src/TurboHTTP.Tests/Protocol/Syntax/Http2/Server/Streaming/Http2ServerOutboundFrameSplittingSpec.cs @@ -171,6 +171,9 @@ private static IFeatureCollection SendGetAndWriteBufferedBody( var mem = writer.GetMemory(bodySize); body.CopyTo(mem); writer.Advance(bodySize); + // The bridge always completes the body before emitting the response; the buffered + // fast path only serves completed bodies. + writer.Complete(); return features; } diff --git a/src/TurboHTTP.Tests/Protocol/Syntax/Http3/Client/StateMachine/Http3ResponseFrameBufferReleaseSpec.cs b/src/TurboHTTP.Tests/Protocol/Syntax/Http3/Client/StateMachine/Http3ResponseFrameBufferReleaseSpec.cs new file mode 100644 index 00000000..5e269544 --- /dev/null +++ b/src/TurboHTTP.Tests/Protocol/Syntax/Http3/Client/StateMachine/Http3ResponseFrameBufferReleaseSpec.cs @@ -0,0 +1,103 @@ +using Servus.Akka.Transport; +using TurboHTTP.Client; +using TurboHTTP.Protocol.Syntax.Http3; +using TurboHTTP.Protocol.Syntax.Http3.Client; +using TurboHTTP.Protocol.Syntax.Http3.Qpack; +using TurboHTTP.Tests.Shared; + +namespace TurboHTTP.Tests.Protocol.Syntax.Http3.Client.StateMachine; + +/// +/// Regression spec for the HTTP/3 inbound frame buffer leak on the client: decoded +/// HEADERS/DATA frames own a MemoryPool rental that must be disposed after response +/// assembly, otherwise the pool drains and every response body allocates fresh arrays. +/// +public sealed class Http3ResponseFrameBufferReleaseSpec +{ + private const int ChunkSize = 16 * 1024; + private const int ChunksPerResponse = 8; + private const int BodySize = ChunkSize * ChunksPerResponse; + + private readonly FakeClientOps _clientOps = new(); + private readonly QpackTableSync _tableSync = new(0, 4 * 1024, 100, null); + + private Http3ClientStateMachine CreateMachine() + => new(new TurboClientOptions(), _clientOps); + + private TransportBuffer BuildHeadersBuffer() + { + var headersFrame = new HeadersFrame(_tableSync.Encoder.Encode([(":status", "200")])); + var buf = TransportBuffer.Rent(headersFrame.SerializedSize); + var span = buf.FullMemory.Span; + headersFrame.WriteTo(ref span); + buf.Length = headersFrame.SerializedSize; + return buf; + } + + private static TransportBuffer BuildDataBuffer(ReadOnlyMemory chunk) + { + var dataFrame = new DataFrame(chunk); + var buf = TransportBuffer.Rent(dataFrame.SerializedSize); + var span = buf.FullMemory.Span; + dataFrame.WriteTo(ref span); + buf.Length = dataFrame.SerializedSize; + return buf; + } + + private long ReceiveResponseAndDrainBody( + Http3ClientStateMachine sm, long streamId, byte[] chunk, byte[] drainBuffer) + { + sm.DecodeServerData(new MultiplexedData(BuildHeadersBuffer(), streamId)); + for (var i = 0; i < ChunksPerResponse; i++) + { + sm.DecodeServerData(new MultiplexedData(BuildDataBuffer(chunk), streamId)); + } + sm.DecodeServerData(new StreamReadCompleted(streamId)); + + var response = _clientOps.Responses[^1]; + var body = response.Content.ReadAsStream(TestContext.Current.CancellationToken); + + long total = 0; + int read; + while ((read = body.Read(drainBuffer)) > 0) + { + total += read; + } + + return total; + } + + [Fact(Timeout = 15000)] + [Trait("RFC", "RFC9114-7.2.1")] + public void Repeated_responses_should_not_allocate_proportional_to_body_size() + { + var sm = CreateMachine(); + var chunk = new byte[ChunkSize]; + Array.Fill(chunk, (byte)0x3E); + var drainBuffer = new byte[64 * 1024]; + long streamId = 0; + + // Warm up pools so the measured phase reflects steady state. + for (var i = 0; i < 3; i++) + { + var total = ReceiveResponseAndDrainBody(sm, streamId, chunk, drainBuffer); + Assert.Equal(BodySize, total); + streamId += 4; + } + + const int measuredResponses = 16; + const long totalBodyBytes = (long)measuredResponses * BodySize; + + var before = GC.GetAllocatedBytesForCurrentThread(); + for (var i = 0; i < measuredResponses; i++) + { + ReceiveResponseAndDrainBody(sm, streamId, chunk, drainBuffer); + streamId += 4; + } + var allocated = GC.GetAllocatedBytesForCurrentThread() - before; + + Assert.True(allocated < totalBodyBytes / 16, + $"Allocated {allocated:N0} bytes for {totalBodyBytes:N0} bytes of response body — " + + "frame pool rentals are not being returned."); + } +} diff --git a/src/TurboHTTP.Tests/Protocol/Syntax/Http3/Frames/Http3FrameDecoderZeroCopySpec.cs b/src/TurboHTTP.Tests/Protocol/Syntax/Http3/Frames/Http3FrameDecoderZeroCopySpec.cs new file mode 100644 index 00000000..72ed6e27 --- /dev/null +++ b/src/TurboHTTP.Tests/Protocol/Syntax/Http3/Frames/Http3FrameDecoderZeroCopySpec.cs @@ -0,0 +1,85 @@ +using TurboHTTP.Protocol.Syntax.Http3; + +namespace TurboHTTP.Tests.Protocol.Syntax.Http3.Frames; + +/// +/// Contract of the zero-copy DecodeAll(ReadOnlyMemory<byte>) overload: payloads of +/// frames fully contained in the input alias the input buffer (no pooled copy); frames +/// assembled from a buffered remainder own their payload and survive input reuse. +/// +public sealed class Http3FrameDecoderZeroCopySpec +{ + private static byte[] SerializeDataFrame(byte fill, int size) + { + var payload = new byte[size]; + Array.Fill(payload, fill); + var frame = new DataFrame(payload); + var buf = new byte[frame.SerializedSize]; + var span = buf.AsSpan(); + frame.WriteTo(ref span); + return buf; + } + + [Fact(Timeout = 5000)] + [Trait("RFC", "RFC9114-7.2.1")] + public void Memory_overload_should_return_payload_slices_of_the_input() + { + var decoder = new FrameDecoder(); + var wire = SerializeDataFrame(0x11, 256); + + var frames = decoder.DecodeAll(wire.AsMemory(), out var consumed); + + Assert.Equal(wire.Length, consumed); + var data = Assert.IsType(Assert.Single(frames)); + Assert.Equal(256, data.Data.Length); + Assert.True(data.Data.Span.IndexOfAnyExcept((byte)0x11) < 0, "Payload content mismatch."); + + // Zero-copy contract: mutating the input buffer is visible through the frame. + Array.Fill(wire, (byte)0x99); + Assert.True(data.Data.Span.IndexOfAnyExcept((byte)0x99) < 0, + "DATA payload does not alias the input buffer — an unnecessary copy was made."); + } + + [Fact(Timeout = 5000)] + [Trait("RFC", "RFC9114-7.2.1")] + public void Frame_spanning_two_inputs_should_own_its_payload() + { + var decoder = new FrameDecoder(); + var wire = SerializeDataFrame(0x42, 256); + + var firstHalf = wire[..(wire.Length / 2)]; + var secondHalf = wire[(wire.Length / 2)..]; + + var none = decoder.DecodeAll(firstHalf.AsMemory(), out _); + Assert.Empty(none); + + var frames = decoder.DecodeAll(secondHalf.AsMemory(), out _); + var data = Assert.IsType(Assert.Single(frames)); + + // The split frame must be an owned copy: scribbling over both inputs must not + // corrupt the payload. + Array.Fill(firstHalf, (byte)0xFF); + Array.Fill(secondHalf, (byte)0xFF); + + Assert.Equal(256, data.Data.Length); + Assert.True(data.Data.Span.IndexOfAnyExcept((byte)0x42) < 0, + "Split-frame payload aliases a reused input buffer."); + (data as IDisposable).Dispose(); + } + + [Fact(Timeout = 5000)] + [Trait("RFC", "RFC9114-7.2.1")] + public void Span_overload_should_keep_copy_semantics() + { + var decoder = new FrameDecoder(); + var wire = SerializeDataFrame(0x33, 64); + + var frames = decoder.DecodeAll(wire.AsSpan(), out _); + var data = Assert.IsType(Assert.Single(frames)); + + Array.Fill(wire, (byte)0xEE); + Assert.True(data.Data.Span.IndexOfAnyExcept((byte)0x33) < 0, + "Span-based DecodeAll no longer copies — existing callers rely on copy semantics."); + (data as IDisposable).Dispose(); + } +} diff --git a/src/TurboHTTP.Tests/Protocol/Syntax/Http3/Qpack/QpackBlockedStreamBufferOwnershipSpec.cs b/src/TurboHTTP.Tests/Protocol/Syntax/Http3/Qpack/QpackBlockedStreamBufferOwnershipSpec.cs new file mode 100644 index 00000000..51210584 --- /dev/null +++ b/src/TurboHTTP.Tests/Protocol/Syntax/Http3/Qpack/QpackBlockedStreamBufferOwnershipSpec.cs @@ -0,0 +1,44 @@ +using TurboHTTP.Protocol.Syntax.Http3.Qpack; + +namespace TurboHTTP.Tests.Protocol.Syntax.Http3.Qpack; + +/// +/// A blocked header block must not alias the caller's buffer: the HEADERS frame that +/// carried it owns a pooled rental that is disposed (and reused) after frame handling, +/// so has to take an owned copy. +/// +public sealed class QpackBlockedStreamBufferOwnershipSpec +{ + [Fact(Timeout = 5000)] + [Trait("RFC", "RFC9204-2.1.2")] + public void Blocked_header_block_should_survive_caller_buffer_reuse() + { + var sync = new QpackTableSync( + encoderMaxCapacity: 4 * 1024, decoderMaxCapacity: 4 * 1024, + maxBlockedStreams: 10, configuredEncoderLimit: null); + var encoder = sync.Encoder; + + var headers = new List<(string, string)> { ("x-owned", "must-survive") }; + var encoded = encoder.Encode(headers); + + // Simulate the pooled HEADERS frame buffer: the header block lives in a buffer + // the caller reuses after the frame is handled. + var callerBuffer = new byte[encoded.Length]; + encoded.CopyTo(callerBuffer); + + var result = sync.TryDecodeOrBlock(callerBuffer, streamId: 4); + Assert.True(result.IsBlocked); + + // Caller disposes the frame; the pool hands the buffer to someone else who scribbles. + Array.Fill(callerBuffer, (byte)0xFF); + + sync.ProcessEncoderInstructions(encoder.EncoderInstructions.Span); + var resolved = sync.ResolveBlockedStreams(); + + Assert.Single(resolved); + Assert.Equal(4, resolved[0].StreamId); + Assert.Single(resolved[0].Headers); + Assert.Equal("x-owned", resolved[0].Headers[0].Name); + Assert.Equal("must-survive", resolved[0].Headers[0].Value); + } +} diff --git a/src/TurboHTTP.Tests/Protocol/Syntax/Http3/Server/SessionManager/Http3DataFrameBufferReleaseSpec.cs b/src/TurboHTTP.Tests/Protocol/Syntax/Http3/Server/SessionManager/Http3DataFrameBufferReleaseSpec.cs new file mode 100644 index 00000000..d2fddeb7 --- /dev/null +++ b/src/TurboHTTP.Tests/Protocol/Syntax/Http3/Server/SessionManager/Http3DataFrameBufferReleaseSpec.cs @@ -0,0 +1,182 @@ +using Microsoft.AspNetCore.Http.Features; +using Servus.Akka.Transport; +using TurboHTTP.Protocol.Syntax.Http3; +using TurboHTTP.Protocol.Syntax.Http3.Qpack; +using TurboHTTP.Protocol.Syntax.Http3.Server; +using TurboHTTP.Server; +using TurboHTTP.Tests.Shared; + +namespace TurboHTTP.Tests.Protocol.Syntax.Http3.Server.SessionManager; + +/// +/// Regression spec for the HTTP/3 inbound DATA frame buffer leak: decoded frames own a +/// MemoryPool rental that must be disposed after handling, otherwise the pool drains and +/// every upload allocates its full body size in fresh arrays (benchmark showed 1.27 MB +/// allocated per 1 MB upload vs Kestrel's 87 KB). +/// +public sealed class Http3DataFrameBufferReleaseSpec +{ + private const int ChunkSize = 16 * 1024; + private const int ChunksPerRequest = 32; + private const int BodySize = ChunkSize * ChunksPerRequest; + + private static Http3ConnectionOptions DefaultConnectionOptions() => new() + { + Limits = new ResolvedServerLimits( + MaxRequestBodySize: 30 * 1024 * 1024, + KeepAliveTimeout: TimeSpan.FromSeconds(130), + RequestHeadersTimeout: TimeSpan.FromSeconds(30), + MinRequestBodyDataRate: 240, + MinRequestBodyDataRateGracePeriod: TimeSpan.FromSeconds(5), + MinResponseDataRate: 240, + MinResponseDataRateGracePeriod: TimeSpan.FromSeconds(5)), + MaxConcurrentStreams = 100, + MaxHeaderListSize = 32 * 1024, + MaxHeaderCount = 100, + QpackMaxTableCapacity = 0, + QpackBlockedStreams = 0, + MaxResponseBufferSize = 64 * 1024, + ResponseBodyChunkSize = 16 * 1024, + BodyConsumptionTimeout = TimeSpan.FromSeconds(30), + UseHuffman = true, + }; + + private static byte[] BuildRequestHeaders() + { + var tableSync = new QpackTableSync(0, 0, 0, 0); + var headers = new List<(string, string)> + { + (":method", "POST"), + (":path", "/upload"), + (":scheme", "https"), + (":authority", "localhost"), + }; + var headerBlock = tableSync.Encoder.Encode(headers); + var frame = new HeadersFrame(headerBlock); + var buf = new byte[frame.SerializedSize]; + var span = buf.AsSpan(); + frame.WriteTo(ref span); + return buf; + } + + private static byte[] BuildDataFrameBytes(ReadOnlyMemory content) + { + var frame = new DataFrame(content); + var buf = new byte[frame.SerializedSize]; + var span = buf.AsSpan(); + frame.WriteTo(ref span); + return buf; + } + + private static void Feed(Http3ServerSessionManager sm, byte[] wireBytes, long streamId) + { + var buffer = TransportBuffer.Rent(wireBytes.Length); + wireBytes.CopyTo(buffer.FullMemory.Span); + buffer.Length = wireBytes.Length; + sm.DecodeClientData(new MultiplexedData(buffer, streamId)); + } + + private static async Task SendUploadAndDrainBody( + Http3ServerSessionManager sm, FakeServerOps ops, long streamId, + byte[] headerBytes, byte[][] dataFrames, byte[] drainBuffer, byte? expectedPattern) + { + sm.DecodeClientData(new ServerStreamAccepted(StreamTarget.FromId(streamId), + StreamDirection.Bidirectional)); + Feed(sm, headerBytes, streamId); + foreach (var dataFrame in dataFrames) + { + Feed(sm, dataFrame, streamId); + } + sm.DecodeClientData(new StreamReadCompleted(StreamTarget.FromId(streamId))); + + var requestFeature = ops.Requests[^1].Get(); + Assert.NotNull(requestFeature); + var body = requestFeature.Body; + Assert.NotNull(body); + + long total = 0; + int read; + while ((read = await body.ReadAsync(drainBuffer)) > 0) + { + if (expectedPattern is { } pattern) + { + for (var i = 0; i < read; i++) + { + if (drainBuffer[i] != pattern) + { + Assert.Fail($"Body byte at offset {total + i} was 0x{drainBuffer[i]:X2}, expected 0x{pattern:X2}."); + } + } + } + total += read; + } + + return total; + } + + [Fact(Timeout = 15000)] + [Trait("RFC", "RFC9114-7.2.1")] + public async Task Upload_body_should_round_trip_intact() + { + var ops = new FakeServerOps(); + var sm = new Http3ServerSessionManager(DefaultConnectionOptions(), ops); + + var headerBytes = BuildRequestHeaders(); + var chunk = new byte[ChunkSize]; + Array.Fill(chunk, (byte)0xAB); + var dataFrames = new byte[ChunksPerRequest][]; + for (var i = 0; i < ChunksPerRequest; i++) + { + dataFrames[i] = BuildDataFrameBytes(chunk); + } + + var drainBuffer = new byte[64 * 1024]; + var total = await SendUploadAndDrainBody(sm, ops, 0, headerBytes, dataFrames, drainBuffer, 0xAB); + + Assert.Equal(BodySize, total); + } + + [Fact(Timeout = 15000)] + [Trait("RFC", "RFC9114-7.2.1")] + public async Task Repeated_uploads_should_not_allocate_proportional_to_body_size() + { + var ops = new FakeServerOps(); + var sm = new Http3ServerSessionManager(DefaultConnectionOptions(), ops); + + var headerBytes = BuildRequestHeaders(); + var chunk = new byte[ChunkSize]; + Array.Fill(chunk, (byte)0x5C); + var dataFrames = new byte[ChunksPerRequest][]; + for (var i = 0; i < ChunksPerRequest; i++) + { + dataFrames[i] = BuildDataFrameBytes(chunk); + } + + var drainBuffer = new byte[64 * 1024]; + long streamId = 0; + + // Warm up pools so the measured phase reflects steady state. + for (var i = 0; i < 3; i++) + { + await SendUploadAndDrainBody(sm, ops, streamId, headerBytes, dataFrames, drainBuffer, null); + streamId += 4; + } + + const int measuredRequests = 8; + const long totalBodyBytes = (long)measuredRequests * BodySize; + + var before = GC.GetAllocatedBytesForCurrentThread(); + for (var i = 0; i < measuredRequests; i++) + { + await SendUploadAndDrainBody(sm, ops, streamId, headerBytes, dataFrames, drainBuffer, null); + streamId += 4; + } + var allocated = GC.GetAllocatedBytesForCurrentThread() - before; + + // With pooled DATA frame buffers correctly returned, steady-state allocations are a + // small fraction of the body. Pre-fix the leak forced ~1x the body size in fresh arrays. + Assert.True(allocated < totalBodyBytes / 16, + $"Allocated {allocated:N0} bytes for {totalBodyBytes:N0} bytes of upload body — " + + "DATA frame pool rentals are not being returned."); + } +} diff --git a/src/TurboHTTP.Tests/Server/Context/TurboHttpResponseBodyFeatureSpec.cs b/src/TurboHTTP.Tests/Server/Context/TurboHttpResponseBodyFeatureSpec.cs index f9353a33..271d7a54 100644 --- a/src/TurboHTTP.Tests/Server/Context/TurboHttpResponseBodyFeatureSpec.cs +++ b/src/TurboHTTP.Tests/Server/Context/TurboHttpResponseBodyFeatureSpec.cs @@ -176,6 +176,33 @@ public void WhenHeadersReady_should_not_be_completed_initially() Assert.False(feature.HasStarted); } + [Fact(Timeout = 5000)] + public async Task Completed_single_segment_body_should_be_served_without_copy() + { + // The buffered-body fast path hands out the pipe's own segment instead of a + // ToArray copy; the segment stays valid until the feature is reset. + var ct = TestContext.Current.CancellationToken; + var feature = new TurboHttpResponseBodyFeature(); + + await feature.Writer.WriteAsync("Hello, World!"u8.ToArray(), ct); + await feature.CompleteAsync(); + + Assert.True(feature.TryGetBufferedBody(out var body)); + Assert.Equal("Hello, World!", Encoding.UTF8.GetString(body.Span)); + } + + [Fact(Timeout = 5000)] + public async Task TryGetBufferedBody_should_not_expose_incomplete_body() + { + var ct = TestContext.Current.CancellationToken; + var feature = new TurboHttpResponseBodyFeature(); + + await feature.Writer.WriteAsync("partial"u8.ToArray(), ct); + + Assert.False(feature.TryGetBufferedBody(out _), + "An incomplete buffered body must not be emitted as a finished response."); + } + public void Dispose() { _system.Dispose(); diff --git a/src/TurboHTTP/Client/TurboHttpClient.cs b/src/TurboHTTP/Client/TurboHttpClient.cs index bdb52315..dba0687a 100644 --- a/src/TurboHTTP/Client/TurboHttpClient.cs +++ b/src/TurboHTTP/Client/TurboHttpClient.cs @@ -148,24 +148,13 @@ public async Task SendAsync(HttpRequestMessage request, Can ? perRequestTimeout : Timeout; - try + // Everything that mutates request.Options must happen BEFORE the channel write: + // once enqueued, the pipeline's RequestEnricher reads and mutates the options + // dictionary on a stream thread, and HttpRequestOptions is not thread-safe. + var linkedCt = cancellationToken.CanBeCanceled; + CancellationTokenSource? cts = null; + if (effectiveTimeout != System.Threading.Timeout.InfiniteTimeSpan || linkedCt) { - try - { - await Requests.WriteAsync(request, cancellationToken); - } - catch (ChannelClosedException) - { - throw CreateClientDisposedException(); - } - - if (effectiveTimeout == System.Threading.Timeout.InfiniteTimeSpan && !cancellationToken.CanBeCanceled) - { - return await pending.GetValueTask(); - } - - var linkedCt = cancellationToken.CanBeCanceled; - CancellationTokenSource cts; if (linkedCt) { cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); @@ -181,18 +170,35 @@ public async Task SendAsync(HttpRequestMessage request, Can } request.SetCancellationToken(cts.Token); + } + try + { try { - cts.CancelAfter(effectiveTimeout); - await using (cts.Token.UnsafeRegister( - static (state, ct) => ((PendingRequest)state!).TrySetCanceled(ct), - pending)) - { - return await pending.GetValueTask(); - } + await Requests.WriteAsync(request, cancellationToken); } - finally + catch (ChannelClosedException) + { + throw CreateClientDisposedException(); + } + + if (cts is null) + { + return await pending.GetValueTask(); + } + + cts.CancelAfter(effectiveTimeout); + await using (cts.Token.UnsafeRegister( + static (state, ct) => ((PendingRequest)state!).TrySetCanceled(ct), + pending)) + { + return await pending.GetValueTask(); + } + } + finally + { + if (cts is not null) { if (linkedCt || !cts.TryReset()) { @@ -208,9 +214,7 @@ public async Task SendAsync(HttpRequestMessage request, Can cts.Dispose(); } } - } - finally - { + _pendingTcs.TryRemove(pending, out _); PendingRequest.Return(pending); } diff --git a/src/TurboHTTP/Protocol/Syntax/Http10/Server/Http10ServerStateMachine.cs b/src/TurboHTTP/Protocol/Syntax/Http10/Server/Http10ServerStateMachine.cs index 9d5b5bad..918844c2 100644 --- a/src/TurboHTTP/Protocol/Syntax/Http10/Server/Http10ServerStateMachine.cs +++ b/src/TurboHTTP/Protocol/Syntax/Http10/Server/Http10ServerStateMachine.cs @@ -26,6 +26,8 @@ internal sealed class Http10ServerStateMachine : IServerStateMachine private readonly long _maxRequestBodySize; private readonly DataRateMonitor _requestRate; private readonly DataRateMonitor _responseRate; + private readonly List _rateViolations = []; + private bool _rateTimerActive; private readonly TimeProvider _clock; private long Now() => _clock.GetUtcNow().ToUnixTimeMilliseconds(); @@ -229,6 +231,9 @@ private void HandleResponseBodyRead(int bytesRead) else { _activeBodyWriter?.CompleteAsync(); + // Response fully handed to the transport: drop the rate entry so a keep-alive + // connection is not flagged as a violation once the grace period elapses. + _responseRate.Remove(0); } } @@ -240,11 +245,12 @@ public void OnTimerFired(string name) { if (name == DataRateCheck) { - var violations = new List(); - _requestRate.Check(Now(), violations); - _responseRate.Check(Now(), violations); + _rateTimerActive = false; + _rateViolations.Clear(); + _requestRate.Check(Now(), _rateViolations); + _responseRate.Check(Now(), _rateViolations); - if (violations.Count > 0) + if (_rateViolations.Count > 0) { Tracing.For("Protocol").Warning(this, "data rate violation (reqRate={0}, respRate={1})", @@ -255,7 +261,7 @@ public void OnTimerFired(string name) if (_requestRate.Count > 0 || _responseRate.Count > 0) { - _ops.OnScheduleTimer(DataRateCheck, TimeSpan.FromSeconds(1)); + EnsureRateTimer(); } } } @@ -329,7 +335,17 @@ public void Cleanup() _activeStreamingReader = null; _deferredFeatures = null; _ops.OnCancelTimer(DataRateCheck); + _rateTimerActive = false; } - private void EnsureRateTimer() => _ops.OnScheduleTimer(DataRateCheck, TimeSpan.FromSeconds(1)); + private void EnsureRateTimer() + { + if (_rateTimerActive) + { + return; + } + + _rateTimerActive = true; + _ops.OnScheduleTimer(DataRateCheck, TimeSpan.FromSeconds(1)); + } } \ No newline at end of file diff --git a/src/TurboHTTP/Protocol/Syntax/Http11/Client/Http11ClientStateMachine.cs b/src/TurboHTTP/Protocol/Syntax/Http11/Client/Http11ClientStateMachine.cs index 9c92e033..9fe94ad1 100644 --- a/src/TurboHTTP/Protocol/Syntax/Http11/Client/Http11ClientStateMachine.cs +++ b/src/TurboHTTP/Protocol/Syntax/Http11/Client/Http11ClientStateMachine.cs @@ -32,6 +32,14 @@ internal sealed class Http11ClientStateMachine : IClientStateMachine private TransportBuffer? _heldBuffer; private int _heldBufferOffset; private bool _draining; + private int _unflushedBodyChunks; + private bool _bodyPumpPaused; + + // High-water mark for body chunks emitted but not yet flushed to the network. Without + // this bound the pump copies the entire request body into pooled chunks ahead of the + // socket; under concurrent uploads the aggregate working set exceeds the array pool + // and every body byte becomes a fresh allocation. + private const int MaxUnflushedBodyChunks = 2; internal sealed record BodyReadComplete(int BytesRead); internal sealed record BodyReadFailed(Exception Reason); @@ -257,6 +265,8 @@ public void OnBodyMessage(object msg) _currentWriter?.Dispose(); _currentWriter = null; _currentBodyStream = null; + _unflushedBodyChunks = 0; + _bodyPumpPaused = false; if (_inFlightQueue.Count > 0) { var req = _inFlightQueue.Dequeue(); @@ -269,6 +279,19 @@ public void OnBodyMessage(object msg) public void OnOutboundFlushed() { + if (_unflushedBodyChunks > 0) + { + _unflushedBodyChunks--; + } + + if (_bodyPumpPaused && _currentWriter is not null && _currentBodyStream is not null + && _unflushedBodyChunks < MaxUnflushedBodyChunks) + { + _bodyPumpPaused = false; + Tracing.For("Protocol").Debug(this, "request body pump resumed ({0} unflushed chunks)", + _unflushedBodyChunks); + ReadNextChunk(); + } } public void Cleanup() @@ -286,6 +309,8 @@ public void Cleanup() _currentWriter?.Dispose(); _currentWriter = null; _currentBodyStream = null; + _unflushedBodyChunks = 0; + _bodyPumpPaused = false; _pool.Dispose(); _decoder.Reset(); } @@ -390,6 +415,8 @@ private void DecodeResponse(TransportBuffer buffer, int startOffset = 0) private void StartBodyDrain(Stream bodyStream, long? contentLength, Version httpVersion) { + _unflushedBodyChunks = 0; + _bodyPumpPaused = false; var (writer, _) = _pool.RentWriter( hasBody: true, contentLength, httpVersion, new BodyEncoderOptions { ChunkSize = _options.RequestBodyChunkSize }, @@ -424,10 +451,22 @@ private void HandleBodyRead(int bytesRead) { if (bytesRead > 0) { + // Count before flushing: a free network port flushes synchronously, re-entering + // OnOutboundFlushed and decrementing back to zero before this method continues. + _unflushedBodyChunks++; _currentWriter!.Advance(bytesRead); _currentWriter.FlushAsync(); Tracing.For("Protocol").Trace(this, "request body chunk flushed (bytes={0})", bytesRead); - ReadNextChunk(); + if (_unflushedBodyChunks >= MaxUnflushedBodyChunks) + { + _bodyPumpPaused = true; + Tracing.For("Protocol").Debug(this, "request body pump paused ({0} unflushed chunks)", + _unflushedBodyChunks); + } + else + { + ReadNextChunk(); + } } else { diff --git a/src/TurboHTTP/Protocol/Syntax/Http11/Server/Http11ServerStateMachine.cs b/src/TurboHTTP/Protocol/Syntax/Http11/Server/Http11ServerStateMachine.cs index 3b240771..252c3d17 100644 --- a/src/TurboHTTP/Protocol/Syntax/Http11/Server/Http11ServerStateMachine.cs +++ b/src/TurboHTTP/Protocol/Syntax/Http11/Server/Http11ServerStateMachine.cs @@ -36,6 +36,8 @@ internal sealed class Http11ServerStateMachine : IServerStateMachine private readonly DataRateMonitor _requestRate; private readonly DataRateMonitor _responseRate; + private readonly List _rateViolations = []; + private bool _rateTimerActive; private readonly TimeProvider _clock; private long Now() => _clock.GetUtcNow().ToUnixTimeMilliseconds(); @@ -404,6 +406,9 @@ private void EmitBufferedBody(IFeatureCollection features, ReadOnlyMemory } writer.CompleteAsync(); + // The response is fully handed to the transport: drop the rate entry, or the idle + // keep-alive connection is flagged as a violation once the grace period elapses. + _responseRate.Remove(0); _ops.OnResponseBodyComplete(features); Tracing.For("Protocol").Debug(this, "response body complete (buffered, bytes={0})", body.Length); @@ -492,11 +497,12 @@ public void OnTimerFired(string name) } else if (name == DataRateCheck) { - var violations = new List(); - _requestRate.Check(Now(), violations); - _responseRate.Check(Now(), violations); + _rateTimerActive = false; + _rateViolations.Clear(); + _requestRate.Check(Now(), _rateViolations); + _responseRate.Check(Now(), _rateViolations); - if (violations.Count > 0) + if (_rateViolations.Count > 0) { Tracing.For("Protocol").Warning(this, "data rate violation (reqRate={0}, respRate={1}, paused={2})", @@ -507,7 +513,7 @@ public void OnTimerFired(string name) if (_requestRate.Count > 0 || _responseRate.Count > 0) { - _ops.OnScheduleTimer(DataRateCheck, TimeSpan.FromSeconds(1)); + EnsureRateTimer(); } } } @@ -648,7 +654,17 @@ public void Cleanup() _ops.OnCancelTimer(KeepAliveTimer); _ops.OnCancelTimer(BodyConsumptionTimer); _ops.OnCancelTimer(DataRateCheck); + _rateTimerActive = false; } - private void EnsureRateTimer() => _ops.OnScheduleTimer(DataRateCheck, TimeSpan.FromSeconds(1)); + private void EnsureRateTimer() + { + if (_rateTimerActive) + { + return; + } + + _rateTimerActive = true; + _ops.OnScheduleTimer(DataRateCheck, TimeSpan.FromSeconds(1)); + } } \ No newline at end of file diff --git a/src/TurboHTTP/Protocol/Syntax/Http2/Server/Http2ServerSessionManager.cs b/src/TurboHTTP/Protocol/Syntax/Http2/Server/Http2ServerSessionManager.cs index 7976fd6c..7c352f5c 100644 --- a/src/TurboHTTP/Protocol/Syntax/Http2/Server/Http2ServerSessionManager.cs +++ b/src/TurboHTTP/Protocol/Syntax/Http2/Server/Http2ServerSessionManager.cs @@ -53,6 +53,9 @@ internal sealed class Http2ServerSessionManager private bool _continuationEndStream; private readonly DataRateMonitor _requestRate; private readonly DataRateMonitor _responseRate; + private readonly List _rateViolations = []; + private readonly HashSet _rateViolationSet = []; + private bool _rateTimerActive; private readonly TimeProvider _clock; private bool _prefaceConsumed; @@ -1048,14 +1051,20 @@ public void EmitGoAway(int lastStreamId, Http2ErrorCode errorCode, string? reaso public void CheckDataRates() { + _rateTimerActive = false; var now = Now(); - var violations = new List(); + _rateViolations.Clear(); - _requestRate.Check(now, violations); - _responseRate.Check(now, violations); + _requestRate.Check(now, _rateViolations); + _responseRate.Check(now, _rateViolations); - var violationSet = new HashSet(violations); - foreach (var streamId in violationSet) + _rateViolationSet.Clear(); + foreach (var violation in _rateViolations) + { + _rateViolationSet.Add(violation); + } + + foreach (var streamId in _rateViolationSet) { Tracing.For("Protocol").Warning(this, "HTTP/2: data rate violation (stream={0})", streamId); EmitRstStream((int)streamId, Http2ErrorCode.EnhanceYourCalm); @@ -1063,9 +1072,18 @@ public void CheckDataRates() if (_requestRate.Count > 0 || _responseRate.Count > 0) { - _ops.OnScheduleTimer(DataRateCheck, TimeSpan.FromSeconds(1)); + EnsureRateTimer(); } } - private void EnsureRateTimer() => _ops.OnScheduleTimer(DataRateCheck, TimeSpan.FromSeconds(1)); + private void EnsureRateTimer() + { + if (_rateTimerActive) + { + return; + } + + _rateTimerActive = true; + _ops.OnScheduleTimer(DataRateCheck, TimeSpan.FromSeconds(1)); + } } \ No newline at end of file diff --git a/src/TurboHTTP/Protocol/Syntax/Http3/Client/Http3ClientStateMachine.cs b/src/TurboHTTP/Protocol/Syntax/Http3/Client/Http3ClientStateMachine.cs index 53dc185f..c7b3e847 100644 --- a/src/TurboHTTP/Protocol/Syntax/Http3/Client/Http3ClientStateMachine.cs +++ b/src/TurboHTTP/Protocol/Syntax/Http3/Client/Http3ClientStateMachine.cs @@ -473,17 +473,30 @@ private void HandleTaggedStreamData(MultiplexedData multiplexed) private void ProcessFrameData(TransportBuffer buffer, long streamId) { + // Decoded frames may slice the input buffer (zero-copy), so it must stay alive + // until the frame loop below has handled (and copied) everything. + using var inputBuffer = buffer; try { - var frames = _clientSession.DecodeServerData(buffer, streamId); + var frames = _clientSession.DecodeServerData(inputBuffer, streamId); for (var i = 0; i < frames.Count; i++) { var frame = frames[i]; - var forwarded = ProcessFrame(frame); - if (forwarded is not null) + try { - _clientSession.AssembleResponse(forwarded, streamId); + var forwarded = ProcessFrame(frame); + if (forwarded is not null) + { + _clientSession.AssembleResponse(forwarded, streamId); + } + } + finally + { + // DATA/HEADERS frames own a pooled rental; response assembly copies what + // it keeps (body bytes into the body reader, header strings via QPACK), + // so the rental must go back to the pool here. + (frame as IDisposable)?.Dispose(); } } } diff --git a/src/TurboHTTP/Protocol/Syntax/Http3/Client/StreamManager.cs b/src/TurboHTTP/Protocol/Syntax/Http3/Client/StreamManager.cs index 2ab4a7e1..d2d31e09 100644 --- a/src/TurboHTTP/Protocol/Syntax/Http3/Client/StreamManager.cs +++ b/src/TurboHTTP/Protocol/Syntax/Http3/Client/StreamManager.cs @@ -37,6 +37,8 @@ internal sealed class StreamManager( /// Decodes a TransportBuffer into HTTP/3 frames using a per-stream decoder. /// Each QUIC stream has independent framing, so decoders must not share /// partial-frame remainder state across streams. + /// Decoded frames may slice (zero-copy) — the caller owns + /// the buffer and must dispose it only after all returned frames have been handled. /// public IReadOnlyList DecodeServerData(TransportBuffer buffer, long streamId) { @@ -46,9 +48,7 @@ public IReadOnlyList DecodeServerData(TransportBuffer buffer, long s _streamDecoders[streamId] = decoder; } - var frames = decoder.DecodeAll(buffer.Memory.Span, out _); - buffer.Dispose(); - return frames; + return decoder.DecodeAll(buffer.Memory, out _); } /// diff --git a/src/TurboHTTP/Protocol/Syntax/Http3/FrameDecoder.cs b/src/TurboHTTP/Protocol/Syntax/Http3/FrameDecoder.cs index ddc4e2d2..06b4666c 100644 --- a/src/TurboHTTP/Protocol/Syntax/Http3/FrameDecoder.cs +++ b/src/TurboHTTP/Protocol/Syntax/Http3/FrameDecoder.cs @@ -34,12 +34,17 @@ internal sealed class FrameDecoder : IDisposable /// internally for the next call. /// public DecodeStatus TryDecode(ReadOnlySpan input, out Http3Frame? frame, out int bytesConsumed) + => TryDecodeCore(input, default, sliceInput: false, out frame, out bytesConsumed); + + private DecodeStatus TryDecodeCore(ReadOnlySpan input, ReadOnlyMemory inputMemory, bool sliceInput, + out Http3Frame? frame, out int bytesConsumed) { frame = null; bytesConsumed = 0; // Combine remainder with new input into a pooled working buffer ReadOnlySpan data; + ReadOnlyMemory dataMemory = default; IMemoryOwner? rentedCombined = null; var combinedLength = 0; @@ -50,6 +55,7 @@ public DecodeStatus TryDecode(ReadOnlySpan input, out Http3Frame? frame, o _remainderOwner!.Memory.Span[.._remainderLength].CopyTo(rentedCombined.Memory.Span); input.CopyTo(rentedCombined.Memory.Span[_remainderLength..]); data = rentedCombined.Memory.Span[..combinedLength]; + sliceInput = false; // Dispose old remainder buffer now that its content has been copied out _remainderOwner?.Dispose(); @@ -59,11 +65,12 @@ public DecodeStatus TryDecode(ReadOnlySpan input, out Http3Frame? frame, o else { data = input; + dataMemory = inputMemory; } try { - var result = TryDecodeFrame(data, out frame, out var totalConsumed); + var result = TryDecodeFrame(data, dataMemory, sliceInput, out frame, out var totalConsumed); if (result == DecodeStatus.NeedMoreData) { @@ -116,13 +123,28 @@ public DecodeStatus TryDecode(ReadOnlySpan input, out Http3Frame? frame, o /// Any trailing partial frame is buffered for the next call. /// public IReadOnlyList DecodeAll(ReadOnlySpan input, out int bytesConsumed) + => DecodeAllCore(input, default, sliceInput: false, out bytesConsumed); + + /// + /// Zero-copy variant: DATA/HEADERS/PUSH_PROMISE payloads of frames fully contained in + /// are returned as slices of it — the caller must keep the + /// backing buffer alive until all returned frames have been handled. Frames assembled + /// from a buffered remainder still own a pooled copy. + /// + public IReadOnlyList DecodeAll(ReadOnlyMemory input, out int bytesConsumed) + => DecodeAllCore(input.Span, input, sliceInput: true, out bytesConsumed); + + private IReadOnlyList DecodeAllCore(ReadOnlySpan input, ReadOnlyMemory inputMemory, + bool sliceInput, out int bytesConsumed) { _frames.Clear(); bytesConsumed = 0; while (true) { - var status = TryDecode(input[bytesConsumed..], out var frame, out var consumed); + var remainingMemory = sliceInput ? inputMemory[bytesConsumed..] : default; + var status = TryDecodeCore(input[bytesConsumed..], remainingMemory, sliceInput, + out var frame, out var consumed); if (status == DecodeStatus.NeedMoreData) { @@ -163,6 +185,8 @@ public void Reset() private static DecodeStatus TryDecodeFrame( ReadOnlySpan data, + ReadOnlyMemory dataMemory, + bool sliceInput, out Http3Frame? frame, out int totalConsumed) { @@ -198,6 +222,7 @@ private static DecodeStatus TryDecodeFrame( } var payload = data.Slice(headerSize, (int)payloadLength); + var payloadMemory = sliceInput ? dataMemory.Slice(headerSize, (int)payloadLength) : default; totalConsumed = frameSize; // Parse frame by type @@ -217,11 +242,11 @@ private static DecodeStatus TryDecodeFrame( frame = frameType switch { - FrameType.Data => DecodeDataFrame(payload), - FrameType.Headers => DecodeHeadersFrame(payload), + FrameType.Data => DecodeDataFrame(payload, payloadMemory, sliceInput), + FrameType.Headers => DecodeHeadersFrame(payload, payloadMemory, sliceInput), FrameType.CancelPush => DecodeCancelPushFrame(payload), FrameType.Settings => DecodeSettingsFrame(payload), - FrameType.PushPromise => DecodePushPromiseFrame(payload), + FrameType.PushPromise => DecodePushPromiseFrame(payload, payloadMemory, sliceInput), FrameType.GoAway => DecodeGoAwayFrame(payload), FrameType.MaxPushId => DecodeMaxPushIdFrame(payload), _ => null // Should not happen given IsDefined check above @@ -230,25 +255,37 @@ private static DecodeStatus TryDecodeFrame( return DecodeStatus.Success; } - private static DataFrame DecodeDataFrame(ReadOnlySpan payload) + private static DataFrame DecodeDataFrame(ReadOnlySpan payload, ReadOnlyMemory payloadMemory, + bool sliceInput) { if (payload.Length == 0) { return new DataFrame(ReadOnlyMemory.Empty); } + if (sliceInput) + { + return new DataFrame(payloadMemory); + } + var owner = MemoryPool.Shared.Rent(payload.Length); payload.CopyTo(owner.Memory.Span); return new DataFrame(owner, payload.Length); } - private static HeadersFrame DecodeHeadersFrame(ReadOnlySpan payload) + private static HeadersFrame DecodeHeadersFrame(ReadOnlySpan payload, ReadOnlyMemory payloadMemory, + bool sliceInput) { if (payload.Length == 0) { return new HeadersFrame(ReadOnlyMemory.Empty); } + if (sliceInput) + { + return new HeadersFrame(payloadMemory); + } + var owner = MemoryPool.Shared.Rent(payload.Length); payload.CopyTo(owner.Memory.Span); return new HeadersFrame(owner, payload.Length); @@ -279,7 +316,8 @@ private static SettingsFrame DecodeSettingsFrame(ReadOnlySpan payload) return new SettingsFrame(parameters); } - private static PushPromiseFrame DecodePushPromiseFrame(ReadOnlySpan payload) + private static PushPromiseFrame DecodePushPromiseFrame(ReadOnlySpan payload, + ReadOnlyMemory payloadMemory, bool sliceInput) { var pushId = QuicVarInt.Decode(payload, out var pushIdBytes); var headerBlockSpan = payload[pushIdBytes..]; @@ -289,6 +327,11 @@ private static PushPromiseFrame DecodePushPromiseFrame(ReadOnlySpan payloa return new PushPromiseFrame(pushId, ReadOnlyMemory.Empty); } + if (sliceInput) + { + return new PushPromiseFrame(pushId, payloadMemory[pushIdBytes..]); + } + var owner = MemoryPool.Shared.Rent(headerBlockSpan.Length); headerBlockSpan.CopyTo(owner.Memory.Span); return new PushPromiseFrame(pushId, owner, headerBlockSpan.Length); diff --git a/src/TurboHTTP/Protocol/Syntax/Http3/Qpack/QpackTableSync.cs b/src/TurboHTTP/Protocol/Syntax/Http3/Qpack/QpackTableSync.cs index bc56a44b..8b5c1e3a 100644 --- a/src/TurboHTTP/Protocol/Syntax/Http3/Qpack/QpackTableSync.cs +++ b/src/TurboHTTP/Protocol/Syntax/Http3/Qpack/QpackTableSync.cs @@ -168,7 +168,9 @@ public QpackDecodeResult TryDecodeOrBlock(ReadOnlyMemory data, int streamI $"RFC 9204 §2.1.2 violation: Maximum blocked streams ({_maxBlockedStreams}) exceeded."); } - _blockedStreams.Add(new BlockedStream(streamId, result.RequiredInsertCount, data)); + // Copy the header block: the caller's buffer is a pooled frame rental that is + // disposed (and reused) right after frame handling, long before resolution. + _blockedStreams.Add(new BlockedStream(streamId, result.RequiredInsertCount, data.ToArray())); } return result; diff --git a/src/TurboHTTP/Protocol/Syntax/Http3/Server/Http3ServerSessionManager.cs b/src/TurboHTTP/Protocol/Syntax/Http3/Server/Http3ServerSessionManager.cs index d1b942e2..52d6afe8 100644 --- a/src/TurboHTTP/Protocol/Syntax/Http3/Server/Http3ServerSessionManager.cs +++ b/src/TurboHTTP/Protocol/Syntax/Http3/Server/Http3ServerSessionManager.cs @@ -46,6 +46,9 @@ internal sealed class Http3ServerSessionManager private const int MaxDecoderPoolSize = 256; private readonly DataRateMonitor _requestRate; private readonly DataRateMonitor _responseRate; + private readonly List _rateViolations = []; + private readonly HashSet _rateViolationSet = []; + private bool _rateTimerActive; private readonly TimeProvider _clock; private bool _controlPrefaceSent; @@ -362,14 +365,20 @@ public void Cleanup() public void CheckDataRates() { + _rateTimerActive = false; var now = Now(); - var violations = new List(); + _rateViolations.Clear(); - _requestRate.Check(now, violations); - _responseRate.Check(now, violations); + _requestRate.Check(now, _rateViolations); + _responseRate.Check(now, _rateViolations); - var violationSet = new HashSet(violations); - foreach (var streamId in violationSet) + _rateViolationSet.Clear(); + foreach (var violation in _rateViolations) + { + _rateViolationSet.Add(violation); + } + + foreach (var streamId in _rateViolationSet) { Tracing.For("Protocol").Warning(this, "HTTP/3: data rate violation (stream={0})", streamId); EmitRstStream(streamId, ErrorCode.GeneralProtocolError); @@ -377,7 +386,7 @@ public void CheckDataRates() if (_requestRate.Count > 0 || _responseRate.Count > 0) { - _ops.OnScheduleTimer(DataRateCheck, TimeSpan.FromSeconds(1)); + EnsureRateTimer(); } } @@ -426,22 +435,23 @@ private void ProcessFrameData(TransportBuffer buffer, long streamId) var (decoder, state) = streamData; + // Decoded DATA/HEADERS frames slice the input buffer (zero-copy), so the buffer + // must stay alive until the frame loop below has handled (and copied) everything. + using var inputBuffer = buffer; + IReadOnlyList frames; try { - frames = decoder.DecodeAll(buffer.Span, out _); + frames = decoder.DecodeAll(inputBuffer.Memory, out _); } catch (Exception ex) when (ex is HttpProtocolException or QpackException or HuffmanException) { - buffer.Dispose(); Tracing.For("Protocol").Warning(this, "HTTP/3 connection framing error on stream {0} - closing connection: {1}", streamId, ex.Message); ShouldComplete = true; return; } - buffer.Dispose(); - foreach (var frame in frames) { try @@ -510,6 +520,13 @@ private void ProcessFrameData(TransportBuffer buffer, long streamId) "HTTP/3 message error on stream {0} - resetting stream: {1}", streamId, ex.Message); EmitRstStream(streamId, ErrorCode.MessageError); } + finally + { + // DATA/HEADERS frames own a pooled rental; handling copies what it keeps + // (body bytes into the body reader, header strings via QPACK decode), so the + // rental must go back to the pool here or every upload allocates fresh arrays. + (frame as IDisposable)?.Dispose(); + } } } @@ -775,5 +792,14 @@ private MultiplexedData BuildControlPreface() return new MultiplexedData(buf, CriticalStreamId.Control); } - private void EnsureRateTimer() => _ops.OnScheduleTimer(DataRateCheck, TimeSpan.FromSeconds(1)); + private void EnsureRateTimer() + { + if (_rateTimerActive) + { + return; + } + + _rateTimerActive = true; + _ops.OnScheduleTimer(DataRateCheck, TimeSpan.FromSeconds(1)); + } } diff --git a/src/TurboHTTP/Server/Context/Features/TurboHttpResponseBodyFeature.cs b/src/TurboHTTP/Server/Context/Features/TurboHttpResponseBodyFeature.cs index bd09f737..44b98a94 100644 --- a/src/TurboHTTP/Server/Context/Features/TurboHttpResponseBodyFeature.cs +++ b/src/TurboHTTP/Server/Context/Features/TurboHttpResponseBodyFeature.cs @@ -4,6 +4,7 @@ using Akka.Streams.Dsl; using Microsoft.AspNetCore.Http.Features; using Servus.Akka.Streams.IO; +using static Servus.Senf; namespace TurboHTTP.Server.Context.Features; @@ -43,12 +44,14 @@ internal void Reset() } _bufferWriter.ResetWrittenCount(); - _writer = new ResponsePipeWriter(this); + _writer.Reset(); } + internal bool HasPipe => _pipe is not null; + internal bool TryGetBufferedBody(out ReadOnlyMemory body) { - if (_pipe is null && _bufferWriter.WrittenCount > 0) + if (_pipe is null && _writer.IsCompleted && _bufferWriter.WrittenCount > 0) { body = _bufferWriter.WrittenMemory; return true; @@ -58,6 +61,16 @@ internal bool TryGetBufferedBody(out ReadOnlyMemory body) { if (result.IsCompleted && !result.Buffer.IsEmpty) { + if (result.Buffer.IsSingleSegment) + { + // Hand out the pipe's own segment: the caller copies it into the wire + // buffer synchronously, and the segment stays valid until the reader + // completes on feature reset. Examined-to-end keeps the data readable. + body = result.Buffer.First; + _pipe.Reader.AdvanceTo(result.Buffer.Start, result.Buffer.End); + return true; + } + body = result.Buffer.ToArray(); _pipe.Reader.AdvanceTo(result.Buffer.End); return true; @@ -77,9 +90,19 @@ internal void UpgradeToPipe() return; } - _pipe = new Pipe(); + Tracing.For("Stage").Debug(this, "response upgraded to pipe (buffered={0}, completed={1})", + _bufferWriter.WrittenCount, _writer.IsCompleted); + + // The initial flush below is not awaited, so the pause threshold must exceed the + // already-buffered content or the pending FlushAsync would be silently discarded. + var buffered = _bufferWriter.WrittenCount; + _pipe = buffered < 64 * 1024 + ? new Pipe() + : new Pipe(new PipeOptions( + pauseWriterThreshold: buffered + 64 * 1024, + resumeWriterThreshold: buffered / 2)); - if (_bufferWriter.WrittenCount > 0) + if (buffered > 0) { var src = _bufferWriter.WrittenSpan; var dest = _pipe.Writer.GetMemory(src.Length); @@ -195,7 +218,8 @@ internal Stream GetResponseStream() private sealed class ResponsePipeWriter : PipeWriter { private readonly TurboHttpResponseBodyFeature _owner; - private readonly TaskCompletionSource _headerCommit = new(TaskCreationOptions.RunContinuationsAsynchronously); + private TaskCompletionSource? _headerCommit; + private bool _headersCommitted; private TurboHttpResponseFeature? _responseFeature; public ResponsePipeWriter(TurboHttpResponseBodyFeature owner) @@ -203,7 +227,34 @@ public ResponsePipeWriter(TurboHttpResponseBodyFeature owner) _owner = owner; } - public Task WhenHeadersReady => _headerCommit.Task; + // Awaited from the stage-actor thread while the app thread commits — a true + // cross-thread boundary, hence the explicit barriers. The TCS is lazy: handlers + // that complete synchronously never touch it. + public Task WhenHeadersReady + { + get + { + if (Volatile.Read(ref _headersCommitted)) + { + return Task.CompletedTask; + } + + var tcs = _headerCommit; + if (tcs is null) + { + var fresh = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + tcs = Interlocked.CompareExchange(ref _headerCommit, fresh, null) ?? fresh; + } + + if (Volatile.Read(ref _headersCommitted)) + { + tcs.TrySetResult(); + } + + return tcs.Task; + } + } + public bool HasStarted { get; private set; } public bool IsCompleted { get; private set; } public long BytesWritten { get; private set; } @@ -216,6 +267,14 @@ internal void Reset() HasStarted = false; IsCompleted = false; BytesWritten = 0; + _headerCommit = null; + _headersCommitted = false; + } + + private void SignalHeadersReady() + { + Volatile.Write(ref _headersCommitted, true); + _headerCommit?.TrySetResult(); } public void CommitHeaders() @@ -224,7 +283,7 @@ public void CommitHeaders() { HasStarted = true; _owner.UpgradeToPipe(); - _headerCommit.TrySetResult(); + SignalHeadersReady(); } } @@ -243,7 +302,7 @@ public async Task CommitHeadersAsync() finally { _owner.UpgradeToPipe(); - _headerCommit.TrySetResult(); + SignalHeadersReady(); } } } @@ -339,7 +398,7 @@ private async ValueTask CommitAndFlushAsync(CancellationToken cance finally { _owner.UpgradeToPipe(); - _headerCommit.TrySetResult(); + SignalHeadersReady(); } return await _owner._pipe!.Writer.FlushAsync(cancellationToken); @@ -359,7 +418,7 @@ private async ValueTask CommitAndWriteAsync(ReadOnlyMemory so finally { _owner.UpgradeToPipe(); - _headerCommit.TrySetResult(); + SignalHeadersReady(); } BytesWritten += source.Length;