Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 82 additions & 15 deletions cadente/Sisk.Cadente/HttpHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ namespace Sisk.Cadente;
public sealed class HttpHost : IDisposable {

private readonly IPEndPoint _endpoint;
private readonly Socket _listener;
private Socket _listener;

// cache line padding to reduce false sharing
private volatile bool _disposedValue;
private volatile bool _isListening;

private readonly SocketAsyncEventArgs [] _acceptArgsPool;
private readonly int [] _acceptArgsAvailable;
private int _listenerRestarting = 0;
private const int AcceptPoolSize = 8;
private const int ListenerAcceptRetryDelayMilliseconds = 250;

Expand Down Expand Up @@ -69,7 +70,7 @@ public sealed class HttpHost : IDisposable {
/// <param name="endpoint">The <see cref="IPEndPoint"/> to listen on.</param>
public HttpHost ( IPEndPoint endpoint ) {
_endpoint = endpoint;
_listener = new Socket ( endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp );
_listener = CreateListenerSocket ();

_acceptArgsPool = new SocketAsyncEventArgs [ AcceptPoolSize ];
_acceptArgsAvailable = new int [ AcceptPoolSize ];
Expand Down Expand Up @@ -97,7 +98,12 @@ public void Start () {
return;
ObjectDisposedException.ThrowIf ( _disposedValue, this );

ConfigureListenerSocket ();
try {
_listener.Dispose ();
}
catch { }

_listener = CreateListenerSocket ();
_listener.Bind ( _endpoint );
_listener.Listen ( backlog: 4096 ); // Alto para burst de conexões
_isListening = true;
Expand All @@ -109,30 +115,37 @@ public void Start () {
}

[MethodImpl ( MethodImplOptions.AggressiveInlining )]
private void ConfigureListenerSocket () {
_listener.NoDelay = true;
_listener.LingerState = new LingerOption ( false, 0 );
private Socket CreateListenerSocket () {
var listener = new Socket ( _endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp );

listener.NoDelay = true;
listener.LingerState = new LingerOption ( false, 0 );

// Buffers grandes para o listener reduzem syscalls
_listener.ReceiveBufferSize = 128 * 1024;
_listener.SendBufferSize = 128 * 1024;
listener.ReceiveBufferSize = 128 * 1024;
listener.SendBufferSize = 128 * 1024;

if (_listener.AddressFamily == AddressFamily.InterNetworkV6 && _endpoint.Address.Equals ( IPAddress.IPv6Any )) {
_listener.DualMode = true;
if (listener.AddressFamily == AddressFamily.InterNetworkV6 && _endpoint.Address.Equals ( IPAddress.IPv6Any )) {
listener.DualMode = true;
}

_listener.SetSocketOption ( SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true );
_listener.SetSocketOption ( SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true );
_listener.SetSocketOption ( SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, 3 );
_listener.SetSocketOption ( SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, 300 );
_listener.SetSocketOption ( SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, 3 );
listener.SetSocketOption ( SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true );
listener.SetSocketOption ( SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true );
listener.SetSocketOption ( SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, 3 );
listener.SetSocketOption ( SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, 300 );
listener.SetSocketOption ( SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, 3 );

return listener;
}

[MethodImpl ( MethodImplOptions.AggressiveInlining )]
private void StartAccept ( int poolIndex ) {
if (!_isListening)
return;

if (Volatile.Read ( ref _listenerRestarting ) == 1)
return;

while (_isListening) {
var args = _acceptArgsPool [ poolIndex ];
args.AcceptSocket = null;
Expand All @@ -145,11 +158,17 @@ private void StartAccept ( int poolIndex ) {
return;
}
catch (SocketException) {
if (Volatile.Read ( ref _listenerRestarting ) == 1)
return;

QueueStartAccept ( poolIndex, ListenerAcceptRetryDelayMilliseconds );
return;
}

int rearmDelayMs = ProcessAcceptInline ( args, poolIndex );
if (rearmDelayMs < 0)
return;

if (rearmDelayMs > 0) {
QueueStartAccept ( poolIndex, rearmDelayMs );
return;
Expand All @@ -160,6 +179,9 @@ private void StartAccept ( int poolIndex ) {
private void OnAcceptCompleted ( object? sender, SocketAsyncEventArgs e ) {
int poolIndex = (int) e.UserToken!;
int rearmDelayMs = ProcessAcceptInline ( e, poolIndex );
if (rearmDelayMs < 0)
return;

if (rearmDelayMs > 0)
QueueStartAccept ( poolIndex, rearmDelayMs );
else
Expand All @@ -173,6 +195,11 @@ private int ProcessAcceptInline ( SocketAsyncEventArgs e, int poolIndex ) {
e.AcceptSocket?.Dispose ();
e.AcceptSocket = null;

if (IsListenerFatalError ( socketError )) {
TriggerListenerRebuild ();
return -1;
}

return IsConnectionAcceptNoise ( socketError )
? 0
: ListenerAcceptRetryDelayMilliseconds;
Expand Down Expand Up @@ -215,6 +242,46 @@ or SocketError.ConnectionReset
or SocketError.ConnectionAborted
or SocketError.NetworkReset;

private static bool IsListenerFatalError ( SocketError socketError ) =>
socketError is SocketError.InvalidArgument
or SocketError.NotSocket
or SocketError.Shutdown
or SocketError.OperationAborted
or SocketError.Interrupted;

private void TriggerListenerRebuild () {
if (Interlocked.CompareExchange ( ref _listenerRestarting, 1, 0 ) != 0)
return;

_ = RebuildListenerAsync ();
}

private async Task RebuildListenerAsync () {
try { _listener.Close (); } catch { }
try { _listener.Dispose (); } catch { }

while (_isListening && !_disposedValue) {
await Task.Delay ( ListenerAcceptRetryDelayMilliseconds ).ConfigureAwait ( false );

try {
Socket newListener = CreateListenerSocket ();
newListener.Bind ( _endpoint );
newListener.Listen ( backlog: 4096 );
_listener = newListener;
break;
}
catch (SocketException) {
}
Comment on lines +273 to +274
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Dispose failed rebuilt listener before retry loop

When CreateListenerSocket()/Bind/Listen throws during RebuildListenerAsync, the catch (SocketException) block swallows the exception and immediately retries without disposing the just-created newListener. In a persistent failure (e.g., endpoint temporarily unavailable), this leaks one socket handle every 250ms, which can exhaust file descriptors and prevent the host from recovering.

Useful? React with 👍 / 👎.

}

Interlocked.Exchange ( ref _listenerRestarting, 0 );

if (_isListening && !_disposedValue) {
for (int i = 0; i < AcceptPoolSize; i++)
StartAccept ( i );
}
}

[MethodImpl ( MethodImplOptions.AggressiveOptimization )]
internal async Task ProcessConnectionCoreAsync ( Socket client ) {
// Early exit se não há handler
Expand Down
Loading