diff --git a/Examples.AspNet/Examples.AspNet.csproj b/Examples.AspNet/Examples.AspNet.csproj new file mode 100644 index 0000000..a1c7f27 --- /dev/null +++ b/Examples.AspNet/Examples.AspNet.csproj @@ -0,0 +1,15 @@ + + + + net11.0 + enable + enable + true + Examples.AspNet + + + + + + + diff --git a/Examples.AspNet/Program.cs b/Examples.AspNet/Program.cs new file mode 100644 index 0000000..8b8705c --- /dev/null +++ b/Examples.AspNet/Program.cs @@ -0,0 +1,42 @@ +using ioxide.Kestrel; +using Microsoft.Extensions.Logging; + +// A minimal ASP.NET Core app that runs on either the ioxide io_uring transport or Kestrel's stock +// sockets transport. Pick with the TRANSPORT environment variable (default: ioxide): +// +// TRANSPORT=ioxide dotnet run # ioxide.Kestrel transport (io_uring, one reactor per core) +// TRANSPORT=sockets dotnet run # stock Kestrel sockets transport (the framework default) +// +// Then: curl http://localhost:8080/ and curl http://localhost:8080/plaintext + +var builder = WebApplication.CreateBuilder(args); + +builder.Logging.ClearProviders(); // turn off all logging (no per-request info: lines) + +var transport = (Environment.GetEnvironmentVariable("TRANSPORT") ?? "ioxide").Trim().ToLowerInvariant(); + +builder.WebHost.ConfigureKestrel(o => o.ListenAnyIP(8080)); + +switch (transport) +{ + case "ioxide": + builder.WebHost.UseIoxide(o => o.ReactorCount = 16); // io_uring transport, 16 reactors (one ring per thread) + break; + + case "sockets": + case "kestrel": + // Stock Kestrel sockets transport — the framework default, nothing to wire up. + break; + + default: + Console.Error.WriteLine($"Unknown TRANSPORT '{transport}'. Use 'ioxide' or 'sockets'."); + return; +} + +var app = builder.Build(); + +app.MapGet("/", () => $"Hello from ioxide.Kestrel! transport={transport}"); +app.MapGet("/plaintext", () => "Hello, World!"); + +Console.WriteLine($"[Examples.AspNet] listening on http://localhost:8080 (transport={transport})"); +app.Run(); diff --git a/ioxide.Kestrel/HopDuplexPipe.cs b/ioxide.Kestrel/HopDuplexPipe.cs new file mode 100644 index 0000000..1397a5f --- /dev/null +++ b/ioxide.Kestrel/HopDuplexPipe.cs @@ -0,0 +1,152 @@ +using System.Buffers; +using System.IO.Pipelines; +using ioxide; +using ioxide.utils; + +namespace ioxide.Kestrel; + +/// +/// A Kestrel transport duplex over an ioxide : two BCL s whose +/// reader schedulers route to the reactor thread (via ), plus a +/// recv→inbound pump and an outbound→send pump that run on the reactor. This pins Kestrel's whole request +/// loop to the reactor thread. One copy each way (recv bytes into the inbound pipe; response bytes into +/// the connection slab). +/// +internal sealed class HopDuplexPipe : IDuplexPipe, IAsyncDisposable +{ + private readonly Connection _conn; + private readonly Reactor _reactor; + private readonly Pipe _inbound; // recv pump writes; Kestrel reads (Transport.Input) + private readonly Pipe _outbound; // Kestrel writes (Transport.Output); send pump reads + + private Task _recvPump = Task.CompletedTask; + private Task _sendPump = Task.CompletedTask; + private int _started; + + public PipeReader Input => _inbound.Reader; + public PipeWriter Output => _outbound.Writer; + + public HopDuplexPipe(Connection conn, Reactor reactor) + { + _conn = conn; + _reactor = reactor; + var scheduler = new ReactorPipeScheduler(reactor); + + // Reader schedulers = the reactor: Kestrel's HTTP parse (inbound reader) and the send pump + // (outbound reader) both run on the reactor thread. + _inbound = new Pipe(new PipeOptions( + readerScheduler: scheduler, + writerScheduler: scheduler, + pauseWriterThreshold: 1024 * 1024, + resumeWriterThreshold: 512 * 1024, + useSynchronizationContext: false)); + + _outbound = new Pipe(new PipeOptions( + readerScheduler: scheduler, + writerScheduler: PipeScheduler.ThreadPool, + pauseWriterThreshold: 64 * 1024, + resumeWriterThreshold: 32 * 1024, + useSynchronizationContext: false)); + } + + /// Launches the recv and send pumps. Must be called on the reactor thread. + public void Start() + { + if (Interlocked.Exchange(ref _started, 1) == 1) + { + return; + } + _recvPump = RecvPumpAsync(); + _sendPump = SendPumpAsync(); + } + + // Reactor → inbound pipe. Copies each recv slice into the pipe and flushes; Kestrel reads it. + private async Task RecvPumpAsync() + { + PipeWriter writer = _inbound.Writer; + try + { + while (true) + { + RecvSnapshot snap = await _conn.ReadAsync(); + + while (_conn.TryGetItem(snap, out SpscRecvRing.Item item)) + { + if (item.HasBuffer && item.Len > 0) + { + CopySlice(in item, writer); + } + _conn.ReturnBuffer(in item); + } + _conn.ResetRead(); + + if (snap.IsClosed) + { + break; + } + + FlushResult fr = await writer.FlushAsync(); + if (fr.IsCompleted || fr.IsCanceled) + { + break; + } + } + } + catch { /* swallow client/protocol faults; teardown in finally */ } + finally { await writer.CompleteAsync(); } + } + + private static unsafe void CopySlice(in SpscRecvRing.Item item, PipeWriter writer) + { + Span dst = writer.GetSpan(item.Len); + new ReadOnlySpan(item.Ptr, item.Len).CopyTo(dst); + writer.Advance(item.Len); + } + + // Outbound pipe → connection send. Drains Kestrel's response into the slab and submits one SEND. + private async Task SendPumpAsync() + { + PipeReader reader = _outbound.Reader; + try + { + while (true) + { + ReadResult rr = await reader.ReadAsync(); + ReadOnlySequence buffer = rr.Buffer; + + if (!buffer.IsEmpty) + { + foreach (ReadOnlyMemory segment in buffer) + { + Span dst = _conn.GetSpan(segment.Length); + segment.Span.CopyTo(dst); + _conn.Advance(segment.Length); + } + await _conn.FlushAsync(); + } + + reader.AdvanceTo(buffer.End); + + if (rr.IsCompleted || rr.IsCanceled) + { + break; + } + } + } + catch { /* swallow; teardown in finally */ } + finally { await reader.CompleteAsync(); } + } + + public async ValueTask DisposeAsync() + { + // Kestrel has completed its ends; wake the pumps and unwind. MarkClosed wakes a recv parked in + // conn.ReadAsync — schedule it ON the reactor so the recv continuation (which touches reactor-owned + // recv state) runs there, not on Kestrel's dispose thread. The pipe cancels resume via the pipes' + // reactor reader/writer schedulers, so they're reactor-safe too. + _reactor.ScheduleOnReactor(static c => ((Connection)c!).MarkClosed(), _conn); + _inbound.Writer.CancelPendingFlush(); + _outbound.Reader.CancelPendingRead(); + try { await _recvPump; } catch { } + try { await _sendPump; } catch { } + } +} diff --git a/ioxide.Kestrel/IoxideConnectionContext.cs b/ioxide.Kestrel/IoxideConnectionContext.cs new file mode 100644 index 0000000..9a77cd1 --- /dev/null +++ b/ioxide.Kestrel/IoxideConnectionContext.cs @@ -0,0 +1,93 @@ +using System.IO.Pipelines; +using System.Net; +using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Connections.Features; +using Microsoft.AspNetCore.Http.Features; +using ioxide; + +namespace ioxide.Kestrel; + +/// +/// Adapts a single ioxide to Kestrel's . The +/// transport is a (BCL pipes whose reader schedulers route to the reactor), +/// so Kestrel's read → parse → handle → send loop runs pinned to the reactor thread. +/// +internal sealed class IoxideConnectionContext : ConnectionContext, + IConnectionIdFeature, + IConnectionTransportFeature, + IConnectionItemsFeature, + IConnectionLifetimeFeature, + IConnectionEndPointFeature +{ + private readonly HopDuplexPipe _pipe; + private readonly CancellationTokenSource _connectionClosedCts = new(); + private readonly FeatureCollection _features = new(); + + // Completed when Kestrel is done with the connection (DisposeAsync, or Abort). The ioxide Handle + // callback awaits this and only then DecRefs the connection. + private readonly TaskCompletionSource _completion = new(TaskCreationOptions.RunContinuationsAsynchronously); + + private int _disposed; + + public IoxideConnectionContext(Connection connection, Reactor reactor, EndPoint localEndPoint, long id) + { + _pipe = new HopDuplexPipe(connection, reactor); + + ConnectionId = $"ioxide-{id:x}"; + LocalEndPoint = localEndPoint; + RemoteEndPoint = null; + Items = new ConnectionItems(); + ConnectionClosed = _connectionClosedCts.Token; + + _features.Set(this); + _features.Set(this); + _features.Set(this); + _features.Set(this); + _features.Set(this); + } + + /// Resolves once Kestrel has finished with this connection; the reactor's Handle callback awaits it. + public Task Completion => _completion.Task; + + /// Launches the transport pumps. Must be called on the reactor thread (from the Handle callback). + public void StartPumps() => _pipe.Start(); + + public override string ConnectionId { get; set; } + public override IFeatureCollection Features => _features; + public override IDictionary Items { get; set; } + + public override IDuplexPipe Transport + { + get => _pipe; + set => throw new NotSupportedException("Transport is owned by the ioxide transport adapter."); + } + + public override CancellationToken ConnectionClosed { get; set; } + public override EndPoint? LocalEndPoint { get; set; } + public override EndPoint? RemoteEndPoint { get; set; } + + public override void Abort(ConnectionAbortedException abortReason) + { + // Forced close: signal the lifetime token. The pumps are unwound (reactor-safely) in DisposeAsync, + // which Kestrel calls during connection cleanup after Abort. + try { _connectionClosedCts.Cancel(); } catch { /* ignore */ } + } + + public override async ValueTask DisposeAsync() + { + if (Interlocked.Exchange(ref _disposed, 1) == 1) + { + return; + } + + try { _connectionClosedCts.Cancel(); } catch { /* ignore */ } + + // Unwind the recv/send pumps (returns held recv buffers, stops the send loop). + await _pipe.DisposeAsync().ConfigureAwait(false); + + _connectionClosedCts.Dispose(); + + // Release the reactor's Handle callback, which DecRefs the connection (-> recycle). + _completion.TrySetResult(); + } +} diff --git a/ioxide.Kestrel/IoxideConnectionListener.cs b/ioxide.Kestrel/IoxideConnectionListener.cs new file mode 100644 index 0000000..dd49cc4 --- /dev/null +++ b/ioxide.Kestrel/IoxideConnectionListener.cs @@ -0,0 +1,139 @@ +using System.Net; +using System.Threading.Channels; +using Microsoft.AspNetCore.Connections; +using Microsoft.Extensions.Logging; +using ioxide; + +namespace ioxide.Kestrel; + +/// +/// Runs a fleet of ioxide reactors (one io_uring ring + thread each, SO_REUSEPORT load-balanced) and +/// bridges ioxide's push model (per-connection Handle callback on the reactor thread) to Kestrel's +/// pull model (). The Handle callback wraps each connection, pushes it onto a +/// channel for Kestrel to dequeue, starts the transport pumps, and then parks until Kestrel disposes the +/// connection — keeping the connection's handler-side ref alive for its whole Kestrel lifetime. +/// +internal sealed class IoxideConnectionListener : IConnectionListener +{ + private readonly ILogger _logger; + private readonly Channel _accepted; + private readonly Reactor[] _reactors; + private readonly Thread[] _threads; + private long _connectionCounter; + private int _stopped; + + public EndPoint EndPoint { get; } + + public IoxideConnectionListener(IPEndPoint endpoint, IoxideTransportOptions options, ILogger logger) + { + EndPoint = endpoint; + _logger = logger; + + _accepted = Channel.CreateUnbounded(new UnboundedChannelOptions + { + SingleReader = true, + SingleWriter = false, + AllowSynchronousContinuations = false, + }); + + var reactorCount = Math.Max(1, options.ReactorCount); + + var cfg = new ServerConfig { ReactorCount = reactorCount, Port = (ushort)endpoint.Port }; + if (options.ConfigureServer is not null) + { + cfg = options.ConfigureServer(cfg); + } + cfg = cfg with { Port = (ushort)endpoint.Port, ReactorCount = reactorCount }; + + _reactors = new Reactor[reactorCount]; + _threads = new Thread[reactorCount]; + + for (var i = 0; i < reactorCount; i++) + { + var reactor = new Reactor(i, cfg) + { + Handle = HandleConnectionAsync, + }; + _reactors[i] = reactor; + _threads[i] = new Thread(reactor.Run) + { + Name = $"ioxide-reactor-{i}", + IsBackground = true, + }; + } + + foreach (var thread in _threads) + { + thread.Start(); + } + + _logger.LogInformation("[ioxide] Bound {Endpoint} with {ReactorCount} reactor(s)", endpoint, reactorCount); + } + + // Runs on the reactor thread, fire-and-forget, once per accepted connection. + private async Task HandleConnectionAsync(Reactor reactor, Connection conn) + { + var id = Interlocked.Increment(ref _connectionCounter); + var ctx = new IoxideConnectionContext(conn, reactor, EndPoint, id); + + if (!_accepted.Writer.TryWrite(ctx)) + { + // Listener is shutting down: nobody will dequeue this. Release immediately. + await ctx.DisposeAsync().ConfigureAwait(false); + conn.DecRef(); + return; + } + + // Launch the recv/send pumps on this reactor thread (we're inside the Handle callback). + ctx.StartPumps(); + + // Park until Kestrel disposes the connection, then release the handler-side ref (-> recycle). + await ctx.Completion.ConfigureAwait(false); + conn.DecRef(); + } + + public async ValueTask AcceptAsync(CancellationToken cancellationToken = default) + { + try + { + return await _accepted.Reader.ReadAsync(cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + return null; + } + catch (ChannelClosedException) + { + return null; + } + } + + public ValueTask UnbindAsync(CancellationToken cancellationToken = default) + { + if (Interlocked.Exchange(ref _stopped, 1) == 0) + { + _accepted.Writer.TryComplete(); + _logger.LogInformation("[ioxide] Unbound listener on {Endpoint}", EndPoint); + } + return ValueTask.CompletedTask; + } + + public async ValueTask DisposeAsync() + { + _accepted.Writer.TryComplete(); + + await Task.Run(() => + { + foreach (var reactor in _reactors) + { + reactor.Stop(); + } + foreach (var thread in _threads) + { + thread.Join(TimeSpan.FromSeconds(5)); + } + }).ConfigureAwait(false); + + _logger.LogInformation("[ioxide] Stopped {ReactorCount} reactor(s) on {Endpoint}", _reactors.Length, EndPoint); + } +} diff --git a/ioxide.Kestrel/IoxideKestrelExtensions.cs b/ioxide.Kestrel/IoxideKestrelExtensions.cs new file mode 100644 index 0000000..6bbeba3 --- /dev/null +++ b/ioxide.Kestrel/IoxideKestrelExtensions.cs @@ -0,0 +1,30 @@ +using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace ioxide.Kestrel; + +/// Hosting extensions for the ioxide Kestrel transport. +public static class IoxideKestrelExtensions +{ + /// + /// Replaces Kestrel's default sockets transport with the ioxide io_uring transport. Call after + /// UseKestrel (or rely on the implicit Kestrel registration that WebApplication.CreateBuilder + /// performs) — this evicts any previously-registered . + /// + public static IWebHostBuilder UseIoxide(this IWebHostBuilder builder, Action? configure = null) + { + return builder.ConfigureServices(services => + { + services.AddOptions(); + if (configure is not null) + { + services.Configure(configure); + } + + services.RemoveAll(); + services.AddSingleton(); + }); + } +} diff --git a/ioxide.Kestrel/IoxideTransportFactory.cs b/ioxide.Kestrel/IoxideTransportFactory.cs new file mode 100644 index 0000000..45c9482 --- /dev/null +++ b/ioxide.Kestrel/IoxideTransportFactory.cs @@ -0,0 +1,34 @@ +using System.Net; +using Microsoft.AspNetCore.Connections; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace ioxide.Kestrel; + +/// +/// Kestrel transport that runs each connection on an ioxide reactor fleet. Kestrel calls +/// once per configured endpoint; each call spins up its own set of reactors. +/// +internal sealed class IoxideTransportFactory : IConnectionListenerFactory +{ + private readonly IoxideTransportOptions _options; + private readonly ILoggerFactory _loggerFactory; + + public IoxideTransportFactory(IOptions options, ILoggerFactory loggerFactory) + { + _options = options.Value; + _loggerFactory = loggerFactory; + } + + public ValueTask BindAsync(EndPoint endpoint, CancellationToken cancellationToken = default) + { + if (endpoint is not IPEndPoint ipEndpoint) + { + throw new NotSupportedException($"ioxide transport only supports {nameof(IPEndPoint)} (got {endpoint.GetType().Name})."); + } + + var logger = _loggerFactory.CreateLogger(); + IConnectionListener listener = new IoxideConnectionListener(ipEndpoint, _options, logger); + return ValueTask.FromResult(listener); + } +} diff --git a/ioxide.Kestrel/IoxideTransportOptions.cs b/ioxide.Kestrel/IoxideTransportOptions.cs new file mode 100644 index 0000000..6c9180e --- /dev/null +++ b/ioxide.Kestrel/IoxideTransportOptions.cs @@ -0,0 +1,20 @@ +using ioxide; + +namespace ioxide.Kestrel; + +/// Options for the ioxide Kestrel transport. +public sealed class IoxideTransportOptions +{ + /// + /// Number of ioxide reactors (one io_uring ring + one dedicated thread each), load-balanced by + /// SO_REUSEPORT. Defaults to the processor count. + /// + public int ReactorCount { get; set; } = Environment.ProcessorCount; + + /// + /// Optional hook to customize the ioxide (ring depth, recv buffer ring, + /// write slab, zero-copy send, incremental mode, ...). The listen port and reactor count are always + /// overridden afterwards by the bound endpoint and . + /// + public Func? ConfigureServer { get; set; } +} diff --git a/ioxide.Kestrel/README.md b/ioxide.Kestrel/README.md new file mode 100644 index 0000000..2f6a269 --- /dev/null +++ b/ioxide.Kestrel/README.md @@ -0,0 +1,42 @@ +# ioxide.Kestrel + +An ASP.NET Core **Kestrel transport** backed by the [ioxide](https://github.com/MDA2AV/ioxide) io_uring +runtime. One reactor (io_uring ring) per core, SO_REUSEPORT load-balanced, with Kestrel's entire request +loop pinned to the reactor thread — no ThreadPool hop on the hot path. + +## Usage + +```csharp +var builder = WebApplication.CreateBuilder(args); + +builder.WebHost.UseIoxide(); // replaces Kestrel's default sockets transport + +var app = builder.Build(); +app.MapGet("/", () => "Hello, World!"); +app.Run(); +``` + +Options: + +```csharp +builder.WebHost.UseIoxide(o => +{ + o.ReactorCount = Environment.ProcessorCount; // rings/threads (default: ProcessorCount) + o.ConfigureServer = cfg => cfg with { RingEntries = 8192 }; // tune the underlying ioxide ServerConfig +}); +``` + +## How it works + +Each accepted connection is bridged to Kestrel through a `System.IO.Pipelines` duplex whose **reader +schedulers route continuations onto the owning reactor thread**. A recv pump copies received bytes into +the inbound pipe and a send pump drains Kestrel's response into the connection's send slab, so +`recv → HTTP parse → handler → send` all run on a single ring thread. + +## Requirements + +- Linux with io_uring (kernel 6.x recommended). +- .NET 11. + +> Inline execution note: like any thread-per-core transport, application middleware runs on the reactor +> thread. Blocking work in a handler stalls every connection on that reactor — keep handlers async. diff --git a/ioxide.Kestrel/ReactorPipeScheduler.cs b/ioxide.Kestrel/ReactorPipeScheduler.cs new file mode 100644 index 0000000..fb7e1ff --- /dev/null +++ b/ioxide.Kestrel/ReactorPipeScheduler.cs @@ -0,0 +1,19 @@ +using System.IO.Pipelines; +using ioxide; + +namespace ioxide.Kestrel; + +/// +/// A that runs pipe continuations on a reactor's loop thread, so the BCL +/// pipes in keep Kestrel's read → parse → handle → send loop pinned to the +/// reactor (no ThreadPool hop). +/// +internal sealed class ReactorPipeScheduler : PipeScheduler +{ + private readonly Reactor _reactor; + + public ReactorPipeScheduler(Reactor reactor) => _reactor = reactor; + + public override void Schedule(Action action, object? state) + => _reactor.ScheduleOnReactor(action, state); +} diff --git a/ioxide.Kestrel/ioxide.Kestrel.csproj b/ioxide.Kestrel/ioxide.Kestrel.csproj new file mode 100644 index 0000000..d5f08ad --- /dev/null +++ b/ioxide.Kestrel/ioxide.Kestrel.csproj @@ -0,0 +1,31 @@ + + + + net11.0 + enable + enable + true + ioxide.Kestrel + + ioxide.Kestrel + 0.0.13 + MDA2AV + ASP.NET Core Kestrel transport backed by the ioxide io_uring runtime: one reactor (ring) per core, SO_REUSEPORT load-balanced, with Kestrel's HTTP request loop pinned to the reactor thread. Drop-in via UseIoxide(). + MIT + https://mda2av.github.io/ioxide/ + https://github.com/MDA2AV/ioxide + git + io_uring;kestrel;aspnetcore;transport;linux;ioxide;thread-per-core + README.md + + + + + + + + + + + + diff --git a/ioxide.file/ioxide.file.csproj b/ioxide.file/ioxide.file.csproj index 1b64ad6..b76e443 100644 --- a/ioxide.file/ioxide.file.csproj +++ b/ioxide.file/ioxide.file.csproj @@ -8,7 +8,7 @@ ioxide.file ioxide.file - 0.0.12 + 0.0.13 MDA2AV File serving for the ioxide io_uring runtime: immutable asset snapshots with baked responses, pooled positional ring reads, atomic reloads. MIT diff --git a/ioxide.pg/ioxide.pg.csproj b/ioxide.pg/ioxide.pg.csproj index d69d7b4..aa40215 100644 --- a/ioxide.pg/ioxide.pg.csproj +++ b/ioxide.pg/ioxide.pg.csproj @@ -8,7 +8,7 @@ ioxide.pg ioxide.pg - 0.0.12 + 0.0.13 MDA2AV Postgres driver for the ioxide io_uring runtime: pooled ring-native connections per reactor, ring-native connect and handshake, inline completion resume. MIT diff --git a/ioxide.redis/ioxide.redis.csproj b/ioxide.redis/ioxide.redis.csproj index 1a72ddf..aa975a1 100644 --- a/ioxide.redis/ioxide.redis.csproj +++ b/ioxide.redis/ioxide.redis.csproj @@ -8,7 +8,7 @@ ioxide.redis ioxide.redis - 0.0.12 + 0.0.13 MDA2AV Redis client for the ioxide io_uring runtime: pooled ring-native connections per reactor, full RESP2 protocol, a generic command API plus typed helpers (strings, keys, hashes, lists, sets, sorted sets, pub/sub, transactions, scripting), and pipelining. Inline completion resume. MIT diff --git a/ioxide.slnx b/ioxide.slnx index 100ce27..9ebcb57 100644 --- a/ioxide.slnx +++ b/ioxide.slnx @@ -3,8 +3,10 @@ + + diff --git a/ioxide.tls/ioxide.tls.csproj b/ioxide.tls/ioxide.tls.csproj index 576eba6..ffe2c56 100644 --- a/ioxide.tls/ioxide.tls.csproj +++ b/ioxide.tls/ioxide.tls.csproj @@ -8,7 +8,7 @@ ioxide.tls ioxide.tls - 0.0.12 + 0.0.13 MDA2AV TLS for the ioxide io_uring runtime: OpenSSL handshake driven over the ring, then kernel TLS (kTLS) transmit offload - handlers keep writing plaintext through the same connection API. Requires Linux kTLS (tls module) and OpenSSL 3. MIT diff --git a/ioxide/Reactor/Reactor.Incremental.cs b/ioxide/Reactor/Reactor.Incremental.cs index b3f2fdc..d349d5e 100644 --- a/ioxide/Reactor/Reactor.Incremental.cs +++ b/ioxide/Reactor/Reactor.Incremental.cs @@ -161,6 +161,7 @@ private void LoopIncremental() DrainFlushQ(); DrainRecycleQ(); DrainRemoteOps(); + DrainPostQ(); int rc = Ring.SubmitAndWait(1); if (rc < 0 && rc != -EINTR && rc != -EAGAIN && rc != -EBUSY) diff --git a/ioxide/Reactor/Reactor.Post.cs b/ioxide/Reactor/Reactor.Post.cs new file mode 100644 index 0000000..48d4940 --- /dev/null +++ b/ioxide/Reactor/Reactor.Post.cs @@ -0,0 +1,58 @@ +using System.Collections.Concurrent; + +namespace ioxide; + +// Reactor-thread scheduling, mirroring reedz's IoUringPipeScheduler: a queue of continuations drained +// each loop iteration, woken via the eventfd (coalesced) only when enqueued from off the reactor. +// Generalizes the existing _remoteOps/WakeFdWrite machinery to carry arbitrary callbacks, so a Kestrel +// transport built on BCL pipes can route their reader/writer continuations onto the reactor thread. +public sealed unsafe partial class Reactor +{ + private readonly struct PostItem + { + public readonly Action Callback; + public readonly object? State; + public PostItem(Action callback, object? state) + { + Callback = callback; + State = state; + } + } + + private readonly ConcurrentQueue _postQ = new(); + private int _postSignalPending; // 0 = no eventfd wake outstanding, 1 = one issued and undrained + + /// True when the caller is executing on this reactor's loop thread. + public bool OnReactorThread => Environment.CurrentManagedThreadId == _reactorThreadId; + + /// + /// Schedule a continuation to run on the reactor thread. Always enqueues (drained each loop + /// iteration by ); wakes the loop via the eventfd only when called off the + /// reactor, coalescing concurrent producers so at most one wake is outstanding per drain. + /// + public void ScheduleOnReactor(Action callback, object? state) + { + _postQ.Enqueue(new PostItem(callback, state)); + + // On the reactor: the loop will drain before it next parks, so no eventfd write needed. + if (Environment.CurrentManagedThreadId == _reactorThreadId) + { + return; + } + + if (Interlocked.Exchange(ref _postSignalPending, 1) == 0) + { + WakeFdWrite(); + } + } + + private void DrainPostQ() + { + // Clear before draining so a producer enqueuing during/after the drain still races to wake. + Volatile.Write(ref _postSignalPending, 0); + while (_postQ.TryDequeue(out PostItem item)) + { + item.Callback(item.State); + } + } +} diff --git a/ioxide/Reactor/Reactor.cs b/ioxide/Reactor/Reactor.cs index bbea211..9739380 100644 --- a/ioxide/Reactor/Reactor.cs +++ b/ioxide/Reactor/Reactor.cs @@ -444,6 +444,7 @@ private void LoopShared() DrainFlushQ(); DrainRecycleQ(); DrainRemoteOps(); + DrainPostQ(); int rc = Ring.SubmitAndWait(1); if (rc < 0 && rc != -EINTR && rc != -EAGAIN && rc != -EBUSY) diff --git a/ioxide/ioxide.csproj b/ioxide/ioxide.csproj index 5f45f9b..17ab46e 100644 --- a/ioxide/ioxide.csproj +++ b/ioxide/ioxide.csproj @@ -8,7 +8,7 @@ ioxide ioxide - 0.0.12 + 0.0.13 MDA2AV A shared-nothing io_uring runtime for .NET: one ring per reactor thread, inline completions, zero native dependencies. The engine - reactor, connection, and the IRingHost client seam. MIT