Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public class EventProducer implements DatastreamEventProducer {
private static final String NUM_EVENTS_OUTSIDE_ALT_SLA_LOG_ENABLED = "numEventsOutsideAltSlaLogEnabled";
private static final String NUM_EVENTS_OUTSIDE_ALT_SLA_LOG_FREQUENCY_MS = "numEventsOutsideAltSlaFrequencyMs";
private static final String NEW_STREAM_GRACE_PERIOD_MS = "newStreamGracePeriodMs";
private static final String DEFAULT_NEW_STREAM_GRACE_PERIOD_MS = "7200000"; // 2 hours
private static final String DEFAULT_NEW_STREAM_GRACE_PERIOD_MS = "0";
private static final String EVENTS_PRODUCED_OUTSIDE_SLA = "eventsProducedOutsideSla";
private static final String EVENTS_PRODUCED_OUTSIDE_ALTERNATE_SLA = "eventsProducedOutsideAlternateSla";
private static final String DROPPED_SENT_FROM_SERIALIZATION_ERROR = "droppedSentFromSerializationError";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,9 @@ public void testSlaGraceActiveForNewCdcStream() {
datastream.getSource().setConnectionString("mysql:/myhost/testDatabase/myTable");
datastream.getMetadata().put(DatastreamMetadataConstants.CREATION_MS,
String.valueOf(System.currentTimeMillis()));
sendOneEventThroughProducer(datastream, new Properties());
Properties props = new Properties();
props.put("newStreamGracePeriodMs", "7200000"); // explicit 2h grace period
sendOneEventThroughProducer(datastream, props);

DynamicMetricsManager metrics = DynamicMetricsManager.getInstance();
Assert.assertNull(metrics.getMetric(SLA_WITHIN_AGG),
Expand Down Expand Up @@ -365,8 +367,10 @@ public void testLatencyHistogramRedirectedToSlaIneligibleDuringGracePeriod() {
datastream.getMetadata().put(DatastreamMetadataConstants.CREATION_MS,
String.valueOf(System.currentTimeMillis()));

Properties props = new Properties();
props.put("newStreamGracePeriodMs", "7200000"); // explicit 2h grace period
String someTopicName = "graceLatencyTopic";
sendOneEventThroughProducer(datastream, new Properties(), someTopicName);
sendOneEventThroughProducer(datastream, props, someTopicName);

DynamicMetricsManager metrics = DynamicMetricsManager.getInstance();
Assert.assertNull(
Expand Down Expand Up @@ -455,8 +459,10 @@ public void testSlaGraceDedupedTaskAllStreamsNew() {
newDsB.getSource().setConnectionString("mysql:/myhost/testDatabase/myTable");
newDsB.getMetadata().put(DatastreamMetadataConstants.CREATION_MS, String.valueOf(now - 60_000L));

Properties props = new Properties();
props.put("newStreamGracePeriodMs", "7200000"); // explicit 2h grace period
DatastreamTaskImpl task = new DatastreamTaskImpl(Arrays.asList(newDsA, newDsB));
sendOneEventThroughTask(task, new Properties(), "someTopicName");
sendOneEventThroughTask(task, props, "someTopicName");

DynamicMetricsManager metrics = DynamicMetricsManager.getInstance();
Assert.assertNull(metrics.getMetric(SLA_WITHIN_AGG),
Expand Down
Loading