Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .DS_Store
Binary file not shown.
10 changes: 10 additions & 0 deletions core/Mock/ClusterInMemoryTopologyDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,16 @@ public void TriggerCommit()
//((StreamThread)threadTopology)?.Manager.CommitAll();
}

public void AdvanceWallClockTime(TimeSpan advance)
{
// Wall-clock time advancement is not supported in the cluster in-memory driver
// because it uses real Kafka with threads that use actual system time.
// Use TaskSynchronousTopologyDriver instead for testing PROCESSING_TIME punctuations.
throw new NotSupportedException(
"AdvanceWallClockTime is not supported in ClusterInMemoryTopologyDriver. " +
"Use the synchronous topology driver (default mode) instead for testing PROCESSING_TIME punctuations.");
}

#endregion
}
}
7 changes: 7 additions & 0 deletions core/Mock/IBehaviorTopologyTestDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,12 @@ internal interface IBehaviorTopologyTestDriver : IDisposable
TestMultiInputTopic<K, V> CreateMultiInputTopic<K, V>(string[] topics, ISerDes<K> keySerdes = null, ISerDes<V> valueSerdes = null);
IStateStore GetStateStore<K, V>(string name);
void TriggerCommit();

/// <summary>
/// Advance wall-clock time by the specified duration, triggering any PROCESSING_TIME punctuations
/// that are scheduled to fire during the advanced time period.
/// </summary>
/// <param name="advance">The amount of time to advance the wall clock</param>
void AdvanceWallClockTime(TimeSpan advance);
}
}
22 changes: 22 additions & 0 deletions core/Mock/IWallClockTimeProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System;

namespace Streamiz.Kafka.Net.Mock
{
/// <summary>
/// Interface for providing wall-clock time. Used to allow time mocking in tests.
/// </summary>
internal interface IWallClockTimeProvider
{
/// <summary>
/// Gets the current wall-clock time in milliseconds since epoch.
/// </summary>
long WallClockTime { get; }

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be

long WallClockTime { get; private set; }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is an interface, you can't put private members on it... by definition its the public contract.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you agree?

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap you are right, I didn't point the correct file.

long WallClockTime { get; private set; }
should be in MockWallClockTimeProvider

@chrismasters chrismasters Jan 19, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I don't think I understand what you're asking. In MockWallClockTimeProvider, WallClockTime is

public long WallClockTime => _currentTimeMs;

You prefered it to be a gettable property over the GetWallClockTime() that it was originally. This is implicitly get only, since the implementation just returns _currentTimeMs.

Would you prefer that it to be more verbose like this?

        public long WallClockTime
        {
            get { return _currentTimeMs; }
            private set { }
        }

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    public long WallClockTime
    {
        get;
        private set;
    }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have removed the member variable and converted the expression body syntax to make it private settable


/// <summary>
/// Advances the wall-clock time by the specified duration.
/// Only applicable for mock implementations; production implementations may ignore this.
/// </summary>
/// <param name="advance">The amount of time to advance</param>
void Advance(TimeSpan advance);
}
}
24 changes: 24 additions & 0 deletions core/Mock/MockWallClockTimeProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using Streamiz.Kafka.Net.Crosscutting;

namespace Streamiz.Kafka.Net.Mock
{
/// <summary>
/// A mock wall-clock time provider for testing that starts at the current time
/// and allows manual advancement.
/// </summary>
internal class MockWallClockTimeProvider : IWallClockTimeProvider
{
public MockWallClockTimeProvider()
{
WallClockTime = DateTime.Now.GetMilliseconds();
}

public long WallClockTime { get; private set; }

public void Advance(TimeSpan advance)
{
WallClockTime += (long)advance.TotalMilliseconds;
}
}
}
5 changes: 3 additions & 2 deletions core/Mock/Sync/SyncPipeBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ public void PublishRecord(string topic, byte[] key, byte[] value, DateTime times

public void Flush()
{
long now = DateTime.Now.GetMilliseconds();
// Use the task's wall clock time which is mock-aware
long now = task.WallClockTime;
TaskManager.CurrentTask = task;
while (task.CanProcess(now))
task.Process();

task.PunctuateStreamTime();
task.PunctuateSystemTime();
TaskManager.CurrentTask = null;
Expand Down
11 changes: 11 additions & 0 deletions core/Mock/Sync/SyncProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,17 @@ public IEnumerable<Message<byte[], byte[]>> GetHistory(string topicName)
return topics[topicName].ToArray();
}

public IDictionary<string, int> GetAllTopicCounts()
{
lock (_lock)
{
var result = new Dictionary<string, int>();
foreach (var kvp in topics)
result[kvp.Key] = kvp.Value.Count;
return result;
}
}

public void CommitTransaction()
{
throw new NotImplementedException();
Expand Down
24 changes: 24 additions & 0 deletions core/Mock/SystemWallClockTimeProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using Streamiz.Kafka.Net.Crosscutting;

namespace Streamiz.Kafka.Net.Mock
{
/// <summary>
/// Default wall-clock time provider that returns the actual system time.
/// Used in production when no mock time provider is injected.
/// </summary>
internal class SystemWallClockTimeProvider : IWallClockTimeProvider
{
/// <summary>
/// Singleton instance for shared use.
/// </summary>
public static readonly SystemWallClockTimeProvider Instance = new();

public long WallClockTime => DateTime.Now.GetMilliseconds();

/// <summary>
/// No-op for system time provider. Time advances naturally.
/// </summary>
public void Advance(TimeSpan advance) { }
}
}
125 changes: 120 additions & 5 deletions core/Mock/TaskSynchronousTopologyDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@ internal sealed class TaskSynchronousTopologyDriver : IBehaviorTopologyTestDrive
private readonly IDictionary<TaskId, StreamTask> tasks = new Dictionary<TaskId, StreamTask>();
private readonly GlobalStateUpdateTask globalTask;
private readonly IDictionary<string, ExternalProcessorTopologyExecutor> externalProcessorTopologies =
new Dictionary<string, ExternalProcessorTopologyExecutor>();
new Dictionary<string, ExternalProcessorTopologyExecutor>();
private readonly GlobalProcessorContext globalProcessorContext;
private readonly StreamMetricsRegistry metricsRegistry;

private readonly IDictionary<TaskId, IList<TopicPartition>> partitionsByTaskId =
new Dictionary<TaskId, IList<TopicPartition>>();

private readonly StreamsProducer producer = null;
private readonly bool hasGlobalTopology = false;
private ITopicManager internalTopicManager;
private readonly IConsumer<byte[],byte[]> repartitionConsumerForwarder;
private readonly MockWallClockTimeProvider wallClockTimeProvider;

public bool IsRunning { get; private set; }

Expand Down Expand Up @@ -113,6 +114,9 @@ public TaskSynchronousTopologyDriver(string clientId, InternalTopologyBuilder to

repartitionConsumerForwarder = this.supplier.GetConsumer(topicConfiguration.ToConsumerConfig("consumer-repartition-forwarder"),
null);

// Initialize mock wall clock time provider for controlling time in tests
wallClockTimeProvider = new MockWallClockTimeProvider();
}

internal StreamTask GetTask(string topicName)
Expand All @@ -125,6 +129,13 @@ internal StreamTask GetTask(string topicName)
{
if (builder.GetSourceTopics().Contains(topicName))
{
// Make sure partitionsByTaskId has this task's partitions
if (!partitionsByTaskId.ContainsKey(id))
{
var part = new TopicPartition(topicName, 0);
partitionsByTaskId.Add(id, new List<TopicPartition> {part});
}

task = new StreamTask("thread-0",
id,
partitionsByTaskId[id],
Expand All @@ -134,7 +145,8 @@ internal StreamTask GetTask(string topicName)
supplier,
producer,
new MockChangelogRegister(),
metricsRegistry);
metricsRegistry,
wallClockTimeProvider);
task.InitializeStateStores();
task.InitializeTopology();
task.RestorationIfNeeded();
Expand Down Expand Up @@ -163,36 +175,68 @@ private void InitializeInternalTopicManager()
}

private void ForwardRepartitionTopic(string topic)
{
ForwardTopicRecordsIfAny(topic);
}

/// <summary>
/// Forwards any pending records from an internal topic to downstream tasks.
/// Returns true if any records were forwarded, false otherwise.
/// </summary>
private bool ForwardTopicRecordsIfAny(string topic)
{
var records = new List<ConsumeResult<byte[], byte[]>>();

repartitionConsumerForwarder.Subscribe(topic);

ConsumeResult<byte[], byte[]> record = null;
do
{
record = repartitionConsumerForwarder.Consume();
if (record != null)
{
records.Add(record);
}
} while (record != null);


if (records.Any())
{
var task = GetTask(topic);

var pipe = CreateBuilder(topic).Input(topic, configuration);
foreach(var r in records)
pipe.Pipe(r.Message.Key, r.Message.Value, r.Message.Timestamp.UnixTimestampMs.FromMilliseconds(), r.Message.Headers);
pipe.Flush();
pipe.Dispose();

repartitionConsumerForwarder.Commit(records.Last());
repartitionConsumerForwarder.Unsubscribe();
return true;
}

repartitionConsumerForwarder.Unsubscribe();
return false;
}

/// <summary>
/// Gets all topics that are used for internal communication within the topology.
/// These are topics that are both written to (sink) and read from (source) within the topology.
/// </summary>
private HashSet<string> GetAllInternalCommunicationTopics()
{
var topicGroups = builder.MakeInternalTopicGroups();
var allSinks = topicGroups.Values.SelectMany(t => t.SinkTopics).ToHashSet();
var allSources = topicGroups.Values.SelectMany(t => t.SourceTopics).ToHashSet();

// Internal topics are those that are both written and read within the topology
allSinks.IntersectWith(allSources);
return allSinks;
}

private SyncPipeBuilder CreateBuilder(string topicName)
{
var task = GetTask(topicName);

if (task == null)
{
if(hasGlobalTopology && builder.GetGlobalTopics().Contains(topicName))
Expand Down Expand Up @@ -319,6 +363,77 @@ public void TriggerCommit()
globalTask?.FlushState();
}

public void AdvanceWallClockTime(TimeSpan advance)
{
// Advance the mock wall clock once at the beginning
wallClockTimeProvider.Advance(advance);

// Get all topics used for internal communication within the topology
var internalTopics = GetAllInternalCommunicationTopics();

// Process until no more progress (like Java's completeAllProcessableWork)
// We advance time incrementally inside the loop to allow downstream buffers to flush.
// This is needed because buffers schedule punctuations relative to current time,
// so buffers created during forwarding need additional time to flush.
bool madeProgress;
int iterations = 0;
const int maxIterations = 100; // Prevent infinite loops
do
{
madeProgress = false;
iterations++;

// 1. FIRST forward records from all internal topics to ensure buffers have data
// Keep forwarding until no more records found in any topic
bool forwardedAny = false;
bool forwardedThisPass;
do
{
forwardedThisPass = false;
foreach (var topic in internalTopics)
{
if (ForwardTopicRecordsIfAny(topic))

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why it's necessary. When you Input a new message, by default the TopologyTestDriver will forward the message downstream in the repartition topic. Why you need to do again when you advance the time ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is required in the case where records may be produced to internal topics during punctuation (not just during the initial PipeInput)

Consider this scenario with a buffering transformer (which was what motivated this feature):

  1. PipeInput("key", "value") --> record flows through topology --> hits a BufferByTimeTransformer --> buffered, not forwarded downstream yet
  2. AdvanceWallClockTime(500ms) --> PROCESSING_TIME punctuation fires --> buffered records are now forwarded to a repartition topic
  3. These records weren't in the repartition topic when the original Flushed handlers ran during step 1

Without internal topic forwarding in AdvanceWallClockTime, records produced to repartition topics during punctuation would never reach downstream tasks. The existing Flushed event handlers only trigger when PipeInput completes - they don't know about records produced later by punctuations.

If there is a cleaner way to hook into the existing forwarding mechanism, I'd be happy to refactor but the core requirement is to have any records produced to internal topics during AdvanceWallClockTime flow downstream.

Does that make sense?

{
madeProgress = true;
forwardedThisPass = true;
forwardedAny = true;
}
}
} while (forwardedThisPass);

// 2. THEN trigger PROCESSING_TIME punctuations for all tasks (including newly created ones)
bool punctuatedAny = false;
foreach (var task in tasks.Values)
{
if (task.PunctuateSystemTime())
{
madeProgress = true;
punctuatedAny = true;
}
}

// 3. If we forwarded records but no punctuations fired, we need to advance
// time significantly so downstream buffers can flush. Use the full advance
// amount since buffer punctuations may be scheduled far apart.
// If punctuations fired or no records were forwarded, use a smaller increment.
if (madeProgress)
{
if (forwardedAny && !punctuatedAny)
{
// Records went to downstream buffers but their punctuations didn't fire yet
// Advance by full amount to ensure downstream punctuations can fire
wallClockTimeProvider.Advance(advance);
}
else
{
// Normal progress - advance by small amount
wallClockTimeProvider.Advance(TimeSpan.FromMilliseconds(advance.TotalMilliseconds / 10));
}
}

} while (madeProgress && iterations < maxIterations);
}

#endregion
}
}
39 changes: 39 additions & 0 deletions core/Mock/TopologyTestDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -405,5 +405,44 @@ public IReadOnlyWindowStore<K, V> GetWindowStore<K, V>(string name)
public bool IsError => behavior.IsError;

#endregion

#region Time Advancement

/// <summary>
/// Advance wall-clock time by the specified duration, triggering any <see cref="Processors.Public.PunctuationType.PROCESSING_TIME"/>
/// punctuations that are scheduled to fire during the advanced time period.
/// <para>
/// This is analogous to <a href="https://kafka.apache.org/32/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#advanceWallClockTime(java.time.Duration)">
/// Java Kafka Streams TopologyTestDriver.advanceWallClockTime</a>.
/// </para>
/// <para>
/// This method is only supported when using <see cref="Mode.SYNC_TASK"/> (the default mode).
/// Using this method with <see cref="Mode.ASYNC_CLUSTER_IN_MEMORY"/> will throw <see cref="NotSupportedException"/>.
/// </para>
/// </summary>
/// <param name="advance">The amount of time to advance the wall clock</param>
/// <exception cref="NotSupportedException">When used with <see cref="Mode.ASYNC_CLUSTER_IN_MEMORY"/></exception>
/// <example>
/// <code>
/// using (var driver = new TopologyTestDriver(topology, config))
/// {
/// var inputTopic = driver.CreateInputTopic&lt;string, string&gt;("input");
/// var outputTopic = driver.CreateOutputTopic&lt;string, string&gt;("output");
///
/// inputTopic.PipeInput("key", "value");
///
/// // Advance time to trigger PROCESSING_TIME punctuations
/// driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(500));
///
/// var result = outputTopic.ReadKeyValue();
/// }
/// </code>
/// </example>
public void AdvanceWallClockTime(TimeSpan advance)
{
behavior.AdvanceWallClockTime(advance);
}

#endregion
}
}
Loading
Loading