Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Examples.AspNet/Examples.AspNet.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>net11.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<RootNamespace>Examples.AspNet</RootNamespace>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="../ioxide.Kestrel/ioxide.Kestrel.csproj" />
</ItemGroup>

</Project>
42 changes: 42 additions & 0 deletions Examples.AspNet/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using ioxide.Kestrel;
using Microsoft.Extensions.Logging;

// A minimal ASP.NET Core app that runs on either the ioxide io_uring transport or Kestrel's stock
// sockets transport. Pick with the TRANSPORT environment variable (default: ioxide):
//
// TRANSPORT=ioxide dotnet run # ioxide.Kestrel transport (io_uring, one reactor per core)
// TRANSPORT=sockets dotnet run # stock Kestrel sockets transport (the framework default)
//
// Then: curl http://localhost:8080/ and curl http://localhost:8080/plaintext

var builder = WebApplication.CreateBuilder(args);

builder.Logging.ClearProviders(); // turn off all logging (no per-request info: lines)

var transport = (Environment.GetEnvironmentVariable("TRANSPORT") ?? "ioxide").Trim().ToLowerInvariant();

builder.WebHost.ConfigureKestrel(o => o.ListenAnyIP(8080));

switch (transport)
{
case "ioxide":
builder.WebHost.UseIoxide(o => o.ReactorCount = 16); // io_uring transport, 16 reactors (one ring per thread)
break;

case "sockets":
case "kestrel":
// Stock Kestrel sockets transport — the framework default, nothing to wire up.
break;

default:
Console.Error.WriteLine($"Unknown TRANSPORT '{transport}'. Use 'ioxide' or 'sockets'.");
return;
}

var app = builder.Build();

app.MapGet("/", () => $"Hello from ioxide.Kestrel! transport={transport}");
app.MapGet("/plaintext", () => "Hello, World!");

Console.WriteLine($"[Examples.AspNet] listening on http://localhost:8080 (transport={transport})");
app.Run();
152 changes: 152 additions & 0 deletions ioxide.Kestrel/HopDuplexPipe.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
using System.Buffers;
using System.IO.Pipelines;
using ioxide;
using ioxide.utils;

namespace ioxide.Kestrel;

/// <summary>
/// A Kestrel transport duplex over an ioxide <see cref="Connection"/>: two BCL <see cref="Pipe"/>s whose
/// reader schedulers route to the reactor thread (via <see cref="ReactorPipeScheduler"/>), plus a
/// recv→inbound pump and an outbound→send pump that run on the reactor. This pins Kestrel's whole request
/// loop to the reactor thread. One copy each way (recv bytes into the inbound pipe; response bytes into
/// the connection slab).
/// </summary>
internal sealed class HopDuplexPipe : IDuplexPipe, IAsyncDisposable
{
private readonly Connection _conn;
private readonly Reactor _reactor;
private readonly Pipe _inbound; // recv pump writes; Kestrel reads (Transport.Input)
private readonly Pipe _outbound; // Kestrel writes (Transport.Output); send pump reads

private Task _recvPump = Task.CompletedTask;
private Task _sendPump = Task.CompletedTask;
private int _started;

public PipeReader Input => _inbound.Reader;
public PipeWriter Output => _outbound.Writer;

public HopDuplexPipe(Connection conn, Reactor reactor)
{
_conn = conn;
_reactor = reactor;
var scheduler = new ReactorPipeScheduler(reactor);

// Reader schedulers = the reactor: Kestrel's HTTP parse (inbound reader) and the send pump
// (outbound reader) both run on the reactor thread.
_inbound = new Pipe(new PipeOptions(
readerScheduler: scheduler,
writerScheduler: scheduler,
pauseWriterThreshold: 1024 * 1024,
resumeWriterThreshold: 512 * 1024,
useSynchronizationContext: false));

_outbound = new Pipe(new PipeOptions(
readerScheduler: scheduler,
writerScheduler: PipeScheduler.ThreadPool,
pauseWriterThreshold: 64 * 1024,
resumeWriterThreshold: 32 * 1024,
useSynchronizationContext: false));
}

/// <summary>Launches the recv and send pumps. Must be called on the reactor thread.</summary>
public void Start()
{
if (Interlocked.Exchange(ref _started, 1) == 1)
{
return;
}
_recvPump = RecvPumpAsync();
_sendPump = SendPumpAsync();
}

// Reactor → inbound pipe. Copies each recv slice into the pipe and flushes; Kestrel reads it.
private async Task RecvPumpAsync()
{
PipeWriter writer = _inbound.Writer;
try
{
while (true)
{
RecvSnapshot snap = await _conn.ReadAsync();

while (_conn.TryGetItem(snap, out SpscRecvRing.Item item))
{
if (item.HasBuffer && item.Len > 0)
{
CopySlice(in item, writer);
}
_conn.ReturnBuffer(in item);
}
_conn.ResetRead();

if (snap.IsClosed)
{
break;
}

FlushResult fr = await writer.FlushAsync();
if (fr.IsCompleted || fr.IsCanceled)
{
break;
}
}
}
catch { /* swallow client/protocol faults; teardown in finally */ }
finally { await writer.CompleteAsync(); }
}

private static unsafe void CopySlice(in SpscRecvRing.Item item, PipeWriter writer)
{
Span<byte> dst = writer.GetSpan(item.Len);
new ReadOnlySpan<byte>(item.Ptr, item.Len).CopyTo(dst);
writer.Advance(item.Len);
}

// Outbound pipe → connection send. Drains Kestrel's response into the slab and submits one SEND.
private async Task SendPumpAsync()
{
PipeReader reader = _outbound.Reader;
try
{
while (true)
{
ReadResult rr = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = rr.Buffer;

if (!buffer.IsEmpty)
{
foreach (ReadOnlyMemory<byte> segment in buffer)
{
Span<byte> dst = _conn.GetSpan(segment.Length);
segment.Span.CopyTo(dst);
_conn.Advance(segment.Length);
}
await _conn.FlushAsync();
}

reader.AdvanceTo(buffer.End);

if (rr.IsCompleted || rr.IsCanceled)
{
break;
}
}
}
catch { /* swallow; teardown in finally */ }
finally { await reader.CompleteAsync(); }
}

public async ValueTask DisposeAsync()
{
// Kestrel has completed its ends; wake the pumps and unwind. MarkClosed wakes a recv parked in
// conn.ReadAsync — schedule it ON the reactor so the recv continuation (which touches reactor-owned
// recv state) runs there, not on Kestrel's dispose thread. The pipe cancels resume via the pipes'
// reactor reader/writer schedulers, so they're reactor-safe too.
_reactor.ScheduleOnReactor(static c => ((Connection)c!).MarkClosed(), _conn);
_inbound.Writer.CancelPendingFlush();
_outbound.Reader.CancelPendingRead();
try { await _recvPump; } catch { }
try { await _sendPump; } catch { }
}
}
93 changes: 93 additions & 0 deletions ioxide.Kestrel/IoxideConnectionContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
using System.IO.Pipelines;
using System.Net;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Http.Features;
using ioxide;

namespace ioxide.Kestrel;

/// <summary>
/// Adapts a single ioxide <see cref="Connection"/> to Kestrel's <see cref="ConnectionContext"/>. The
/// transport is a <see cref="HopDuplexPipe"/> (BCL pipes whose reader schedulers route to the reactor),
/// so Kestrel's read → parse → handle → send loop runs pinned to the reactor thread.
/// </summary>
internal sealed class IoxideConnectionContext : ConnectionContext,
IConnectionIdFeature,
IConnectionTransportFeature,
IConnectionItemsFeature,
IConnectionLifetimeFeature,
IConnectionEndPointFeature
{
private readonly HopDuplexPipe _pipe;
private readonly CancellationTokenSource _connectionClosedCts = new();
private readonly FeatureCollection _features = new();

// Completed when Kestrel is done with the connection (DisposeAsync, or Abort). The ioxide Handle
// callback awaits this and only then DecRefs the connection.
private readonly TaskCompletionSource _completion = new(TaskCreationOptions.RunContinuationsAsynchronously);

private int _disposed;

public IoxideConnectionContext(Connection connection, Reactor reactor, EndPoint localEndPoint, long id)
{
_pipe = new HopDuplexPipe(connection, reactor);

ConnectionId = $"ioxide-{id:x}";
LocalEndPoint = localEndPoint;
RemoteEndPoint = null;
Items = new ConnectionItems();
ConnectionClosed = _connectionClosedCts.Token;

_features.Set<IConnectionIdFeature>(this);
_features.Set<IConnectionTransportFeature>(this);
_features.Set<IConnectionItemsFeature>(this);
_features.Set<IConnectionLifetimeFeature>(this);
_features.Set<IConnectionEndPointFeature>(this);
}

/// <summary>Resolves once Kestrel has finished with this connection; the reactor's Handle callback awaits it.</summary>
public Task Completion => _completion.Task;

/// <summary>Launches the transport pumps. Must be called on the reactor thread (from the Handle callback).</summary>
public void StartPumps() => _pipe.Start();

public override string ConnectionId { get; set; }
public override IFeatureCollection Features => _features;
public override IDictionary<object, object?> Items { get; set; }

public override IDuplexPipe Transport
{
get => _pipe;
set => throw new NotSupportedException("Transport is owned by the ioxide transport adapter.");
}

public override CancellationToken ConnectionClosed { get; set; }
public override EndPoint? LocalEndPoint { get; set; }
public override EndPoint? RemoteEndPoint { get; set; }

public override void Abort(ConnectionAbortedException abortReason)
{
// Forced close: signal the lifetime token. The pumps are unwound (reactor-safely) in DisposeAsync,
// which Kestrel calls during connection cleanup after Abort.
try { _connectionClosedCts.Cancel(); } catch { /* ignore */ }
}

public override async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _disposed, 1) == 1)
{
return;
}

try { _connectionClosedCts.Cancel(); } catch { /* ignore */ }

// Unwind the recv/send pumps (returns held recv buffers, stops the send loop).
await _pipe.DisposeAsync().ConfigureAwait(false);

_connectionClosedCts.Dispose();

// Release the reactor's Handle callback, which DecRefs the connection (-> recycle).
_completion.TrySetResult();
}
}
Loading
Loading