Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public class DatastreamProducerRecord {
// timestamp for the events obtained from kafka header
private Optional<Long> _eventsKafkaHeaderTimestamp = Optional.empty();

// Epoch-millis of the source DB commit for this event, when the connector can supply one (CDC connectors only).
// Distinct from _eventsSourceTimestamp, which on some connectors reflects the time Brooklin read the event from an
// intermediate hop rather than the original DB commit.
private Optional<Long> _eventsCommitTimestamp = Optional.empty();

DatastreamProducerRecord(List<BrooklinEnvelope> events, Optional<Integer> partition, Optional<String> partitionKey,
String checkpoint, long eventsSourceTimestamp) {
this(events, partition, partitionKey, Optional.empty(), checkpoint, eventsSourceTimestamp, false);
Expand Down Expand Up @@ -117,6 +122,21 @@ public void setEventsKafkaHeaderTimestamp(long timestamp) {
_eventsKafkaHeaderTimestamp = Optional.of(timestamp);
}

/**
* Get the source DB commit timestamp (Epoch-millis) if the connector supplied one. Present for CDC connectors
* that surface a true commit time; absent otherwise.
*/
public Optional<Long> getEventsCommitTimestamp() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this field will only get set for CDC events?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this applied only for CDC

return _eventsCommitTimestamp;
}

/**
* Set the source DB commit timestamp (Epoch-millis).
*/
public void setEventsCommitTimestamp(long timestamp) {
_eventsCommitTimestamp = Optional.of(timestamp);
}

/**
* Get destination partition within the destination
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class DatastreamProducerRecordBuilder {
private Optional<String> _partitionKey = Optional.empty();
private Optional<String> _destination = Optional.empty();
private boolean _isBroadcastRecord = false;
private Optional<Long> _eventsCommitTimestamp = Optional.empty();

/**
* Partition to which this DatastreamProducerRecord should be produced. If the partition is not set, TransportProvider
Expand Down Expand Up @@ -89,14 +90,24 @@ public void setIsBroadcastRecord(boolean isBroadcastRecord) {
_isBroadcastRecord = isBroadcastRecord;
}

/**
* Set the source DB commit timestamp (Epoch-millis). CDC connectors that can supply a true commit time
* should call this; non-CDC connectors should leave it absent.
*/
public void setEventsCommitTimestamp(long eventsCommitTimestamp) {
_eventsCommitTimestamp = Optional.of(eventsCommitTimestamp);
}

/**
* Build the DatastreamProducerRecord.
* @return
* DatastreamProducerRecord that is created.
*/
public DatastreamProducerRecord build() {
return new DatastreamProducerRecord(_events, _partition, _partitionKey, _destination, _sourceCheckpoint,
_eventsSourceTimestamp, _isBroadcastRecord);
DatastreamProducerRecord record = new DatastreamProducerRecord(_events, _partition, _partitionKey, _destination,
_sourceCheckpoint, _eventsSourceTimestamp, _isBroadcastRecord);
_eventsCommitTimestamp.ifPresent(record::setEventsCommitTimestamp);
return record;
}

/**
Expand All @@ -119,6 +130,7 @@ public static DatastreamProducerRecord copyProducerRecord(DatastreamProducerReco
builder.setSourceCheckpoint(record.getCheckpoint());
builder.setEventsSourceTimestamp(record.getEventsSourceTimestamp());
builder.setIsBroadcastRecord(record.isBroadcastRecord());
record.getEventsCommitTimestamp().ifPresent(builder::setEventsCommitTimestamp);
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,47 @@ public void testBuilderThrowsWhenEventsTimestampMissing() {
builder.build();
}

@Test
public void testCommitTimestampAbsentByDefault() {
DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder();
builder.addEvent(createDatastreamEvent());
builder.setEventsSourceTimestamp(System.currentTimeMillis());
Assert.assertFalse(builder.build().getEventsCommitTimestamp().isPresent());
}

@Test
public void testCommitTimestampRoundTripsThroughBuilder() {
long commitTs = 1700000000000L;
DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder();
builder.addEvent(createDatastreamEvent());
builder.setEventsSourceTimestamp(System.currentTimeMillis());
builder.setEventsCommitTimestamp(commitTs);
DatastreamProducerRecord record = builder.build();
Assert.assertTrue(record.getEventsCommitTimestamp().isPresent());
Assert.assertEquals(record.getEventsCommitTimestamp().get().longValue(), commitTs);
}

@Test
public void testCopyProducerRecordPropagatesCommitTimestamp() {
long commitTs = 1700000000000L;
DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder();
builder.addEvent(createDatastreamEvent());
builder.setEventsSourceTimestamp(System.currentTimeMillis());
builder.setEventsCommitTimestamp(commitTs);
DatastreamProducerRecord original = builder.build();
DatastreamProducerRecord copy = DatastreamProducerRecordBuilder.copyProducerRecord(original, 0);
Assert.assertEquals(copy.getEventsCommitTimestamp().orElse(-1L).longValue(), commitTs);
}

@Test
public void testCopyProducerRecordWhenCommitTimestampAbsent() {
DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder();
builder.addEvent(createDatastreamEvent());
builder.setEventsSourceTimestamp(System.currentTimeMillis());
DatastreamProducerRecord copy = DatastreamProducerRecordBuilder.copyProducerRecord(builder.build(), 0);
Assert.assertFalse(copy.getEventsCommitTimestamp().isPresent());
}

@Test
public void testBuilderWithAddSerializedKeyValueEventAndValidateFields() {
DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public class EventProducer implements DatastreamEventProducer {
static final String EVENTS_LATENCY_MS_STRING = "eventsLatencyMs";
static final String EVENTS_LATENCY_MS_SLA_INELIGIBLE_STRING = "eventsLatencyMsSlaIneligible";
static final String EVENTS_SEND_LATENCY_MS_STRING = "eventsSendLatencyMs";
// Source DB commit timestamp -> destination ack latency. Distinct from eventsLatencyMs, which on Espresso/TiDB
// measures from the intermediate Kafka append time rather than the original DB commit.
static final String EVENTS_COMMIT_TO_ACK_LATENCY_MS_STRING = "eventsCommitToAckLatencyMs";
static final String EVENTS_COMMIT_TO_ACK_LATENCY_MS_SLA_INELIGIBLE_STRING = "eventsCommitToAckLatencyMsSlaIneligible";
static final String THROUGHPUT_VIOLATING_EVENTS_LATENCY_MS_STRING = "throughputViolatingEventsLatencyMs";
static final String THROUGHPUT_VIOLATING_EVENTS_SEND_LATENCY_MS_STRING = "throughputViolatingEventsSendLatencyMs";

Expand All @@ -97,12 +101,22 @@ public class EventProducer implements DatastreamEventProducer {
private static final String DEFAULT_NEW_STREAM_GRACE_PERIOD_MS = "0";
private static final String EVENTS_PRODUCED_OUTSIDE_SLA = "eventsProducedOutsideSla";
private static final String EVENTS_PRODUCED_OUTSIDE_ALTERNATE_SLA = "eventsProducedOutsideAlternateSla";
private static final String EVENTS_COMMIT_WITHIN_SLA = "eventsCommitWithinSla";
private static final String EVENTS_COMMIT_OUTSIDE_SLA = "eventsCommitOutsideSla";
private static final String EVENTS_COMMIT_WITHIN_ALTERNATE_SLA = "eventsCommitWithinAlternateSla";
private static final String EVENTS_COMMIT_OUTSIDE_ALTERNATE_SLA = "eventsCommitOutsideAlternateSla";
private static final String COMMIT_TO_ACK_THRESHOLD_SLA_MS = "commitToAckThresholdSlaMs";
private static final String COMMIT_TO_ACK_THRESHOLD_ALTERNATE_SLA_MS = "commitToAckThresholdAlternateSlaMs";
private static final String DROPPED_SENT_FROM_SERIALIZATION_ERROR = "droppedSentFromSerializationError";
static final String BYTES_PRODUCED_RATE = "bytesProducedRate";
private static final String AGGREGATE = "aggregate";

private static final String DEFAULT_AVAILABILITY_THRESHOLD_SLA_MS = "60000"; // 1 minute
private static final String DEFAULT_AVAILABILITY_THRESHOLD_ALTERNATE_SLA_MS = "180000"; // 3 minutes
// Commit-to-ack covers upstream CDC pipeline lag in addition to Brooklin-side transport, so defaults are wider
// than the source-to-ack thresholds above.
private static final String DEFAULT_COMMIT_TO_ACK_THRESHOLD_SLA_MS = "300000"; // 5 minutes
private static final String DEFAULT_COMMIT_TO_ACK_THRESHOLD_ALTERNATE_SLA_MS = "900000"; // 15 minutes
private static final String DEFAULT_WARN_LOG_LATENCY_ENABLED = "false";
private static final String DEFAULT_WARN_LOG_LATENCY_THRESHOLD_MS = "1500000000"; // 25000 minutes, ~17 days
private static final String DEFAULT_NUM_EVENTS_OUTSIDE_ALT_SLA_LOG_ENABLED = "false";
Expand All @@ -119,6 +133,10 @@ public class EventProducer implements DatastreamEventProducer {
private final int _availabilityThresholdSlaMs;
// Alternate SLA for comparison with the main SLA
private final int _availabilityThresholdAlternateSlaMs;
// Commit-to-ack (DB commit time -> destination ack) SLA thresholds. Used only when the connector supplies a
// commit timestamp on the producer record (CDC connectors).
private final int _commitToAckThresholdSlaMs;
private final int _commitToAckThresholdAlternateSlaMs;
// Grace period for newly created streams. While a stream is inside this window, primary/alternate
// SLA counters are suppressed and the latency histogram is redirected to eventsLatencyMsSlaIneligible.
private final long _newStreamGracePeriodMs;
Expand Down Expand Up @@ -196,6 +214,11 @@ public EventProducer(DatastreamTask task, TransportProvider transportProvider, C
_availabilityThresholdAlternateSlaMs = Integer.parseInt(
config.getProperty(AVAILABILITY_THRESHOLD_ALTERNATE_SLA_MS, DEFAULT_AVAILABILITY_THRESHOLD_ALTERNATE_SLA_MS));

_commitToAckThresholdSlaMs = Integer.parseInt(
config.getProperty(COMMIT_TO_ACK_THRESHOLD_SLA_MS, DEFAULT_COMMIT_TO_ACK_THRESHOLD_SLA_MS));
_commitToAckThresholdAlternateSlaMs = Integer.parseInt(
config.getProperty(COMMIT_TO_ACK_THRESHOLD_ALTERNATE_SLA_MS, DEFAULT_COMMIT_TO_ACK_THRESHOLD_ALTERNATE_SLA_MS));

_newStreamGracePeriodMs = Long.parseLong(
config.getProperty(NEW_STREAM_GRACE_PERIOD_MS, DEFAULT_NEW_STREAM_GRACE_PERIOD_MS));
_streamCreationTimeMs = parseStreamCreationTimeMs(task);
Expand Down Expand Up @@ -323,6 +346,8 @@ private DatastreamRecordMetadata helperSendOrBroadcast(DatastreamProducerRecord
record.setEventsSendTimestamp(System.currentTimeMillis());
long recordEventsSourceTimestamp = record.getEventsSourceTimestamp();
long recordEventsSendTimestamp = record.getEventsSendTimestamp().orElse(0L);
// Absent for non-CDC connectors and for CDC bootstrap/heartbeat paths; commit-to-ack metric is skipped when absent.
Optional<Long> recordEventsCommitTimestamp = record.getEventsCommitTimestamp();
final long numSerializedBytes = record.getEvents().stream()
.mapToLong(e -> {
long keySize = e.key().filter(k -> k instanceof byte[]).map(k -> (long) ((byte[]) k).length).orElse(0L);
Expand All @@ -333,7 +358,7 @@ private DatastreamRecordMetadata helperSendOrBroadcast(DatastreamProducerRecord
if (isBroadcast) {
broadcastMetadata = _transportProvider.broadcast(destination, record,
(metadata, exception) -> onSendCallback(metadata, exception, sendEventCallback, recordEventsSourceTimestamp,
recordEventsSendTimestamp, numSerializedBytes));
recordEventsSendTimestamp, numSerializedBytes, recordEventsCommitTimestamp));
_logger.debug("Broadcast completed with {}", broadcastMetadata);
if (broadcastMetadata.isMessageSerializationError()) {
_logger.warn("Broadcast of record {} to destination {} failed because of serialization error.",
Expand All @@ -342,7 +367,7 @@ private DatastreamRecordMetadata helperSendOrBroadcast(DatastreamProducerRecord
} else {
_transportProvider.send(destination, record,
(metadata, exception) -> onSendCallback(metadata, exception, sendEventCallback, recordEventsSourceTimestamp,
recordEventsSendTimestamp, numSerializedBytes));
recordEventsSendTimestamp, numSerializedBytes, recordEventsCommitTimestamp));
}
} catch (Exception e) {
String errorMessage = String.format("Failed to send the event %s exception %s", record, e);
Expand Down Expand Up @@ -470,7 +495,8 @@ private void performSlaRelatedLogging(DatastreamRecordMetadata metadata, long ev
* to avoid overcounting.
*/
private void reportMetrics(DatastreamRecordMetadata metadata, long eventsSourceTimestamp, long eventsSendTimestamp,
long numBytes) {
long numBytes, Optional<Long> eventsCommitTimestamp) {
reportCommitToAckMetrics(metadata, eventsCommitTimestamp);
// If per-topic metrics are enabled, use topic as key for metrics; else, use datastream name as the key
String datastreamName = getDatastreamName();

Expand Down Expand Up @@ -530,6 +556,35 @@ private void reportMetrics(DatastreamRecordMetadata metadata, long eventsSourceT
reportThroughputAttributionMetrics(numBytes);
}

/**
* Emit commit-to-ack (DB commit -> destination ack) latency histogram and SLA counters. No-op when the connector
* did not supply a commit timestamp (non-CDC sources, or CDC bootstrap/heartbeat paths). Gated by the same
* shouldEmitMetric() suppression as the existing source-to-ack SLA so the new metric inherits grace-period and
* per-datastream opt-out behavior.
*/
private void reportCommitToAckMetrics(DatastreamRecordMetadata metadata, Optional<Long> eventsCommitTimestamp) {
if (!eventsCommitTimestamp.isPresent()) {
return;
}
long commitTs = eventsCommitTimestamp.get();
if (commitTs <= 0) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: In reportCommitToAckMetrics, we guard commitTs <= 0 but not the case where the connector supplies a timestamp in the future. In that case System.currentTimeMillis() - commitTs becomes negative and we’ll emit a negative histogram value / classify SLA incorrectly. Just checking Do we need to add that check ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commitTS cannot be in the future since the event has already happened right.

return;
}
long commitToAckLatencyMs = System.currentTimeMillis() - commitTs;
String topicOrDatastreamName = _enablePerTopicMetrics ? metadata.getTopic() : getDatastreamName();
String latencyMetricName = shouldEmitMetric()
? EVENTS_COMMIT_TO_ACK_LATENCY_MS_STRING
: EVENTS_COMMIT_TO_ACK_LATENCY_MS_SLA_INELIGIBLE_STRING;
reportEventLatencyMetrics(topicOrDatastreamName, metadata, commitToAckLatencyMs, latencyMetricName);

if (shouldEmitMetric()) {
reportSLAMetrics(topicOrDatastreamName, commitToAckLatencyMs <= _commitToAckThresholdSlaMs,
EVENTS_COMMIT_WITHIN_SLA, EVENTS_COMMIT_OUTSIDE_SLA);
reportSLAMetrics(topicOrDatastreamName, commitToAckLatencyMs <= _commitToAckThresholdAlternateSlaMs,
EVENTS_COMMIT_WITHIN_ALTERNATE_SLA, EVENTS_COMMIT_OUTSIDE_ALTERNATE_SLA);
}
}

/**
* Only for the throughput violating topics!
* <br>
Expand Down Expand Up @@ -608,7 +663,7 @@ private void reportSendLatencyMetrics(DatastreamRecordMetadata metadata, long se
}

private void onSendCallback(DatastreamRecordMetadata metadata, Exception exception, SendCallback sendCallback,
long eventSourceTimestamp, long eventSendTimestamp, long numBytes) {
long eventSourceTimestamp, long eventSendTimestamp, long numBytes, Optional<Long> eventCommitTimestamp) {

SendFailedException sendFailedException = null;

Expand All @@ -623,7 +678,7 @@ private void onSendCallback(DatastreamRecordMetadata metadata, Exception excepti
if (_throughputViolatingTopicsProvider.apply(_datastreamTask).contains(metadata.getUndecoratedTopic())) {
reportMetricsForThroughputViolatingTopics(metadata, eventSourceTimestamp, eventSendTimestamp, numBytes);
} else {
reportMetrics(metadata, eventSourceTimestamp, eventSendTimestamp, numBytes);
reportMetrics(metadata, eventSourceTimestamp, eventSendTimestamp, numBytes, eventCommitTimestamp);
}
}
} catch (Exception e) {
Expand Down
Loading
Loading