Add eventsCommitToAckLatencyMs SLA metric#1018
Merged
akshayrai merged 2 commits intoMay 21, 2026
Merged
Conversation
Adds a parallel end-to-end latency metric measuring source DB commit time to destination ack, distinct from the existing eventsLatencyMs which on some CDC connectors (Espresso, TiDB) reflects the time Brooklin read the event from an intermediate Kafka hop rather than the original DB commit. DatastreamProducerRecord gains an Optional<Long> eventsCommitTimestamp that connectors capable of supplying a true commit time populate via DatastreamProducerRecordBuilder.setEventsCommitTimestamp(long). The field is absent for non-CDC sources and bootstrap paths, so the new metric only emits when a connector opts in — existing eventsLatencyMs behavior and counters are unchanged. EventProducer threads the optional timestamp through send -> onSendCallback -> reportMetrics and emits eventsCommitToAckLatencyMs (histogram) plus eventsCommitWithinSla / eventsCommitOutsideSla counter pairs (primary and alternate). Thresholds are configurable via commitToAckThresholdSlaMs (default 5m) and commitToAckThresholdAlternateSlaMs (default 15m); defaults are wider than the existing source-to-ack thresholds because commit-to-ack includes upstream CDC pipeline lag. Emission is gated by the same shouldEmitMetric() suppression as the existing SLA metric, so grace-period and disableSlaMetric semantics carry over. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Four tests covering the new commit-to-ack latency path: - metric not emitted when commit timestamp is absent (non-CDC / bootstrap) - within-SLA counter increments when commit timestamp is recent - outside-SLA counter increments when latency exceeds threshold - histogram redirects to SLA-ineligible and counters suppress during grace All assert against aggregate metric names so no per-task DropWizard plumbing is needed. The non-CDC source URI is used in the first three tests to keep the grace gate disengaged; the fourth deliberately re-enables it with a fresh CDC stream to exercise the suppression path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| return; | ||
| } | ||
| long commitTs = eventsCommitTimestamp.get(); | ||
| if (commitTs <= 0) { |
Collaborator
There was a problem hiding this comment.
Nit: In reportCommitToAckMetrics, we guard commitTs <= 0 but not the case where the connector supplies a timestamp in the future. In that case System.currentTimeMillis() - commitTs becomes negative and we’ll emit a negative histogram value / classify SLA incorrectly. Just checking Do we need to add that check ?
Collaborator
Author
There was a problem hiding this comment.
commitTS cannot be in the future since the event has already happened right.
| * Get the source DB commit timestamp (Epoch-millis) if the connector supplied one. Present for CDC connectors | ||
| * that surface a true commit time; absent otherwise. | ||
| */ | ||
| public Optional<Long> getEventsCommitTimestamp() { |
Collaborator
There was a problem hiding this comment.
So this field will only get set for CDC events?
Collaborator
Author
There was a problem hiding this comment.
yes, this applied only for CDC
kanishkjaiswal2015
approved these changes
May 21, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds a new SLA metric —
eventsCommitToAckLatencyMs— that measures source DB commit → destination ack latency.The existing
eventsLatencyMsis computed fromDatastreamProducerRecord.eventsSourceTimestamp, which on some CDC connectors reflects when Brooklin read the event (e.g. from an intermediate Kafka hop) rather than the original DB commit. This change introduces a separate, opt-in timestamp on the producer record and a parallel metric so connectors that can supply a true commit time expose a clean end-to-end SLA without changing existing metric semantics.Tests
New unit tests covering both the data path and the metric path:
TestDatastreamProducerRecordBuildereventsCommitTimestampabsent by defaultcopyProducerRecordcopyProducerRecordTestEventProducereventsLatencyMsSLA still fires (regression guard)eventsCommitWithinSlaincrements when latency is inside the thresholdeventsCommitOutsideSlaincrements when latency exceeds the threshold (tight 1ms threshold + 100ms-old commit)eventsCommitToAckLatencyMsSlaIneligibleand both counters are suppressed