Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions Playground/Handlers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,46 @@ public static async Task Hop(Reactor reactor, Connection conn)
}
}

private static ReadOnlySpan<byte> JsonHeader =>
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 13\r\n\r\n"u8;

private static int _offReactorSeen;

/// <summary>
/// taskrun - raw, but each request awaits a Task.Run JSON serialization. With the reactor
/// SynchronizationContext installed the continuation comes home to the reactor; without it,
/// it stays on the thread pool. Logs once if the post-await thread is off-reactor.
/// </summary>
public static async Task TaskRun(Reactor reactor, Connection conn)
{
try
{
while (true)
{
RecvSnapshot snapshot = await conn.ReadAsync();
Drain(conn, snapshot);

string json = await Task.Run(() => System.Text.Json.JsonSerializer.Serialize("hello world"));

if (!reactor.OnReactorThread && Interlocked.Exchange(ref _offReactorSeen, 1) == 0)
{
Console.WriteLine("[taskrun] continuation resumed OFF the reactor (no sync context)");
}

conn.Write(JsonHeader);
conn.Write(Encoding.UTF8.GetBytes(json));
await conn.FlushAsync();

if (snapshot.IsClosed) return;
conn.ResetRead();
}
}
finally
{
conn.DecRef();
}
}

/// <summary>
/// pg - each request runs a query through the reactor's pool; a server error becomes a 500.
/// Paths: / → SELECT 42 · /sleep → 100ms query (pool concurrency demo) · /err → server error.
Expand Down
4 changes: 4 additions & 0 deletions Playground/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ private static int Main()
reactor.Handle = Handlers.Hop;
break;

case "taskrun":
reactor.Handle = Handlers.TaskRun;
break;

default:
reactor.Handle = Handlers.Raw;
break;
Expand Down
4 changes: 3 additions & 1 deletion ioxide.pg/PgConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,10 @@ public void Complete()

public PgResult GetResult(short token) => _core.GetResult(token);
public ValueTaskSourceStatus GetStatus(short token) => _core.GetStatus(token);
// Completes on the reactor thread only - strip the context-post so resumes stay inline
// (see ReactorSynchronizationContext).
public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
=> _core.OnCompleted(continuation, state, token, flags);
=> _core.OnCompleted(continuation, state, token, flags & ~ValueTaskSourceOnCompletedFlags.UseSchedulingContext);
}

private readonly record struct Message(byte Tag, int BodyStart, int BodyLength);
Expand Down
4 changes: 3 additions & 1 deletion ioxide.redis/RedisConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,10 @@ private sealed class Pending : IValueTaskSource<RespValue>

public RespValue GetResult(short token) => _core.GetResult(token);
public ValueTaskSourceStatus GetStatus(short token) => _core.GetStatus(token);
// Completes on the reactor thread only - strip the context-post so resumes stay inline
// (see ReactorSynchronizationContext).
public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
=> _core.OnCompleted(continuation, state, token, flags);
=> _core.OnCompleted(continuation, state, token, flags & ~ValueTaskSourceOnCompletedFlags.UseSchedulingContext);
}

private async ValueTask<RespValue> ReceiveReplyAsync()
Expand Down
5 changes: 4 additions & 1 deletion ioxide/Client/RingOpSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ void IValueTaskSource<int>.OnCompleted(
short token,
ValueTaskSourceOnCompletedFlags flags)
{
_core.OnCompleted(continuation, state, token, flags);
// Completes on the reactor thread only - strip the context-post so resumes stay inline
// (see ReactorSynchronizationContext).
_core.OnCompleted(continuation, state, token,
flags & ~ValueTaskSourceOnCompletedFlags.UseSchedulingContext);
}
}
7 changes: 6 additions & 1 deletion ioxide/Connection/Connection.Read.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ void IValueTaskSource<RecvSnapshot>.OnCompleted(Action<object?> continuation, ob
return;
}

_readSignal.OnCompleted(continuation, state, _readSignal.Version, flags);
// This source only completes on the owning reactor thread, so the continuation already
// runs where ReactorSynchronizationContext would post it. Strip the scheduling-context
// flag or MRVTSC posts every resume to the mailbox instead of invoking it inline
// (RunContinuationsAsynchronously=false only covers the null-context case).
_readSignal.OnCompleted(continuation, state, _readSignal.Version,
flags & ~ValueTaskSourceOnCompletedFlags.UseSchedulingContext);
}
}
5 changes: 4 additions & 1 deletion ioxide/Connection/Connection.Write.Flush.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ void IValueTaskSource.OnCompleted(Action<object?> continuation, object? state, s

return;
}
_flushSignal.OnCompleted(continuation, state, _flushSignal.Version, flags);
// Completes on the reactor thread only - strip the context-post so resumes stay inline
// (see ReactorSynchronizationContext).
_flushSignal.OnCompleted(continuation, state, _flushSignal.Version,
flags & ~ValueTaskSourceOnCompletedFlags.UseSchedulingContext);
}

#endregion
Expand Down
5 changes: 4 additions & 1 deletion ioxide/Connection/ConnectionPipeReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,9 @@ void IValueTaskSource<ReadResult>.OnCompleted(
short token,
ValueTaskSourceOnCompletedFlags flags)
{
_core.OnCompleted(continuation, state, token, flags);
// Completes on the reactor thread only - strip the context-post so resumes stay inline
// (see ReactorSynchronizationContext).
_core.OnCompleted(continuation, state, token,
flags & ~ValueTaskSourceOnCompletedFlags.UseSchedulingContext);
}
}
5 changes: 4 additions & 1 deletion ioxide/Connection/ConnectionPipeWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ void IValueTaskSource<FlushResult>.OnCompleted(
short token,
ValueTaskSourceOnCompletedFlags flags)
{
_core.OnCompleted(continuation, state, token, flags);
// Completes on the reactor thread only - strip the context-post so resumes stay inline
// (see ReactorSynchronizationContext).
_core.OnCompleted(continuation, state, token,
flags & ~ValueTaskSourceOnCompletedFlags.UseSchedulingContext);
}
}
6 changes: 4 additions & 2 deletions ioxide/Connection/ConnectionStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,13 @@ public override void Write(byte[] buffer, int offset, int count) =>
// (ValueTask<int> vs ValueTask) routes each call to the right core.
int IValueTaskSource<int>.GetResult(short token) => _readCore.GetResult(token);
ValueTaskSourceStatus IValueTaskSource<int>.GetStatus(short token) => _readCore.GetStatus(token);
// Both cores complete on the reactor thread only - strip the context-post so resumes stay
// inline (see ReactorSynchronizationContext).
void IValueTaskSource<int>.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) =>
_readCore.OnCompleted(continuation, state, token, flags);
_readCore.OnCompleted(continuation, state, token, flags & ~ValueTaskSourceOnCompletedFlags.UseSchedulingContext);

void IValueTaskSource.GetResult(short token) => _writeCore.GetResult(token);
ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => _writeCore.GetStatus(token);
void IValueTaskSource.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) =>
_writeCore.OnCompleted(continuation, state, token, flags);
_writeCore.OnCompleted(continuation, state, token, flags & ~ValueTaskSourceOnCompletedFlags.UseSchedulingContext);
}
4 changes: 2 additions & 2 deletions ioxide/IoxideRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ public static class IoxideRuntime
// reactor.OnStart = r => PgPool.Start(r, pgOptions); // clients open on the reactor thread
// reactor.Run();
//
// Roadmap: BCL bridge (per-reactor SyncContext) · per-command timeouts · fixed files / send-zc ·
// builder API. (SCRAM, the extended/prepared protocol, Redis, and TLS have shipped.)
// Roadmap: per-command timeouts · fixed files / send-zc · builder API. (SCRAM, the
// extended/prepared protocol, Redis, TLS, and the BCL bridge (per-reactor SyncContext) have shipped.)
}
32 changes: 28 additions & 4 deletions ioxide/Reactor/Reactor.Post.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ public sealed unsafe partial class Reactor
{
private readonly struct PostItem
{
public readonly Action<object?> Callback;
// Action<object?> or SendOrPostCallback - same signature, distinct delegate types.
// Held as Delegate and type-tested at invoke so neither caller pays a conversion alloc.
public readonly Delegate Callback;
public readonly object? State;
public PostItem(Action<object?> callback, object? state)
public PostItem(Delegate callback, object? state)
{
Callback = callback;
State = state;
Expand All @@ -30,7 +32,13 @@ public PostItem(Action<object?> callback, object? state)
/// iteration by <see cref="DrainPostQ"/>); wakes the loop via the eventfd only when called off the
/// reactor, coalescing concurrent producers so at most one wake is outstanding per drain.
/// </summary>
public void ScheduleOnReactor(Action<object?> callback, object? state)
public void ScheduleOnReactor(Action<object?> callback, object? state) => EnqueuePost(callback, state);

// SendOrPostCallback variant for ReactorSynchronizationContext. Distinct name (not an
// overload): lambdas at existing ScheduleOnReactor call sites would otherwise be ambiguous.
internal void SchedulePost(SendOrPostCallback callback, object? state) => EnqueuePost(callback, state);

private void EnqueuePost(Delegate callback, object? state)
{
_postQ.Enqueue(new PostItem(callback, state));

Expand All @@ -52,7 +60,23 @@ private void DrainPostQ()
Volatile.Write(ref _postSignalPending, 0);
while (_postQ.TryDequeue(out PostItem item))
{
item.Callback(item.State);
// A faulted continuation (e.g. an async-void rethrow delivered via Post) must not
// unwind the loop and kill the reactor.
try
{
if (item.Callback is Action<object?> action)
{
action(item.State);
}
else
{
((SendOrPostCallback)item.Callback)(item.State);
}
}
catch (Exception ex)
{
Console.Error.WriteLine($"[r{_id}] posted continuation threw: {ex}");
}
}
}
}
4 changes: 4 additions & 0 deletions ioxide/Reactor/Reactor.Runner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public void Run()
{
_reactorThreadId = Environment.CurrentManagedThreadId;

// Awaits from reactor code (timers, HttpClient, Task.Run results) resume here instead of
// the thread pool. Thread-lifetime; nothing to uninstall.
SynchronizationContext.SetSynchronizationContext(new ReactorSynchronizationContext(this));

_ring = Ring.Create(_ringEntries);

// One SO_REUSEPORT listener per port; accepts route by listener fd.
Expand Down
51 changes: 51 additions & 0 deletions ioxide/Reactor/ReactorSynchronizationContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
namespace ioxide;

/// <summary>
/// Per-reactor <see cref="SynchronizationContext"/>, installed on the loop thread by
/// <see cref="Reactor.Run"/>. Awaits made from reactor code capture it, so continuations that
/// complete off the ring (timers, HttpClient, Task.Run results) are posted back and resume on
/// the reactor instead of the thread pool. Posted work runs in the loop's post-queue drain,
/// before the ring parks. <c>ConfigureAwait(false)</c> opts an await out.
/// <para>
/// Never block the reactor thread on a posted continuation (<c>.Result</c> / <c>.Wait()</c>):
/// the loop cannot drain its own mailbox while blocked inside a handler, so the wait can never
/// complete - a single-thread deadlock, not starvation.
/// </para>
/// </summary>
public sealed class ReactorSynchronizationContext : SynchronizationContext
{
private readonly Reactor _reactor;

internal ReactorSynchronizationContext(Reactor reactor) => _reactor = reactor;

/// <summary>The reactor whose loop thread this context posts to.</summary>
public Reactor Reactor => _reactor;

/// <summary>Always queues, never invokes inline - Post is contractually asynchronous.</summary>
public override void Post(SendOrPostCallback d, object? state) => _reactor.SchedulePost(d, state);

public override void Send(SendOrPostCallback d, object? state)
{
if (_reactor.OnReactorThread)
{
d(state);
return;
}

using var done = new ManualResetEventSlim();
Exception? ex = null;
_reactor.SchedulePost(_ =>
{
try { d(state); }
catch (Exception e) { ex = e; }
finally { done.Set(); }
}, null);
done.Wait();
if (ex is not null)
{
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(ex).Throw();
}
}

public override SynchronizationContext CreateCopy() => this;
}
Loading