diff --git a/.DS_Store b/.DS_Store index 102ecc34..321215ef 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/core/Mock/ClusterInMemoryTopologyDriver.cs b/core/Mock/ClusterInMemoryTopologyDriver.cs index 0ed9118b..ec4140a7 100644 --- a/core/Mock/ClusterInMemoryTopologyDriver.cs +++ b/core/Mock/ClusterInMemoryTopologyDriver.cs @@ -244,6 +244,16 @@ public void TriggerCommit() //((StreamThread)threadTopology)?.Manager.CommitAll(); } + public void AdvanceWallClockTime(TimeSpan advance) + { + // Wall-clock time advancement is not supported in the cluster in-memory driver + // because it uses real Kafka with threads that use actual system time. + // Use TaskSynchronousTopologyDriver instead for testing PROCESSING_TIME punctuations. + throw new NotSupportedException( + "AdvanceWallClockTime is not supported in ClusterInMemoryTopologyDriver. " + + "Use the synchronous topology driver (default mode) instead for testing PROCESSING_TIME punctuations."); + } + #endregion } } diff --git a/core/Mock/IBehaviorTopologyTestDriver.cs b/core/Mock/IBehaviorTopologyTestDriver.cs index 68958831..b93c7b27 100644 --- a/core/Mock/IBehaviorTopologyTestDriver.cs +++ b/core/Mock/IBehaviorTopologyTestDriver.cs @@ -15,5 +15,12 @@ internal interface IBehaviorTopologyTestDriver : IDisposable TestMultiInputTopic CreateMultiInputTopic(string[] topics, ISerDes keySerdes = null, ISerDes valueSerdes = null); IStateStore GetStateStore(string name); void TriggerCommit(); + + /// + /// Advance wall-clock time by the specified duration, triggering any PROCESSING_TIME punctuations + /// that are scheduled to fire during the advanced time period. + /// + /// The amount of time to advance the wall clock + void AdvanceWallClockTime(TimeSpan advance); } } diff --git a/core/Mock/IWallClockTimeProvider.cs b/core/Mock/IWallClockTimeProvider.cs new file mode 100644 index 00000000..4940d3c6 --- /dev/null +++ b/core/Mock/IWallClockTimeProvider.cs @@ -0,0 +1,22 @@ +using System; + +namespace Streamiz.Kafka.Net.Mock +{ + /// + /// Interface for providing wall-clock time. Used to allow time mocking in tests. + /// + internal interface IWallClockTimeProvider + { + /// + /// Gets the current wall-clock time in milliseconds since epoch. + /// + long WallClockTime { get; } + + /// + /// Advances the wall-clock time by the specified duration. + /// Only applicable for mock implementations; production implementations may ignore this. + /// + /// The amount of time to advance + void Advance(TimeSpan advance); + } +} \ No newline at end of file diff --git a/core/Mock/MockWallClockTimeProvider.cs b/core/Mock/MockWallClockTimeProvider.cs new file mode 100644 index 00000000..a41a6ff7 --- /dev/null +++ b/core/Mock/MockWallClockTimeProvider.cs @@ -0,0 +1,24 @@ +using System; +using Streamiz.Kafka.Net.Crosscutting; + +namespace Streamiz.Kafka.Net.Mock +{ + /// + /// A mock wall-clock time provider for testing that starts at the current time + /// and allows manual advancement. + /// + internal class MockWallClockTimeProvider : IWallClockTimeProvider + { + public MockWallClockTimeProvider() + { + WallClockTime = DateTime.Now.GetMilliseconds(); + } + + public long WallClockTime { get; private set; } + + public void Advance(TimeSpan advance) + { + WallClockTime += (long)advance.TotalMilliseconds; + } + } +} \ No newline at end of file diff --git a/core/Mock/Sync/SyncPipeBuilder.cs b/core/Mock/Sync/SyncPipeBuilder.cs index c4840dad..8d5706e6 100644 --- a/core/Mock/Sync/SyncPipeBuilder.cs +++ b/core/Mock/Sync/SyncPipeBuilder.cs @@ -34,11 +34,12 @@ public void PublishRecord(string topic, byte[] key, byte[] value, DateTime times public void Flush() { - long now = DateTime.Now.GetMilliseconds(); + // Use the task's wall clock time which is mock-aware + long now = task.WallClockTime; TaskManager.CurrentTask = task; while (task.CanProcess(now)) task.Process(); - + task.PunctuateStreamTime(); task.PunctuateSystemTime(); TaskManager.CurrentTask = null; diff --git a/core/Mock/Sync/SyncProducer.cs b/core/Mock/Sync/SyncProducer.cs index e3640427..a935e4b6 100644 --- a/core/Mock/Sync/SyncProducer.cs +++ b/core/Mock/Sync/SyncProducer.cs @@ -163,6 +163,17 @@ public IEnumerable> GetHistory(string topicName) return topics[topicName].ToArray(); } + public IDictionary GetAllTopicCounts() + { + lock (_lock) + { + var result = new Dictionary(); + foreach (var kvp in topics) + result[kvp.Key] = kvp.Value.Count; + return result; + } + } + public void CommitTransaction() { throw new NotImplementedException(); diff --git a/core/Mock/SystemWallClockTimeProvider.cs b/core/Mock/SystemWallClockTimeProvider.cs new file mode 100644 index 00000000..aa053318 --- /dev/null +++ b/core/Mock/SystemWallClockTimeProvider.cs @@ -0,0 +1,24 @@ +using System; +using Streamiz.Kafka.Net.Crosscutting; + +namespace Streamiz.Kafka.Net.Mock +{ + /// + /// Default wall-clock time provider that returns the actual system time. + /// Used in production when no mock time provider is injected. + /// + internal class SystemWallClockTimeProvider : IWallClockTimeProvider + { + /// + /// Singleton instance for shared use. + /// + public static readonly SystemWallClockTimeProvider Instance = new(); + + public long WallClockTime => DateTime.Now.GetMilliseconds(); + + /// + /// No-op for system time provider. Time advances naturally. + /// + public void Advance(TimeSpan advance) { } + } +} diff --git a/core/Mock/TaskSynchronousTopologyDriver.cs b/core/Mock/TaskSynchronousTopologyDriver.cs index 94f7f976..b103d6df 100644 --- a/core/Mock/TaskSynchronousTopologyDriver.cs +++ b/core/Mock/TaskSynchronousTopologyDriver.cs @@ -27,10 +27,10 @@ internal sealed class TaskSynchronousTopologyDriver : IBehaviorTopologyTestDrive private readonly IDictionary tasks = new Dictionary(); private readonly GlobalStateUpdateTask globalTask; private readonly IDictionary externalProcessorTopologies = - new Dictionary(); + new Dictionary(); private readonly GlobalProcessorContext globalProcessorContext; private readonly StreamMetricsRegistry metricsRegistry; - + private readonly IDictionary> partitionsByTaskId = new Dictionary>(); @@ -38,6 +38,7 @@ internal sealed class TaskSynchronousTopologyDriver : IBehaviorTopologyTestDrive private readonly bool hasGlobalTopology = false; private ITopicManager internalTopicManager; private readonly IConsumer repartitionConsumerForwarder; + private readonly MockWallClockTimeProvider wallClockTimeProvider; public bool IsRunning { get; private set; } @@ -113,6 +114,9 @@ public TaskSynchronousTopologyDriver(string clientId, InternalTopologyBuilder to repartitionConsumerForwarder = this.supplier.GetConsumer(topicConfiguration.ToConsumerConfig("consumer-repartition-forwarder"), null); + + // Initialize mock wall clock time provider for controlling time in tests + wallClockTimeProvider = new MockWallClockTimeProvider(); } internal StreamTask GetTask(string topicName) @@ -125,6 +129,13 @@ internal StreamTask GetTask(string topicName) { if (builder.GetSourceTopics().Contains(topicName)) { + // Make sure partitionsByTaskId has this task's partitions + if (!partitionsByTaskId.ContainsKey(id)) + { + var part = new TopicPartition(topicName, 0); + partitionsByTaskId.Add(id, new List {part}); + } + task = new StreamTask("thread-0", id, partitionsByTaskId[id], @@ -134,7 +145,8 @@ internal StreamTask GetTask(string topicName) supplier, producer, new MockChangelogRegister(), - metricsRegistry); + metricsRegistry, + wallClockTimeProvider); task.InitializeStateStores(); task.InitializeTopology(); task.RestorationIfNeeded(); @@ -163,20 +175,34 @@ private void InitializeInternalTopicManager() } private void ForwardRepartitionTopic(string topic) + { + ForwardTopicRecordsIfAny(topic); + } + + /// + /// Forwards any pending records from an internal topic to downstream tasks. + /// Returns true if any records were forwarded, false otherwise. + /// + private bool ForwardTopicRecordsIfAny(string topic) { var records = new List>(); + repartitionConsumerForwarder.Subscribe(topic); + ConsumeResult record = null; do { record = repartitionConsumerForwarder.Consume(); if (record != null) + { records.Add(record); + } } while (record != null); - if (records.Any()) { + var task = GetTask(topic); + var pipe = CreateBuilder(topic).Input(topic, configuration); foreach(var r in records) pipe.Pipe(r.Message.Key, r.Message.Value, r.Message.Timestamp.UnixTimestampMs.FromMilliseconds(), r.Message.Headers); @@ -184,15 +210,33 @@ record = repartitionConsumerForwarder.Consume(); pipe.Dispose(); repartitionConsumerForwarder.Commit(records.Last()); + repartitionConsumerForwarder.Unsubscribe(); + return true; } repartitionConsumerForwarder.Unsubscribe(); + return false; + } + + /// + /// Gets all topics that are used for internal communication within the topology. + /// These are topics that are both written to (sink) and read from (source) within the topology. + /// + private HashSet GetAllInternalCommunicationTopics() + { + var topicGroups = builder.MakeInternalTopicGroups(); + var allSinks = topicGroups.Values.SelectMany(t => t.SinkTopics).ToHashSet(); + var allSources = topicGroups.Values.SelectMany(t => t.SourceTopics).ToHashSet(); + + // Internal topics are those that are both written and read within the topology + allSinks.IntersectWith(allSources); + return allSinks; } private SyncPipeBuilder CreateBuilder(string topicName) { var task = GetTask(topicName); - + if (task == null) { if(hasGlobalTopology && builder.GetGlobalTopics().Contains(topicName)) @@ -319,6 +363,77 @@ public void TriggerCommit() globalTask?.FlushState(); } + public void AdvanceWallClockTime(TimeSpan advance) + { + // Advance the mock wall clock once at the beginning + wallClockTimeProvider.Advance(advance); + + // Get all topics used for internal communication within the topology + var internalTopics = GetAllInternalCommunicationTopics(); + + // Process until no more progress (like Java's completeAllProcessableWork) + // We advance time incrementally inside the loop to allow downstream buffers to flush. + // This is needed because buffers schedule punctuations relative to current time, + // so buffers created during forwarding need additional time to flush. + bool madeProgress; + int iterations = 0; + const int maxIterations = 100; // Prevent infinite loops + do + { + madeProgress = false; + iterations++; + + // 1. FIRST forward records from all internal topics to ensure buffers have data + // Keep forwarding until no more records found in any topic + bool forwardedAny = false; + bool forwardedThisPass; + do + { + forwardedThisPass = false; + foreach (var topic in internalTopics) + { + if (ForwardTopicRecordsIfAny(topic)) + { + madeProgress = true; + forwardedThisPass = true; + forwardedAny = true; + } + } + } while (forwardedThisPass); + + // 2. THEN trigger PROCESSING_TIME punctuations for all tasks (including newly created ones) + bool punctuatedAny = false; + foreach (var task in tasks.Values) + { + if (task.PunctuateSystemTime()) + { + madeProgress = true; + punctuatedAny = true; + } + } + + // 3. If we forwarded records but no punctuations fired, we need to advance + // time significantly so downstream buffers can flush. Use the full advance + // amount since buffer punctuations may be scheduled far apart. + // If punctuations fired or no records were forwarded, use a smaller increment. + if (madeProgress) + { + if (forwardedAny && !punctuatedAny) + { + // Records went to downstream buffers but their punctuations didn't fire yet + // Advance by full amount to ensure downstream punctuations can fire + wallClockTimeProvider.Advance(advance); + } + else + { + // Normal progress - advance by small amount + wallClockTimeProvider.Advance(TimeSpan.FromMilliseconds(advance.TotalMilliseconds / 10)); + } + } + + } while (madeProgress && iterations < maxIterations); + } + #endregion } } \ No newline at end of file diff --git a/core/Mock/TopologyTestDriver.cs b/core/Mock/TopologyTestDriver.cs index 69a61af0..7321fca5 100644 --- a/core/Mock/TopologyTestDriver.cs +++ b/core/Mock/TopologyTestDriver.cs @@ -405,5 +405,44 @@ public IReadOnlyWindowStore GetWindowStore(string name) public bool IsError => behavior.IsError; #endregion + + #region Time Advancement + + /// + /// Advance wall-clock time by the specified duration, triggering any + /// punctuations that are scheduled to fire during the advanced time period. + /// + /// This is analogous to + /// Java Kafka Streams TopologyTestDriver.advanceWallClockTime. + /// + /// + /// This method is only supported when using (the default mode). + /// Using this method with will throw . + /// + /// + /// The amount of time to advance the wall clock + /// When used with + /// + /// + /// using (var driver = new TopologyTestDriver(topology, config)) + /// { + /// var inputTopic = driver.CreateInputTopic<string, string>("input"); + /// var outputTopic = driver.CreateOutputTopic<string, string>("output"); + /// + /// inputTopic.PipeInput("key", "value"); + /// + /// // Advance time to trigger PROCESSING_TIME punctuations + /// driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(500)); + /// + /// var result = outputTopic.ReadKeyValue(); + /// } + /// + /// + public void AdvanceWallClockTime(TimeSpan advance) + { + behavior.AdvanceWallClockTime(advance); + } + + #endregion } } \ No newline at end of file diff --git a/core/ProcessorContext.cs b/core/ProcessorContext.cs index c846878a..d3231f6a 100644 --- a/core/ProcessorContext.cs +++ b/core/ProcessorContext.cs @@ -73,6 +73,16 @@ public class ProcessorContext /// public virtual string StateDir => $"{Path.Combine(Configuration.StateDir, Configuration.ApplicationId, Id.ToString())}"; + /// + /// Gets the current wall clock time in milliseconds since epoch. + /// In production, this returns system time (DateTime.Now). + /// In tests using TopologyTestDriver, this returns the mock wall clock time + /// that can be controlled via AdvanceWallClockTime. + /// Use this instead of DateTimeOffset.UtcNow when you need time that works + /// correctly in both production and test environments. + /// + public virtual long WallClockTime => Task?.WallClockTime ?? DateTime.Now.GetMilliseconds(); + internal ProcessorContext() { diff --git a/core/Processors/AbstractTask.cs b/core/Processors/AbstractTask.cs index 2dddefdc..dc174e93 100644 --- a/core/Processors/AbstractTask.cs +++ b/core/Processors/AbstractTask.cs @@ -96,6 +96,13 @@ public bool IsPersistent public abstract void Suspend(); public abstract void MayWriteCheckpoint(bool force = false); public abstract TaskScheduled RegisterScheduleTask(TimeSpan interval, PunctuationType punctuationType, Action punctuator); + + /// + /// Gets the current wall clock time in milliseconds since epoch. + /// In production, this returns DateTime.Now. In tests using TopologyTestDriver, + /// this returns the mock wall clock time that can be controlled via AdvanceWallClockTime. + /// + public abstract long WallClockTime { get; } #endregion protected void TransitTo(TaskState newState) diff --git a/core/Processors/ExternalProcessorTopologyExecutor.cs b/core/Processors/ExternalProcessorTopologyExecutor.cs index 27f9b6d6..c271f2c8 100644 --- a/core/Processors/ExternalProcessorTopologyExecutor.cs +++ b/core/Processors/ExternalProcessorTopologyExecutor.cs @@ -101,7 +101,9 @@ public override bool InitializeStateStores() { throw new NotImplementedException(); } - + + public override long WallClockTime => DateTime.Now.GetMilliseconds(); + #endregion } diff --git a/core/Processors/Public/ProcessorContext.cs b/core/Processors/Public/ProcessorContext.cs index 2a83f636..daa33ce6 100644 --- a/core/Processors/Public/ProcessorContext.cs +++ b/core/Processors/Public/ProcessorContext.cs @@ -37,6 +37,14 @@ internal ProcessorContext(ProcessorContext context) internal override IProcessor CurrentProcessor => context.CurrentProcessor; internal override IRecordContext RecordContext => context.RecordContext; + /// + /// Gets the current wall clock time in milliseconds since epoch. + /// In production, this returns system time (DateTime.Now). + /// In tests using TopologyTestDriver, this returns the mock wall clock time + /// that can be controlled via AdvanceWallClockTime. + /// + public override long WallClockTime => context.WallClockTime; + #endregion private void CheckConfiguration() diff --git a/core/Processors/SinkProcessor.cs b/core/Processors/SinkProcessor.cs index 6596ba6d..a78f7b4d 100644 --- a/core/Processors/SinkProcessor.cs +++ b/core/Processors/SinkProcessor.cs @@ -54,7 +54,6 @@ public override void Init(ProcessorContext context) public override void Process(K key, V value) { LogProcessingKeyValue(key, value); - if (KeySerDes == null || ValueSerDes == null) { var s = KeySerDes == null ? "key" : "value"; diff --git a/core/Processors/StreamTask.cs b/core/Processors/StreamTask.cs index 7b7560eb..002b0bdf 100644 --- a/core/Processors/StreamTask.cs +++ b/core/Processors/StreamTask.cs @@ -10,6 +10,7 @@ using Streamiz.Kafka.Net.Kafka.Internal; using Streamiz.Kafka.Net.Metrics; using Streamiz.Kafka.Net.Metrics.Internal; +using Streamiz.Kafka.Net.Mock; using Streamiz.Kafka.Net.Processors.Internal; using Streamiz.Kafka.Net.Processors.Public; using Streamiz.Kafka.Net.Stream.Internal; @@ -32,6 +33,7 @@ internal class StreamTask : AbstractTask private long idleStartTime; private bool transactionInFlight; private readonly string threadId; + private readonly IWallClockTimeProvider wallClockTimeProvider; private Sensor closeTaskSensor; private Sensor activeBufferedRecordSensor; @@ -47,11 +49,24 @@ public StreamTask(string threadId, TaskId id, IEnumerable partit ProcessorTopology processorTopology, IConsumer consumer, IStreamConfig configuration, IKafkaSupplier kafkaSupplier, StreamsProducer producer, IChangelogRegister changelogRegister, StreamMetricsRegistry streamMetricsRegistry) + : this(threadId, id, partitions, processorTopology, consumer, configuration, kafkaSupplier, + producer, changelogRegister, streamMetricsRegistry, SystemWallClockTimeProvider.Instance) + { + } + + /// + /// Constructor that accepts an optional wall clock time provider for testing PROCESSING_TIME punctuations. + /// + public StreamTask(string threadId, TaskId id, IEnumerable partitions, + ProcessorTopology processorTopology, IConsumer consumer, IStreamConfig configuration, + IKafkaSupplier kafkaSupplier, StreamsProducer producer, IChangelogRegister changelogRegister, + StreamMetricsRegistry streamMetricsRegistry, IWallClockTimeProvider wallClockTimeProvider) : base(id, partitions, processorTopology, consumer, configuration, changelogRegister) { this.threadId = threadId; this.kafkaSupplier = kafkaSupplier; this.streamMetricsRegistry = streamMetricsRegistry; + this.wallClockTimeProvider = wallClockTimeProvider ?? SystemWallClockTimeProvider.Instance; consumedOffsets = new Dictionary(); maxTaskIdleMs = configuration.MaxTaskIdleMs; idleStartTime = -1; @@ -82,10 +97,17 @@ public StreamTask(string threadId, TaskId id, IEnumerable partit } partitionGrouper = new PartitionGrouper(partitionsQueue); - + RegisterSensors(); } + /// + /// Gets the current wall clock time from the injected time provider. + /// In tests using TopologyTestDriver, this returns the mock wall clock time + /// that can be controlled via AdvanceWallClockTime. + /// + public override long WallClockTime => wallClockTimeProvider.WallClockTime; + #region Private private void RegisterSensors() @@ -123,7 +145,7 @@ private TaskScheduled ScheduleTask(long startTime, TimeSpan interval, Punctuatio var taskScheduled = new TaskScheduled( startTime, interval, - punctuator, + punctuator, Context.CurrentProcessor); switch (punctuationType) @@ -394,8 +416,11 @@ public override TaskScheduled RegisterScheduleTask(TimeSpan interval, Punctuatio // align punctuation to 0L, punctuate as soon as we have data return ScheduleTask(0L, interval, punctuationType, punctuator); case PunctuationType.PROCESSING_TIME: - // align punctuation to now, punctuate after interval has elapsed - return ScheduleTask(DateTime.Now.GetMilliseconds(), interval, punctuationType, punctuator); + // Set lastTime to (now - interval) so that the first punctuation fires after 'interval' has elapsed. + // This is because CanExecute checks: now - lastTime >= interval + // With lastTime = now - interval, the first check becomes: now - (now - interval) >= interval = interval >= interval = true + // But we want it to fire AFTER interval, so we use now as the starting point + return ScheduleTask(WallClockTime, interval, punctuationType, punctuator); default: return null; } @@ -467,7 +492,7 @@ public void CompleteRestoration() public bool PunctuateSystemTime() { - long systemTime = DateTime.Now.GetMilliseconds(); + long systemTime = WallClockTime; bool punctuated = false; diff --git a/test/Streamiz.Kafka.Net.Tests/Private/TopologyBuildOrderBugTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/TopologyBuildOrderBugTests.cs new file mode 100644 index 00000000..9e30c5e7 --- /dev/null +++ b/test/Streamiz.Kafka.Net.Tests/Private/TopologyBuildOrderBugTests.cs @@ -0,0 +1,116 @@ +using System; +using NUnit.Framework; +using Streamiz.Kafka.Net.Crosscutting; +using Streamiz.Kafka.Net.Mock; +using Streamiz.Kafka.Net.SerDes; +using Streamiz.Kafka.Net.State; +using Streamiz.Kafka.Net.Stream; +using Streamiz.Kafka.Net.Table; + +namespace Streamiz.Kafka.Net.Tests.Private +{ + /// + /// Tests to verify that records flow correctly through topologies + /// that use Aggregate → ToStream → Repartition when AdvanceWallClockTime is used. + /// + [TestFixture] + public class TopologyBuildOrderBugTests + { + /// + /// Basic test: Aggregate → ToStream → Repartition should forward records. + /// + [Test] + public void Aggregate_ToStream_Repartition_Should_Forward_Records() + { + var builder = new StreamBuilder(); + + builder.Stream("input") + .GroupByKey() + .Aggregate( + () => 0, + (k, v, agg) => agg + v, + Materialized> + .Create() + .With()) + .ToStream() + .Repartition() + .To("output"); + + var config = new StreamConfig + { + ApplicationId = "test-topology-build-order-bug" + }; + + using var driver = new TopologyTestDriver(builder.Build(), config); + + var input = driver.CreateInputTopic("input"); + var output = driver.CreateOutputTopic("output"); + + // Send a record + input.PipeInput("key1", 100); + + // Advance time to trigger any buffered/punctuation-based processing + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(100)); + + // Read results - this should have our aggregated value + var results = output.ReadKeyValuesToMap(); + + Assert.That(results.ContainsKey("key1"), Is.True, + "Records should flow through Aggregate → ToStream → Repartition → To."); + Assert.That(results["key1"], Is.EqualTo(100)); + } + + /// + /// Test with multiple repartitions. + /// + [Test] + public void Multiple_Repartitions_Should_Forward_Records() + { + var builder = new StreamBuilder(); + + builder.Stream("input") + .GroupByKey() + .Aggregate( + () => 0, + (k, v, agg) => agg + v, + Materialized> + .Create() + .With()) + .ToStream() + .Repartition() // First repartition + .GroupByKey() + .Aggregate( + () => 0, + (k, v, agg) => agg + v, + Materialized> + .Create("second-agg-store") + .With()) + .ToStream() + .Repartition() // Second repartition + .To("output"); + + var config = new StreamConfig + { + ApplicationId = "test-multiple-repartitions" + }; + + using var driver = new TopologyTestDriver(builder.Build(), config); + + var input = driver.CreateInputTopic("input"); + var output = driver.CreateOutputTopic("output"); + + input.PipeInput("key1", 100); + + // Need multiple time advances to flush through multiple stages + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(100)); + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(100)); + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(100)); + + var results = output.ReadKeyValuesToMap(); + + Assert.That(results.ContainsKey("key1"), Is.True, + "Records should flow through multiple Aggregate → ToStream → Repartition stages."); + Assert.That(results["key1"], Is.EqualTo(100)); + } + } +} diff --git a/test/Streamiz.Kafka.Net.Tests/TestDriver/AdvanceWallClockTimeTests.cs b/test/Streamiz.Kafka.Net.Tests/TestDriver/AdvanceWallClockTimeTests.cs new file mode 100644 index 00000000..9c508fdc --- /dev/null +++ b/test/Streamiz.Kafka.Net.Tests/TestDriver/AdvanceWallClockTimeTests.cs @@ -0,0 +1,229 @@ +using System; +using System.Collections.Generic; +using NUnit.Framework; +using Streamiz.Kafka.Net.Mock; +using Streamiz.Kafka.Net.Processors.Public; +using Streamiz.Kafka.Net.SerDes; +using Streamiz.Kafka.Net.Stream; + +namespace Streamiz.Kafka.Net.Tests.TestDriver +{ + /// + /// Tests for the AdvanceWallClockTime functionality which allows testing + /// PROCESSING_TIME punctuations in the TopologyTestDriver. + /// + public class AdvanceWallClockTimeTests + { + private class ProcessingTimePunctuationProcessor : IProcessor + { + public static List PunctuationTimes { get; } = new(); + public static int PunctuationIntervalMs { get; set; } = 100; + + public void Init(ProcessorContext context) + { + context.Schedule( + TimeSpan.FromMilliseconds(PunctuationIntervalMs), + PunctuationType.PROCESSING_TIME, + timestamp => PunctuationTimes.Add(timestamp)); + } + + public void Process(Record record) + { + } + + public void Close() + { + } + } + + [SetUp] + public void Setup() + { + ProcessingTimePunctuationProcessor.PunctuationTimes.Clear(); + ProcessingTimePunctuationProcessor.PunctuationIntervalMs = 100; + } + + [Test] + public void Should_Trigger_Processing_Time_Punctuation_After_Advancing_Time() + { + // Arrange + var config = new StreamConfig + { + ApplicationId = "test-advance-wall-clock-time" + }; + + ProcessingTimePunctuationProcessor.PunctuationIntervalMs = 100; + + var builder = new StreamBuilder(); + builder.Stream("input") + .Process(new ProcessorBuilder() + .Processor() + .Build()); + + var topology = builder.Build(); + + using (var driver = new TopologyTestDriver(topology, config)) + { + var inputTopic = driver.CreateInputTopic("input"); + + // Act - pipe a record to initialize the processor and schedule the punctuation + inputTopic.PipeInput("key1", "value1"); + + // Assert - no punctuation yet (time hasn't advanced enough) + Assert.AreEqual(0, ProcessingTimePunctuationProcessor.PunctuationTimes.Count, + "No punctuation should have fired yet"); + + // Act - advance time past the punctuation interval + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(110)); + + // Assert - punctuation should have fired + Assert.AreEqual(1, ProcessingTimePunctuationProcessor.PunctuationTimes.Count, + "Punctuation should have fired once after advancing time"); + + // Act - advance time again + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(100)); + + // Assert - another punctuation + Assert.AreEqual(2, ProcessingTimePunctuationProcessor.PunctuationTimes.Count, + "Punctuation should have fired twice"); + } + } + + [Test] + public void Should_Not_Trigger_Punctuation_Before_Interval_Elapsed() + { + // Arrange + var config = new StreamConfig + { + ApplicationId = "test-no-early-punctuation" + }; + + ProcessingTimePunctuationProcessor.PunctuationIntervalMs = 500; + + var builder = new StreamBuilder(); + builder.Stream("input") + .Process(new ProcessorBuilder() + .Processor() + .Build()); + + var topology = builder.Build(); + + using (var driver = new TopologyTestDriver(topology, config)) + { + var inputTopic = driver.CreateInputTopic("input"); + + // Initialize the processor + inputTopic.PipeInput("key", "value"); + + // Advance time, but not past the interval + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(400)); + + // Assert - no punctuation should have fired + Assert.AreEqual(0, ProcessingTimePunctuationProcessor.PunctuationTimes.Count, + "Punctuation should not fire before interval elapsed"); + } + } + + [Test] + public void Should_Trigger_Multiple_Punctuations_For_Multiple_Intervals() + { + // Arrange + var config = new StreamConfig + { + ApplicationId = "test-multiple-punctuations" + }; + + ProcessingTimePunctuationProcessor.PunctuationIntervalMs = 100; + + var builder = new StreamBuilder(); + builder.Stream("input") + .Process(new ProcessorBuilder() + .Processor() + .Build()); + + var topology = builder.Build(); + + using (var driver = new TopologyTestDriver(topology, config)) + { + var inputTopic = driver.CreateInputTopic("input"); + inputTopic.PipeInput("key", "value"); + + // Advance time by 5 intervals worth + for (int i = 0; i < 5; i++) + { + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(110)); + } + + // Assert - should have multiple punctuations + Assert.AreEqual(5, ProcessingTimePunctuationProcessor.PunctuationTimes.Count, + "Should have 5 punctuations for 5 time advances"); + } + } + + [Test] + public void Should_Throw_NotSupportedException_For_ClusterInMemory_Mode() + { + // Arrange + var config = new StreamConfig + { + ApplicationId = "test-cluster-mode" + }; + + var builder = new StreamBuilder(); + builder.Stream("input") + .To("output"); + + var topology = builder.Build(); + + using (var driver = new TopologyTestDriver(topology, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY)) + { + // Act & Assert + Assert.Throws(() => + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(100))); + } + } + + [Test] + public void Should_Use_Mock_Time_For_Scheduling_New_Punctuations() + { + // This test verifies that after advancing time, new punctuations + // scheduled use the mock time, not real wall clock time. + + var config = new StreamConfig + { + ApplicationId = "test-mock-time-scheduling" + }; + + ProcessingTimePunctuationProcessor.PunctuationIntervalMs = 200; + + var builder = new StreamBuilder(); + builder.Stream("input") + .Process(new ProcessorBuilder() + .Processor() + .Build()); + + var topology = builder.Build(); + + using (var driver = new TopologyTestDriver(topology, config)) + { + var inputTopic = driver.CreateInputTopic("input"); + inputTopic.PipeInput("key", "value"); + + // Advance time and capture punctuation times + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(210)); + Assert.AreEqual(1, ProcessingTimePunctuationProcessor.PunctuationTimes.Count); + var firstPunctuationTime = ProcessingTimePunctuationProcessor.PunctuationTimes[0]; + + driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(210)); + Assert.AreEqual(2, ProcessingTimePunctuationProcessor.PunctuationTimes.Count); + var secondPunctuationTime = ProcessingTimePunctuationProcessor.PunctuationTimes[1]; + + // The difference between punctuation times should be approximately + // the interval (within the time we advanced) + var timeDifference = secondPunctuationTime - firstPunctuationTime; + Assert.GreaterOrEqual(timeDifference, 200, + "Time difference between punctuations should be at least the interval"); + } + } + } +} \ No newline at end of file