From 267def61391719efac9c39bf1f13f711187652f0 Mon Sep 17 00:00:00 2001 From: mittalprince Date: Wed, 20 May 2026 21:07:00 +0530 Subject: [PATCH 1/2] Set DEFAULT_NEW_STREAM_GRACE_PERIOD_MS to 0 Disables the SLA grace period by default so SLA metrics are emitted immediately for all new streams without a warmup window. --- .../main/java/com/linkedin/datastream/server/EventProducer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java b/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java index a89b72850..a725ae2e1 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java @@ -94,7 +94,7 @@ public class EventProducer implements DatastreamEventProducer { private static final String NUM_EVENTS_OUTSIDE_ALT_SLA_LOG_ENABLED = "numEventsOutsideAltSlaLogEnabled"; private static final String NUM_EVENTS_OUTSIDE_ALT_SLA_LOG_FREQUENCY_MS = "numEventsOutsideAltSlaFrequencyMs"; private static final String NEW_STREAM_GRACE_PERIOD_MS = "newStreamGracePeriodMs"; - private static final String DEFAULT_NEW_STREAM_GRACE_PERIOD_MS = "7200000"; // 2 hours + private static final String DEFAULT_NEW_STREAM_GRACE_PERIOD_MS = "0"; private static final String EVENTS_PRODUCED_OUTSIDE_SLA = "eventsProducedOutsideSla"; private static final String EVENTS_PRODUCED_OUTSIDE_ALTERNATE_SLA = "eventsProducedOutsideAlternateSla"; private static final String DROPPED_SENT_FROM_SERIALIZATION_ERROR = "droppedSentFromSerializationError"; From e847e9ee03511a32279dd8639bf5e5bc19b6027d Mon Sep 17 00:00:00 2001 From: pmittal Date: Wed, 20 May 2026 22:56:36 +0530 Subject: [PATCH 2/2] Fix TestEventProducer tests broken by DEFAULT_NEW_STREAM_GRACE_PERIOD_MS=0 Tests that assert SLA suppression during grace period were relying on the 2h default. Now that the default is 0, pass an explicit 2h grace period via props so they continue to test the intended behaviour. --- .../datastream/server/TestEventProducer.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/TestEventProducer.java b/datastream-server/src/test/java/com/linkedin/datastream/server/TestEventProducer.java index 0ff0cd4d8..d73b82e33 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/TestEventProducer.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/TestEventProducer.java @@ -286,7 +286,9 @@ public void testSlaGraceActiveForNewCdcStream() { datastream.getSource().setConnectionString("mysql:/myhost/testDatabase/myTable"); datastream.getMetadata().put(DatastreamMetadataConstants.CREATION_MS, String.valueOf(System.currentTimeMillis())); - sendOneEventThroughProducer(datastream, new Properties()); + Properties props = new Properties(); + props.put("newStreamGracePeriodMs", "7200000"); // explicit 2h grace period + sendOneEventThroughProducer(datastream, props); DynamicMetricsManager metrics = DynamicMetricsManager.getInstance(); Assert.assertNull(metrics.getMetric(SLA_WITHIN_AGG), @@ -365,8 +367,10 @@ public void testLatencyHistogramRedirectedToSlaIneligibleDuringGracePeriod() { datastream.getMetadata().put(DatastreamMetadataConstants.CREATION_MS, String.valueOf(System.currentTimeMillis())); + Properties props = new Properties(); + props.put("newStreamGracePeriodMs", "7200000"); // explicit 2h grace period String someTopicName = "graceLatencyTopic"; - sendOneEventThroughProducer(datastream, new Properties(), someTopicName); + sendOneEventThroughProducer(datastream, props, someTopicName); DynamicMetricsManager metrics = DynamicMetricsManager.getInstance(); Assert.assertNull( @@ -455,8 +459,10 @@ public void testSlaGraceDedupedTaskAllStreamsNew() { newDsB.getSource().setConnectionString("mysql:/myhost/testDatabase/myTable"); newDsB.getMetadata().put(DatastreamMetadataConstants.CREATION_MS, String.valueOf(now - 60_000L)); + Properties props = new Properties(); + props.put("newStreamGracePeriodMs", "7200000"); // explicit 2h grace period DatastreamTaskImpl task = new DatastreamTaskImpl(Arrays.asList(newDsA, newDsB)); - sendOneEventThroughTask(task, new Properties(), "someTopicName"); + sendOneEventThroughTask(task, props, "someTopicName"); DynamicMetricsManager metrics = DynamicMetricsManager.getInstance(); Assert.assertNull(metrics.getMetric(SLA_WITHIN_AGG),