From 20fc3b1acf15001ae920489a3a938ee9cbabb7c1 Mon Sep 17 00:00:00 2001 From: MDA2AV Date: Sun, 21 Jun 2026 21:40:52 +0100 Subject: [PATCH] ioxide.Kestrel: on-reactor services for ring-native DB/file from Kestrel endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lets an endpoint run ioxide.pg / ioxide.file on the connection's reactor, so DB and file I/O stay thread-per-core (Npgsql / blocking file I/O would hop to the ThreadPool instead). - IoxideTransportOptions.OnReactorStart: per-reactor startup hook (open ring-native clients here: PgPool.Start, AssetReader.CreatePool, ...). - IReactorFeature on the ConnectionContext + HttpContext.OnReactor(work) helper: resolves the connection's reactor and runs the work on it — inline when the endpoint is already on that reactor (warm/keep-alive), marshaled via ScheduleOnReactor when not (the first request of a connection, which Kestrel dispatches to the ThreadPool). The ring I/O always runs on the reactor. - IoxideReactor current-reactor seam (ThreadStatic, bound in OnStart) + ReactorPinReader: pins the first read of each connection onto the reactor. - No changes to ioxide / ioxide.pg / ioxide.file / ioxide.tls — only their existing public APIs are used. - Bump packages to 0.0.16. --- ioxide.Kestrel/HopDuplexPipe.cs | 5 +- ioxide.Kestrel/IReactorFeature.cs | 14 +++++ ioxide.Kestrel/IoxideConnectionContext.cs | 9 ++- ioxide.Kestrel/IoxideConnectionListener.cs | 18 ++++-- ioxide.Kestrel/IoxideHttpExtensions.cs | 45 +++++++++++++ ioxide.Kestrel/IoxideReactor.cs | 29 +++++++++ ioxide.Kestrel/IoxideTransportOptions.cs | 7 +++ ioxide.Kestrel/ReactorPinReader.cs | 73 ++++++++++++++++++++++ ioxide.Kestrel/ioxide.Kestrel.csproj | 2 +- ioxide.file/ioxide.file.csproj | 2 +- ioxide.pg/ioxide.pg.csproj | 2 +- ioxide.redis/ioxide.redis.csproj | 2 +- ioxide.tls/ioxide.tls.csproj | 2 +- ioxide/ioxide.csproj | 2 +- 14 files changed, 199 insertions(+), 13 deletions(-) create mode 100644 ioxide.Kestrel/IReactorFeature.cs create mode 100644 ioxide.Kestrel/IoxideHttpExtensions.cs create mode 100644 ioxide.Kestrel/IoxideReactor.cs create mode 100644 ioxide.Kestrel/ReactorPinReader.cs diff --git a/ioxide.Kestrel/HopDuplexPipe.cs b/ioxide.Kestrel/HopDuplexPipe.cs index ac9e4c6..b23d67e 100644 --- a/ioxide.Kestrel/HopDuplexPipe.cs +++ b/ioxide.Kestrel/HopDuplexPipe.cs @@ -20,6 +20,7 @@ internal sealed class HopDuplexPipe : IDuplexPipe, IAsyncDisposable private readonly TlsSession? _tls; // non-null on a kTLS-terminated connection: inbound is decrypted here private readonly Pipe _inbound; // recv pump writes; Kestrel reads (Transport.Input) private readonly Pipe _outbound; // Kestrel writes (Transport.Output); send pump reads + private readonly PipeReader _input; // Kestrel's Input — pins the first read onto the reactor thread private Task _recvPump = Task.CompletedTask; private Task _sendPump = Task.CompletedTask; @@ -29,7 +30,7 @@ internal sealed class HopDuplexPipe : IDuplexPipe, IAsyncDisposable private static extern int Shutdown(int sockfd, int how); private const int ShutWr = 1; // SHUT_WR - public PipeReader Input => _inbound.Reader; + public PipeReader Input => _input; public PipeWriter Output => _outbound.Writer; public HopDuplexPipe(Connection conn, Reactor reactor, TlsSession? tls = null) @@ -54,6 +55,8 @@ public HopDuplexPipe(Connection conn, Reactor reactor, TlsSession? tls = null) pauseWriterThreshold: 64 * 1024, resumeWriterThreshold: 32 * 1024, useSynchronizationContext: false)); + + _input = new ReactorPinReader(_inbound.Reader, reactor); } /// Launches the recv and send pumps. Must be called on the reactor thread. diff --git a/ioxide.Kestrel/IReactorFeature.cs b/ioxide.Kestrel/IReactorFeature.cs new file mode 100644 index 0000000..2d31a78 --- /dev/null +++ b/ioxide.Kestrel/IReactorFeature.cs @@ -0,0 +1,14 @@ +using ioxide; + +namespace ioxide.Kestrel; + +/// +/// Exposes the ioxide that owns the current connection. The transport sets this on +/// the connection's feature collection, so an endpoint can reach the connection's reactor (and its +/// ring-native services like PgPool / AssetReader) regardless of which thread Kestrel ran the endpoint on. +/// Prefer the HttpContext.OnReactor(...) helper, which uses this to run work on the reactor. +/// +public interface IReactorFeature +{ + Reactor Reactor { get; } +} diff --git a/ioxide.Kestrel/IoxideConnectionContext.cs b/ioxide.Kestrel/IoxideConnectionContext.cs index 2faebba..a0ca94e 100644 --- a/ioxide.Kestrel/IoxideConnectionContext.cs +++ b/ioxide.Kestrel/IoxideConnectionContext.cs @@ -20,9 +20,11 @@ internal sealed class IoxideConnectionContext : ConnectionContext, IConnectionTransportFeature, IConnectionItemsFeature, IConnectionLifetimeFeature, - IConnectionEndPointFeature + IConnectionEndPointFeature, + IReactorFeature { private readonly HopDuplexPipe _pipe; + private readonly Reactor _reactor; private readonly CancellationTokenSource _connectionClosedCts = new(); private readonly FeatureCollection _features = new(); @@ -35,6 +37,7 @@ internal sealed class IoxideConnectionContext : ConnectionContext, public IoxideConnectionContext(Connection connection, Reactor reactor, EndPoint localEndPoint, long id, TlsSession? session = null, string? alpn = null) { _pipe = new HopDuplexPipe(connection, reactor, session); + _reactor = reactor; ConnectionId = $"ioxide-{id:x}"; LocalEndPoint = localEndPoint; @@ -47,6 +50,7 @@ public IoxideConnectionContext(Connection connection, Reactor reactor, EndPoint _features.Set(this); _features.Set(this); _features.Set(this); + _features.Set(this); if (session is not null) { @@ -65,6 +69,9 @@ public IoxideConnectionContext(Connection connection, Reactor reactor, EndPoint /// Launches the transport pumps. Must be called on the reactor thread (from the Handle callback). public void StartPumps() => _pipe.Start(); + /// The reactor that owns this connection (). + public Reactor Reactor => _reactor; + public override string ConnectionId { get; set; } public override IFeatureCollection Features => _features; public override IDictionary Items { get; set; } diff --git a/ioxide.Kestrel/IoxideConnectionListener.cs b/ioxide.Kestrel/IoxideConnectionListener.cs index 23d69bc..84dedfc 100644 --- a/ioxide.Kestrel/IoxideConnectionListener.cs +++ b/ioxide.Kestrel/IoxideConnectionListener.cs @@ -57,12 +57,20 @@ public IoxideConnectionListener(IPEndPoint endpoint, IoxideTransportOptions opti { Handle = HandleConnectionAsync, }; - if (_tlsOptions is not null) + // Per-reactor startup, on the reactor's own thread: bind the current-reactor seam (so Kestrel + // endpoints can resolve ring-native services), start the TLS service if configured, then run + // the user's hook (PgPool.Start, AssetReader.CreatePool, ...). + var tls = _tlsOptions; + var onReactorStart = options.OnReactorStart; + reactor.OnStart = r => { - // Start the per-reactor TLS service on the reactor's own thread (OpenSSL ctx + cert load). - var tls = _tlsOptions; - reactor.OnStart = r => TlsService.Start(r, tls); - } + IoxideReactor.Bind(r); + if (tls is not null) + { + TlsService.Start(r, tls); + } + onReactorStart?.Invoke(r); + }; _reactors[i] = reactor; _threads[i] = new Thread(reactor.Run) { diff --git a/ioxide.Kestrel/IoxideHttpExtensions.cs b/ioxide.Kestrel/IoxideHttpExtensions.cs new file mode 100644 index 0000000..f161d89 --- /dev/null +++ b/ioxide.Kestrel/IoxideHttpExtensions.cs @@ -0,0 +1,45 @@ +using Microsoft.AspNetCore.Http; +using ioxide; + +namespace ioxide.Kestrel; + +/// +/// Helpers for running ring-native work (ioxide.pg / ioxide.file) on the connection's reactor from a Kestrel +/// endpoint, so DB and file I/O stay thread-per-core regardless of which thread Kestrel ran the endpoint on. +/// +public static class IoxideHttpExtensions +{ + /// The reactor that owns this connection (from ). + public static Reactor GetReactor(this HttpContext context) + => (context.Features.Get() + ?? throw new InvalidOperationException( + "No ioxide reactor on this connection — is the app running on the ioxide transport (UseIoxide())?")) + .Reactor; + + /// + /// Runs on this connection's reactor and returns its result. When the endpoint is + /// already on that reactor (the common keep-alive case) it runs inline — ioxide's inline resume is + /// preserved. Otherwise (e.g. the first request of a connection, which Kestrel may dispatch to the + /// ThreadPool) the work is marshaled onto the reactor. Either way the ring I/O runs on the reactor. + /// Materialize ring-native results (PgRow, file buffers) into your own objects inside . + /// + public static Task OnReactor(this HttpContext context, Func> work) + { + Reactor reactor = context.GetReactor(); + + if (IoxideReactor.TryCurrent() == reactor) + { + return work(reactor); // already on the reactor — run inline + } + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + reactor.ScheduleOnReactor(_ => _ = RunAsync(reactor, work, tcs), null); + return tcs.Task; + } + + private static async Task RunAsync(Reactor reactor, Func> work, TaskCompletionSource tcs) + { + try { tcs.SetResult(await work(reactor)); } + catch (Exception e) { tcs.SetException(e); } + } +} diff --git a/ioxide.Kestrel/IoxideReactor.cs b/ioxide.Kestrel/IoxideReactor.cs new file mode 100644 index 0000000..3fe59a1 --- /dev/null +++ b/ioxide.Kestrel/IoxideReactor.cs @@ -0,0 +1,29 @@ +using ioxide; + +namespace ioxide.Kestrel; + +/// +/// Access to the reactor running the current request, so a Kestrel endpoint — which runs pinned to the +/// reactor thread under this transport — can reach that reactor's ring-native services, e.g. +/// IoxideReactor.Current.GetService<PgPool>(). Start those services per reactor via +/// . +/// +/// Only valid on a reactor thread. It relies on awaited continuations resuming inline on the same reactor +/// thread (which the transport guarantees for the request loop) — under a thread-hopping scheduler the +/// slot could point at the wrong reactor, so resolve the service before the first off-reactor await. +/// +public static class IoxideReactor +{ + [ThreadStatic] private static Reactor? _current; + + /// The reactor handling the current request. Throws when read off a reactor thread. + public static Reactor Current => + _current ?? throw new InvalidOperationException( + "IoxideReactor.Current is only available on an ioxide reactor thread (inside request handling)."); + + /// The current reactor, or null when not on a reactor thread. Used by the transport to pin work. + internal static Reactor? TryCurrent() => _current; + + /// Bound once per reactor, on the reactor's own thread, from the transport's OnStart. + internal static void Bind(Reactor reactor) => _current = reactor; +} diff --git a/ioxide.Kestrel/IoxideTransportOptions.cs b/ioxide.Kestrel/IoxideTransportOptions.cs index 46a99fa..1abd19d 100644 --- a/ioxide.Kestrel/IoxideTransportOptions.cs +++ b/ioxide.Kestrel/IoxideTransportOptions.cs @@ -18,6 +18,13 @@ public sealed class IoxideTransportOptions /// public Func? ConfigureServer { get; set; } + /// + /// Per-reactor startup hook, run on each reactor's own thread before its loop starts — open ring-native + /// clients here (e.g. PgPool.Start(r, pgOptions), AssetReader.CreatePool(r, ...)) so DB and + /// file I/O ride that reactor's ring. Endpoints resolve them via IoxideReactor.Current.GetService<T>(). + /// + public Action? OnReactorStart { get; set; } + /// /// TLS termination via kTLS (kernel TLS), done in the transport on the listed ports. When set, the /// reactor runs the TLS 1.3 handshake on accept, installs kTLS TX, and hands Kestrel a plaintext diff --git a/ioxide.Kestrel/ReactorPinReader.cs b/ioxide.Kestrel/ReactorPinReader.cs new file mode 100644 index 0000000..fbda574 --- /dev/null +++ b/ioxide.Kestrel/ReactorPinReader.cs @@ -0,0 +1,73 @@ +using System.IO.Pipelines; +using System.Runtime.CompilerServices; +using ioxide; + +namespace ioxide.Kestrel; + +/// +/// Wraps the inbound so the FIRST read of a connection resumes on the connection's +/// reactor thread. Kestrel dispatches each new connection to the ThreadPool, and the first read usually +/// completes synchronously (the recv pump pre-buffered the request), which would run the whole first +/// request — application code included — off the reactor. Hopping once here pins Kestrel's parse + request +/// loop to the reactor from request one, so endpoints can use ring-native services (ioxide.pg, ioxide.file) +/// reliably on every request. Warm requests (read parks, resumes via the reactor scheduler) are already +/// on-reactor, so this only costs one hop per connection. +/// +internal sealed class ReactorPinReader : PipeReader +{ + private readonly PipeReader _inner; + private readonly Reactor _reactor; + private bool _pinned; + + public ReactorPinReader(PipeReader inner, Reactor reactor) + { + _inner = inner; + _reactor = reactor; + } + + public override ValueTask ReadAsync(CancellationToken cancellationToken = default) + => _pinned ? _inner.ReadAsync(cancellationToken) : FirstReadAsync(cancellationToken); + + private async ValueTask FirstReadAsync(CancellationToken cancellationToken) + { + ReadResult result = await _inner.ReadAsync(cancellationToken).ConfigureAwait(false); + + // If we resumed off the reactor (the common first-read case), hop onto this connection's reactor so + // Kestrel's header parse and the application run there. + if (IoxideReactor.TryCurrent() != _reactor) + { + await new ReactorSwitch(_reactor); + } + _pinned = true; + return result; + } + + // Force the first read through ReadAsync (which pins) by reporting no synchronous data the first time. + public override bool TryRead(out ReadResult result) + { + if (!_pinned) + { + result = default; + return false; + } + return _inner.TryRead(out result); + } + + public override void AdvanceTo(SequencePosition consumed) => _inner.AdvanceTo(consumed); + public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) => _inner.AdvanceTo(consumed, examined); + public override void CancelPendingRead() => _inner.CancelPendingRead(); + public override void Complete(Exception? exception = null) => _inner.Complete(exception); + public override ValueTask CompleteAsync(Exception? exception = null) => _inner.CompleteAsync(exception); +} + +/// Awaiter that resumes its continuation on the given reactor's thread (via the reactor post queue). +internal readonly struct ReactorSwitch : ICriticalNotifyCompletion +{ + private readonly Reactor _reactor; + public ReactorSwitch(Reactor reactor) => _reactor = reactor; + public ReactorSwitch GetAwaiter() => this; + public bool IsCompleted => false; + public void OnCompleted(Action continuation) => _reactor.ScheduleOnReactor(static s => ((Action)s!)(), continuation); + public void UnsafeOnCompleted(Action continuation) => _reactor.ScheduleOnReactor(static s => ((Action)s!)(), continuation); + public void GetResult() { } +} diff --git a/ioxide.Kestrel/ioxide.Kestrel.csproj b/ioxide.Kestrel/ioxide.Kestrel.csproj index f9e5564..0a0a3d3 100644 --- a/ioxide.Kestrel/ioxide.Kestrel.csproj +++ b/ioxide.Kestrel/ioxide.Kestrel.csproj @@ -8,7 +8,7 @@ ioxide.Kestrel ioxide.Kestrel - 0.0.15 + 0.0.16 MDA2AV ASP.NET Core Kestrel transport backed by the ioxide io_uring runtime: one reactor (ring) per core, SO_REUSEPORT load-balanced, with Kestrel's HTTP request loop pinned to the reactor thread. Drop-in via UseIoxide(). MIT diff --git a/ioxide.file/ioxide.file.csproj b/ioxide.file/ioxide.file.csproj index 32c6c9f..3da9bca 100644 --- a/ioxide.file/ioxide.file.csproj +++ b/ioxide.file/ioxide.file.csproj @@ -8,7 +8,7 @@ ioxide.file ioxide.file - 0.0.15 + 0.0.16 MDA2AV File serving for the ioxide io_uring runtime: immutable asset snapshots with baked responses, pooled positional ring reads, atomic reloads. MIT diff --git a/ioxide.pg/ioxide.pg.csproj b/ioxide.pg/ioxide.pg.csproj index 046df3f..fa47409 100644 --- a/ioxide.pg/ioxide.pg.csproj +++ b/ioxide.pg/ioxide.pg.csproj @@ -8,7 +8,7 @@ ioxide.pg ioxide.pg - 0.0.15 + 0.0.16 MDA2AV Postgres driver for the ioxide io_uring runtime: pooled ring-native connections per reactor, ring-native connect and handshake, inline completion resume. MIT diff --git a/ioxide.redis/ioxide.redis.csproj b/ioxide.redis/ioxide.redis.csproj index ea1b07f..284adca 100644 --- a/ioxide.redis/ioxide.redis.csproj +++ b/ioxide.redis/ioxide.redis.csproj @@ -8,7 +8,7 @@ ioxide.redis ioxide.redis - 0.0.15 + 0.0.16 MDA2AV Redis client for the ioxide io_uring runtime: pooled ring-native connections per reactor, full RESP2 protocol, a generic command API plus typed helpers (strings, keys, hashes, lists, sets, sorted sets, pub/sub, transactions, scripting), and pipelining. Inline completion resume. MIT diff --git a/ioxide.tls/ioxide.tls.csproj b/ioxide.tls/ioxide.tls.csproj index aa70bcd..e26b836 100644 --- a/ioxide.tls/ioxide.tls.csproj +++ b/ioxide.tls/ioxide.tls.csproj @@ -8,7 +8,7 @@ ioxide.tls ioxide.tls - 0.0.15 + 0.0.16 MDA2AV TLS for the ioxide io_uring runtime: OpenSSL handshake driven over the ring, then kernel TLS (kTLS) transmit offload - handlers keep writing plaintext through the same connection API. Requires Linux kTLS (tls module) and OpenSSL 3. MIT diff --git a/ioxide/ioxide.csproj b/ioxide/ioxide.csproj index 1bfaea5..7d9d674 100644 --- a/ioxide/ioxide.csproj +++ b/ioxide/ioxide.csproj @@ -8,7 +8,7 @@ ioxide ioxide - 0.0.15 + 0.0.16 MDA2AV A shared-nothing io_uring runtime for .NET: one ring per reactor thread, inline completions, zero native dependencies. The engine - reactor, connection, and the IRingHost client seam. MIT