diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java b/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java index a89b72850..a725ae2e1 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java @@ -94,7 +94,7 @@ public class EventProducer implements DatastreamEventProducer { private static final String NUM_EVENTS_OUTSIDE_ALT_SLA_LOG_ENABLED = "numEventsOutsideAltSlaLogEnabled"; private static final String NUM_EVENTS_OUTSIDE_ALT_SLA_LOG_FREQUENCY_MS = "numEventsOutsideAltSlaFrequencyMs"; private static final String NEW_STREAM_GRACE_PERIOD_MS = "newStreamGracePeriodMs"; - private static final String DEFAULT_NEW_STREAM_GRACE_PERIOD_MS = "7200000"; // 2 hours + private static final String DEFAULT_NEW_STREAM_GRACE_PERIOD_MS = "0"; private static final String EVENTS_PRODUCED_OUTSIDE_SLA = "eventsProducedOutsideSla"; private static final String EVENTS_PRODUCED_OUTSIDE_ALTERNATE_SLA = "eventsProducedOutsideAlternateSla"; private static final String DROPPED_SENT_FROM_SERIALIZATION_ERROR = "droppedSentFromSerializationError"; diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/TestEventProducer.java b/datastream-server/src/test/java/com/linkedin/datastream/server/TestEventProducer.java index 0ff0cd4d8..d73b82e33 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/TestEventProducer.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/TestEventProducer.java @@ -286,7 +286,9 @@ public void testSlaGraceActiveForNewCdcStream() { datastream.getSource().setConnectionString("mysql:/myhost/testDatabase/myTable"); datastream.getMetadata().put(DatastreamMetadataConstants.CREATION_MS, String.valueOf(System.currentTimeMillis())); - sendOneEventThroughProducer(datastream, new Properties()); + Properties props = new Properties(); + props.put("newStreamGracePeriodMs", "7200000"); // explicit 2h grace period + sendOneEventThroughProducer(datastream, props); DynamicMetricsManager metrics = DynamicMetricsManager.getInstance(); Assert.assertNull(metrics.getMetric(SLA_WITHIN_AGG), @@ -365,8 +367,10 @@ public void testLatencyHistogramRedirectedToSlaIneligibleDuringGracePeriod() { datastream.getMetadata().put(DatastreamMetadataConstants.CREATION_MS, String.valueOf(System.currentTimeMillis())); + Properties props = new Properties(); + props.put("newStreamGracePeriodMs", "7200000"); // explicit 2h grace period String someTopicName = "graceLatencyTopic"; - sendOneEventThroughProducer(datastream, new Properties(), someTopicName); + sendOneEventThroughProducer(datastream, props, someTopicName); DynamicMetricsManager metrics = DynamicMetricsManager.getInstance(); Assert.assertNull( @@ -455,8 +459,10 @@ public void testSlaGraceDedupedTaskAllStreamsNew() { newDsB.getSource().setConnectionString("mysql:/myhost/testDatabase/myTable"); newDsB.getMetadata().put(DatastreamMetadataConstants.CREATION_MS, String.valueOf(now - 60_000L)); + Properties props = new Properties(); + props.put("newStreamGracePeriodMs", "7200000"); // explicit 2h grace period DatastreamTaskImpl task = new DatastreamTaskImpl(Arrays.asList(newDsA, newDsB)); - sendOneEventThroughTask(task, new Properties(), "someTopicName"); + sendOneEventThroughTask(task, props, "someTopicName"); DynamicMetricsManager metrics = DynamicMetricsManager.getInstance(); Assert.assertNull(metrics.getMetric(SLA_WITHIN_AGG),