diff --git a/src/Nethermind/Nethermind.Core.Test/Scheduler/CommandSchedulerTests.cs b/src/Nethermind/Nethermind.Core.Test/Scheduler/CommandSchedulerTests.cs new file mode 100644 index 000000000000..5ff45deeca1d --- /dev/null +++ b/src/Nethermind/Nethermind.Core.Test/Scheduler/CommandSchedulerTests.cs @@ -0,0 +1,221 @@ +// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using Nethermind.Core.Scheduler; +using Nethermind.Core.Timers; +using Nethermind.Logging; +using NSubstitute; +using NUnit.Framework; +using ITimer = Nethermind.Core.Timers.ITimer; + +namespace Nethermind.Core.Test.Scheduler; + +public class CommandSchedulerTests +{ + private ITimerFactory _timerFactory = null!; + + [SetUp] + public void SetUp() + { + _timerFactory = TimerFactory.Default; + } + + [Test] + public async Task Registered_task_is_executed_on_timer_elapsed() + { + using ManualResetEventSlim executed = new(initialState: false); + ITimer capturedTimer = null!; + + ITimerFactory factory = new FunctionalTimerFactory(interval => + { + ITimer timer = Substitute.For(); + capturedTimer = timer; + return timer; + }); + + await using CommandScheduler scheduler = new CommandScheduler(factory, LimboLogs.Instance); + + SimpleTask task = new SimpleTask("test-task", TimeSpan.FromSeconds(1), _ => + { + executed.Set(); + return Task.CompletedTask; + }); + + scheduler.Register(task); + scheduler.Start(); + + // Simulate the timer firing + capturedTimer.Elapsed += Raise.Event(); + + executed.Wait(timeout: TimeSpan.FromSeconds(5)).Should().BeTrue("task should execute when timer elapses"); + } + + [Test] + public async Task Multiple_tasks_each_get_their_own_timer() + { + int timerCount = 0; + + ITimerFactory factory = new FunctionalTimerFactory(_ => + { + Interlocked.Increment(ref timerCount); + return Substitute.For(); + }); + + await using CommandScheduler scheduler = new CommandScheduler(factory, LimboLogs.Instance); + + scheduler.Register(new SimpleTask("task-1", TimeSpan.FromSeconds(1))); + scheduler.Register(new SimpleTask("task-2", TimeSpan.FromSeconds(2))); + scheduler.Register(new SimpleTask("task-3", TimeSpan.FromSeconds(3))); + scheduler.Start(); + + timerCount.Should().Be(3, "each registered task should get its own timer"); + } + + [Test] + public void Cannot_register_after_start() + { + ITimerFactory factory = new FunctionalTimerFactory(_ => Substitute.For()); + + CommandScheduler scheduler = new CommandScheduler(factory, LimboLogs.Instance); + scheduler.Start(); + + try + { + Action act = () => scheduler.Register(new SimpleTask("late-task", TimeSpan.FromSeconds(1))); + + act.Should().Throw() + .WithMessage("*late-task*"); + } + finally + { + _ = scheduler.DisposeAsync().AsTask(); + } + } + + [Test] + public void Cannot_start_twice() + { + ITimerFactory factory = new FunctionalTimerFactory(_ => Substitute.For()); + + CommandScheduler scheduler = new CommandScheduler(factory, LimboLogs.Instance); + scheduler.Start(); + + try + { + Action act = () => scheduler.Start(); + act.Should().Throw(); + } + finally + { + _ = scheduler.DisposeAsync().AsTask(); + } + } + + [Test] + public async Task Task_exception_does_not_crash_scheduler() + { + ITimer capturedTimer = null!; + ITimerFactory factory = new FunctionalTimerFactory(_ => + { + ITimer timer = Substitute.For(); + capturedTimer = timer; + return timer; + }); + + await using CommandScheduler scheduler = new CommandScheduler(factory, LimboLogs.Instance); + + using ManualResetEventSlim faulted = new(initialState: false); + SimpleTask task = new SimpleTask("faulting-task", TimeSpan.FromSeconds(1), _ => + { + faulted.Set(); + throw new InvalidOperationException("task failure"); + }); + + scheduler.Register(task); + scheduler.Start(); + + capturedTimer.Elapsed += Raise.Event(); + + // The scheduler should absorb the exception and not crash + faulted.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue(); + // Give the error handler a moment to run + await Task.Delay(50); + } + + [Test] + public async Task Dispose_stops_all_timers() + { + ITimer timer1 = Substitute.For(); + ITimer timer2 = Substitute.For(); + int call = 0; + + ITimerFactory factory = new FunctionalTimerFactory(_ => + { + return Interlocked.Increment(ref call) == 1 ? timer1 : timer2; + }); + + CommandScheduler scheduler = new CommandScheduler(factory, LimboLogs.Instance); + scheduler.Register(new SimpleTask("a", TimeSpan.FromSeconds(1))); + scheduler.Register(new SimpleTask("b", TimeSpan.FromSeconds(2))); + scheduler.Start(); + + await scheduler.DisposeAsync(); + + timer1.Received().Stop(); + timer1.Received().Dispose(); + timer2.Received().Stop(); + timer2.Received().Dispose(); + } + + [Test] + public async Task Cancelled_token_prevents_task_execution() + { + ITimer capturedTimer = null!; + ITimerFactory factory = new FunctionalTimerFactory(_ => + { + ITimer timer = Substitute.For(); + capturedTimer = timer; + return timer; + }); + + await using CommandScheduler scheduler = new CommandScheduler(factory, LimboLogs.Instance); + + bool executed = false; + SimpleTask task = new SimpleTask("cancel-task", TimeSpan.FromSeconds(1), _ => + { + executed = true; + return Task.CompletedTask; + }); + + scheduler.Register(task); + + using CancellationTokenSource cts = new(); + scheduler.Start(cts.Token); + cts.Cancel(); + + capturedTimer.Elapsed += Raise.Event(); + + // Give the timer callback a moment to run (it should bail out immediately) + await Task.Delay(100); + executed.Should().BeFalse("cancelled token should prevent task execution"); + } + + /// + /// Minimal for use in tests. + /// + private sealed class SimpleTask( + string name, + TimeSpan interval, + Func? action = null) : IScheduledTask + { + public string Name { get; } = name; + public TimeSpan Interval { get; } = interval; + + public Task ExecuteAsync(CancellationToken cancellationToken) => + action is not null ? action(cancellationToken) : Task.CompletedTask; + } +} diff --git a/src/Nethermind/Nethermind.Core/Scheduler/CommandScheduler.cs b/src/Nethermind/Nethermind.Core/Scheduler/CommandScheduler.cs new file mode 100644 index 000000000000..b3dc6e2be041 --- /dev/null +++ b/src/Nethermind/Nethermind.Core/Scheduler/CommandScheduler.cs @@ -0,0 +1,116 @@ +// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Nethermind.Core.Timers; +using Nethermind.Logging; +using ITimer = Nethermind.Core.Timers.ITimer; + +namespace Nethermind.Core.Scheduler; + +/// +/// Manages periodic execution of registered instances. +/// Each task is driven by a dedicated timer created via . +/// +public sealed class CommandScheduler : ICommandScheduler +{ + private readonly ITimerFactory _timerFactory; + private readonly ILogger _logger; + private readonly List _tasks = []; + private readonly List _timers = []; + private CancellationToken _cancellationToken; + private bool _started; + + public CommandScheduler(ITimerFactory timerFactory, ILogManager logManager) + { + ArgumentNullException.ThrowIfNull(timerFactory); + ArgumentNullException.ThrowIfNull(logManager); + + _timerFactory = timerFactory; + _logger = logManager.GetClassLogger(); + } + + /// + public void Register(IScheduledTask task) + { + ArgumentNullException.ThrowIfNull(task); + + if (_started) + { + throw new InvalidOperationException($"Cannot register task '{task.Name}' after the scheduler has been started."); + } + + _tasks.Add(task); + if (_logger.IsDebug) _logger.Debug($"Registered scheduled task '{task.Name}' with interval {task.Interval}."); + } + + /// + public void Start(CancellationToken cancellationToken = default) + { + if (_started) + { + throw new InvalidOperationException("The scheduler has already been started."); + } + + _started = true; + _cancellationToken = cancellationToken; + + foreach (IScheduledTask task in _tasks) + { + ITimer timer = _timerFactory.CreateTimer(task.Interval); + timer.AutoReset = true; + timer.Elapsed += (_, _) => OnTimerElapsed(task); + timer.Start(); + _timers.Add(timer); + + if (_logger.IsInfo) _logger.Info($"Scheduled task '{task.Name}' started with interval {task.Interval}."); + } + } + + private void OnTimerElapsed(IScheduledTask task) + { + if (_cancellationToken.IsCancellationRequested) + { + return; + } + + // Run the task on a thread-pool thread to avoid blocking the timer thread. + // RunTaskAsync catches all exceptions, so the resulting task will not fault. + Task runTask = Task.Run(() => RunTaskAsync(task, _cancellationToken), _cancellationToken); + runTask.ContinueWith( + t => { if (_logger.IsError) _logger.Error($"Unexpected failure in scheduled task '{task.Name}'.", t.Exception); }, + TaskContinuationOptions.OnlyOnFaulted); + } + + private async Task RunTaskAsync(IScheduledTask task, CancellationToken cancellationToken) + { + try + { + await task.ExecuteAsync(cancellationToken); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Expected on shutdown. + } + catch (Exception ex) + { + if (_logger.IsError) _logger.Error($"Unhandled exception in scheduled task '{task.Name}'.", ex); + } + } + + /// + public ValueTask DisposeAsync() + { + foreach (ITimer timer in _timers) + { + timer.Stop(); + timer.Dispose(); + } + + _timers.Clear(); + return ValueTask.CompletedTask; + } +} diff --git a/src/Nethermind/Nethermind.Core/Scheduler/ICommandScheduler.cs b/src/Nethermind/Nethermind.Core/Scheduler/ICommandScheduler.cs new file mode 100644 index 000000000000..550eda2e51fe --- /dev/null +++ b/src/Nethermind/Nethermind.Core/Scheduler/ICommandScheduler.cs @@ -0,0 +1,26 @@ +// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Nethermind.Core.Scheduler; + +/// +/// Schedules and manages periodic execution of instances. +/// +public interface ICommandScheduler : IAsyncDisposable +{ + /// + /// Registers a task to be executed at its specified interval. + /// + /// The task to schedule. + void Register(IScheduledTask task); + + /// + /// Starts all registered scheduled tasks. + /// + /// Token to observe for cancellation. + void Start(CancellationToken cancellationToken = default); +} diff --git a/src/Nethermind/Nethermind.Core/Scheduler/IScheduledTask.cs b/src/Nethermind/Nethermind.Core/Scheduler/IScheduledTask.cs new file mode 100644 index 000000000000..34f88baf5306 --- /dev/null +++ b/src/Nethermind/Nethermind.Core/Scheduler/IScheduledTask.cs @@ -0,0 +1,30 @@ +// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Nethermind.Core.Scheduler; + +/// +/// Represents a task that is executed periodically on a fixed interval. +/// +public interface IScheduledTask +{ + /// + /// Gets the name identifying this scheduled task. + /// + string Name { get; } + + /// + /// Gets the interval at which the task should be executed. + /// + TimeSpan Interval { get; } + + /// + /// Executes the task asynchronously. + /// + /// Token to observe for cancellation. + Task ExecuteAsync(CancellationToken cancellationToken); +}