From e0c762b177ade8bae511b76e8d384742f26cddc9 Mon Sep 17 00:00:00 2001 From: Akshay Rai Date: Thu, 21 May 2026 14:21:49 +0530 Subject: [PATCH 1/2] Add commit-to-ack latency metric for CDC sources MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a parallel end-to-end latency metric measuring source DB commit time to destination ack, distinct from the existing eventsLatencyMs which on some CDC connectors (Espresso, TiDB) reflects the time Brooklin read the event from an intermediate Kafka hop rather than the original DB commit. DatastreamProducerRecord gains an Optional eventsCommitTimestamp that connectors capable of supplying a true commit time populate via DatastreamProducerRecordBuilder.setEventsCommitTimestamp(long). The field is absent for non-CDC sources and bootstrap paths, so the new metric only emits when a connector opts in — existing eventsLatencyMs behavior and counters are unchanged. EventProducer threads the optional timestamp through send -> onSendCallback -> reportMetrics and emits eventsCommitToAckLatencyMs (histogram) plus eventsCommitWithinSla / eventsCommitOutsideSla counter pairs (primary and alternate). Thresholds are configurable via commitToAckThresholdSlaMs (default 5m) and commitToAckThresholdAlternateSlaMs (default 15m); defaults are wider than the existing source-to-ack thresholds because commit-to-ack includes upstream CDC pipeline lag. Emission is gated by the same shouldEmitMetric() suppression as the existing SLA metric, so grace-period and disableSlaMetric semantics carry over. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../server/DatastreamProducerRecord.java | 20 ++++++ .../DatastreamProducerRecordBuilder.java | 16 ++++- .../TestDatastreamProducerRecordBuilder.java | 41 ++++++++++++ .../datastream/server/EventProducer.java | 65 +++++++++++++++++-- 4 files changed, 135 insertions(+), 7 deletions(-) diff --git a/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamProducerRecord.java b/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamProducerRecord.java index 4263e193e..40b0df997 100644 --- a/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamProducerRecord.java +++ b/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamProducerRecord.java @@ -35,6 +35,11 @@ public class DatastreamProducerRecord { // timestamp for the events obtained from kafka header private Optional _eventsKafkaHeaderTimestamp = Optional.empty(); + // Epoch-millis of the source DB commit for this event, when the connector can supply one (CDC connectors only). + // Distinct from _eventsSourceTimestamp, which on some connectors reflects the time Brooklin read the event from an + // intermediate hop rather than the original DB commit. + private Optional _eventsCommitTimestamp = Optional.empty(); + DatastreamProducerRecord(List events, Optional partition, Optional partitionKey, String checkpoint, long eventsSourceTimestamp) { this(events, partition, partitionKey, Optional.empty(), checkpoint, eventsSourceTimestamp, false); @@ -117,6 +122,21 @@ public void setEventsKafkaHeaderTimestamp(long timestamp) { _eventsKafkaHeaderTimestamp = Optional.of(timestamp); } + /** + * Get the source DB commit timestamp (Epoch-millis) if the connector supplied one. Present for CDC connectors + * that surface a true commit time; absent otherwise. + */ + public Optional getEventsCommitTimestamp() { + return _eventsCommitTimestamp; + } + + /** + * Set the source DB commit timestamp (Epoch-millis). + */ + public void setEventsCommitTimestamp(long timestamp) { + _eventsCommitTimestamp = Optional.of(timestamp); + } + /** * Get destination partition within the destination */ diff --git a/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamProducerRecordBuilder.java b/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamProducerRecordBuilder.java index 7bb971e1f..cad10c040 100644 --- a/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamProducerRecordBuilder.java +++ b/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamProducerRecordBuilder.java @@ -31,6 +31,7 @@ public class DatastreamProducerRecordBuilder { private Optional _partitionKey = Optional.empty(); private Optional _destination = Optional.empty(); private boolean _isBroadcastRecord = false; + private Optional _eventsCommitTimestamp = Optional.empty(); /** * Partition to which this DatastreamProducerRecord should be produced. If the partition is not set, TransportProvider @@ -89,14 +90,24 @@ public void setIsBroadcastRecord(boolean isBroadcastRecord) { _isBroadcastRecord = isBroadcastRecord; } + /** + * Set the source DB commit timestamp (Epoch-millis). CDC connectors that can supply a true commit time + * should call this; non-CDC connectors should leave it absent. + */ + public void setEventsCommitTimestamp(long eventsCommitTimestamp) { + _eventsCommitTimestamp = Optional.of(eventsCommitTimestamp); + } + /** * Build the DatastreamProducerRecord. * @return * DatastreamProducerRecord that is created. */ public DatastreamProducerRecord build() { - return new DatastreamProducerRecord(_events, _partition, _partitionKey, _destination, _sourceCheckpoint, - _eventsSourceTimestamp, _isBroadcastRecord); + DatastreamProducerRecord record = new DatastreamProducerRecord(_events, _partition, _partitionKey, _destination, + _sourceCheckpoint, _eventsSourceTimestamp, _isBroadcastRecord); + _eventsCommitTimestamp.ifPresent(record::setEventsCommitTimestamp); + return record; } /** @@ -119,6 +130,7 @@ public static DatastreamProducerRecord copyProducerRecord(DatastreamProducerReco builder.setSourceCheckpoint(record.getCheckpoint()); builder.setEventsSourceTimestamp(record.getEventsSourceTimestamp()); builder.setIsBroadcastRecord(record.isBroadcastRecord()); + record.getEventsCommitTimestamp().ifPresent(builder::setEventsCommitTimestamp); return builder.build(); } } diff --git a/datastream-server-api/src/test/java/com/linkedin/datastream/server/TestDatastreamProducerRecordBuilder.java b/datastream-server-api/src/test/java/com/linkedin/datastream/server/TestDatastreamProducerRecordBuilder.java index 02c3e55ee..550fd34cf 100644 --- a/datastream-server-api/src/test/java/com/linkedin/datastream/server/TestDatastreamProducerRecordBuilder.java +++ b/datastream-server-api/src/test/java/com/linkedin/datastream/server/TestDatastreamProducerRecordBuilder.java @@ -75,6 +75,47 @@ public void testBuilderThrowsWhenEventsTimestampMissing() { builder.build(); } + @Test + public void testCommitTimestampAbsentByDefault() { + DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder(); + builder.addEvent(createDatastreamEvent()); + builder.setEventsSourceTimestamp(System.currentTimeMillis()); + Assert.assertFalse(builder.build().getEventsCommitTimestamp().isPresent()); + } + + @Test + public void testCommitTimestampRoundTripsThroughBuilder() { + long commitTs = 1700000000000L; + DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder(); + builder.addEvent(createDatastreamEvent()); + builder.setEventsSourceTimestamp(System.currentTimeMillis()); + builder.setEventsCommitTimestamp(commitTs); + DatastreamProducerRecord record = builder.build(); + Assert.assertTrue(record.getEventsCommitTimestamp().isPresent()); + Assert.assertEquals(record.getEventsCommitTimestamp().get().longValue(), commitTs); + } + + @Test + public void testCopyProducerRecordPropagatesCommitTimestamp() { + long commitTs = 1700000000000L; + DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder(); + builder.addEvent(createDatastreamEvent()); + builder.setEventsSourceTimestamp(System.currentTimeMillis()); + builder.setEventsCommitTimestamp(commitTs); + DatastreamProducerRecord original = builder.build(); + DatastreamProducerRecord copy = DatastreamProducerRecordBuilder.copyProducerRecord(original, 0); + Assert.assertEquals(copy.getEventsCommitTimestamp().orElse(-1L).longValue(), commitTs); + } + + @Test + public void testCopyProducerRecordWhenCommitTimestampAbsent() { + DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder(); + builder.addEvent(createDatastreamEvent()); + builder.setEventsSourceTimestamp(System.currentTimeMillis()); + DatastreamProducerRecord copy = DatastreamProducerRecordBuilder.copyProducerRecord(builder.build(), 0); + Assert.assertFalse(copy.getEventsCommitTimestamp().isPresent()); + } + @Test public void testBuilderWithAddSerializedKeyValueEventAndValidateFields() { DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder(); diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java b/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java index a725ae2e1..d6ea8278f 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java @@ -76,6 +76,10 @@ public class EventProducer implements DatastreamEventProducer { static final String EVENTS_LATENCY_MS_STRING = "eventsLatencyMs"; static final String EVENTS_LATENCY_MS_SLA_INELIGIBLE_STRING = "eventsLatencyMsSlaIneligible"; static final String EVENTS_SEND_LATENCY_MS_STRING = "eventsSendLatencyMs"; + // Source DB commit timestamp -> destination ack latency. Distinct from eventsLatencyMs, which on Espresso/TiDB + // measures from the intermediate Kafka append time rather than the original DB commit. + static final String EVENTS_COMMIT_TO_ACK_LATENCY_MS_STRING = "eventsCommitToAckLatencyMs"; + static final String EVENTS_COMMIT_TO_ACK_LATENCY_MS_SLA_INELIGIBLE_STRING = "eventsCommitToAckLatencyMsSlaIneligible"; static final String THROUGHPUT_VIOLATING_EVENTS_LATENCY_MS_STRING = "throughputViolatingEventsLatencyMs"; static final String THROUGHPUT_VIOLATING_EVENTS_SEND_LATENCY_MS_STRING = "throughputViolatingEventsSendLatencyMs"; @@ -97,12 +101,22 @@ public class EventProducer implements DatastreamEventProducer { private static final String DEFAULT_NEW_STREAM_GRACE_PERIOD_MS = "0"; private static final String EVENTS_PRODUCED_OUTSIDE_SLA = "eventsProducedOutsideSla"; private static final String EVENTS_PRODUCED_OUTSIDE_ALTERNATE_SLA = "eventsProducedOutsideAlternateSla"; + private static final String EVENTS_COMMIT_WITHIN_SLA = "eventsCommitWithinSla"; + private static final String EVENTS_COMMIT_OUTSIDE_SLA = "eventsCommitOutsideSla"; + private static final String EVENTS_COMMIT_WITHIN_ALTERNATE_SLA = "eventsCommitWithinAlternateSla"; + private static final String EVENTS_COMMIT_OUTSIDE_ALTERNATE_SLA = "eventsCommitOutsideAlternateSla"; + private static final String COMMIT_TO_ACK_THRESHOLD_SLA_MS = "commitToAckThresholdSlaMs"; + private static final String COMMIT_TO_ACK_THRESHOLD_ALTERNATE_SLA_MS = "commitToAckThresholdAlternateSlaMs"; private static final String DROPPED_SENT_FROM_SERIALIZATION_ERROR = "droppedSentFromSerializationError"; static final String BYTES_PRODUCED_RATE = "bytesProducedRate"; private static final String AGGREGATE = "aggregate"; private static final String DEFAULT_AVAILABILITY_THRESHOLD_SLA_MS = "60000"; // 1 minute private static final String DEFAULT_AVAILABILITY_THRESHOLD_ALTERNATE_SLA_MS = "180000"; // 3 minutes + // Commit-to-ack covers upstream CDC pipeline lag in addition to Brooklin-side transport, so defaults are wider + // than the source-to-ack thresholds above. + private static final String DEFAULT_COMMIT_TO_ACK_THRESHOLD_SLA_MS = "300000"; // 5 minutes + private static final String DEFAULT_COMMIT_TO_ACK_THRESHOLD_ALTERNATE_SLA_MS = "900000"; // 15 minutes private static final String DEFAULT_WARN_LOG_LATENCY_ENABLED = "false"; private static final String DEFAULT_WARN_LOG_LATENCY_THRESHOLD_MS = "1500000000"; // 25000 minutes, ~17 days private static final String DEFAULT_NUM_EVENTS_OUTSIDE_ALT_SLA_LOG_ENABLED = "false"; @@ -119,6 +133,10 @@ public class EventProducer implements DatastreamEventProducer { private final int _availabilityThresholdSlaMs; // Alternate SLA for comparison with the main SLA private final int _availabilityThresholdAlternateSlaMs; + // Commit-to-ack (DB commit time -> destination ack) SLA thresholds. Used only when the connector supplies a + // commit timestamp on the producer record (CDC connectors). + private final int _commitToAckThresholdSlaMs; + private final int _commitToAckThresholdAlternateSlaMs; // Grace period for newly created streams. While a stream is inside this window, primary/alternate // SLA counters are suppressed and the latency histogram is redirected to eventsLatencyMsSlaIneligible. private final long _newStreamGracePeriodMs; @@ -196,6 +214,11 @@ public EventProducer(DatastreamTask task, TransportProvider transportProvider, C _availabilityThresholdAlternateSlaMs = Integer.parseInt( config.getProperty(AVAILABILITY_THRESHOLD_ALTERNATE_SLA_MS, DEFAULT_AVAILABILITY_THRESHOLD_ALTERNATE_SLA_MS)); + _commitToAckThresholdSlaMs = Integer.parseInt( + config.getProperty(COMMIT_TO_ACK_THRESHOLD_SLA_MS, DEFAULT_COMMIT_TO_ACK_THRESHOLD_SLA_MS)); + _commitToAckThresholdAlternateSlaMs = Integer.parseInt( + config.getProperty(COMMIT_TO_ACK_THRESHOLD_ALTERNATE_SLA_MS, DEFAULT_COMMIT_TO_ACK_THRESHOLD_ALTERNATE_SLA_MS)); + _newStreamGracePeriodMs = Long.parseLong( config.getProperty(NEW_STREAM_GRACE_PERIOD_MS, DEFAULT_NEW_STREAM_GRACE_PERIOD_MS)); _streamCreationTimeMs = parseStreamCreationTimeMs(task); @@ -323,6 +346,8 @@ private DatastreamRecordMetadata helperSendOrBroadcast(DatastreamProducerRecord record.setEventsSendTimestamp(System.currentTimeMillis()); long recordEventsSourceTimestamp = record.getEventsSourceTimestamp(); long recordEventsSendTimestamp = record.getEventsSendTimestamp().orElse(0L); + // Absent for non-CDC connectors and for CDC bootstrap/heartbeat paths; commit-to-ack metric is skipped when absent. + Optional recordEventsCommitTimestamp = record.getEventsCommitTimestamp(); final long numSerializedBytes = record.getEvents().stream() .mapToLong(e -> { long keySize = e.key().filter(k -> k instanceof byte[]).map(k -> (long) ((byte[]) k).length).orElse(0L); @@ -333,7 +358,7 @@ private DatastreamRecordMetadata helperSendOrBroadcast(DatastreamProducerRecord if (isBroadcast) { broadcastMetadata = _transportProvider.broadcast(destination, record, (metadata, exception) -> onSendCallback(metadata, exception, sendEventCallback, recordEventsSourceTimestamp, - recordEventsSendTimestamp, numSerializedBytes)); + recordEventsSendTimestamp, numSerializedBytes, recordEventsCommitTimestamp)); _logger.debug("Broadcast completed with {}", broadcastMetadata); if (broadcastMetadata.isMessageSerializationError()) { _logger.warn("Broadcast of record {} to destination {} failed because of serialization error.", @@ -342,7 +367,7 @@ private DatastreamRecordMetadata helperSendOrBroadcast(DatastreamProducerRecord } else { _transportProvider.send(destination, record, (metadata, exception) -> onSendCallback(metadata, exception, sendEventCallback, recordEventsSourceTimestamp, - recordEventsSendTimestamp, numSerializedBytes)); + recordEventsSendTimestamp, numSerializedBytes, recordEventsCommitTimestamp)); } } catch (Exception e) { String errorMessage = String.format("Failed to send the event %s exception %s", record, e); @@ -470,7 +495,8 @@ private void performSlaRelatedLogging(DatastreamRecordMetadata metadata, long ev * to avoid overcounting. */ private void reportMetrics(DatastreamRecordMetadata metadata, long eventsSourceTimestamp, long eventsSendTimestamp, - long numBytes) { + long numBytes, Optional eventsCommitTimestamp) { + reportCommitToAckMetrics(metadata, eventsCommitTimestamp); // If per-topic metrics are enabled, use topic as key for metrics; else, use datastream name as the key String datastreamName = getDatastreamName(); @@ -530,6 +556,35 @@ private void reportMetrics(DatastreamRecordMetadata metadata, long eventsSourceT reportThroughputAttributionMetrics(numBytes); } + /** + * Emit commit-to-ack (DB commit -> destination ack) latency histogram and SLA counters. No-op when the connector + * did not supply a commit timestamp (non-CDC sources, or CDC bootstrap/heartbeat paths). Gated by the same + * shouldEmitMetric() suppression as the existing source-to-ack SLA so the new metric inherits grace-period and + * per-datastream opt-out behavior. + */ + private void reportCommitToAckMetrics(DatastreamRecordMetadata metadata, Optional eventsCommitTimestamp) { + if (!eventsCommitTimestamp.isPresent()) { + return; + } + long commitTs = eventsCommitTimestamp.get(); + if (commitTs <= 0) { + return; + } + long commitToAckLatencyMs = System.currentTimeMillis() - commitTs; + String topicOrDatastreamName = _enablePerTopicMetrics ? metadata.getTopic() : getDatastreamName(); + String latencyMetricName = shouldEmitMetric() + ? EVENTS_COMMIT_TO_ACK_LATENCY_MS_STRING + : EVENTS_COMMIT_TO_ACK_LATENCY_MS_SLA_INELIGIBLE_STRING; + reportEventLatencyMetrics(topicOrDatastreamName, metadata, commitToAckLatencyMs, latencyMetricName); + + if (shouldEmitMetric()) { + reportSLAMetrics(topicOrDatastreamName, commitToAckLatencyMs <= _commitToAckThresholdSlaMs, + EVENTS_COMMIT_WITHIN_SLA, EVENTS_COMMIT_OUTSIDE_SLA); + reportSLAMetrics(topicOrDatastreamName, commitToAckLatencyMs <= _commitToAckThresholdAlternateSlaMs, + EVENTS_COMMIT_WITHIN_ALTERNATE_SLA, EVENTS_COMMIT_OUTSIDE_ALTERNATE_SLA); + } + } + /** * Only for the throughput violating topics! *
@@ -608,7 +663,7 @@ private void reportSendLatencyMetrics(DatastreamRecordMetadata metadata, long se } private void onSendCallback(DatastreamRecordMetadata metadata, Exception exception, SendCallback sendCallback, - long eventSourceTimestamp, long eventSendTimestamp, long numBytes) { + long eventSourceTimestamp, long eventSendTimestamp, long numBytes, Optional eventCommitTimestamp) { SendFailedException sendFailedException = null; @@ -623,7 +678,7 @@ private void onSendCallback(DatastreamRecordMetadata metadata, Exception excepti if (_throughputViolatingTopicsProvider.apply(_datastreamTask).contains(metadata.getUndecoratedTopic())) { reportMetricsForThroughputViolatingTopics(metadata, eventSourceTimestamp, eventSendTimestamp, numBytes); } else { - reportMetrics(metadata, eventSourceTimestamp, eventSendTimestamp, numBytes); + reportMetrics(metadata, eventSourceTimestamp, eventSendTimestamp, numBytes, eventCommitTimestamp); } } } catch (Exception e) { From 743bf49bfc5b779371c3112ee9b99117bb406a8e Mon Sep 17 00:00:00 2001 From: Akshay Rai Date: Thu, 21 May 2026 14:44:37 +0530 Subject: [PATCH 2/2] Add EventProducer tests for commit-to-ack metric Four tests covering the new commit-to-ack latency path: - metric not emitted when commit timestamp is absent (non-CDC / bootstrap) - within-SLA counter increments when commit timestamp is recent - outside-SLA counter increments when latency exceeds threshold - histogram redirects to SLA-ineligible and counters suppress during grace All assert against aggregate metric names so no per-task DropWizard plumbing is needed. The non-CDC source URI is used in the first three tests to keep the grace gate disengaged; the fourth deliberately re-enables it with a fresh CDC stream to exercise the suppression path. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../datastream/server/TestEventProducer.java | 123 +++++++++++++++++- 1 file changed, 122 insertions(+), 1 deletion(-) diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/TestEventProducer.java b/datastream-server/src/test/java/com/linkedin/datastream/server/TestEventProducer.java index d73b82e33..485edb198 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/TestEventProducer.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/TestEventProducer.java @@ -277,6 +277,110 @@ public void testNoDatabaseMetricForBmmUri() { private static final String SLA_WITHIN_AGG = "EventProducer.aggregate.eventsProducedWithinSla"; private static final String SLA_WITHIN_ALT_AGG = "EventProducer.aggregate.eventsProducedWithinAlternateSla"; + private static final String COMMIT_WITHIN_AGG = "EventProducer.aggregate.eventsCommitWithinSla"; + private static final String COMMIT_OUTSIDE_AGG = "EventProducer.aggregate.eventsCommitOutsideSla"; + + // For commit-to-ack metric assertions, use a non-CDC source (kafka://) so the grace gate is not + // engaged — the new metric should fire whenever the connector supplies a commit timestamp, + // regardless of CDC catch-up logic. + private static String setupOldNonCdcStream(Datastream datastream) { + datastream.getSource().setConnectionString("kafka://broker:9092/topic"); + long oneHourAgo = System.currentTimeMillis() - (60 * 60 * 1000L); + datastream.getMetadata().put(DatastreamMetadataConstants.CREATION_MS, String.valueOf(oneHourAgo)); + return datastream.getName(); + } + + @Test + public void testCommitToAckMetricNotEmittedWhenCommitTimestampAbsent() { + // No commit timestamp on the record → new metric path is a no-op; existing metrics are unaffected. + Datastream datastream = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, "ds-no-commit-ts")[0]; + setupOldNonCdcStream(datastream); + + String topic = "noCommitTsTopic"; + DatastreamTaskImpl task = new DatastreamTaskImpl(Collections.singletonList(datastream)); + sendOneEventThroughTask(task, new Properties(), topic, null); + + DynamicMetricsManager metrics = DynamicMetricsManager.getInstance(); + Assert.assertNull( + metrics.getMetric("EventProducer." + topic + "." + EventProducer.EVENTS_COMMIT_TO_ACK_LATENCY_MS_STRING), + "commit-to-ack histogram must not fire when the record has no commit timestamp"); + Assert.assertNull(metrics.getMetric(COMMIT_WITHIN_AGG), + "commit-to-ack within-SLA counter must not be created when no commit timestamp is supplied"); + Assert.assertNull(metrics.getMetric(COMMIT_OUTSIDE_AGG), + "commit-to-ack outside-SLA counter must not be created when no commit timestamp is supplied"); + Assert.assertNotNull(metrics.getMetric(SLA_WITHIN_AGG), + "existing eventsLatencyMs SLA path must still fire — no regression"); + } + + @Test + public void testCommitToAckMetricFiresWithinSlaWhenCommitTimestampRecent() { + // Recent commit timestamp → within default 5-min SLA → withinSla counter increments, histogram emits. + Datastream datastream = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, "ds-commit-within")[0]; + setupOldNonCdcStream(datastream); + + String topic = "commitWithinSlaTopic"; + DatastreamTaskImpl task = new DatastreamTaskImpl(Collections.singletonList(datastream)); + sendOneEventThroughTask(task, new Properties(), topic, System.currentTimeMillis()); + + DynamicMetricsManager metrics = DynamicMetricsManager.getInstance(); + Assert.assertNotNull( + metrics.getMetric("EventProducer." + topic + "." + EventProducer.EVENTS_COMMIT_TO_ACK_LATENCY_MS_STRING), + "commit-to-ack histogram must fire when commit timestamp is present"); + Counter withinAgg = (Counter) metrics.getMetric(COMMIT_WITHIN_AGG); + Assert.assertNotNull(withinAgg, "withinSla aggregate counter must be created when commit timestamp is present"); + Assert.assertEquals(withinAgg.getCount(), 1L, "recent commit timestamp should fall inside the default 5-min SLA"); + } + + @Test + public void testCommitToAckMetricFiresOutsideSlaWhenThresholdTight() { + // Force OUTSIDE_SLA by setting a 1ms threshold; any latency by the time the callback runs blows past it. + Datastream datastream = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, "ds-commit-outside")[0]; + setupOldNonCdcStream(datastream); + + Properties props = new Properties(); + props.put("commitToAckThresholdSlaMs", "1"); + + String topic = "commitOutsideSlaTopic"; + DatastreamTaskImpl task = new DatastreamTaskImpl(Collections.singletonList(datastream)); + sendOneEventThroughTask(task, props, topic, System.currentTimeMillis() - 100); + + DynamicMetricsManager metrics = DynamicMetricsManager.getInstance(); + Counter outsideAgg = (Counter) metrics.getMetric(COMMIT_OUTSIDE_AGG); + Assert.assertNotNull(outsideAgg, "outsideSla counter must be created when commit-to-ack latency exceeds threshold"); + Assert.assertEquals(outsideAgg.getCount(), 1L, + "100ms commit-to-ack latency vs 1ms threshold should count as outside-SLA"); + } + + @Test + public void testCommitToAckMetricRedirectedToSlaIneligibleDuringGracePeriod() { + // CDC source + freshly created stream → grace gate engaged. Commit-to-ack histogram should redirect + // to the SLA-ineligible variant and both within/outside counters should remain suppressed — + // same suppression semantics as the existing eventsLatencyMs path. + Datastream datastream = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, "ds-cdc-commit-grace")[0]; + datastream.getSource().setConnectionString("mysql:/myhost/testDatabase/myTable"); + datastream.getMetadata().put(DatastreamMetadataConstants.CREATION_MS, + String.valueOf(System.currentTimeMillis())); + + Properties props = new Properties(); + props.put("newStreamGracePeriodMs", "7200000"); + + String topic = "commitGraceTopic"; + DatastreamTaskImpl task = new DatastreamTaskImpl(Collections.singletonList(datastream)); + sendOneEventThroughTask(task, props, topic, System.currentTimeMillis()); + + DynamicMetricsManager metrics = DynamicMetricsManager.getInstance(); + Assert.assertNull( + metrics.getMetric("EventProducer." + topic + "." + EventProducer.EVENTS_COMMIT_TO_ACK_LATENCY_MS_STRING), + "commit-to-ack histogram must NOT fire during grace period"); + Assert.assertNotNull( + metrics.getMetric( + "EventProducer." + topic + "." + EventProducer.EVENTS_COMMIT_TO_ACK_LATENCY_MS_SLA_INELIGIBLE_STRING), + "commit-to-ack latency should be redirected to the SLA-ineligible histogram during grace"); + Assert.assertNull(metrics.getMetric(COMMIT_WITHIN_AGG), + "commit-to-ack withinSla counter must remain suppressed during grace"); + Assert.assertNull(metrics.getMetric(COMMIT_OUTSIDE_AGG), + "commit-to-ack outsideSla counter must remain suppressed during grace"); + } @Test public void testSlaGraceActiveForNewCdcStream() { @@ -568,6 +672,11 @@ private void sendOneEventThroughProducer(Datastream datastream, Properties props } private void sendOneEventThroughTask(DatastreamTaskImpl task, Properties props, String topicName) { + sendOneEventThroughTask(task, props, topicName, null); + } + + private void sendOneEventThroughTask(DatastreamTaskImpl task, Properties props, String topicName, + Long commitTimestamp) { TransportProvider transport = new NoOpTransportProviderAdminFactory.NoOpTransportProvider() { @Override public void send(String destination, DatastreamProducerRecord record, SendCallback onComplete) { @@ -577,13 +686,25 @@ public void send(String destination, DatastreamProducerRecord record, SendCallba } }; EventProducer eventProducer = new EventProducer(task, transport, new NoOpCheckpointProvider(), props, false); - eventProducer.send(createDatastreamProducerRecord(), (m, e) -> { }); + eventProducer.send(createDatastreamProducerRecord(commitTimestamp), (m, e) -> { }); } private DatastreamProducerRecord createDatastreamProducerRecord() { return createDatastreamProducerRecord(0, "0", 1); } + private DatastreamProducerRecord createDatastreamProducerRecord(Long commitTimestamp) { + DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder(); + builder.setPartition(0); + builder.setSourceCheckpoint("0"); + builder.setEventsSourceTimestamp(System.currentTimeMillis()); + if (commitTimestamp != null) { + builder.setEventsCommitTimestamp(commitTimestamp); + } + builder.addEvent(new BrooklinEnvelope(new byte[0], new byte[0], null, new HashMap<>())); + return builder.build(); + } + private DatastreamProducerRecord createDatastreamProducerRecord(int partition, String checkpoint, int eventCount) { DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder(); builder.setPartition(partition);