Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 221 additions & 0 deletions src/Nethermind/Nethermind.Core.Test/Scheduler/CommandSchedulerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Nethermind.Core.Scheduler;
using Nethermind.Core.Timers;
using Nethermind.Logging;
using NSubstitute;
using NUnit.Framework;
using ITimer = Nethermind.Core.Timers.ITimer;

namespace Nethermind.Core.Test.Scheduler;

public class CommandSchedulerTests
{
private ITimerFactory _timerFactory = null!;

[SetUp]
public void SetUp()
{
_timerFactory = TimerFactory.Default;
}

[Test]
public async Task Registered_task_is_executed_on_timer_elapsed()
{
using ManualResetEventSlim executed = new(initialState: false);
ITimer capturedTimer = null!;

ITimerFactory factory = new FunctionalTimerFactory(interval =>
{
ITimer timer = Substitute.For<ITimer>();
capturedTimer = timer;
return timer;
});

await using CommandScheduler scheduler = new CommandScheduler(factory, LimboLogs.Instance);

SimpleTask task = new SimpleTask("test-task", TimeSpan.FromSeconds(1), _ =>
{
executed.Set();
return Task.CompletedTask;
});

scheduler.Register(task);
scheduler.Start();

// Simulate the timer firing
capturedTimer.Elapsed += Raise.Event();

executed.Wait(timeout: TimeSpan.FromSeconds(5)).Should().BeTrue("task should execute when timer elapses");
}

[Test]
public async Task Multiple_tasks_each_get_their_own_timer()
{
int timerCount = 0;

ITimerFactory factory = new FunctionalTimerFactory(_ =>
{
Interlocked.Increment(ref timerCount);
return Substitute.For<ITimer>();
});

await using CommandScheduler scheduler = new CommandScheduler(factory, LimboLogs.Instance);

scheduler.Register(new SimpleTask("task-1", TimeSpan.FromSeconds(1)));
scheduler.Register(new SimpleTask("task-2", TimeSpan.FromSeconds(2)));
scheduler.Register(new SimpleTask("task-3", TimeSpan.FromSeconds(3)));
scheduler.Start();

timerCount.Should().Be(3, "each registered task should get its own timer");
}

[Test]
public void Cannot_register_after_start()
{
ITimerFactory factory = new FunctionalTimerFactory(_ => Substitute.For<ITimer>());

CommandScheduler scheduler = new CommandScheduler(factory, LimboLogs.Instance);
scheduler.Start();

try
{
Action act = () => scheduler.Register(new SimpleTask("late-task", TimeSpan.FromSeconds(1)));

act.Should().Throw<InvalidOperationException>()
.WithMessage("*late-task*");
}
finally
{
_ = scheduler.DisposeAsync().AsTask();
}
}

[Test]
public void Cannot_start_twice()
{
ITimerFactory factory = new FunctionalTimerFactory(_ => Substitute.For<ITimer>());

CommandScheduler scheduler = new CommandScheduler(factory, LimboLogs.Instance);
scheduler.Start();

try
{
Action act = () => scheduler.Start();
act.Should().Throw<InvalidOperationException>();
}
finally
{
_ = scheduler.DisposeAsync().AsTask();
}
}

[Test]
public async Task Task_exception_does_not_crash_scheduler()
{
ITimer capturedTimer = null!;
ITimerFactory factory = new FunctionalTimerFactory(_ =>
{
ITimer timer = Substitute.For<ITimer>();
capturedTimer = timer;
return timer;
});

await using CommandScheduler scheduler = new CommandScheduler(factory, LimboLogs.Instance);

using ManualResetEventSlim faulted = new(initialState: false);
SimpleTask task = new SimpleTask("faulting-task", TimeSpan.FromSeconds(1), _ =>
{
faulted.Set();
throw new InvalidOperationException("task failure");
});

scheduler.Register(task);
scheduler.Start();

capturedTimer.Elapsed += Raise.Event();

// The scheduler should absorb the exception and not crash
faulted.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue();
// Give the error handler a moment to run
await Task.Delay(50);
}

[Test]
public async Task Dispose_stops_all_timers()
{
ITimer timer1 = Substitute.For<ITimer>();
ITimer timer2 = Substitute.For<ITimer>();
int call = 0;

ITimerFactory factory = new FunctionalTimerFactory(_ =>
{
return Interlocked.Increment(ref call) == 1 ? timer1 : timer2;
});

CommandScheduler scheduler = new CommandScheduler(factory, LimboLogs.Instance);
scheduler.Register(new SimpleTask("a", TimeSpan.FromSeconds(1)));
scheduler.Register(new SimpleTask("b", TimeSpan.FromSeconds(2)));
scheduler.Start();

await scheduler.DisposeAsync();

timer1.Received().Stop();
timer1.Received().Dispose();
timer2.Received().Stop();
timer2.Received().Dispose();
}

[Test]
public async Task Cancelled_token_prevents_task_execution()
{
ITimer capturedTimer = null!;
ITimerFactory factory = new FunctionalTimerFactory(_ =>
{
ITimer timer = Substitute.For<ITimer>();
capturedTimer = timer;
return timer;
});

await using CommandScheduler scheduler = new CommandScheduler(factory, LimboLogs.Instance);

bool executed = false;
SimpleTask task = new SimpleTask("cancel-task", TimeSpan.FromSeconds(1), _ =>
{
executed = true;
return Task.CompletedTask;
});

scheduler.Register(task);

using CancellationTokenSource cts = new();
scheduler.Start(cts.Token);
cts.Cancel();

capturedTimer.Elapsed += Raise.Event();

// Give the timer callback a moment to run (it should bail out immediately)
await Task.Delay(100);
executed.Should().BeFalse("cancelled token should prevent task execution");
}

/// <summary>
/// Minimal <see cref="IScheduledTask"/> for use in tests.
/// </summary>
private sealed class SimpleTask(
string name,
TimeSpan interval,
Func<CancellationToken, Task>? action = null) : IScheduledTask
{
public string Name { get; } = name;
public TimeSpan Interval { get; } = interval;

public Task ExecuteAsync(CancellationToken cancellationToken) =>
action is not null ? action(cancellationToken) : Task.CompletedTask;
}
}
116 changes: 116 additions & 0 deletions src/Nethermind/Nethermind.Core/Scheduler/CommandScheduler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Core.Timers;
using Nethermind.Logging;
using ITimer = Nethermind.Core.Timers.ITimer;

namespace Nethermind.Core.Scheduler;

/// <summary>
/// Manages periodic execution of registered <see cref="IScheduledTask"/> instances.
/// Each task is driven by a dedicated timer created via <see cref="ITimerFactory"/>.
/// </summary>
public sealed class CommandScheduler : ICommandScheduler
{
private readonly ITimerFactory _timerFactory;
private readonly ILogger _logger;
private readonly List<IScheduledTask> _tasks = [];
private readonly List<ITimer> _timers = [];
private CancellationToken _cancellationToken;
private bool _started;

public CommandScheduler(ITimerFactory timerFactory, ILogManager logManager)
{
ArgumentNullException.ThrowIfNull(timerFactory);
ArgumentNullException.ThrowIfNull(logManager);

_timerFactory = timerFactory;
_logger = logManager.GetClassLogger();
}

/// <inheritdoc/>
public void Register(IScheduledTask task)
{
ArgumentNullException.ThrowIfNull(task);

if (_started)
{
throw new InvalidOperationException($"Cannot register task '{task.Name}' after the scheduler has been started.");
}

_tasks.Add(task);
if (_logger.IsDebug) _logger.Debug($"Registered scheduled task '{task.Name}' with interval {task.Interval}.");
}

/// <inheritdoc/>
public void Start(CancellationToken cancellationToken = default)
{
if (_started)
{
throw new InvalidOperationException("The scheduler has already been started.");
}

_started = true;
_cancellationToken = cancellationToken;

foreach (IScheduledTask task in _tasks)
{
ITimer timer = _timerFactory.CreateTimer(task.Interval);
timer.AutoReset = true;
timer.Elapsed += (_, _) => OnTimerElapsed(task);
timer.Start();
_timers.Add(timer);

if (_logger.IsInfo) _logger.Info($"Scheduled task '{task.Name}' started with interval {task.Interval}.");
}
}

private void OnTimerElapsed(IScheduledTask task)
{
if (_cancellationToken.IsCancellationRequested)
{
return;
}

// Run the task on a thread-pool thread to avoid blocking the timer thread.
// RunTaskAsync catches all exceptions, so the resulting task will not fault.
Task runTask = Task.Run(() => RunTaskAsync(task, _cancellationToken), _cancellationToken);
runTask.ContinueWith(
t => { if (_logger.IsError) _logger.Error($"Unexpected failure in scheduled task '{task.Name}'.", t.Exception); },
TaskContinuationOptions.OnlyOnFaulted);
}

private async Task RunTaskAsync(IScheduledTask task, CancellationToken cancellationToken)
{
try
{
await task.ExecuteAsync(cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Expected on shutdown.
}
catch (Exception ex)
{
if (_logger.IsError) _logger.Error($"Unhandled exception in scheduled task '{task.Name}'.", ex);
}
}

/// <inheritdoc/>
public ValueTask DisposeAsync()
{
foreach (ITimer timer in _timers)
{
timer.Stop();
timer.Dispose();
}

_timers.Clear();
return ValueTask.CompletedTask;
}
}
26 changes: 26 additions & 0 deletions src/Nethermind/Nethermind.Core/Scheduler/ICommandScheduler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Nethermind.Core.Scheduler;

/// <summary>
/// Schedules and manages periodic execution of <see cref="IScheduledTask"/> instances.
/// </summary>
public interface ICommandScheduler : IAsyncDisposable
{
/// <summary>
/// Registers a task to be executed at its specified interval.
/// </summary>
/// <param name="task">The task to schedule.</param>
void Register(IScheduledTask task);

/// <summary>
/// Starts all registered scheduled tasks.
/// </summary>
/// <param name="cancellationToken">Token to observe for cancellation.</param>
void Start(CancellationToken cancellationToken = default);
}
30 changes: 30 additions & 0 deletions src/Nethermind/Nethermind.Core/Scheduler/IScheduledTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Nethermind.Core.Scheduler;

/// <summary>
/// Represents a task that is executed periodically on a fixed interval.
/// </summary>
public interface IScheduledTask
{
/// <summary>
/// Gets the name identifying this scheduled task.
/// </summary>
string Name { get; }

/// <summary>
/// Gets the interval at which the task should be executed.
/// </summary>
TimeSpan Interval { get; }

/// <summary>
/// Executes the task asynchronously.
/// </summary>
/// <param name="cancellationToken">Token to observe for cancellation.</param>
Task ExecuteAsync(CancellationToken cancellationToken);
}