Skip to content

Add AdvanceWallClockTime support for testing PROCESSING_TIME punctuations#460

Merged
LGouellec merged 6 commits into
LGouellec:developfrom
chrismasters:feature/advance-wall-clock-time
Jan 21, 2026
Merged

Add AdvanceWallClockTime support for testing PROCESSING_TIME punctuations#460
LGouellec merged 6 commits into
LGouellec:developfrom
chrismasters:feature/advance-wall-clock-time

Conversation

@chrismasters

@chrismasters chrismasters commented Jan 12, 2026

Copy link
Copy Markdown
Contributor

This PR adds support for advancing wall-clock time in TopologyTestDriver, enabling proper unit testing of processors that use PROCESSING_TIME punctuations. This mirrors the Java Kafka Streams TopologyTestDriver.advanceWallClockTime API.

Previously, there was no way to control wall-clock time in tests, making it impossible to reliably test:

  • Buffering transformers that flush after a quiet period
  • Windowing operations based on processing time
  • Any custom processor using PROCESSING_TIME punctuations

Changes

Commit 1: Add AdvanceWallClockTime to TopologyTestDriver

  • Add IWallClockTimeProvider interface for time abstraction
  • Add MockWallClockTimeProvider for controllable time in tests
  • Add AdvanceWallClockTime(TimeSpan) method to TopologyTestDriver
  • Modify StreamTask to use injected time provider when available
  • Fix PROCESSING_TIME scheduling to correctly fire after interval

Commit 2: Expose WallClockTime through ProcessorContext

  • Add WallClockTime property to ProcessorContext so custom processors/transformers can access mock-aware time
  • Improve AdvanceWallClockTime to handle multi-stage topologies with repartitions (iterates until no more progress)

Usage

  using var driver = new TopologyTestDriver(topology, config);

  var input = driver.CreateInputTopic<string, int>("input");
  var output = driver.CreateOutputTopic<string, int>("output");

  input.PipeInput("key", 100);

  // Advance wall clock to trigger PROCESSING_TIME punctuations
  driver.AdvanceWallClockTime(TimeSpan.FromMilliseconds(500));

  var results = output.ReadKeyValuesToMap();

  Custom processors can access mock-aware time via:
  public Record<K, V> Process(Record<K, V> record)
  {
      long wallClock = context.WallClockTime;  // Mock-aware in tests
      // ...
  }

Testing

Added tests in TopologyBuildOrderBugTests.cs verifying records flow correctly through Aggregate > ToStream > Repartition topologies when using AdvanceWallClockTime.

…_TIME punctuations

This change adds support for advancing wall-clock time in TopologyTestDriver,
enabling proper unit testing of processors that use PROCESSING_TIME punctuations.

Previously, there was no way to control wall-clock time in tests, making it
impossible to reliably test buffering, windowing, and other time-sensitive
operations that use PROCESSING_TIME.

Changes:
- Add IWallClockTimeProvider interface for time abstraction
- Add MockWallClockTimeProvider for controllable time in tests
- Add AdvanceWallClockTime method to TopologyTestDriver
- Modify StreamTask to use injected time provider when available
- Fix PROCESSING_TIME scheduling to correctly fire after interval
  (was using startTime = now + interval, changed to startTime = now)

This mirrors the Java Kafka Streams TopologyTestDriver.advanceWallClockTime API.
… topology support

Building on the AdvanceWallClockTime feature, this change:

1. Exposes WallClockTime as a property on ProcessorContext, allowing processors
   and transformers to access mock-aware wall clock time. This enables custom
   processors that need wall clock time to work correctly with TopologyTestDriver.

2. Improves AdvanceWallClockTime to properly handle multi-stage topologies with
   repartitions. The implementation now iterates until no more progress is made,
   ensuring records flow through all stages including downstream buffers that
   may have their own PROCESSING_TIME punctuations.

Changes:
- Add GetWallClockTime() abstract method to AbstractTask
- Add WallClockTime property to ProcessorContext and ProcessorContext<K,V>
- Implement GetWallClockTime in StreamTask (uses injected provider) and
  ExternalStreamTask (uses real time)
- Enhance TaskSynchronousTopologyDriver.AdvanceWallClockTime to handle
  internal communication topics and iterate until stable

@LGouellec LGouellec left a comment

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments, but LGTM as soon as the comments is fixed

Comment thread core/Mock/MockWallClockTimeProvider.cs Outdated
_currentTimeMs = DateTime.Now.GetMilliseconds();
}

public long GetWallClockTime() => _currentTimeMs;

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be a property

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorted

forwardedThisPass = false;
foreach (var topic in internalTopics)
{
if (ForwardTopicRecordsIfAny(topic))

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why it's necessary. When you Input a new message, by default the TopologyTestDriver will forward the message downstream in the repartition topic. Why you need to do again when you advance the time ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is required in the case where records may be produced to internal topics during punctuation (not just during the initial PipeInput)

Consider this scenario with a buffering transformer (which was what motivated this feature):

  1. PipeInput("key", "value") --> record flows through topology --> hits a BufferByTimeTransformer --> buffered, not forwarded downstream yet
  2. AdvanceWallClockTime(500ms) --> PROCESSING_TIME punctuation fires --> buffered records are now forwarded to a repartition topic
  3. These records weren't in the repartition topic when the original Flushed handlers ran during step 1

Without internal topic forwarding in AdvanceWallClockTime, records produced to repartition topics during punctuation would never reach downstream tasks. The existing Flushed event handlers only trigger when PipeInput completes - they don't know about records produced later by punctuations.

If there is a cleaner way to hook into the existing forwarding mechanism, I'd be happy to refactor but the core requirement is to have any records produced to internal topics during AdvanceWallClockTime flow downstream.

Does that make sense?

Comment thread core/Processors/AbstractTask.cs Outdated
Comment thread core/Processors/StreamTask.cs Outdated

@LGouellec LGouellec left a comment

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just a minor change about the property getter/setter and I'll approve it sooner

/// Gets the current wall-clock time in milliseconds since epoch.
/// </summary>
long GetWallClockTime();
long WallClockTime { get; }

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be

long WallClockTime { get; private set; }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is an interface, you can't put private members on it... by definition its the public contract.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you agree?

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap you are right, I didn't point the correct file.

long WallClockTime { get; private set; }
should be in MockWallClockTimeProvider

@chrismasters chrismasters Jan 19, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I don't think I understand what you're asking. In MockWallClockTimeProvider, WallClockTime is

public long WallClockTime => _currentTimeMs;

You prefered it to be a gettable property over the GetWallClockTime() that it was originally. This is implicitly get only, since the implementation just returns _currentTimeMs.

Would you prefer that it to be more verbose like this?

        public long WallClockTime
        {
            get { return _currentTimeMs; }
            private set { }
        }

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    public long WallClockTime
    {
        get;
        private set;
    }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have removed the member variable and converted the expression body syntax to make it private settable

@chrismasters

Copy link
Copy Markdown
Contributor Author

All good to go then? Looks like you resolved any merge conflict with the pull?

@LGouellec LGouellec merged commit 6522b01 into LGouellec:develop Jan 21, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants