From 5f620a169f6bccf24bad17a9b23af840fc01ab8d Mon Sep 17 00:00:00 2001 From: Chris Masters Date: Sun, 11 Jan 2026 13:07:38 +0200 Subject: [PATCH 1/5] Add AdvanceWallClockTime to TopologyTestDriver for testing PROCESSING_TIME punctuations This change adds support for advancing wall-clock time in TopologyTestDriver, enabling proper unit testing of processors that use PROCESSING_TIME punctuations. Previously, there was no way to control wall-clock time in tests, making it impossible to reliably test buffering, windowing, and other time-sensitive operations that use PROCESSING_TIME. Changes: - Add IWallClockTimeProvider interface for time abstraction - Add MockWallClockTimeProvider for controllable time in tests - Add AdvanceWallClockTime method to TopologyTestDriver - Modify StreamTask to use injected time provider when available - Fix PROCESSING_TIME scheduling to correctly fire after interval (was using startTime = now + interval, changed to startTime = now) This mirrors the Java Kafka Streams TopologyTestDriver.advanceWallClockTime API. --- .DS_Store | Bin 8196 -> 8196 bytes core/Mock/ClusterInMemoryTopologyDriver.cs | 10 + core/Mock/IBehaviorTopologyTestDriver.cs | 7 + core/Mock/IWallClockTimeProvider.cs | 21 ++ core/Mock/MockWallClockTimeProvider.cs | 27 +++ core/Mock/TaskSynchronousTopologyDriver.cs | 23 +- core/Mock/TopologyTestDriver.cs | 39 +++ core/Processors/StreamTask.cs | 35 ++- .../TestDriver/AdvanceWallClockTimeTests.cs | 229 ++++++++++++++++++ 9 files changed, 384 insertions(+), 7 deletions(-) create mode 100644 core/Mock/IWallClockTimeProvider.cs create mode 100644 core/Mock/MockWallClockTimeProvider.cs create mode 100644 test/Streamiz.Kafka.Net.Tests/TestDriver/AdvanceWallClockTimeTests.cs diff --git a/.DS_Store b/.DS_Store index 102ecc34f0cda526696b23f79eaa5e0546c11f34..321215ef4470b309e10e3e4f4ab989d095ae1878 100644 GIT binary patch delta 40 wcmZp1XmOa}&nUhzU^hRb_+}mfSH{h|1%22iHe_vPm-xoAIbC!C)5L~!01&bb*#H0l delta 95 zcmZp1XmOa}&nU4mU^hRb#AY4=S4KwF$y|aOW(;}^E)4k$B@Dp~sX(%XA(J7GA)TQZ sC|(LAQyGdpbMljua`KZH7#IYA*b#{5ZFUn3XWPs!@r`BkWD#~|03*f~>i_@% diff --git a/core/Mock/ClusterInMemoryTopologyDriver.cs b/core/Mock/ClusterInMemoryTopologyDriver.cs index 0ed9118b..ec4140a7 100644 --- a/core/Mock/ClusterInMemoryTopologyDriver.cs +++ b/core/Mock/ClusterInMemoryTopologyDriver.cs @@ -244,6 +244,16 @@ public void TriggerCommit() //((StreamThread)threadTopology)?.Manager.CommitAll(); } + public void AdvanceWallClockTime(TimeSpan advance) + { + // Wall-clock time advancement is not supported in the cluster in-memory driver + // because it uses real Kafka with threads that use actual system time. + // Use TaskSynchronousTopologyDriver instead for testing PROCESSING_TIME punctuations. + throw new NotSupportedException( + "AdvanceWallClockTime is not supported in ClusterInMemoryTopologyDriver. " + + "Use the synchronous topology driver (default mode) instead for testing PROCESSING_TIME punctuations."); + } + #endregion } } diff --git a/core/Mock/IBehaviorTopologyTestDriver.cs b/core/Mock/IBehaviorTopologyTestDriver.cs index 68958831..b93c7b27 100644 --- a/core/Mock/IBehaviorTopologyTestDriver.cs +++ b/core/Mock/IBehaviorTopologyTestDriver.cs @@ -15,5 +15,12 @@ internal interface IBehaviorTopologyTestDriver : IDisposable TestMultiInputTopic CreateMultiInputTopic(string[] topics, ISerDes keySerdes = null, ISerDes valueSerdes = null); IStateStore GetStateStore(string name); void TriggerCommit(); + + /// + /// Advance wall-clock time by the specified duration, triggering any PROCESSING_TIME punctuations + /// that are scheduled to fire during the advanced time period. + /// + /// The amount of time to advance the wall clock + void AdvanceWallClockTime(TimeSpan advance); } } diff --git a/core/Mock/IWallClockTimeProvider.cs b/core/Mock/IWallClockTimeProvider.cs new file mode 100644 index 00000000..6c8dd5e3 --- /dev/null +++ b/core/Mock/IWallClockTimeProvider.cs @@ -0,0 +1,21 @@ +using System; + +namespace Streamiz.Kafka.Net.Mock +{ + /// + /// Interface for providing wall-clock time. Used to allow time mocking in tests. + /// + internal interface IWallClockTimeProvider + { + /// + /// Gets the current wall-clock time in milliseconds since epoch. + /// + long GetWallClockTime(); + + /// + /// Advances the wall-clock time by the specified duration. + /// + /// The amount of time to advance + void Advance(TimeSpan advance); + } +} \ No newline at end of file diff --git a/core/Mock/MockWallClockTimeProvider.cs b/core/Mock/MockWallClockTimeProvider.cs new file mode 100644 index 00000000..d1ccd69b --- /dev/null +++ b/core/Mock/MockWallClockTimeProvider.cs @@ -0,0 +1,27 @@ +using System; +using Streamiz.Kafka.Net.Crosscutting; + +namespace Streamiz.Kafka.Net.Mock +{ + /// + /// A mock wall-clock time provider for testing that starts at the current time + /// and allows manual advancement. + /// + internal class MockWallClockTimeProvider : IWallClockTimeProvider + { + private long _currentTimeMs; + + public MockWallClockTimeProvider() + { + // Initialize to current time + _currentTimeMs = DateTime.Now.GetMilliseconds(); + } + + public long GetWallClockTime() => _currentTimeMs; + + public void Advance(TimeSpan advance) + { + _currentTimeMs += (long)advance.TotalMilliseconds; + } + } +} \ No newline at end of file diff --git a/core/Mock/TaskSynchronousTopologyDriver.cs b/core/Mock/TaskSynchronousTopologyDriver.cs index 94f7f976..d57e829f 100644 --- a/core/Mock/TaskSynchronousTopologyDriver.cs +++ b/core/Mock/TaskSynchronousTopologyDriver.cs @@ -27,10 +27,10 @@ internal sealed class TaskSynchronousTopologyDriver : IBehaviorTopologyTestDrive private readonly IDictionary tasks = new Dictionary(); private readonly GlobalStateUpdateTask globalTask; private readonly IDictionary externalProcessorTopologies = - new Dictionary(); + new Dictionary(); private readonly GlobalProcessorContext globalProcessorContext; private readonly StreamMetricsRegistry metricsRegistry; - + private readonly IDictionary> partitionsByTaskId = new Dictionary>(); @@ -38,6 +38,7 @@ internal sealed class TaskSynchronousTopologyDriver : IBehaviorTopologyTestDrive private readonly bool hasGlobalTopology = false; private ITopicManager internalTopicManager; private readonly IConsumer repartitionConsumerForwarder; + private readonly MockWallClockTimeProvider wallClockTimeProvider; public bool IsRunning { get; private set; } @@ -113,6 +114,9 @@ public TaskSynchronousTopologyDriver(string clientId, InternalTopologyBuilder to repartitionConsumerForwarder = this.supplier.GetConsumer(topicConfiguration.ToConsumerConfig("consumer-repartition-forwarder"), null); + + // Initialize mock wall clock time provider for controlling time in tests + wallClockTimeProvider = new MockWallClockTimeProvider(); } internal StreamTask GetTask(string topicName) @@ -134,7 +138,8 @@ internal StreamTask GetTask(string topicName) supplier, producer, new MockChangelogRegister(), - metricsRegistry); + metricsRegistry, + wallClockTimeProvider); task.InitializeStateStores(); task.InitializeTopology(); task.RestorationIfNeeded(); @@ -319,6 +324,18 @@ public void TriggerCommit() globalTask?.FlushState(); } + public void AdvanceWallClockTime(TimeSpan advance) + { + // Advance the mock wall clock + wallClockTimeProvider.Advance(advance); + + // Trigger PROCESSING_TIME punctuations for all tasks + foreach (var task in tasks.Values) + { + task.PunctuateSystemTime(); + } + } + #endregion } } \ No newline at end of file diff --git a/core/Mock/TopologyTestDriver.cs b/core/Mock/TopologyTestDriver.cs index 69a61af0..7321fca5 100644 --- a/core/Mock/TopologyTestDriver.cs +++ b/core/Mock/TopologyTestDriver.cs @@ -405,5 +405,44 @@ public IReadOnlyWindowStore GetWindowStore(string name) public bool IsError => behavior.IsError; #endregion + + #region Time Advancement + + /// + /// Advance wall-clock time by the specified duration, triggering any + /// punctuations that are scheduled to fire during the advanced time period. + /// + /// This is analogous to + /// Java Kafka Streams TopologyTestDriver.advanceWallClockTime. + /// + /// + /// This method is only supported when using (the default mode). + /// Using this method with will throw . + /// + /// + /// The amount of time to advance the wall clock + /// When used with + /// + /// + /// using (var driver = new TopologyTestDriver(topology, config)) + /// { + /// var inputTopic = driver.CreateInputTopic<string, string>("input"); + /// var outputTopic = driver.CreateOutputTopic<string, string>("output"); + /// + /// inputTopic.PipeInput("key", "value"); + /// + /// // Advance time to trigger PROCESSING_TIME punctuations + /// driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(500)); + /// + /// var result = outputTopic.ReadKeyValue(); + /// } + /// + /// + public void AdvanceWallClockTime(TimeSpan advance) + { + behavior.AdvanceWallClockTime(advance); + } + + #endregion } } \ No newline at end of file diff --git a/core/Processors/StreamTask.cs b/core/Processors/StreamTask.cs index 615ef609..7b5bee23 100644 --- a/core/Processors/StreamTask.cs +++ b/core/Processors/StreamTask.cs @@ -10,6 +10,7 @@ using Streamiz.Kafka.Net.Kafka.Internal; using Streamiz.Kafka.Net.Metrics; using Streamiz.Kafka.Net.Metrics.Internal; +using Streamiz.Kafka.Net.Mock; using Streamiz.Kafka.Net.Processors.Internal; using Streamiz.Kafka.Net.Processors.Public; using Streamiz.Kafka.Net.Stream.Internal; @@ -32,6 +33,7 @@ internal class StreamTask : AbstractTask private long idleStartTime; private bool transactionInFlight; private readonly string threadId; + private readonly IWallClockTimeProvider wallClockTimeProvider; private Sensor closeTaskSensor; private Sensor activeBufferedRecordSensor; @@ -47,11 +49,24 @@ public StreamTask(string threadId, TaskId id, IEnumerable partit ProcessorTopology processorTopology, IConsumer consumer, IStreamConfig configuration, IKafkaSupplier kafkaSupplier, StreamsProducer producer, IChangelogRegister changelogRegister, StreamMetricsRegistry streamMetricsRegistry) + : this(threadId, id, partitions, processorTopology, consumer, configuration, kafkaSupplier, + producer, changelogRegister, streamMetricsRegistry, null) + { + } + + /// + /// Constructor that accepts an optional wall clock time provider for testing PROCESSING_TIME punctuations. + /// + public StreamTask(string threadId, TaskId id, IEnumerable partitions, + ProcessorTopology processorTopology, IConsumer consumer, IStreamConfig configuration, + IKafkaSupplier kafkaSupplier, StreamsProducer producer, IChangelogRegister changelogRegister, + StreamMetricsRegistry streamMetricsRegistry, IWallClockTimeProvider wallClockTimeProvider) : base(id, partitions, processorTopology, consumer, configuration, changelogRegister) { this.threadId = threadId; this.kafkaSupplier = kafkaSupplier; this.streamMetricsRegistry = streamMetricsRegistry; + this.wallClockTimeProvider = wallClockTimeProvider; consumedOffsets = new Dictionary(); maxTaskIdleMs = configuration.MaxTaskIdleMs; idleStartTime = -1; @@ -82,12 +97,21 @@ public StreamTask(string threadId, TaskId id, IEnumerable partit } partitionGrouper = new PartitionGrouper(partitionsQueue); - + RegisterSensors(); } #region Private + /// + /// Gets the current wall clock time, using the injected time provider if available, + /// otherwise falling back to DateTime.Now. + /// + private long GetWallClockTime() + { + return wallClockTimeProvider?.GetWallClockTime() ?? DateTime.Now.GetMilliseconds(); + } + private void RegisterSensors() { closeTaskSensor = ThreadMetrics.ClosedTaskSensor(threadId, streamMetricsRegistry); @@ -394,8 +418,11 @@ public override TaskScheduled RegisterScheduleTask(TimeSpan interval, Punctuatio // align punctuation to 0L, punctuate as soon as we have data return ScheduleTask(0L, interval, punctuationType, punctuator); case PunctuationType.PROCESSING_TIME: - // align punctuation to now, punctuate after interval has elapsed - return ScheduleTask(DateTime.Now.GetMilliseconds() + (long)interval.TotalMilliseconds, interval, punctuationType, punctuator); + // Set lastTime to (now - interval) so that the first punctuation fires after 'interval' has elapsed. + // This is because CanExecute checks: now - lastTime >= interval + // With lastTime = now - interval, the first check becomes: now - (now - interval) >= interval = interval >= interval = true + // But we want it to fire AFTER interval, so we use now as the starting point + return ScheduleTask(GetWallClockTime(), interval, punctuationType, punctuator); default: return null; } @@ -467,7 +494,7 @@ public void CompleteRestoration() public bool PunctuateSystemTime() { - long systemTime = DateTime.Now.GetMilliseconds(); + long systemTime = GetWallClockTime(); bool punctuated = false; diff --git a/test/Streamiz.Kafka.Net.Tests/TestDriver/AdvanceWallClockTimeTests.cs b/test/Streamiz.Kafka.Net.Tests/TestDriver/AdvanceWallClockTimeTests.cs new file mode 100644 index 00000000..9c508fdc --- /dev/null +++ b/test/Streamiz.Kafka.Net.Tests/TestDriver/AdvanceWallClockTimeTests.cs @@ -0,0 +1,229 @@ +using System; +using System.Collections.Generic; +using NUnit.Framework; +using Streamiz.Kafka.Net.Mock; +using Streamiz.Kafka.Net.Processors.Public; +using Streamiz.Kafka.Net.SerDes; +using Streamiz.Kafka.Net.Stream; + +namespace Streamiz.Kafka.Net.Tests.TestDriver +{ + /// + /// Tests for the AdvanceWallClockTime functionality which allows testing + /// PROCESSING_TIME punctuations in the TopologyTestDriver. + /// + public class AdvanceWallClockTimeTests + { + private class ProcessingTimePunctuationProcessor : IProcessor + { + public static List PunctuationTimes { get; } = new(); + public static int PunctuationIntervalMs { get; set; } = 100; + + public void Init(ProcessorContext context) + { + context.Schedule( + TimeSpan.FromMilliseconds(PunctuationIntervalMs), + PunctuationType.PROCESSING_TIME, + timestamp => PunctuationTimes.Add(timestamp)); + } + + public void Process(Record record) + { + } + + public void Close() + { + } + } + + [SetUp] + public void Setup() + { + ProcessingTimePunctuationProcessor.PunctuationTimes.Clear(); + ProcessingTimePunctuationProcessor.PunctuationIntervalMs = 100; + } + + [Test] + public void Should_Trigger_Processing_Time_Punctuation_After_Advancing_Time() + { + // Arrange + var config = new StreamConfig + { + ApplicationId = "test-advance-wall-clock-time" + }; + + ProcessingTimePunctuationProcessor.PunctuationIntervalMs = 100; + + var builder = new StreamBuilder(); + builder.Stream("input") + .Process(new ProcessorBuilder() + .Processor() + .Build()); + + var topology = builder.Build(); + + using (var driver = new TopologyTestDriver(topology, config)) + { + var inputTopic = driver.CreateInputTopic("input"); + + // Act - pipe a record to initialize the processor and schedule the punctuation + inputTopic.PipeInput("key1", "value1"); + + // Assert - no punctuation yet (time hasn't advanced enough) + Assert.AreEqual(0, ProcessingTimePunctuationProcessor.PunctuationTimes.Count, + "No punctuation should have fired yet"); + + // Act - advance time past the punctuation interval + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(110)); + + // Assert - punctuation should have fired + Assert.AreEqual(1, ProcessingTimePunctuationProcessor.PunctuationTimes.Count, + "Punctuation should have fired once after advancing time"); + + // Act - advance time again + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(100)); + + // Assert - another punctuation + Assert.AreEqual(2, ProcessingTimePunctuationProcessor.PunctuationTimes.Count, + "Punctuation should have fired twice"); + } + } + + [Test] + public void Should_Not_Trigger_Punctuation_Before_Interval_Elapsed() + { + // Arrange + var config = new StreamConfig + { + ApplicationId = "test-no-early-punctuation" + }; + + ProcessingTimePunctuationProcessor.PunctuationIntervalMs = 500; + + var builder = new StreamBuilder(); + builder.Stream("input") + .Process(new ProcessorBuilder() + .Processor() + .Build()); + + var topology = builder.Build(); + + using (var driver = new TopologyTestDriver(topology, config)) + { + var inputTopic = driver.CreateInputTopic("input"); + + // Initialize the processor + inputTopic.PipeInput("key", "value"); + + // Advance time, but not past the interval + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(400)); + + // Assert - no punctuation should have fired + Assert.AreEqual(0, ProcessingTimePunctuationProcessor.PunctuationTimes.Count, + "Punctuation should not fire before interval elapsed"); + } + } + + [Test] + public void Should_Trigger_Multiple_Punctuations_For_Multiple_Intervals() + { + // Arrange + var config = new StreamConfig + { + ApplicationId = "test-multiple-punctuations" + }; + + ProcessingTimePunctuationProcessor.PunctuationIntervalMs = 100; + + var builder = new StreamBuilder(); + builder.Stream("input") + .Process(new ProcessorBuilder() + .Processor() + .Build()); + + var topology = builder.Build(); + + using (var driver = new TopologyTestDriver(topology, config)) + { + var inputTopic = driver.CreateInputTopic("input"); + inputTopic.PipeInput("key", "value"); + + // Advance time by 5 intervals worth + for (int i = 0; i < 5; i++) + { + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(110)); + } + + // Assert - should have multiple punctuations + Assert.AreEqual(5, ProcessingTimePunctuationProcessor.PunctuationTimes.Count, + "Should have 5 punctuations for 5 time advances"); + } + } + + [Test] + public void Should_Throw_NotSupportedException_For_ClusterInMemory_Mode() + { + // Arrange + var config = new StreamConfig + { + ApplicationId = "test-cluster-mode" + }; + + var builder = new StreamBuilder(); + builder.Stream("input") + .To("output"); + + var topology = builder.Build(); + + using (var driver = new TopologyTestDriver(topology, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY)) + { + // Act & Assert + Assert.Throws(() => + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(100))); + } + } + + [Test] + public void Should_Use_Mock_Time_For_Scheduling_New_Punctuations() + { + // This test verifies that after advancing time, new punctuations + // scheduled use the mock time, not real wall clock time. + + var config = new StreamConfig + { + ApplicationId = "test-mock-time-scheduling" + }; + + ProcessingTimePunctuationProcessor.PunctuationIntervalMs = 200; + + var builder = new StreamBuilder(); + builder.Stream("input") + .Process(new ProcessorBuilder() + .Processor() + .Build()); + + var topology = builder.Build(); + + using (var driver = new TopologyTestDriver(topology, config)) + { + var inputTopic = driver.CreateInputTopic("input"); + inputTopic.PipeInput("key", "value"); + + // Advance time and capture punctuation times + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(210)); + Assert.AreEqual(1, ProcessingTimePunctuationProcessor.PunctuationTimes.Count); + var firstPunctuationTime = ProcessingTimePunctuationProcessor.PunctuationTimes[0]; + + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(210)); + Assert.AreEqual(2, ProcessingTimePunctuationProcessor.PunctuationTimes.Count); + var secondPunctuationTime = ProcessingTimePunctuationProcessor.PunctuationTimes[1]; + + // The difference between punctuation times should be approximately + // the interval (within the time we advanced) + var timeDifference = secondPunctuationTime - firstPunctuationTime; + Assert.GreaterOrEqual(timeDifference, 200, + "Time difference between punctuations should be at least the interval"); + } + } + } +} \ No newline at end of file From dd8897f4bc503788fe67b44ab8a55df268b7de5d Mon Sep 17 00:00:00 2001 From: Chris Masters Date: Mon, 12 Jan 2026 22:06:46 +0200 Subject: [PATCH 2/5] Expose WallClockTime through ProcessorContext and improve multi-stage topology support Building on the AdvanceWallClockTime feature, this change: 1. Exposes WallClockTime as a property on ProcessorContext, allowing processors and transformers to access mock-aware wall clock time. This enables custom processors that need wall clock time to work correctly with TopologyTestDriver. 2. Improves AdvanceWallClockTime to properly handle multi-stage topologies with repartitions. The implementation now iterates until no more progress is made, ensuring records flow through all stages including downstream buffers that may have their own PROCESSING_TIME punctuations. Changes: - Add GetWallClockTime() abstract method to AbstractTask - Add WallClockTime property to ProcessorContext and ProcessorContext - Implement GetWallClockTime in StreamTask (uses injected provider) and ExternalStreamTask (uses real time) - Enhance TaskSynchronousTopologyDriver.AdvanceWallClockTime to handle internal communication topics and iterate until stable --- core/Mock/Sync/SyncPipeBuilder.cs | 13 +- core/Mock/Sync/SyncProducer.cs | 11 ++ core/Mock/TaskSynchronousTopologyDriver.cs | 114 +++++++++++++++-- core/ProcessorContext.cs | 10 ++ core/Processors/AbstractTask.cs | 7 ++ .../ExternalProcessorTopologyExecutor.cs | 7 +- core/Processors/Public/ProcessorContext.cs | 8 ++ core/Processors/SinkProcessor.cs | 1 - core/Processors/StreamTask.cs | 10 +- .../Private/TopologyBuildOrderBugTests.cs | 116 ++++++++++++++++++ 10 files changed, 278 insertions(+), 19 deletions(-) create mode 100644 test/Streamiz.Kafka.Net.Tests/Private/TopologyBuildOrderBugTests.cs diff --git a/core/Mock/Sync/SyncPipeBuilder.cs b/core/Mock/Sync/SyncPipeBuilder.cs index c4840dad..ad1b66a0 100644 --- a/core/Mock/Sync/SyncPipeBuilder.cs +++ b/core/Mock/Sync/SyncPipeBuilder.cs @@ -16,10 +16,12 @@ internal class SyncPipeBuilder : IPipeBuilder private class StreamTaskPublisher : ISyncPublisher { private readonly StreamTask task; + private readonly IWallClockTimeProvider wallClockTimeProvider; - public StreamTaskPublisher(StreamTask task) + public StreamTaskPublisher(StreamTask task, IWallClockTimeProvider wallClockTimeProvider = null) { this.task = task; + this.wallClockTimeProvider = wallClockTimeProvider; } private int offset = 0; @@ -34,11 +36,12 @@ public void PublishRecord(string topic, byte[] key, byte[] value, DateTime times public void Flush() { - long now = DateTime.Now.GetMilliseconds(); + // Use mock wall clock time if available, otherwise fall back to real time + long now = wallClockTimeProvider?.GetWallClockTime() ?? DateTime.Now.GetMilliseconds(); TaskManager.CurrentTask = task; while (task.CanProcess(now)) task.Process(); - + task.PunctuateStreamTime(); task.PunctuateSystemTime(); TaskManager.CurrentTask = null; @@ -115,9 +118,9 @@ public void Close() private readonly ISyncPublisher publisher; private readonly SyncProducer mockProducer; - public SyncPipeBuilder(StreamTask task) + public SyncPipeBuilder(StreamTask task, IWallClockTimeProvider wallClockTimeProvider = null) { - publisher = new StreamTaskPublisher(task); + publisher = new StreamTaskPublisher(task, wallClockTimeProvider); } public SyncPipeBuilder(GlobalStateUpdateTask globalTask) diff --git a/core/Mock/Sync/SyncProducer.cs b/core/Mock/Sync/SyncProducer.cs index e3640427..a935e4b6 100644 --- a/core/Mock/Sync/SyncProducer.cs +++ b/core/Mock/Sync/SyncProducer.cs @@ -163,6 +163,17 @@ public IEnumerable> GetHistory(string topicName) return topics[topicName].ToArray(); } + public IDictionary GetAllTopicCounts() + { + lock (_lock) + { + var result = new Dictionary(); + foreach (var kvp in topics) + result[kvp.Key] = kvp.Value.Count; + return result; + } + } + public void CommitTransaction() { throw new NotImplementedException(); diff --git a/core/Mock/TaskSynchronousTopologyDriver.cs b/core/Mock/TaskSynchronousTopologyDriver.cs index d57e829f..4d13311e 100644 --- a/core/Mock/TaskSynchronousTopologyDriver.cs +++ b/core/Mock/TaskSynchronousTopologyDriver.cs @@ -129,6 +129,13 @@ internal StreamTask GetTask(string topicName) { if (builder.GetSourceTopics().Contains(topicName)) { + // Make sure partitionsByTaskId has this task's partitions + if (!partitionsByTaskId.ContainsKey(id)) + { + var part = new TopicPartition(topicName, 0); + partitionsByTaskId.Add(id, new List {part}); + } + task = new StreamTask("thread-0", id, partitionsByTaskId[id], @@ -168,20 +175,34 @@ private void InitializeInternalTopicManager() } private void ForwardRepartitionTopic(string topic) + { + ForwardTopicRecordsIfAny(topic); + } + + /// + /// Forwards any pending records from an internal topic to downstream tasks. + /// Returns true if any records were forwarded, false otherwise. + /// + private bool ForwardTopicRecordsIfAny(string topic) { var records = new List>(); + repartitionConsumerForwarder.Subscribe(topic); + ConsumeResult record = null; do { record = repartitionConsumerForwarder.Consume(); if (record != null) + { records.Add(record); + } } while (record != null); - if (records.Any()) { + var task = GetTask(topic); + var pipe = CreateBuilder(topic).Input(topic, configuration); foreach(var r in records) pipe.Pipe(r.Message.Key, r.Message.Value, r.Message.Timestamp.UnixTimestampMs.FromMilliseconds(), r.Message.Headers); @@ -189,15 +210,33 @@ record = repartitionConsumerForwarder.Consume(); pipe.Dispose(); repartitionConsumerForwarder.Commit(records.Last()); + repartitionConsumerForwarder.Unsubscribe(); + return true; } repartitionConsumerForwarder.Unsubscribe(); + return false; + } + + /// + /// Gets all topics that are used for internal communication within the topology. + /// These are topics that are both written to (sink) and read from (source) within the topology. + /// + private HashSet GetAllInternalCommunicationTopics() + { + var topicGroups = builder.MakeInternalTopicGroups(); + var allSinks = topicGroups.Values.SelectMany(t => t.SinkTopics).ToHashSet(); + var allSources = topicGroups.Values.SelectMany(t => t.SourceTopics).ToHashSet(); + + // Internal topics are those that are both written and read within the topology + allSinks.IntersectWith(allSources); + return allSinks; } private SyncPipeBuilder CreateBuilder(string topicName) { var task = GetTask(topicName); - + if (task == null) { if(hasGlobalTopology && builder.GetGlobalTopics().Contains(topicName)) @@ -206,7 +245,7 @@ private SyncPipeBuilder CreateBuilder(string topicName) return new SyncPipeBuilder(externalProcessorTopologies[topicName]); } - return new SyncPipeBuilder(task); + return new SyncPipeBuilder(task, wallClockTimeProvider); } #region IBehaviorTopologyTestDriver @@ -326,14 +365,73 @@ public void TriggerCommit() public void AdvanceWallClockTime(TimeSpan advance) { - // Advance the mock wall clock + // Advance the mock wall clock once at the beginning wallClockTimeProvider.Advance(advance); - // Trigger PROCESSING_TIME punctuations for all tasks - foreach (var task in tasks.Values) + // Get all topics used for internal communication within the topology + var internalTopics = GetAllInternalCommunicationTopics(); + + // Process until no more progress (like Java's completeAllProcessableWork) + // We advance time incrementally inside the loop to allow downstream buffers to flush. + // This is needed because buffers schedule punctuations relative to current time, + // so buffers created during forwarding need additional time to flush. + bool madeProgress; + int iterations = 0; + const int maxIterations = 100; // Prevent infinite loops + do { - task.PunctuateSystemTime(); - } + madeProgress = false; + iterations++; + + // 1. FIRST forward records from all internal topics to ensure buffers have data + // Keep forwarding until no more records found in any topic + bool forwardedAny = false; + bool forwardedThisPass; + do + { + forwardedThisPass = false; + foreach (var topic in internalTopics) + { + if (ForwardTopicRecordsIfAny(topic)) + { + madeProgress = true; + forwardedThisPass = true; + forwardedAny = true; + } + } + } while (forwardedThisPass); + + // 2. THEN trigger PROCESSING_TIME punctuations for all tasks (including newly created ones) + bool punctuatedAny = false; + foreach (var task in tasks.Values) + { + if (task.PunctuateSystemTime()) + { + madeProgress = true; + punctuatedAny = true; + } + } + + // 3. If we forwarded records but no punctuations fired, we need to advance + // time significantly so downstream buffers can flush. Use the full advance + // amount since buffer punctuations may be scheduled far apart. + // If punctuations fired or no records were forwarded, use a smaller increment. + if (madeProgress) + { + if (forwardedAny && !punctuatedAny) + { + // Records went to downstream buffers but their punctuations didn't fire yet + // Advance by full amount to ensure downstream punctuations can fire + wallClockTimeProvider.Advance(advance); + } + else + { + // Normal progress - advance by small amount + wallClockTimeProvider.Advance(TimeSpan.FromMilliseconds(advance.TotalMilliseconds / 10)); + } + } + + } while (madeProgress && iterations < maxIterations); } #endregion diff --git a/core/ProcessorContext.cs b/core/ProcessorContext.cs index c846878a..799e326e 100644 --- a/core/ProcessorContext.cs +++ b/core/ProcessorContext.cs @@ -73,6 +73,16 @@ public class ProcessorContext /// public virtual string StateDir => $"{Path.Combine(Configuration.StateDir, Configuration.ApplicationId, Id.ToString())}"; + /// + /// Gets the current wall clock time in milliseconds since epoch. + /// In production, this returns system time (DateTime.Now). + /// In tests using TopologyTestDriver, this returns the mock wall clock time + /// that can be controlled via AdvanceWallClockTime. + /// Use this instead of DateTimeOffset.UtcNow when you need time that works + /// correctly in both production and test environments. + /// + public virtual long WallClockTime => Task?.GetWallClockTime() ?? DateTime.Now.GetMilliseconds(); + internal ProcessorContext() { diff --git a/core/Processors/AbstractTask.cs b/core/Processors/AbstractTask.cs index 2dddefdc..d8190996 100644 --- a/core/Processors/AbstractTask.cs +++ b/core/Processors/AbstractTask.cs @@ -96,6 +96,13 @@ public bool IsPersistent public abstract void Suspend(); public abstract void MayWriteCheckpoint(bool force = false); public abstract TaskScheduled RegisterScheduleTask(TimeSpan interval, PunctuationType punctuationType, Action punctuator); + + /// + /// Gets the current wall clock time in milliseconds since epoch. + /// In production, this returns DateTime.Now. In tests using TopologyTestDriver, + /// this returns the mock wall clock time that can be controlled via AdvanceWallClockTime. + /// + public abstract long GetWallClockTime(); #endregion protected void TransitTo(TaskState newState) diff --git a/core/Processors/ExternalProcessorTopologyExecutor.cs b/core/Processors/ExternalProcessorTopologyExecutor.cs index 27f9b6d6..e597bdb6 100644 --- a/core/Processors/ExternalProcessorTopologyExecutor.cs +++ b/core/Processors/ExternalProcessorTopologyExecutor.cs @@ -101,7 +101,12 @@ public override bool InitializeStateStores() { throw new NotImplementedException(); } - + + public override long GetWallClockTime() + { + return DateTime.Now.GetMilliseconds(); + } + #endregion } diff --git a/core/Processors/Public/ProcessorContext.cs b/core/Processors/Public/ProcessorContext.cs index 2a83f636..daa33ce6 100644 --- a/core/Processors/Public/ProcessorContext.cs +++ b/core/Processors/Public/ProcessorContext.cs @@ -37,6 +37,14 @@ internal ProcessorContext(ProcessorContext context) internal override IProcessor CurrentProcessor => context.CurrentProcessor; internal override IRecordContext RecordContext => context.RecordContext; + /// + /// Gets the current wall clock time in milliseconds since epoch. + /// In production, this returns system time (DateTime.Now). + /// In tests using TopologyTestDriver, this returns the mock wall clock time + /// that can be controlled via AdvanceWallClockTime. + /// + public override long WallClockTime => context.WallClockTime; + #endregion private void CheckConfiguration() diff --git a/core/Processors/SinkProcessor.cs b/core/Processors/SinkProcessor.cs index 6596ba6d..a78f7b4d 100644 --- a/core/Processors/SinkProcessor.cs +++ b/core/Processors/SinkProcessor.cs @@ -54,7 +54,6 @@ public override void Init(ProcessorContext context) public override void Process(K key, V value) { LogProcessingKeyValue(key, value); - if (KeySerDes == null || ValueSerDes == null) { var s = KeySerDes == null ? "key" : "value"; diff --git a/core/Processors/StreamTask.cs b/core/Processors/StreamTask.cs index 7b5bee23..92d517f7 100644 --- a/core/Processors/StreamTask.cs +++ b/core/Processors/StreamTask.cs @@ -101,17 +101,19 @@ public StreamTask(string threadId, TaskId id, IEnumerable partit RegisterSensors(); } - #region Private - /// /// Gets the current wall clock time, using the injected time provider if available, /// otherwise falling back to DateTime.Now. + /// In tests using TopologyTestDriver, this returns the mock wall clock time + /// that can be controlled via AdvanceWallClockTime. /// - private long GetWallClockTime() + public override long GetWallClockTime() { return wallClockTimeProvider?.GetWallClockTime() ?? DateTime.Now.GetMilliseconds(); } + #region Private + private void RegisterSensors() { closeTaskSensor = ThreadMetrics.ClosedTaskSensor(threadId, streamMetricsRegistry); @@ -147,7 +149,7 @@ private TaskScheduled ScheduleTask(long startTime, TimeSpan interval, Punctuatio var taskScheduled = new TaskScheduled( startTime, interval, - punctuator, + punctuator, Context.CurrentProcessor); switch (punctuationType) diff --git a/test/Streamiz.Kafka.Net.Tests/Private/TopologyBuildOrderBugTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/TopologyBuildOrderBugTests.cs new file mode 100644 index 00000000..9e30c5e7 --- /dev/null +++ b/test/Streamiz.Kafka.Net.Tests/Private/TopologyBuildOrderBugTests.cs @@ -0,0 +1,116 @@ +using System; +using NUnit.Framework; +using Streamiz.Kafka.Net.Crosscutting; +using Streamiz.Kafka.Net.Mock; +using Streamiz.Kafka.Net.SerDes; +using Streamiz.Kafka.Net.State; +using Streamiz.Kafka.Net.Stream; +using Streamiz.Kafka.Net.Table; + +namespace Streamiz.Kafka.Net.Tests.Private +{ + /// + /// Tests to verify that records flow correctly through topologies + /// that use Aggregate → ToStream → Repartition when AdvanceWallClockTime is used. + /// + [TestFixture] + public class TopologyBuildOrderBugTests + { + /// + /// Basic test: Aggregate → ToStream → Repartition should forward records. + /// + [Test] + public void Aggregate_ToStream_Repartition_Should_Forward_Records() + { + var builder = new StreamBuilder(); + + builder.Stream("input") + .GroupByKey() + .Aggregate( + () => 0, + (k, v, agg) => agg + v, + Materialized> + .Create() + .With()) + .ToStream() + .Repartition() + .To("output"); + + var config = new StreamConfig + { + ApplicationId = "test-topology-build-order-bug" + }; + + using var driver = new TopologyTestDriver(builder.Build(), config); + + var input = driver.CreateInputTopic("input"); + var output = driver.CreateOutputTopic("output"); + + // Send a record + input.PipeInput("key1", 100); + + // Advance time to trigger any buffered/punctuation-based processing + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(100)); + + // Read results - this should have our aggregated value + var results = output.ReadKeyValuesToMap(); + + Assert.That(results.ContainsKey("key1"), Is.True, + "Records should flow through Aggregate → ToStream → Repartition → To."); + Assert.That(results["key1"], Is.EqualTo(100)); + } + + /// + /// Test with multiple repartitions. + /// + [Test] + public void Multiple_Repartitions_Should_Forward_Records() + { + var builder = new StreamBuilder(); + + builder.Stream("input") + .GroupByKey() + .Aggregate( + () => 0, + (k, v, agg) => agg + v, + Materialized> + .Create() + .With()) + .ToStream() + .Repartition() // First repartition + .GroupByKey() + .Aggregate( + () => 0, + (k, v, agg) => agg + v, + Materialized> + .Create("second-agg-store") + .With()) + .ToStream() + .Repartition() // Second repartition + .To("output"); + + var config = new StreamConfig + { + ApplicationId = "test-multiple-repartitions" + }; + + using var driver = new TopologyTestDriver(builder.Build(), config); + + var input = driver.CreateInputTopic("input"); + var output = driver.CreateOutputTopic("output"); + + input.PipeInput("key1", 100); + + // Need multiple time advances to flush through multiple stages + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(100)); + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(100)); + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(100)); + + var results = output.ReadKeyValuesToMap(); + + Assert.That(results.ContainsKey("key1"), Is.True, + "Records should flow through multiple Aggregate → ToStream → Repartition stages."); + Assert.That(results["key1"], Is.EqualTo(100)); + } + } +} From 75a62d88df2018e5ac3b85006a0162798c0b1cd3 Mon Sep 17 00:00:00 2001 From: Chris Masters Date: Tue, 13 Jan 2026 09:56:36 +0200 Subject: [PATCH 3/5] GetWallClockTime() to property WallClockTime and default wall clock provider implementation --- core/Mock/IWallClockTimeProvider.cs | 3 ++- core/Mock/MockWallClockTimeProvider.cs | 2 +- core/Mock/Sync/SyncPipeBuilder.cs | 12 ++++----- core/Mock/SystemWallClockTimeProvider.cs | 27 +++++++++++++++++++ core/Mock/TaskSynchronousTopologyDriver.cs | 2 +- core/ProcessorContext.cs | 2 +- core/Processors/AbstractTask.cs | 2 +- .../ExternalProcessorTopologyExecutor.cs | 5 +--- core/Processors/StreamTask.cs | 16 +++++------ 9 files changed, 45 insertions(+), 26 deletions(-) create mode 100644 core/Mock/SystemWallClockTimeProvider.cs diff --git a/core/Mock/IWallClockTimeProvider.cs b/core/Mock/IWallClockTimeProvider.cs index 6c8dd5e3..4940d3c6 100644 --- a/core/Mock/IWallClockTimeProvider.cs +++ b/core/Mock/IWallClockTimeProvider.cs @@ -10,10 +10,11 @@ internal interface IWallClockTimeProvider /// /// Gets the current wall-clock time in milliseconds since epoch. /// - long GetWallClockTime(); + long WallClockTime { get; } /// /// Advances the wall-clock time by the specified duration. + /// Only applicable for mock implementations; production implementations may ignore this. /// /// The amount of time to advance void Advance(TimeSpan advance); diff --git a/core/Mock/MockWallClockTimeProvider.cs b/core/Mock/MockWallClockTimeProvider.cs index d1ccd69b..da508518 100644 --- a/core/Mock/MockWallClockTimeProvider.cs +++ b/core/Mock/MockWallClockTimeProvider.cs @@ -17,7 +17,7 @@ public MockWallClockTimeProvider() _currentTimeMs = DateTime.Now.GetMilliseconds(); } - public long GetWallClockTime() => _currentTimeMs; + public long WallClockTime => _currentTimeMs; public void Advance(TimeSpan advance) { diff --git a/core/Mock/Sync/SyncPipeBuilder.cs b/core/Mock/Sync/SyncPipeBuilder.cs index ad1b66a0..8d5706e6 100644 --- a/core/Mock/Sync/SyncPipeBuilder.cs +++ b/core/Mock/Sync/SyncPipeBuilder.cs @@ -16,12 +16,10 @@ internal class SyncPipeBuilder : IPipeBuilder private class StreamTaskPublisher : ISyncPublisher { private readonly StreamTask task; - private readonly IWallClockTimeProvider wallClockTimeProvider; - public StreamTaskPublisher(StreamTask task, IWallClockTimeProvider wallClockTimeProvider = null) + public StreamTaskPublisher(StreamTask task) { this.task = task; - this.wallClockTimeProvider = wallClockTimeProvider; } private int offset = 0; @@ -36,8 +34,8 @@ public void PublishRecord(string topic, byte[] key, byte[] value, DateTime times public void Flush() { - // Use mock wall clock time if available, otherwise fall back to real time - long now = wallClockTimeProvider?.GetWallClockTime() ?? DateTime.Now.GetMilliseconds(); + // Use the task's wall clock time which is mock-aware + long now = task.WallClockTime; TaskManager.CurrentTask = task; while (task.CanProcess(now)) task.Process(); @@ -118,9 +116,9 @@ public void Close() private readonly ISyncPublisher publisher; private readonly SyncProducer mockProducer; - public SyncPipeBuilder(StreamTask task, IWallClockTimeProvider wallClockTimeProvider = null) + public SyncPipeBuilder(StreamTask task) { - publisher = new StreamTaskPublisher(task, wallClockTimeProvider); + publisher = new StreamTaskPublisher(task); } public SyncPipeBuilder(GlobalStateUpdateTask globalTask) diff --git a/core/Mock/SystemWallClockTimeProvider.cs b/core/Mock/SystemWallClockTimeProvider.cs new file mode 100644 index 00000000..740230be --- /dev/null +++ b/core/Mock/SystemWallClockTimeProvider.cs @@ -0,0 +1,27 @@ +using System; +using Streamiz.Kafka.Net.Crosscutting; + +namespace Streamiz.Kafka.Net.Mock +{ + /// + /// Default wall-clock time provider that returns the actual system time. + /// Used in production when no mock time provider is injected. + /// + internal class SystemWallClockTimeProvider : IWallClockTimeProvider + { + /// + /// Singleton instance for shared use. + /// + public static readonly SystemWallClockTimeProvider Instance = new(); + + public long WallClockTime => DateTime.Now.GetMilliseconds(); + + /// + /// No-op for system time provider. Time advances naturally. + /// + public void Advance(TimeSpan advance) + { + // No-op: system time cannot be advanced + } + } +} diff --git a/core/Mock/TaskSynchronousTopologyDriver.cs b/core/Mock/TaskSynchronousTopologyDriver.cs index 4d13311e..b103d6df 100644 --- a/core/Mock/TaskSynchronousTopologyDriver.cs +++ b/core/Mock/TaskSynchronousTopologyDriver.cs @@ -245,7 +245,7 @@ private SyncPipeBuilder CreateBuilder(string topicName) return new SyncPipeBuilder(externalProcessorTopologies[topicName]); } - return new SyncPipeBuilder(task, wallClockTimeProvider); + return new SyncPipeBuilder(task); } #region IBehaviorTopologyTestDriver diff --git a/core/ProcessorContext.cs b/core/ProcessorContext.cs index 799e326e..d3231f6a 100644 --- a/core/ProcessorContext.cs +++ b/core/ProcessorContext.cs @@ -81,7 +81,7 @@ public class ProcessorContext /// Use this instead of DateTimeOffset.UtcNow when you need time that works /// correctly in both production and test environments. /// - public virtual long WallClockTime => Task?.GetWallClockTime() ?? DateTime.Now.GetMilliseconds(); + public virtual long WallClockTime => Task?.WallClockTime ?? DateTime.Now.GetMilliseconds(); internal ProcessorContext() { diff --git a/core/Processors/AbstractTask.cs b/core/Processors/AbstractTask.cs index d8190996..dc174e93 100644 --- a/core/Processors/AbstractTask.cs +++ b/core/Processors/AbstractTask.cs @@ -102,7 +102,7 @@ public bool IsPersistent /// In production, this returns DateTime.Now. In tests using TopologyTestDriver, /// this returns the mock wall clock time that can be controlled via AdvanceWallClockTime. /// - public abstract long GetWallClockTime(); + public abstract long WallClockTime { get; } #endregion protected void TransitTo(TaskState newState) diff --git a/core/Processors/ExternalProcessorTopologyExecutor.cs b/core/Processors/ExternalProcessorTopologyExecutor.cs index e597bdb6..c271f2c8 100644 --- a/core/Processors/ExternalProcessorTopologyExecutor.cs +++ b/core/Processors/ExternalProcessorTopologyExecutor.cs @@ -102,10 +102,7 @@ public override bool InitializeStateStores() throw new NotImplementedException(); } - public override long GetWallClockTime() - { - return DateTime.Now.GetMilliseconds(); - } + public override long WallClockTime => DateTime.Now.GetMilliseconds(); #endregion } diff --git a/core/Processors/StreamTask.cs b/core/Processors/StreamTask.cs index 92d517f7..002b0bdf 100644 --- a/core/Processors/StreamTask.cs +++ b/core/Processors/StreamTask.cs @@ -50,7 +50,7 @@ public StreamTask(string threadId, TaskId id, IEnumerable partit IKafkaSupplier kafkaSupplier, StreamsProducer producer, IChangelogRegister changelogRegister, StreamMetricsRegistry streamMetricsRegistry) : this(threadId, id, partitions, processorTopology, consumer, configuration, kafkaSupplier, - producer, changelogRegister, streamMetricsRegistry, null) + producer, changelogRegister, streamMetricsRegistry, SystemWallClockTimeProvider.Instance) { } @@ -66,7 +66,7 @@ public StreamTask(string threadId, TaskId id, IEnumerable partit this.threadId = threadId; this.kafkaSupplier = kafkaSupplier; this.streamMetricsRegistry = streamMetricsRegistry; - this.wallClockTimeProvider = wallClockTimeProvider; + this.wallClockTimeProvider = wallClockTimeProvider ?? SystemWallClockTimeProvider.Instance; consumedOffsets = new Dictionary(); maxTaskIdleMs = configuration.MaxTaskIdleMs; idleStartTime = -1; @@ -102,15 +102,11 @@ public StreamTask(string threadId, TaskId id, IEnumerable partit } /// - /// Gets the current wall clock time, using the injected time provider if available, - /// otherwise falling back to DateTime.Now. + /// Gets the current wall clock time from the injected time provider. /// In tests using TopologyTestDriver, this returns the mock wall clock time /// that can be controlled via AdvanceWallClockTime. /// - public override long GetWallClockTime() - { - return wallClockTimeProvider?.GetWallClockTime() ?? DateTime.Now.GetMilliseconds(); - } + public override long WallClockTime => wallClockTimeProvider.WallClockTime; #region Private @@ -424,7 +420,7 @@ public override TaskScheduled RegisterScheduleTask(TimeSpan interval, Punctuatio // This is because CanExecute checks: now - lastTime >= interval // With lastTime = now - interval, the first check becomes: now - (now - interval) >= interval = interval >= interval = true // But we want it to fire AFTER interval, so we use now as the starting point - return ScheduleTask(GetWallClockTime(), interval, punctuationType, punctuator); + return ScheduleTask(WallClockTime, interval, punctuationType, punctuator); default: return null; } @@ -496,7 +492,7 @@ public void CompleteRestoration() public bool PunctuateSystemTime() { - long systemTime = GetWallClockTime(); + long systemTime = WallClockTime; bool punctuated = false; From fd2292aa75ba13a97649bc2c56ebbd02880789a4 Mon Sep 17 00:00:00 2001 From: Chris Masters Date: Tue, 13 Jan 2026 09:58:35 +0200 Subject: [PATCH 4/5] remove duplicate comment --- core/Mock/SystemWallClockTimeProvider.cs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/Mock/SystemWallClockTimeProvider.cs b/core/Mock/SystemWallClockTimeProvider.cs index 740230be..aa053318 100644 --- a/core/Mock/SystemWallClockTimeProvider.cs +++ b/core/Mock/SystemWallClockTimeProvider.cs @@ -19,9 +19,6 @@ internal class SystemWallClockTimeProvider : IWallClockTimeProvider /// /// No-op for system time provider. Time advances naturally. /// - public void Advance(TimeSpan advance) - { - // No-op: system time cannot be advanced - } + public void Advance(TimeSpan advance) { } } } From 2a0bb368c7aac5e63589188132a816c736b02a06 Mon Sep 17 00:00:00 2001 From: Chris Masters Date: Tue, 20 Jan 2026 07:45:30 +0100 Subject: [PATCH 5/5] Change property from expression body into get/private set --- core/Mock/MockWallClockTimeProvider.cs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/core/Mock/MockWallClockTimeProvider.cs b/core/Mock/MockWallClockTimeProvider.cs index da508518..a41a6ff7 100644 --- a/core/Mock/MockWallClockTimeProvider.cs +++ b/core/Mock/MockWallClockTimeProvider.cs @@ -9,19 +9,16 @@ namespace Streamiz.Kafka.Net.Mock /// internal class MockWallClockTimeProvider : IWallClockTimeProvider { - private long _currentTimeMs; - public MockWallClockTimeProvider() { - // Initialize to current time - _currentTimeMs = DateTime.Now.GetMilliseconds(); + WallClockTime = DateTime.Now.GetMilliseconds(); } - public long WallClockTime => _currentTimeMs; + public long WallClockTime { get; private set; } public void Advance(TimeSpan advance) { - _currentTimeMs += (long)advance.TotalMilliseconds; + WallClockTime += (long)advance.TotalMilliseconds; } } } \ No newline at end of file