Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ioxide.Kestrel/HopDuplexPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ internal sealed class HopDuplexPipe : IDuplexPipe, IAsyncDisposable
private readonly TlsSession? _tls; // non-null on a kTLS-terminated connection: inbound is decrypted here
private readonly Pipe _inbound; // recv pump writes; Kestrel reads (Transport.Input)
private readonly Pipe _outbound; // Kestrel writes (Transport.Output); send pump reads
private readonly PipeReader _input; // Kestrel's Input — pins the first read onto the reactor thread

private Task _recvPump = Task.CompletedTask;
private Task _sendPump = Task.CompletedTask;
Expand All @@ -29,7 +30,7 @@ internal sealed class HopDuplexPipe : IDuplexPipe, IAsyncDisposable
private static extern int Shutdown(int sockfd, int how);
private const int ShutWr = 1; // SHUT_WR

public PipeReader Input => _inbound.Reader;
public PipeReader Input => _input;
public PipeWriter Output => _outbound.Writer;

public HopDuplexPipe(Connection conn, Reactor reactor, TlsSession? tls = null)
Expand All @@ -54,6 +55,8 @@ public HopDuplexPipe(Connection conn, Reactor reactor, TlsSession? tls = null)
pauseWriterThreshold: 64 * 1024,
resumeWriterThreshold: 32 * 1024,
useSynchronizationContext: false));

_input = new ReactorPinReader(_inbound.Reader, reactor);
}

/// <summary>Launches the recv and send pumps. Must be called on the reactor thread.</summary>
Expand Down
14 changes: 14 additions & 0 deletions ioxide.Kestrel/IReactorFeature.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using ioxide;

namespace ioxide.Kestrel;

/// <summary>
/// Exposes the ioxide <see cref="Reactor"/> that owns the current connection. The transport sets this on
/// the connection's feature collection, so an endpoint can reach the connection's reactor (and its
/// ring-native services like PgPool / AssetReader) regardless of which thread Kestrel ran the endpoint on.
/// Prefer the <c>HttpContext.OnReactor(...)</c> helper, which uses this to run work on the reactor.
/// </summary>
public interface IReactorFeature
{
Reactor Reactor { get; }
}
9 changes: 8 additions & 1 deletion ioxide.Kestrel/IoxideConnectionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ internal sealed class IoxideConnectionContext : ConnectionContext,
IConnectionTransportFeature,
IConnectionItemsFeature,
IConnectionLifetimeFeature,
IConnectionEndPointFeature
IConnectionEndPointFeature,
IReactorFeature
{
private readonly HopDuplexPipe _pipe;
private readonly Reactor _reactor;
private readonly CancellationTokenSource _connectionClosedCts = new();
private readonly FeatureCollection _features = new();

Expand All @@ -35,6 +37,7 @@ internal sealed class IoxideConnectionContext : ConnectionContext,
public IoxideConnectionContext(Connection connection, Reactor reactor, EndPoint localEndPoint, long id, TlsSession? session = null, string? alpn = null)
{
_pipe = new HopDuplexPipe(connection, reactor, session);
_reactor = reactor;

ConnectionId = $"ioxide-{id:x}";
LocalEndPoint = localEndPoint;
Expand All @@ -47,6 +50,7 @@ public IoxideConnectionContext(Connection connection, Reactor reactor, EndPoint
_features.Set<IConnectionItemsFeature>(this);
_features.Set<IConnectionLifetimeFeature>(this);
_features.Set<IConnectionEndPointFeature>(this);
_features.Set<IReactorFeature>(this);

if (session is not null)
{
Expand All @@ -65,6 +69,9 @@ public IoxideConnectionContext(Connection connection, Reactor reactor, EndPoint
/// <summary>Launches the transport pumps. Must be called on the reactor thread (from the Handle callback).</summary>
public void StartPumps() => _pipe.Start();

/// <summary>The reactor that owns this connection (<see cref="IReactorFeature"/>).</summary>
public Reactor Reactor => _reactor;

public override string ConnectionId { get; set; }
public override IFeatureCollection Features => _features;
public override IDictionary<object, object?> Items { get; set; }
Expand Down
18 changes: 13 additions & 5 deletions ioxide.Kestrel/IoxideConnectionListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,20 @@ public IoxideConnectionListener(IPEndPoint endpoint, IoxideTransportOptions opti
{
Handle = HandleConnectionAsync,
};
if (_tlsOptions is not null)
// Per-reactor startup, on the reactor's own thread: bind the current-reactor seam (so Kestrel
// endpoints can resolve ring-native services), start the TLS service if configured, then run
// the user's hook (PgPool.Start, AssetReader.CreatePool, ...).
var tls = _tlsOptions;
var onReactorStart = options.OnReactorStart;
reactor.OnStart = r =>
{
// Start the per-reactor TLS service on the reactor's own thread (OpenSSL ctx + cert load).
var tls = _tlsOptions;
reactor.OnStart = r => TlsService.Start(r, tls);
}
IoxideReactor.Bind(r);
if (tls is not null)
{
TlsService.Start(r, tls);
}
onReactorStart?.Invoke(r);
};
_reactors[i] = reactor;
_threads[i] = new Thread(reactor.Run)
{
Expand Down
45 changes: 45 additions & 0 deletions ioxide.Kestrel/IoxideHttpExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using Microsoft.AspNetCore.Http;
using ioxide;

namespace ioxide.Kestrel;

/// <summary>
/// Helpers for running ring-native work (ioxide.pg / ioxide.file) on the connection's reactor from a Kestrel
/// endpoint, so DB and file I/O stay thread-per-core regardless of which thread Kestrel ran the endpoint on.
/// </summary>
public static class IoxideHttpExtensions
{
/// <summary>The reactor that owns this connection (from <see cref="IReactorFeature"/>).</summary>
public static Reactor GetReactor(this HttpContext context)
=> (context.Features.Get<IReactorFeature>()
?? throw new InvalidOperationException(
"No ioxide reactor on this connection — is the app running on the ioxide transport (UseIoxide())?"))
.Reactor;

/// <summary>
/// Runs <paramref name="work"/> on this connection's reactor and returns its result. When the endpoint is
/// already on that reactor (the common keep-alive case) it runs inline — ioxide's inline resume is
/// preserved. Otherwise (e.g. the first request of a connection, which Kestrel may dispatch to the
/// ThreadPool) the work is marshaled onto the reactor. Either way the ring I/O runs on the reactor.
/// Materialize ring-native results (PgRow, file buffers) into your own objects inside <paramref name="work"/>.
/// </summary>
public static Task<T> OnReactor<T>(this HttpContext context, Func<Reactor, Task<T>> work)
{
Reactor reactor = context.GetReactor();

if (IoxideReactor.TryCurrent() == reactor)
{
return work(reactor); // already on the reactor — run inline
}

var tcs = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
reactor.ScheduleOnReactor(_ => _ = RunAsync(reactor, work, tcs), null);
return tcs.Task;
}

private static async Task RunAsync<T>(Reactor reactor, Func<Reactor, Task<T>> work, TaskCompletionSource<T> tcs)
{
try { tcs.SetResult(await work(reactor)); }
catch (Exception e) { tcs.SetException(e); }
}
}
29 changes: 29 additions & 0 deletions ioxide.Kestrel/IoxideReactor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using ioxide;

namespace ioxide.Kestrel;

/// <summary>
/// Access to the reactor running the current request, so a Kestrel endpoint — which runs pinned to the
/// reactor thread under this transport — can reach that reactor's ring-native services, e.g.
/// <c>IoxideReactor.Current.GetService&lt;PgPool&gt;()</c>. Start those services per reactor via
/// <see cref="IoxideTransportOptions.OnReactorStart"/>.
///
/// Only valid on a reactor thread. It relies on awaited continuations resuming inline on the same reactor
/// thread (which the transport guarantees for the request loop) — under a thread-hopping scheduler the
/// slot could point at the wrong reactor, so resolve the service before the first off-reactor await.
/// </summary>
public static class IoxideReactor
{
[ThreadStatic] private static Reactor? _current;

/// <summary>The reactor handling the current request. Throws when read off a reactor thread.</summary>
public static Reactor Current =>
_current ?? throw new InvalidOperationException(
"IoxideReactor.Current is only available on an ioxide reactor thread (inside request handling).");

/// <summary>The current reactor, or null when not on a reactor thread. Used by the transport to pin work.</summary>
internal static Reactor? TryCurrent() => _current;

/// <summary>Bound once per reactor, on the reactor's own thread, from the transport's OnStart.</summary>
internal static void Bind(Reactor reactor) => _current = reactor;
}
7 changes: 7 additions & 0 deletions ioxide.Kestrel/IoxideTransportOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ public sealed class IoxideTransportOptions
/// </summary>
public Func<ServerConfig, ServerConfig>? ConfigureServer { get; set; }

/// <summary>
/// Per-reactor startup hook, run on each reactor's own thread before its loop starts — open ring-native
/// clients here (e.g. <c>PgPool.Start(r, pgOptions)</c>, <c>AssetReader.CreatePool(r, ...)</c>) so DB and
/// file I/O ride that reactor's ring. Endpoints resolve them via <c>IoxideReactor.Current.GetService&lt;T&gt;()</c>.
/// </summary>
public Action<Reactor>? OnReactorStart { get; set; }

/// <summary>
/// TLS termination via kTLS (kernel TLS), done in the transport on the listed ports. When set, the
/// reactor runs the TLS 1.3 handshake on accept, installs kTLS TX, and hands Kestrel a plaintext
Expand Down
73 changes: 73 additions & 0 deletions ioxide.Kestrel/ReactorPinReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using ioxide;

namespace ioxide.Kestrel;

/// <summary>
/// Wraps the inbound <see cref="PipeReader"/> so the FIRST read of a connection resumes on the connection's
/// reactor thread. Kestrel dispatches each new connection to the ThreadPool, and the first read usually
/// completes synchronously (the recv pump pre-buffered the request), which would run the whole first
/// request — application code included — off the reactor. Hopping once here pins Kestrel's parse + request
/// loop to the reactor from request one, so endpoints can use ring-native services (ioxide.pg, ioxide.file)
/// reliably on every request. Warm requests (read parks, resumes via the reactor scheduler) are already
/// on-reactor, so this only costs one hop per connection.
/// </summary>
internal sealed class ReactorPinReader : PipeReader
{
private readonly PipeReader _inner;
private readonly Reactor _reactor;
private bool _pinned;

public ReactorPinReader(PipeReader inner, Reactor reactor)
{
_inner = inner;
_reactor = reactor;
}

public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
=> _pinned ? _inner.ReadAsync(cancellationToken) : FirstReadAsync(cancellationToken);

private async ValueTask<ReadResult> FirstReadAsync(CancellationToken cancellationToken)
{
ReadResult result = await _inner.ReadAsync(cancellationToken).ConfigureAwait(false);

// If we resumed off the reactor (the common first-read case), hop onto this connection's reactor so
// Kestrel's header parse and the application run there.
if (IoxideReactor.TryCurrent() != _reactor)
{
await new ReactorSwitch(_reactor);
}
_pinned = true;
return result;
}

// Force the first read through ReadAsync (which pins) by reporting no synchronous data the first time.
public override bool TryRead(out ReadResult result)
{
if (!_pinned)
{
result = default;
return false;
}
return _inner.TryRead(out result);
}

public override void AdvanceTo(SequencePosition consumed) => _inner.AdvanceTo(consumed);
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) => _inner.AdvanceTo(consumed, examined);
public override void CancelPendingRead() => _inner.CancelPendingRead();
public override void Complete(Exception? exception = null) => _inner.Complete(exception);
public override ValueTask CompleteAsync(Exception? exception = null) => _inner.CompleteAsync(exception);
}

/// <summary>Awaiter that resumes its continuation on the given reactor's thread (via the reactor post queue).</summary>
internal readonly struct ReactorSwitch : ICriticalNotifyCompletion
{
private readonly Reactor _reactor;
public ReactorSwitch(Reactor reactor) => _reactor = reactor;
public ReactorSwitch GetAwaiter() => this;
public bool IsCompleted => false;
public void OnCompleted(Action continuation) => _reactor.ScheduleOnReactor(static s => ((Action)s!)(), continuation);
public void UnsafeOnCompleted(Action continuation) => _reactor.ScheduleOnReactor(static s => ((Action)s!)(), continuation);
public void GetResult() { }
}
2 changes: 1 addition & 1 deletion ioxide.Kestrel/ioxide.Kestrel.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<RootNamespace>ioxide.Kestrel</RootNamespace>

<PackageId>ioxide.Kestrel</PackageId>
<Version>0.0.15</Version>
<Version>0.0.16</Version>
<Authors>MDA2AV</Authors>
<Description>ASP.NET Core Kestrel transport backed by the ioxide io_uring runtime: one reactor (ring) per core, SO_REUSEPORT load-balanced, with Kestrel's HTTP request loop pinned to the reactor thread. Drop-in via UseIoxide().</Description>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
Expand Down
2 changes: 1 addition & 1 deletion ioxide.file/ioxide.file.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<RootNamespace>ioxide.file</RootNamespace>

<PackageId>ioxide.file</PackageId>
<Version>0.0.15</Version>
<Version>0.0.16</Version>
<Authors>MDA2AV</Authors>
<Description>File serving for the ioxide io_uring runtime: immutable asset snapshots with baked responses, pooled positional ring reads, atomic reloads.</Description>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
Expand Down
2 changes: 1 addition & 1 deletion ioxide.pg/ioxide.pg.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<RootNamespace>ioxide.pg</RootNamespace>

<PackageId>ioxide.pg</PackageId>
<Version>0.0.15</Version>
<Version>0.0.16</Version>
<Authors>MDA2AV</Authors>
<Description>Postgres driver for the ioxide io_uring runtime: pooled ring-native connections per reactor, ring-native connect and handshake, inline completion resume.</Description>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
Expand Down
2 changes: 1 addition & 1 deletion ioxide.redis/ioxide.redis.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<RootNamespace>ioxide.redis</RootNamespace>

<PackageId>ioxide.redis</PackageId>
<Version>0.0.15</Version>
<Version>0.0.16</Version>
<Authors>MDA2AV</Authors>
<Description>Redis client for the ioxide io_uring runtime: pooled ring-native connections per reactor, full RESP2 protocol, a generic command API plus typed helpers (strings, keys, hashes, lists, sets, sorted sets, pub/sub, transactions, scripting), and pipelining. Inline completion resume.</Description>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
Expand Down
2 changes: 1 addition & 1 deletion ioxide.tls/ioxide.tls.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<RootNamespace>ioxide.tls</RootNamespace>

<PackageId>ioxide.tls</PackageId>
<Version>0.0.15</Version>
<Version>0.0.16</Version>
<Authors>MDA2AV</Authors>
<Description>TLS for the ioxide io_uring runtime: OpenSSL handshake driven over the ring, then kernel TLS (kTLS) transmit offload - handlers keep writing plaintext through the same connection API. Requires Linux kTLS (tls module) and OpenSSL 3.</Description>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
Expand Down
2 changes: 1 addition & 1 deletion ioxide/ioxide.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<RootNamespace>ioxide</RootNamespace>

<PackageId>ioxide</PackageId>
<Version>0.0.15</Version>
<Version>0.0.16</Version>
<Authors>MDA2AV</Authors>
<Description>A shared-nothing io_uring runtime for .NET: one ring per reactor thread, inline completions, zero native dependencies. The engine - reactor, connection, and the IRingHost client seam.</Description>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
Expand Down
Loading