From 9fd5fc46cf00a2a1fc9b464013fdfb8991b2317c Mon Sep 17 00:00:00 2001 From: Diogo Martins Date: Sat, 4 Jul 2026 18:18:24 +0100 Subject: [PATCH] Experiment with reactor synchronization context --- Playground/Handlers.cs | 40 +++++++++++++++ Playground/Program.cs | 4 ++ ioxide.pg/PgConnection.cs | 4 +- ioxide.redis/RedisConnection.cs | 4 +- ioxide/Client/RingOpSource.cs | 5 +- ioxide/Connection/Connection.Read.cs | 7 ++- ioxide/Connection/Connection.Write.Flush.cs | 5 +- ioxide/Connection/ConnectionPipeReader.cs | 5 +- ioxide/Connection/ConnectionPipeWriter.cs | 5 +- ioxide/Connection/ConnectionStream.cs | 6 ++- ioxide/IoxideRuntime.cs | 4 +- ioxide/Reactor/Reactor.Post.cs | 32 ++++++++++-- ioxide/Reactor/Reactor.Runner.cs | 4 ++ .../Reactor/ReactorSynchronizationContext.cs | 51 +++++++++++++++++++ 14 files changed, 161 insertions(+), 15 deletions(-) create mode 100644 ioxide/Reactor/ReactorSynchronizationContext.cs diff --git a/Playground/Handlers.cs b/Playground/Handlers.cs index 17aa2bc..9758c28 100644 --- a/Playground/Handlers.cs +++ b/Playground/Handlers.cs @@ -107,6 +107,46 @@ public static async Task Hop(Reactor reactor, Connection conn) } } + private static ReadOnlySpan JsonHeader => + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 13\r\n\r\n"u8; + + private static int _offReactorSeen; + + /// + /// taskrun - raw, but each request awaits a Task.Run JSON serialization. With the reactor + /// SynchronizationContext installed the continuation comes home to the reactor; without it, + /// it stays on the thread pool. Logs once if the post-await thread is off-reactor. + /// + public static async Task TaskRun(Reactor reactor, Connection conn) + { + try + { + while (true) + { + RecvSnapshot snapshot = await conn.ReadAsync(); + Drain(conn, snapshot); + + string json = await Task.Run(() => System.Text.Json.JsonSerializer.Serialize("hello world")); + + if (!reactor.OnReactorThread && Interlocked.Exchange(ref _offReactorSeen, 1) == 0) + { + Console.WriteLine("[taskrun] continuation resumed OFF the reactor (no sync context)"); + } + + conn.Write(JsonHeader); + conn.Write(Encoding.UTF8.GetBytes(json)); + await conn.FlushAsync(); + + if (snapshot.IsClosed) return; + conn.ResetRead(); + } + } + finally + { + conn.DecRef(); + } + } + /// /// pg - each request runs a query through the reactor's pool; a server error becomes a 500. /// Paths: / → SELECT 42 · /sleep → 100ms query (pool concurrency demo) · /err → server error. diff --git a/Playground/Program.cs b/Playground/Program.cs index ac013df..2641e80 100644 --- a/Playground/Program.cs +++ b/Playground/Program.cs @@ -93,6 +93,10 @@ private static int Main() reactor.Handle = Handlers.Hop; break; + case "taskrun": + reactor.Handle = Handlers.TaskRun; + break; + default: reactor.Handle = Handlers.Raw; break; diff --git a/ioxide.pg/PgConnection.cs b/ioxide.pg/PgConnection.cs index d935bab..2b6ae5c 100644 --- a/ioxide.pg/PgConnection.cs +++ b/ioxide.pg/PgConnection.cs @@ -430,8 +430,10 @@ public void Complete() public PgResult GetResult(short token) => _core.GetResult(token); public ValueTaskSourceStatus GetStatus(short token) => _core.GetStatus(token); + // Completes on the reactor thread only - strip the context-post so resumes stay inline + // (see ReactorSynchronizationContext). public void OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) - => _core.OnCompleted(continuation, state, token, flags); + => _core.OnCompleted(continuation, state, token, flags & ~ValueTaskSourceOnCompletedFlags.UseSchedulingContext); } private readonly record struct Message(byte Tag, int BodyStart, int BodyLength); diff --git a/ioxide.redis/RedisConnection.cs b/ioxide.redis/RedisConnection.cs index a21be4e..7e46a85 100644 --- a/ioxide.redis/RedisConnection.cs +++ b/ioxide.redis/RedisConnection.cs @@ -260,8 +260,10 @@ private sealed class Pending : IValueTaskSource public RespValue GetResult(short token) => _core.GetResult(token); public ValueTaskSourceStatus GetStatus(short token) => _core.GetStatus(token); + // Completes on the reactor thread only - strip the context-post so resumes stay inline + // (see ReactorSynchronizationContext). public void OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) - => _core.OnCompleted(continuation, state, token, flags); + => _core.OnCompleted(continuation, state, token, flags & ~ValueTaskSourceOnCompletedFlags.UseSchedulingContext); } private async ValueTask ReceiveReplyAsync() diff --git a/ioxide/Client/RingOpSource.cs b/ioxide/Client/RingOpSource.cs index db56405..9672705 100644 --- a/ioxide/Client/RingOpSource.cs +++ b/ioxide/Client/RingOpSource.cs @@ -53,6 +53,9 @@ void IValueTaskSource.OnCompleted( short token, ValueTaskSourceOnCompletedFlags flags) { - _core.OnCompleted(continuation, state, token, flags); + // Completes on the reactor thread only - strip the context-post so resumes stay inline + // (see ReactorSynchronizationContext). + _core.OnCompleted(continuation, state, token, + flags & ~ValueTaskSourceOnCompletedFlags.UseSchedulingContext); } } diff --git a/ioxide/Connection/Connection.Read.cs b/ioxide/Connection/Connection.Read.cs index c0f5ab1..7c79023 100644 --- a/ioxide/Connection/Connection.Read.cs +++ b/ioxide/Connection/Connection.Read.cs @@ -191,6 +191,11 @@ void IValueTaskSource.OnCompleted(Action continuation, ob return; } - _readSignal.OnCompleted(continuation, state, _readSignal.Version, flags); + // This source only completes on the owning reactor thread, so the continuation already + // runs where ReactorSynchronizationContext would post it. Strip the scheduling-context + // flag or MRVTSC posts every resume to the mailbox instead of invoking it inline + // (RunContinuationsAsynchronously=false only covers the null-context case). + _readSignal.OnCompleted(continuation, state, _readSignal.Version, + flags & ~ValueTaskSourceOnCompletedFlags.UseSchedulingContext); } } diff --git a/ioxide/Connection/Connection.Write.Flush.cs b/ioxide/Connection/Connection.Write.Flush.cs index e53f300..19f8ff2 100644 --- a/ioxide/Connection/Connection.Write.Flush.cs +++ b/ioxide/Connection/Connection.Write.Flush.cs @@ -131,7 +131,10 @@ void IValueTaskSource.OnCompleted(Action continuation, object? state, s return; } - _flushSignal.OnCompleted(continuation, state, _flushSignal.Version, flags); + // Completes on the reactor thread only - strip the context-post so resumes stay inline + // (see ReactorSynchronizationContext). + _flushSignal.OnCompleted(continuation, state, _flushSignal.Version, + flags & ~ValueTaskSourceOnCompletedFlags.UseSchedulingContext); } #endregion diff --git a/ioxide/Connection/ConnectionPipeReader.cs b/ioxide/Connection/ConnectionPipeReader.cs index c3bd41a..3de8ce9 100644 --- a/ioxide/Connection/ConnectionPipeReader.cs +++ b/ioxide/Connection/ConnectionPipeReader.cs @@ -314,6 +314,9 @@ void IValueTaskSource.OnCompleted( short token, ValueTaskSourceOnCompletedFlags flags) { - _core.OnCompleted(continuation, state, token, flags); + // Completes on the reactor thread only - strip the context-post so resumes stay inline + // (see ReactorSynchronizationContext). + _core.OnCompleted(continuation, state, token, + flags & ~ValueTaskSourceOnCompletedFlags.UseSchedulingContext); } } diff --git a/ioxide/Connection/ConnectionPipeWriter.cs b/ioxide/Connection/ConnectionPipeWriter.cs index 777e749..c32f4ab 100644 --- a/ioxide/Connection/ConnectionPipeWriter.cs +++ b/ioxide/Connection/ConnectionPipeWriter.cs @@ -91,6 +91,9 @@ void IValueTaskSource.OnCompleted( short token, ValueTaskSourceOnCompletedFlags flags) { - _core.OnCompleted(continuation, state, token, flags); + // Completes on the reactor thread only - strip the context-post so resumes stay inline + // (see ReactorSynchronizationContext). + _core.OnCompleted(continuation, state, token, + flags & ~ValueTaskSourceOnCompletedFlags.UseSchedulingContext); } } diff --git a/ioxide/Connection/ConnectionStream.cs b/ioxide/Connection/ConnectionStream.cs index 1814647..dff3fe0 100644 --- a/ioxide/Connection/ConnectionStream.cs +++ b/ioxide/Connection/ConnectionStream.cs @@ -220,11 +220,13 @@ public override void Write(byte[] buffer, int offset, int count) => // (ValueTask vs ValueTask) routes each call to the right core. int IValueTaskSource.GetResult(short token) => _readCore.GetResult(token); ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => _readCore.GetStatus(token); + // Both cores complete on the reactor thread only - strip the context-post so resumes stay + // inline (see ReactorSynchronizationContext). void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => - _readCore.OnCompleted(continuation, state, token, flags); + _readCore.OnCompleted(continuation, state, token, flags & ~ValueTaskSourceOnCompletedFlags.UseSchedulingContext); void IValueTaskSource.GetResult(short token) => _writeCore.GetResult(token); ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => _writeCore.GetStatus(token); void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => - _writeCore.OnCompleted(continuation, state, token, flags); + _writeCore.OnCompleted(continuation, state, token, flags & ~ValueTaskSourceOnCompletedFlags.UseSchedulingContext); } diff --git a/ioxide/IoxideRuntime.cs b/ioxide/IoxideRuntime.cs index d618d8d..233db56 100644 --- a/ioxide/IoxideRuntime.cs +++ b/ioxide/IoxideRuntime.cs @@ -15,6 +15,6 @@ public static class IoxideRuntime // reactor.OnStart = r => PgPool.Start(r, pgOptions); // clients open on the reactor thread // reactor.Run(); // - // Roadmap: BCL bridge (per-reactor SyncContext) · per-command timeouts · fixed files / send-zc · - // builder API. (SCRAM, the extended/prepared protocol, Redis, and TLS have shipped.) + // Roadmap: per-command timeouts · fixed files / send-zc · builder API. (SCRAM, the + // extended/prepared protocol, Redis, TLS, and the BCL bridge (per-reactor SyncContext) have shipped.) } diff --git a/ioxide/Reactor/Reactor.Post.cs b/ioxide/Reactor/Reactor.Post.cs index 48d4940..1f4fc5e 100644 --- a/ioxide/Reactor/Reactor.Post.cs +++ b/ioxide/Reactor/Reactor.Post.cs @@ -10,9 +10,11 @@ public sealed unsafe partial class Reactor { private readonly struct PostItem { - public readonly Action Callback; + // Action or SendOrPostCallback - same signature, distinct delegate types. + // Held as Delegate and type-tested at invoke so neither caller pays a conversion alloc. + public readonly Delegate Callback; public readonly object? State; - public PostItem(Action callback, object? state) + public PostItem(Delegate callback, object? state) { Callback = callback; State = state; @@ -30,7 +32,13 @@ public PostItem(Action callback, object? state) /// iteration by ); wakes the loop via the eventfd only when called off the /// reactor, coalescing concurrent producers so at most one wake is outstanding per drain. /// - public void ScheduleOnReactor(Action callback, object? state) + public void ScheduleOnReactor(Action callback, object? state) => EnqueuePost(callback, state); + + // SendOrPostCallback variant for ReactorSynchronizationContext. Distinct name (not an + // overload): lambdas at existing ScheduleOnReactor call sites would otherwise be ambiguous. + internal void SchedulePost(SendOrPostCallback callback, object? state) => EnqueuePost(callback, state); + + private void EnqueuePost(Delegate callback, object? state) { _postQ.Enqueue(new PostItem(callback, state)); @@ -52,7 +60,23 @@ private void DrainPostQ() Volatile.Write(ref _postSignalPending, 0); while (_postQ.TryDequeue(out PostItem item)) { - item.Callback(item.State); + // A faulted continuation (e.g. an async-void rethrow delivered via Post) must not + // unwind the loop and kill the reactor. + try + { + if (item.Callback is Action action) + { + action(item.State); + } + else + { + ((SendOrPostCallback)item.Callback)(item.State); + } + } + catch (Exception ex) + { + Console.Error.WriteLine($"[r{_id}] posted continuation threw: {ex}"); + } } } } diff --git a/ioxide/Reactor/Reactor.Runner.cs b/ioxide/Reactor/Reactor.Runner.cs index d452a6f..ac48c09 100644 --- a/ioxide/Reactor/Reactor.Runner.cs +++ b/ioxide/Reactor/Reactor.Runner.cs @@ -21,6 +21,10 @@ public void Run() { _reactorThreadId = Environment.CurrentManagedThreadId; + // Awaits from reactor code (timers, HttpClient, Task.Run results) resume here instead of + // the thread pool. Thread-lifetime; nothing to uninstall. + SynchronizationContext.SetSynchronizationContext(new ReactorSynchronizationContext(this)); + _ring = Ring.Create(_ringEntries); // One SO_REUSEPORT listener per port; accepts route by listener fd. diff --git a/ioxide/Reactor/ReactorSynchronizationContext.cs b/ioxide/Reactor/ReactorSynchronizationContext.cs new file mode 100644 index 0000000..9de4080 --- /dev/null +++ b/ioxide/Reactor/ReactorSynchronizationContext.cs @@ -0,0 +1,51 @@ +namespace ioxide; + +/// +/// Per-reactor , installed on the loop thread by +/// . Awaits made from reactor code capture it, so continuations that +/// complete off the ring (timers, HttpClient, Task.Run results) are posted back and resume on +/// the reactor instead of the thread pool. Posted work runs in the loop's post-queue drain, +/// before the ring parks. ConfigureAwait(false) opts an await out. +/// +/// Never block the reactor thread on a posted continuation (.Result / .Wait()): +/// the loop cannot drain its own mailbox while blocked inside a handler, so the wait can never +/// complete - a single-thread deadlock, not starvation. +/// +/// +public sealed class ReactorSynchronizationContext : SynchronizationContext +{ + private readonly Reactor _reactor; + + internal ReactorSynchronizationContext(Reactor reactor) => _reactor = reactor; + + /// The reactor whose loop thread this context posts to. + public Reactor Reactor => _reactor; + + /// Always queues, never invokes inline - Post is contractually asynchronous. + public override void Post(SendOrPostCallback d, object? state) => _reactor.SchedulePost(d, state); + + public override void Send(SendOrPostCallback d, object? state) + { + if (_reactor.OnReactorThread) + { + d(state); + return; + } + + using var done = new ManualResetEventSlim(); + Exception? ex = null; + _reactor.SchedulePost(_ => + { + try { d(state); } + catch (Exception e) { ex = e; } + finally { done.Set(); } + }, null); + done.Wait(); + if (ex is not null) + { + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(ex).Throw(); + } + } + + public override SynchronizationContext CreateCopy() => this; +}