diff --git a/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamProducerRecord.java b/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamProducerRecord.java index 4263e193e..40b0df997 100644 --- a/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamProducerRecord.java +++ b/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamProducerRecord.java @@ -35,6 +35,11 @@ public class DatastreamProducerRecord { // timestamp for the events obtained from kafka header private Optional _eventsKafkaHeaderTimestamp = Optional.empty(); + // Epoch-millis of the source DB commit for this event, when the connector can supply one (CDC connectors only). + // Distinct from _eventsSourceTimestamp, which on some connectors reflects the time Brooklin read the event from an + // intermediate hop rather than the original DB commit. + private Optional _eventsCommitTimestamp = Optional.empty(); + DatastreamProducerRecord(List events, Optional partition, Optional partitionKey, String checkpoint, long eventsSourceTimestamp) { this(events, partition, partitionKey, Optional.empty(), checkpoint, eventsSourceTimestamp, false); @@ -117,6 +122,21 @@ public void setEventsKafkaHeaderTimestamp(long timestamp) { _eventsKafkaHeaderTimestamp = Optional.of(timestamp); } + /** + * Get the source DB commit timestamp (Epoch-millis) if the connector supplied one. Present for CDC connectors + * that surface a true commit time; absent otherwise. + */ + public Optional getEventsCommitTimestamp() { + return _eventsCommitTimestamp; + } + + /** + * Set the source DB commit timestamp (Epoch-millis). + */ + public void setEventsCommitTimestamp(long timestamp) { + _eventsCommitTimestamp = Optional.of(timestamp); + } + /** * Get destination partition within the destination */ diff --git a/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamProducerRecordBuilder.java b/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamProducerRecordBuilder.java index 7bb971e1f..cad10c040 100644 --- a/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamProducerRecordBuilder.java +++ b/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamProducerRecordBuilder.java @@ -31,6 +31,7 @@ public class DatastreamProducerRecordBuilder { private Optional _partitionKey = Optional.empty(); private Optional _destination = Optional.empty(); private boolean _isBroadcastRecord = false; + private Optional _eventsCommitTimestamp = Optional.empty(); /** * Partition to which this DatastreamProducerRecord should be produced. If the partition is not set, TransportProvider @@ -89,14 +90,24 @@ public void setIsBroadcastRecord(boolean isBroadcastRecord) { _isBroadcastRecord = isBroadcastRecord; } + /** + * Set the source DB commit timestamp (Epoch-millis). CDC connectors that can supply a true commit time + * should call this; non-CDC connectors should leave it absent. + */ + public void setEventsCommitTimestamp(long eventsCommitTimestamp) { + _eventsCommitTimestamp = Optional.of(eventsCommitTimestamp); + } + /** * Build the DatastreamProducerRecord. * @return * DatastreamProducerRecord that is created. */ public DatastreamProducerRecord build() { - return new DatastreamProducerRecord(_events, _partition, _partitionKey, _destination, _sourceCheckpoint, - _eventsSourceTimestamp, _isBroadcastRecord); + DatastreamProducerRecord record = new DatastreamProducerRecord(_events, _partition, _partitionKey, _destination, + _sourceCheckpoint, _eventsSourceTimestamp, _isBroadcastRecord); + _eventsCommitTimestamp.ifPresent(record::setEventsCommitTimestamp); + return record; } /** @@ -119,6 +130,7 @@ public static DatastreamProducerRecord copyProducerRecord(DatastreamProducerReco builder.setSourceCheckpoint(record.getCheckpoint()); builder.setEventsSourceTimestamp(record.getEventsSourceTimestamp()); builder.setIsBroadcastRecord(record.isBroadcastRecord()); + record.getEventsCommitTimestamp().ifPresent(builder::setEventsCommitTimestamp); return builder.build(); } } diff --git a/datastream-server-api/src/test/java/com/linkedin/datastream/server/TestDatastreamProducerRecordBuilder.java b/datastream-server-api/src/test/java/com/linkedin/datastream/server/TestDatastreamProducerRecordBuilder.java index 02c3e55ee..550fd34cf 100644 --- a/datastream-server-api/src/test/java/com/linkedin/datastream/server/TestDatastreamProducerRecordBuilder.java +++ b/datastream-server-api/src/test/java/com/linkedin/datastream/server/TestDatastreamProducerRecordBuilder.java @@ -75,6 +75,47 @@ public void testBuilderThrowsWhenEventsTimestampMissing() { builder.build(); } + @Test + public void testCommitTimestampAbsentByDefault() { + DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder(); + builder.addEvent(createDatastreamEvent()); + builder.setEventsSourceTimestamp(System.currentTimeMillis()); + Assert.assertFalse(builder.build().getEventsCommitTimestamp().isPresent()); + } + + @Test + public void testCommitTimestampRoundTripsThroughBuilder() { + long commitTs = 1700000000000L; + DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder(); + builder.addEvent(createDatastreamEvent()); + builder.setEventsSourceTimestamp(System.currentTimeMillis()); + builder.setEventsCommitTimestamp(commitTs); + DatastreamProducerRecord record = builder.build(); + Assert.assertTrue(record.getEventsCommitTimestamp().isPresent()); + Assert.assertEquals(record.getEventsCommitTimestamp().get().longValue(), commitTs); + } + + @Test + public void testCopyProducerRecordPropagatesCommitTimestamp() { + long commitTs = 1700000000000L; + DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder(); + builder.addEvent(createDatastreamEvent()); + builder.setEventsSourceTimestamp(System.currentTimeMillis()); + builder.setEventsCommitTimestamp(commitTs); + DatastreamProducerRecord original = builder.build(); + DatastreamProducerRecord copy = DatastreamProducerRecordBuilder.copyProducerRecord(original, 0); + Assert.assertEquals(copy.getEventsCommitTimestamp().orElse(-1L).longValue(), commitTs); + } + + @Test + public void testCopyProducerRecordWhenCommitTimestampAbsent() { + DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder(); + builder.addEvent(createDatastreamEvent()); + builder.setEventsSourceTimestamp(System.currentTimeMillis()); + DatastreamProducerRecord copy = DatastreamProducerRecordBuilder.copyProducerRecord(builder.build(), 0); + Assert.assertFalse(copy.getEventsCommitTimestamp().isPresent()); + } + @Test public void testBuilderWithAddSerializedKeyValueEventAndValidateFields() { DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder(); diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java b/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java index a725ae2e1..d6ea8278f 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java @@ -76,6 +76,10 @@ public class EventProducer implements DatastreamEventProducer { static final String EVENTS_LATENCY_MS_STRING = "eventsLatencyMs"; static final String EVENTS_LATENCY_MS_SLA_INELIGIBLE_STRING = "eventsLatencyMsSlaIneligible"; static final String EVENTS_SEND_LATENCY_MS_STRING = "eventsSendLatencyMs"; + // Source DB commit timestamp -> destination ack latency. Distinct from eventsLatencyMs, which on Espresso/TiDB + // measures from the intermediate Kafka append time rather than the original DB commit. + static final String EVENTS_COMMIT_TO_ACK_LATENCY_MS_STRING = "eventsCommitToAckLatencyMs"; + static final String EVENTS_COMMIT_TO_ACK_LATENCY_MS_SLA_INELIGIBLE_STRING = "eventsCommitToAckLatencyMsSlaIneligible"; static final String THROUGHPUT_VIOLATING_EVENTS_LATENCY_MS_STRING = "throughputViolatingEventsLatencyMs"; static final String THROUGHPUT_VIOLATING_EVENTS_SEND_LATENCY_MS_STRING = "throughputViolatingEventsSendLatencyMs"; @@ -97,12 +101,22 @@ public class EventProducer implements DatastreamEventProducer { private static final String DEFAULT_NEW_STREAM_GRACE_PERIOD_MS = "0"; private static final String EVENTS_PRODUCED_OUTSIDE_SLA = "eventsProducedOutsideSla"; private static final String EVENTS_PRODUCED_OUTSIDE_ALTERNATE_SLA = "eventsProducedOutsideAlternateSla"; + private static final String EVENTS_COMMIT_WITHIN_SLA = "eventsCommitWithinSla"; + private static final String EVENTS_COMMIT_OUTSIDE_SLA = "eventsCommitOutsideSla"; + private static final String EVENTS_COMMIT_WITHIN_ALTERNATE_SLA = "eventsCommitWithinAlternateSla"; + private static final String EVENTS_COMMIT_OUTSIDE_ALTERNATE_SLA = "eventsCommitOutsideAlternateSla"; + private static final String COMMIT_TO_ACK_THRESHOLD_SLA_MS = "commitToAckThresholdSlaMs"; + private static final String COMMIT_TO_ACK_THRESHOLD_ALTERNATE_SLA_MS = "commitToAckThresholdAlternateSlaMs"; private static final String DROPPED_SENT_FROM_SERIALIZATION_ERROR = "droppedSentFromSerializationError"; static final String BYTES_PRODUCED_RATE = "bytesProducedRate"; private static final String AGGREGATE = "aggregate"; private static final String DEFAULT_AVAILABILITY_THRESHOLD_SLA_MS = "60000"; // 1 minute private static final String DEFAULT_AVAILABILITY_THRESHOLD_ALTERNATE_SLA_MS = "180000"; // 3 minutes + // Commit-to-ack covers upstream CDC pipeline lag in addition to Brooklin-side transport, so defaults are wider + // than the source-to-ack thresholds above. + private static final String DEFAULT_COMMIT_TO_ACK_THRESHOLD_SLA_MS = "300000"; // 5 minutes + private static final String DEFAULT_COMMIT_TO_ACK_THRESHOLD_ALTERNATE_SLA_MS = "900000"; // 15 minutes private static final String DEFAULT_WARN_LOG_LATENCY_ENABLED = "false"; private static final String DEFAULT_WARN_LOG_LATENCY_THRESHOLD_MS = "1500000000"; // 25000 minutes, ~17 days private static final String DEFAULT_NUM_EVENTS_OUTSIDE_ALT_SLA_LOG_ENABLED = "false"; @@ -119,6 +133,10 @@ public class EventProducer implements DatastreamEventProducer { private final int _availabilityThresholdSlaMs; // Alternate SLA for comparison with the main SLA private final int _availabilityThresholdAlternateSlaMs; + // Commit-to-ack (DB commit time -> destination ack) SLA thresholds. Used only when the connector supplies a + // commit timestamp on the producer record (CDC connectors). + private final int _commitToAckThresholdSlaMs; + private final int _commitToAckThresholdAlternateSlaMs; // Grace period for newly created streams. While a stream is inside this window, primary/alternate // SLA counters are suppressed and the latency histogram is redirected to eventsLatencyMsSlaIneligible. private final long _newStreamGracePeriodMs; @@ -196,6 +214,11 @@ public EventProducer(DatastreamTask task, TransportProvider transportProvider, C _availabilityThresholdAlternateSlaMs = Integer.parseInt( config.getProperty(AVAILABILITY_THRESHOLD_ALTERNATE_SLA_MS, DEFAULT_AVAILABILITY_THRESHOLD_ALTERNATE_SLA_MS)); + _commitToAckThresholdSlaMs = Integer.parseInt( + config.getProperty(COMMIT_TO_ACK_THRESHOLD_SLA_MS, DEFAULT_COMMIT_TO_ACK_THRESHOLD_SLA_MS)); + _commitToAckThresholdAlternateSlaMs = Integer.parseInt( + config.getProperty(COMMIT_TO_ACK_THRESHOLD_ALTERNATE_SLA_MS, DEFAULT_COMMIT_TO_ACK_THRESHOLD_ALTERNATE_SLA_MS)); + _newStreamGracePeriodMs = Long.parseLong( config.getProperty(NEW_STREAM_GRACE_PERIOD_MS, DEFAULT_NEW_STREAM_GRACE_PERIOD_MS)); _streamCreationTimeMs = parseStreamCreationTimeMs(task); @@ -323,6 +346,8 @@ private DatastreamRecordMetadata helperSendOrBroadcast(DatastreamProducerRecord record.setEventsSendTimestamp(System.currentTimeMillis()); long recordEventsSourceTimestamp = record.getEventsSourceTimestamp(); long recordEventsSendTimestamp = record.getEventsSendTimestamp().orElse(0L); + // Absent for non-CDC connectors and for CDC bootstrap/heartbeat paths; commit-to-ack metric is skipped when absent. + Optional recordEventsCommitTimestamp = record.getEventsCommitTimestamp(); final long numSerializedBytes = record.getEvents().stream() .mapToLong(e -> { long keySize = e.key().filter(k -> k instanceof byte[]).map(k -> (long) ((byte[]) k).length).orElse(0L); @@ -333,7 +358,7 @@ private DatastreamRecordMetadata helperSendOrBroadcast(DatastreamProducerRecord if (isBroadcast) { broadcastMetadata = _transportProvider.broadcast(destination, record, (metadata, exception) -> onSendCallback(metadata, exception, sendEventCallback, recordEventsSourceTimestamp, - recordEventsSendTimestamp, numSerializedBytes)); + recordEventsSendTimestamp, numSerializedBytes, recordEventsCommitTimestamp)); _logger.debug("Broadcast completed with {}", broadcastMetadata); if (broadcastMetadata.isMessageSerializationError()) { _logger.warn("Broadcast of record {} to destination {} failed because of serialization error.", @@ -342,7 +367,7 @@ private DatastreamRecordMetadata helperSendOrBroadcast(DatastreamProducerRecord } else { _transportProvider.send(destination, record, (metadata, exception) -> onSendCallback(metadata, exception, sendEventCallback, recordEventsSourceTimestamp, - recordEventsSendTimestamp, numSerializedBytes)); + recordEventsSendTimestamp, numSerializedBytes, recordEventsCommitTimestamp)); } } catch (Exception e) { String errorMessage = String.format("Failed to send the event %s exception %s", record, e); @@ -470,7 +495,8 @@ private void performSlaRelatedLogging(DatastreamRecordMetadata metadata, long ev * to avoid overcounting. */ private void reportMetrics(DatastreamRecordMetadata metadata, long eventsSourceTimestamp, long eventsSendTimestamp, - long numBytes) { + long numBytes, Optional eventsCommitTimestamp) { + reportCommitToAckMetrics(metadata, eventsCommitTimestamp); // If per-topic metrics are enabled, use topic as key for metrics; else, use datastream name as the key String datastreamName = getDatastreamName(); @@ -530,6 +556,35 @@ private void reportMetrics(DatastreamRecordMetadata metadata, long eventsSourceT reportThroughputAttributionMetrics(numBytes); } + /** + * Emit commit-to-ack (DB commit -> destination ack) latency histogram and SLA counters. No-op when the connector + * did not supply a commit timestamp (non-CDC sources, or CDC bootstrap/heartbeat paths). Gated by the same + * shouldEmitMetric() suppression as the existing source-to-ack SLA so the new metric inherits grace-period and + * per-datastream opt-out behavior. + */ + private void reportCommitToAckMetrics(DatastreamRecordMetadata metadata, Optional eventsCommitTimestamp) { + if (!eventsCommitTimestamp.isPresent()) { + return; + } + long commitTs = eventsCommitTimestamp.get(); + if (commitTs <= 0) { + return; + } + long commitToAckLatencyMs = System.currentTimeMillis() - commitTs; + String topicOrDatastreamName = _enablePerTopicMetrics ? metadata.getTopic() : getDatastreamName(); + String latencyMetricName = shouldEmitMetric() + ? EVENTS_COMMIT_TO_ACK_LATENCY_MS_STRING + : EVENTS_COMMIT_TO_ACK_LATENCY_MS_SLA_INELIGIBLE_STRING; + reportEventLatencyMetrics(topicOrDatastreamName, metadata, commitToAckLatencyMs, latencyMetricName); + + if (shouldEmitMetric()) { + reportSLAMetrics(topicOrDatastreamName, commitToAckLatencyMs <= _commitToAckThresholdSlaMs, + EVENTS_COMMIT_WITHIN_SLA, EVENTS_COMMIT_OUTSIDE_SLA); + reportSLAMetrics(topicOrDatastreamName, commitToAckLatencyMs <= _commitToAckThresholdAlternateSlaMs, + EVENTS_COMMIT_WITHIN_ALTERNATE_SLA, EVENTS_COMMIT_OUTSIDE_ALTERNATE_SLA); + } + } + /** * Only for the throughput violating topics! *
@@ -608,7 +663,7 @@ private void reportSendLatencyMetrics(DatastreamRecordMetadata metadata, long se } private void onSendCallback(DatastreamRecordMetadata metadata, Exception exception, SendCallback sendCallback, - long eventSourceTimestamp, long eventSendTimestamp, long numBytes) { + long eventSourceTimestamp, long eventSendTimestamp, long numBytes, Optional eventCommitTimestamp) { SendFailedException sendFailedException = null; @@ -623,7 +678,7 @@ private void onSendCallback(DatastreamRecordMetadata metadata, Exception excepti if (_throughputViolatingTopicsProvider.apply(_datastreamTask).contains(metadata.getUndecoratedTopic())) { reportMetricsForThroughputViolatingTopics(metadata, eventSourceTimestamp, eventSendTimestamp, numBytes); } else { - reportMetrics(metadata, eventSourceTimestamp, eventSendTimestamp, numBytes); + reportMetrics(metadata, eventSourceTimestamp, eventSendTimestamp, numBytes, eventCommitTimestamp); } } } catch (Exception e) { diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/TestEventProducer.java b/datastream-server/src/test/java/com/linkedin/datastream/server/TestEventProducer.java index d73b82e33..485edb198 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/TestEventProducer.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/TestEventProducer.java @@ -277,6 +277,110 @@ public void testNoDatabaseMetricForBmmUri() { private static final String SLA_WITHIN_AGG = "EventProducer.aggregate.eventsProducedWithinSla"; private static final String SLA_WITHIN_ALT_AGG = "EventProducer.aggregate.eventsProducedWithinAlternateSla"; + private static final String COMMIT_WITHIN_AGG = "EventProducer.aggregate.eventsCommitWithinSla"; + private static final String COMMIT_OUTSIDE_AGG = "EventProducer.aggregate.eventsCommitOutsideSla"; + + // For commit-to-ack metric assertions, use a non-CDC source (kafka://) so the grace gate is not + // engaged — the new metric should fire whenever the connector supplies a commit timestamp, + // regardless of CDC catch-up logic. + private static String setupOldNonCdcStream(Datastream datastream) { + datastream.getSource().setConnectionString("kafka://broker:9092/topic"); + long oneHourAgo = System.currentTimeMillis() - (60 * 60 * 1000L); + datastream.getMetadata().put(DatastreamMetadataConstants.CREATION_MS, String.valueOf(oneHourAgo)); + return datastream.getName(); + } + + @Test + public void testCommitToAckMetricNotEmittedWhenCommitTimestampAbsent() { + // No commit timestamp on the record → new metric path is a no-op; existing metrics are unaffected. + Datastream datastream = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, "ds-no-commit-ts")[0]; + setupOldNonCdcStream(datastream); + + String topic = "noCommitTsTopic"; + DatastreamTaskImpl task = new DatastreamTaskImpl(Collections.singletonList(datastream)); + sendOneEventThroughTask(task, new Properties(), topic, null); + + DynamicMetricsManager metrics = DynamicMetricsManager.getInstance(); + Assert.assertNull( + metrics.getMetric("EventProducer." + topic + "." + EventProducer.EVENTS_COMMIT_TO_ACK_LATENCY_MS_STRING), + "commit-to-ack histogram must not fire when the record has no commit timestamp"); + Assert.assertNull(metrics.getMetric(COMMIT_WITHIN_AGG), + "commit-to-ack within-SLA counter must not be created when no commit timestamp is supplied"); + Assert.assertNull(metrics.getMetric(COMMIT_OUTSIDE_AGG), + "commit-to-ack outside-SLA counter must not be created when no commit timestamp is supplied"); + Assert.assertNotNull(metrics.getMetric(SLA_WITHIN_AGG), + "existing eventsLatencyMs SLA path must still fire — no regression"); + } + + @Test + public void testCommitToAckMetricFiresWithinSlaWhenCommitTimestampRecent() { + // Recent commit timestamp → within default 5-min SLA → withinSla counter increments, histogram emits. + Datastream datastream = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, "ds-commit-within")[0]; + setupOldNonCdcStream(datastream); + + String topic = "commitWithinSlaTopic"; + DatastreamTaskImpl task = new DatastreamTaskImpl(Collections.singletonList(datastream)); + sendOneEventThroughTask(task, new Properties(), topic, System.currentTimeMillis()); + + DynamicMetricsManager metrics = DynamicMetricsManager.getInstance(); + Assert.assertNotNull( + metrics.getMetric("EventProducer." + topic + "." + EventProducer.EVENTS_COMMIT_TO_ACK_LATENCY_MS_STRING), + "commit-to-ack histogram must fire when commit timestamp is present"); + Counter withinAgg = (Counter) metrics.getMetric(COMMIT_WITHIN_AGG); + Assert.assertNotNull(withinAgg, "withinSla aggregate counter must be created when commit timestamp is present"); + Assert.assertEquals(withinAgg.getCount(), 1L, "recent commit timestamp should fall inside the default 5-min SLA"); + } + + @Test + public void testCommitToAckMetricFiresOutsideSlaWhenThresholdTight() { + // Force OUTSIDE_SLA by setting a 1ms threshold; any latency by the time the callback runs blows past it. + Datastream datastream = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, "ds-commit-outside")[0]; + setupOldNonCdcStream(datastream); + + Properties props = new Properties(); + props.put("commitToAckThresholdSlaMs", "1"); + + String topic = "commitOutsideSlaTopic"; + DatastreamTaskImpl task = new DatastreamTaskImpl(Collections.singletonList(datastream)); + sendOneEventThroughTask(task, props, topic, System.currentTimeMillis() - 100); + + DynamicMetricsManager metrics = DynamicMetricsManager.getInstance(); + Counter outsideAgg = (Counter) metrics.getMetric(COMMIT_OUTSIDE_AGG); + Assert.assertNotNull(outsideAgg, "outsideSla counter must be created when commit-to-ack latency exceeds threshold"); + Assert.assertEquals(outsideAgg.getCount(), 1L, + "100ms commit-to-ack latency vs 1ms threshold should count as outside-SLA"); + } + + @Test + public void testCommitToAckMetricRedirectedToSlaIneligibleDuringGracePeriod() { + // CDC source + freshly created stream → grace gate engaged. Commit-to-ack histogram should redirect + // to the SLA-ineligible variant and both within/outside counters should remain suppressed — + // same suppression semantics as the existing eventsLatencyMs path. + Datastream datastream = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, "ds-cdc-commit-grace")[0]; + datastream.getSource().setConnectionString("mysql:/myhost/testDatabase/myTable"); + datastream.getMetadata().put(DatastreamMetadataConstants.CREATION_MS, + String.valueOf(System.currentTimeMillis())); + + Properties props = new Properties(); + props.put("newStreamGracePeriodMs", "7200000"); + + String topic = "commitGraceTopic"; + DatastreamTaskImpl task = new DatastreamTaskImpl(Collections.singletonList(datastream)); + sendOneEventThroughTask(task, props, topic, System.currentTimeMillis()); + + DynamicMetricsManager metrics = DynamicMetricsManager.getInstance(); + Assert.assertNull( + metrics.getMetric("EventProducer." + topic + "." + EventProducer.EVENTS_COMMIT_TO_ACK_LATENCY_MS_STRING), + "commit-to-ack histogram must NOT fire during grace period"); + Assert.assertNotNull( + metrics.getMetric( + "EventProducer." + topic + "." + EventProducer.EVENTS_COMMIT_TO_ACK_LATENCY_MS_SLA_INELIGIBLE_STRING), + "commit-to-ack latency should be redirected to the SLA-ineligible histogram during grace"); + Assert.assertNull(metrics.getMetric(COMMIT_WITHIN_AGG), + "commit-to-ack withinSla counter must remain suppressed during grace"); + Assert.assertNull(metrics.getMetric(COMMIT_OUTSIDE_AGG), + "commit-to-ack outsideSla counter must remain suppressed during grace"); + } @Test public void testSlaGraceActiveForNewCdcStream() { @@ -568,6 +672,11 @@ private void sendOneEventThroughProducer(Datastream datastream, Properties props } private void sendOneEventThroughTask(DatastreamTaskImpl task, Properties props, String topicName) { + sendOneEventThroughTask(task, props, topicName, null); + } + + private void sendOneEventThroughTask(DatastreamTaskImpl task, Properties props, String topicName, + Long commitTimestamp) { TransportProvider transport = new NoOpTransportProviderAdminFactory.NoOpTransportProvider() { @Override public void send(String destination, DatastreamProducerRecord record, SendCallback onComplete) { @@ -577,13 +686,25 @@ public void send(String destination, DatastreamProducerRecord record, SendCallba } }; EventProducer eventProducer = new EventProducer(task, transport, new NoOpCheckpointProvider(), props, false); - eventProducer.send(createDatastreamProducerRecord(), (m, e) -> { }); + eventProducer.send(createDatastreamProducerRecord(commitTimestamp), (m, e) -> { }); } private DatastreamProducerRecord createDatastreamProducerRecord() { return createDatastreamProducerRecord(0, "0", 1); } + private DatastreamProducerRecord createDatastreamProducerRecord(Long commitTimestamp) { + DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder(); + builder.setPartition(0); + builder.setSourceCheckpoint("0"); + builder.setEventsSourceTimestamp(System.currentTimeMillis()); + if (commitTimestamp != null) { + builder.setEventsCommitTimestamp(commitTimestamp); + } + builder.addEvent(new BrooklinEnvelope(new byte[0], new byte[0], null, new HashMap<>())); + return builder.build(); + } + private DatastreamProducerRecord createDatastreamProducerRecord(int partition, String checkpoint, int eventCount) { DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder(); builder.setPartition(partition);