Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e483e6a
fix(server): propagate actual bound port for dynamic port allocation
st0o0 May 29, 2026
d4f3caa
feat(server): add ConnectionRoutingFeature for shared pipeline routing
st0o0 May 29, 2026
7c2dc60
feat(server): add SharedBridgeStage with unordered handler dispatch
st0o0 May 29, 2026
d7eaf95
feat(server): add ServerPipelineOwner with MergeHub/PartitionHub shar…
st0o0 May 29, 2026
fd9fce5
feat(server): add ConnectionBridgeStage for per-connection hub bridging
st0o0 May 29, 2026
8c3f243
refactor(server): rewire ConnectionActor, ListenerActor, and TurboSer…
st0o0 May 29, 2026
5c11121
refactor(server): simplify ConnectionBridge to GraphDsl factory
st0o0 May 29, 2026
2c7cf12
feat(server): shared pipeline with GraphDsl bridge + Recover
st0o0 May 29, 2026
25a18a3
feat(server): shared pipeline with fire-and-forget registration
st0o0 May 29, 2026
1c88fc4
feat(server): partition index recycling + connection stage error isol…
st0o0 May 29, 2026
acdc86d
fix(server): absorb network failures on all ConnectionStage ports
st0o0 May 29, 2026
612621b
feat(server): add ResponseDispatcherHub with O(1) key-based routing
st0o0 May 29, 2026
33b0fe5
feat(server): integrate ResponseDispatcherHub into server pipeline
st0o0 May 29, 2026
2690e8e
test(server): add ResponseDispatcherHub unit tests
st0o0 May 29, 2026
24efdb2
chore(server): remove orphaned ServerConnectionConsumer
st0o0 May 29, 2026
fd88a8d
fix(server): buffer pending responses before source registration
st0o0 May 29, 2026
737b43f
fix(server): handle shared ActorSystem lifecycle in StopAsync
st0o0 May 29, 2026
8abeb4b
feat(tests): wire AssemblyFixture and shared ActorSystem into ServerS…
st0o0 May 29, 2026
a026b55
feat(tests): add ActorSystemFixture constructors to all server specs
st0o0 May 29, 2026
0a8a0ab
Revert "feat(tests): add ActorSystemFixture constructors to all serve…
st0o0 May 29, 2026
6914ef0
Revert "feat(tests): wire AssemblyFixture and shared ActorSystem into…
st0o0 May 29, 2026
1289fab
feat(tests): shared TurboServerFixture for integration test pilot
st0o0 May 29, 2026
9691c28
feat(tests): convert 10 more specs to shared TurboServerFixture
st0o0 May 29, 2026
d40c56b
feat(tests): add HTTPS port to shared fixture, convert 2 TLS specs
st0o0 May 29, 2026
7abd809
Revert "feat(tests): add HTTPS port to shared fixture, convert 2 TLS …
st0o0 May 29, 2026
f8fd1cd
perf(tests): enable parallel test collections and increase thread count
st0o0 May 29, 2026
c76c951
fix(tests): move ConnectionCloseReproSpec back to ServerSpecBase
st0o0 May 29, 2026
e77da95
fix(tests): add explicit read timeout to ConnectionCloseReproSpec
st0o0 May 29, 2026
dd1710f
fix(tests): increase timeout for 3 flaky acceptance tests on CI
st0o0 May 29, 2026
dd69672
fix(tests): increase read timeout to 10s for ConnectionCloseReproSpec
st0o0 May 29, 2026
7755c2a
chore: code cleanup
st0o0 May 29, 2026
0339186
chore: update API approval baseline and fix test warnings
st0o0 May 29, 2026
044134a
feat(server): wire up ResponseBodyChunkSize and BodyConsumptionTimeou…
st0o0 May 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Servus.Akka/Transport/IListenerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ namespace Servus.Akka.Transport;

public interface IListenerFactory
{
Source<Flow<ITransportOutbound, ITransportInbound, NotUsed>, Task> Bind(ListenerOptions options);
Source<Flow<ITransportOutbound, ITransportInbound, NotUsed>, Task<int>> Bind(ListenerOptions options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Servus.Akka.Transport.Quic.Listener;

public sealed class QuicListenerFactory : IListenerFactory
{
public Source<Flow<ITransportOutbound, ITransportInbound, NotUsed>, Task> Bind(ListenerOptions options)
public Source<Flow<ITransportOutbound, ITransportInbound, NotUsed>, Task<int>> Bind(ListenerOptions options)
{
if (options is not QuicListenerOptions quicOptions)
{
Expand Down
14 changes: 7 additions & 7 deletions src/Servus.Akka/Transport/Quic/Listener/QuicListenerStage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ internal sealed record QuicAcceptFailed(Exception Error);
internal sealed record QuicListenerBound(QuicListener Listener);

internal sealed class QuicListenerStage
: GraphStageWithMaterializedValue<SourceShape<Flow<ITransportOutbound, ITransportInbound, NotUsed>>, Task>
: GraphStageWithMaterializedValue<SourceShape<Flow<ITransportOutbound, ITransportInbound, NotUsed>>, Task<int>>
{
private readonly QuicListenerOptions _options;

Expand All @@ -33,24 +33,24 @@ public QuicListenerStage(QuicListenerOptions options)
Shape = new SourceShape<Flow<ITransportOutbound, ITransportInbound, NotUsed>>(_out);
}

public override ILogicAndMaterializedValue<Task> CreateLogicAndMaterializedValue(
public override ILogicAndMaterializedValue<Task<int>> CreateLogicAndMaterializedValue(
Attributes inheritedAttributes)
{
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
return new LogicAndMaterializedValue<Task>(new Logic(this, tcs), tcs.Task);
var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
return new LogicAndMaterializedValue<Task<int>>(new Logic(this, tcs), tcs.Task);
}

[ExcludeFromCodeCoverage]
private sealed class Logic : GraphStageLogic
{
private readonly QuicListenerStage _stage;
private readonly TaskCompletionSource _boundSignal;
private readonly TaskCompletionSource<int> _boundSignal;
private readonly Queue<Flow<ITransportOutbound, ITransportInbound, NotUsed>> _pendingConnections = new();
private QuicListener? _listener;
private IActorRef _self = null!;
private CancellationTokenSource? _cts;

public Logic(QuicListenerStage stage, TaskCompletionSource boundSignal) : base(stage.Shape)
public Logic(QuicListenerStage stage, TaskCompletionSource<int> boundSignal) : base(stage.Shape)
{
_stage = stage;
_boundSignal = boundSignal;
Expand Down Expand Up @@ -149,7 +149,7 @@ private void OnReceive((IActorRef sender, object message) args)
{
case QuicListenerBound bound:
_listener = bound.Listener;
_boundSignal.TrySetResult();
_boundSignal.TrySetResult(_listener.LocalEndPoint.Port);
_ = AcceptLoopAsync(_listener, _self, _cts!.Token);
break;
case QuicConnectionAccepted accepted:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Servus.Akka.Transport.Tcp.Listener;

public sealed class TcpListenerFactory : IListenerFactory
{
public Source<Flow<ITransportOutbound, ITransportInbound, NotUsed>, Task> Bind(ListenerOptions options)
public Source<Flow<ITransportOutbound, ITransportInbound, NotUsed>, Task<int>> Bind(ListenerOptions options)
{
if (options is not TcpListenerOptions tcpOptions)
{
Expand Down
15 changes: 8 additions & 7 deletions src/Servus.Akka/Transport/Tcp/Listener/TcpListenerStage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ internal sealed record TcpConnectionReady(Flow<ITransportOutbound, ITransportInb
internal sealed record TcpConnectionInitFailed(Exception Error);

internal sealed class TcpListenerStage
: GraphStageWithMaterializedValue<SourceShape<Flow<ITransportOutbound, ITransportInbound, NotUsed>>, Task>
: GraphStageWithMaterializedValue<SourceShape<Flow<ITransportOutbound, ITransportInbound, NotUsed>>, Task<int>>
{
private readonly TcpListenerOptions _options;

Expand All @@ -35,24 +35,24 @@ public TcpListenerStage(TcpListenerOptions options)
Shape = new SourceShape<Flow<ITransportOutbound, ITransportInbound, NotUsed>>(_out);
}

public override ILogicAndMaterializedValue<Task> CreateLogicAndMaterializedValue(
public override ILogicAndMaterializedValue<Task<int>> CreateLogicAndMaterializedValue(
Attributes inheritedAttributes)
{
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
return new LogicAndMaterializedValue<Task>(new Logic(this, tcs), tcs.Task);
var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
return new LogicAndMaterializedValue<Task<int>>(new Logic(this, tcs), tcs.Task);
}

[ExcludeFromCodeCoverage]
private sealed class Logic : GraphStageLogic
{
private readonly TcpListenerStage _stage;
private readonly TaskCompletionSource _boundSignal;
private readonly TaskCompletionSource<int> _boundSignal;
private readonly Queue<Flow<ITransportOutbound, ITransportInbound, NotUsed>> _pendingConnections = new();
private TcpListener? _listener;
private IActorRef _self = null!;
private CancellationTokenSource? _cts;

public Logic(TcpListenerStage stage, TaskCompletionSource boundSignal) : base(stage.Shape)
public Logic(TcpListenerStage stage, TaskCompletionSource<int> boundSignal) : base(stage.Shape)
{
_stage = stage;
_boundSignal = boundSignal;
Expand Down Expand Up @@ -81,7 +81,8 @@ public override void PreStart()
}

_listener.Start(_stage._options.Backlog);
_boundSignal.TrySetResult();
var actualPort = ((IPEndPoint)_listener.LocalEndpoint).Port;
_boundSignal.TrySetResult(actualPort);
_ = AcceptLoopAsync(_listener, _self, _cts.Token);
}

Expand Down
4 changes: 2 additions & 2 deletions src/Servus.Akka/Transport/TransportFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ namespace Servus.Akka.Transport;

public static class TransportFactory
{
public static Source<Flow<ITransportOutbound, ITransportInbound, NotUsed>, Task> CreateTcpListener(
public static Source<Flow<ITransportOutbound, ITransportInbound, NotUsed>, Task<int>> CreateTcpListener(
TcpListenerOptions options)
=> new TcpListenerFactory().Bind(options);

public static Source<Flow<ITransportOutbound, ITransportInbound, NotUsed>, Task> CreateQuicListener(
public static Source<Flow<ITransportOutbound, ITransportInbound, NotUsed>, Task<int>> CreateQuicListener(
QuicListenerOptions options)
=> new QuicListenerFactory().Bind(options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,59 +333,7 @@ namespace TurboHTTP.Server.Context.Features
}
namespace TurboHTTP.Server.Context
{
public interface ITurboFormCollection : System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<string, Microsoft.Extensions.Primitives.StringValues>>, System.Collections.IEnumerable
{
int Count { get; }
TurboHTTP.Server.Context.ITurboFormFileCollection Files { get; }
Microsoft.Extensions.Primitives.StringValues this[string key] { get; }
System.Collections.Generic.ICollection<string> Keys { get; }
bool ContainsKey(string key);
}
public interface ITurboFormFile
{
string ContentType { get; }
string FileName { get; }
long Length { get; }
string Name { get; }
void CopyTo(System.IO.Stream target);
System.Threading.Tasks.Task CopyToAsync(System.IO.Stream target, System.Threading.CancellationToken cancellationToken = default);
System.IO.Stream OpenReadStream();
}
public interface ITurboFormFileCollection : System.Collections.Generic.IEnumerable<TurboHTTP.Server.Context.ITurboFormFile>, System.Collections.IEnumerable
{
int Count { get; }
TurboHTTP.Server.Context.ITurboFormFile this[int index] { get; }
TurboHTTP.Server.Context.ITurboFormFile? this[string name] { get; }
TurboHTTP.Server.Context.ITurboFormFile? GetFile(string name);
System.Collections.Generic.IReadOnlyList<TurboHTTP.Server.Context.ITurboFormFile> GetFiles(string name);
}
public interface ITurboHeaderDictionary : System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<string, Microsoft.Extensions.Primitives.StringValues>>, System.Collections.IEnumerable
{
long? ContentLength { get; set; }
int Count { get; }
Microsoft.Extensions.Primitives.StringValues this[string key] { get; set; }
System.Collections.Generic.ICollection<string> Keys { get; }
void Add(string key, Microsoft.Extensions.Primitives.StringValues value);
void Clear();
bool ContainsKey(string key);
bool Remove(string key);
bool TryGetValue(string key, out Microsoft.Extensions.Primitives.StringValues value);
}
public interface ITurboQueryCollection : System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<string, Microsoft.Extensions.Primitives.StringValues>>, System.Collections.IEnumerable
{
int Count { get; }
Microsoft.Extensions.Primitives.StringValues this[string key] { get; }
System.Collections.Generic.ICollection<string> Keys { get; }
bool ContainsKey(string key);
bool TryGetValue(string key, out Microsoft.Extensions.Primitives.StringValues value);
}
public interface ITurboRequestCookieCollection : System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<string, string>>, System.Collections.IEnumerable
{
int Count { get; }
string? this[string key] { get; }
System.Collections.Generic.ICollection<string> Keys { get; }
bool ContainsKey(string key);
}
public interface ITurboHeaderDictionary : Microsoft.AspNetCore.Http.IHeaderDictionary, System.Collections.Generic.ICollection<System.Collections.Generic.KeyValuePair<string, Microsoft.Extensions.Primitives.StringValues>>, System.Collections.Generic.IDictionary<string, Microsoft.Extensions.Primitives.StringValues>, System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<string, Microsoft.Extensions.Primitives.StringValues>>, System.Collections.IEnumerable { }
}
namespace TurboHTTP.Server
{
Expand Down
2 changes: 1 addition & 1 deletion src/TurboHTTP.AcceptanceTests/H11/ErrorHandlingSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public async Task ErrorHandling_should_return_4xx_status_code_400()
Assert.Equal(HttpStatusCode.BadRequest, response.StatusCode);
}

[Fact(Timeout = 5000)]
[Fact(Timeout = 10000)]
[Trait("RFC", "RFC9110-15.5")]
public async Task ErrorHandling_should_return_4xx_status_code_401()
{
Expand Down
2 changes: 1 addition & 1 deletion src/TurboHTTP.AcceptanceTests/H11/RedirectSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public async Task Redirect_should_follow_get_308_to_hello()
Assert.Equal("Hello World", body);
}

[Theory(Timeout = 5000)]
[Theory(Timeout = 10000)]
[InlineData(1)]
[InlineData(3)]
[InlineData(5)]
Expand Down
2 changes: 1 addition & 1 deletion src/TurboHTTP.AcceptanceTests/TLS/ConnectionSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public async Task Connection_should_default_to_keep_alive_without_connection_hea
Assert.Equal("default", body);
}

[Fact(Timeout = 5000)]
[Fact(Timeout = 10000)]
[Trait("RFC", "RFC9110-7.8")]
public async Task Connection_101_switching_protocols_must_not_be_reusable_for_http()
{
Expand Down
2 changes: 1 addition & 1 deletion src/TurboHTTP.AcceptanceTests/xunit.runner.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"$schema": "https://xunit.net/schema/current/xunit.runner.schema.json",
"parallelizeTestCollections": true,
"parallelizeAssembly": false,
"maxParallelThreads": 4
"maxParallelThreads": 8
}
107 changes: 107 additions & 0 deletions src/TurboHTTP.IntegrationTests.Server/BodyFloodReproSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
using System.Net;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Servus.Akka.Transport;
using TurboHTTP.IntegrationTests.Server.Shared;
using TurboHTTP.Server;

namespace TurboHTTP.IntegrationTests.Server;

[Collection("ServerStress")]
public sealed class BodyFloodReproSpec : ServerSpecBase
{
private static readonly byte[] Payload = new byte[1 * 1024 * 1024];

protected override void ConfigureServer(WebApplicationBuilder builder, ushort port)
{
builder.Host.UseTurboHttp(options =>
{
options.Bind(new TcpListenerOptions { Host = "127.0.0.1", Port = port });
});
}

protected override void ConfigureEndpoints(WebApplication app)
{
app.MapPost("/echo-size", async ctx =>
{
long count = 0;
var buffer = new byte[64 * 1024];
int read;
while ((read = await ctx.Request.Body.ReadAsync(buffer, CancellationToken)) > 0)
{
count += read;
}

ctx.Response.ContentType = "text/plain";
await ctx.Response.WriteAsync(count.ToString(), CancellationToken);
});
}

[Fact(Timeout = 10000)]
public async Task Post_1mb_body_should_return_correct_size()
{
var content = new ByteArrayContent(Payload);
var response = await Client.PostAsync(
new Uri($"http://127.0.0.1:{Port}/echo-size"),
content,
CancellationToken);

Assert.Equal(HttpStatusCode.OK, response.StatusCode);
var body = await response.Content.ReadAsStringAsync(CancellationToken);
Assert.Equal((1 * 1024 * 1024).ToString(), body);
}

[Fact(Timeout = 120000)]
public async Task Concurrent_1mb_posts_should_all_succeed()
{
var concurrency = 50;
using var handler = new SocketsHttpHandler
{
MaxConnectionsPerServer = concurrency,
};
using var client = new HttpClient(handler) { Timeout = TimeSpan.FromSeconds(60) };

var uri = new Uri($"http://127.0.0.1:{Port}/echo-size");
var errors = new List<string>();
var succeeded = 0;

var expectedSize = (1 * 1024 * 1024).ToString();
var tasks = Enumerable.Range(0, concurrency).Select(async i =>
{
try
{
var content = new ByteArrayContent(Payload);
var response = await client.PostAsync(uri, content, CancellationToken);
if (response.StatusCode != HttpStatusCode.OK)
{
lock (errors) errors.Add($"[{i}] status={response.StatusCode}");
return;
}

var body = await response.Content.ReadAsStringAsync(CancellationToken);
if (body == expectedSize)
{
Interlocked.Increment(ref succeeded);
}
else
{
lock (errors) errors.Add($"[{i}] body size mismatch: expected={expectedSize}, actual={body}");
}
}
catch (Exception ex)
{
lock (errors) errors.Add($"[{i}] {ex.GetType().Name}: {ex.InnerException?.Message ?? ex.Message}");
}
}).ToArray();

await Task.WhenAll(tasks);

var msg = $"{succeeded}/{concurrency} succeeded";
if (errors.Count > 0)
{
msg += $"\nErrors ({errors.Count}):\n" + string.Join("\n", errors.Take(10));
}

Assert.True(succeeded == concurrency, msg);
}
}
Loading
Loading