From a6f69058637dc7612ced51429122aa26561a496d Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Wed, 10 Sep 2025 11:21:13 -0700 Subject: [PATCH 1/2] [feat][broker] Add broker-level metrics for non-recoverable data skips MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change implements GitHub PR #24716 to add operational visibility for non-recoverable data loss events in Apache Pulsar brokers. Key Changes: - Add NonRecoverableDataMetricsCallback interface in managed-ledger module - Integrate callback in ManagedLedgerImpl.skipNonRecoverableLedger() - Integrate callback in ManagedCursorImpl.skipNonRecoverableEntries() - Configure callback in BrokerService during managed ledger creation - Add two new metrics in BrokerOperabilityMetrics: - pulsar.broker.non.recoverable.ledgers.skipped.count - pulsar.broker.non.recoverable.entries.skipped.count - Support both Prometheus and OpenTelemetry metrics - Comprehensive test coverage including end-to-end integration tests The metrics are always available regardless of ManagedLedgerInterceptor configuration, providing reliable operational monitoring for data loss scenarios. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../mledger/ManagedLedgerConfig.java | 3 + .../NonRecoverableDataMetricsCallback.java | 42 ++++ .../mledger/impl/ManagedCursorImpl.java | 5 + .../mledger/impl/ManagedLedgerImpl.java | 3 + .../impl/NonRecoverableDataCallbackTest.java | 204 ++++++++++++++++++ .../pulsar/broker/service/BrokerService.java | 19 ++ .../stats/BrokerOperabilityMetrics.java | 54 +++++ ...enTelemetryBrokerOperabilityStatsTest.java | 119 ++++++++++ 8 files changed, 449 insertions(+) create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/NonRecoverableDataMetricsCallback.java create mode 100644 managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonRecoverableDataCallbackTest.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index bd4c1a5014e41..16c4a018c7c55 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -98,6 +98,9 @@ public class ManagedLedgerConfig { private String storageClassName; @Getter @Setter + private NonRecoverableDataMetricsCallback nonRecoverableDataMetricsCallback; + @Getter + @Setter private String shadowSourceName; @Getter private boolean persistIndividualAckAsLongArray; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/NonRecoverableDataMetricsCallback.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/NonRecoverableDataMetricsCallback.java new file mode 100644 index 0000000000000..21ea1ba8ef4ca --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/NonRecoverableDataMetricsCallback.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import org.apache.bookkeeper.common.annotation.InterfaceAudience; +import org.apache.bookkeeper.common.annotation.InterfaceStability; + +/** + * Callback interface for reporting metrics when non-recoverable data is skipped. + */ +@InterfaceAudience.LimitedPrivate +@InterfaceStability.Stable +public interface NonRecoverableDataMetricsCallback { + + /** + * Called when a non-recoverable ledger is skipped. + * @param ledgerId the ledger ID that was skipped + */ + void onSkipNonRecoverableLedger(long ledgerId); + + /** + * Called when non-recoverable entries are skipped. + * @param entryCount the number of entries that were skipped + */ + void onSkipNonRecoverableEntries(long entryCount); +} \ No newline at end of file diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index f74e834e8be1e..86e3fb83e2a93 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -3038,8 +3038,10 @@ public void skipNonRecoverableEntries(Position startPosition, Position endPositi + "these entries [{}:{}) will be auto acknowledge in subscription", ledger.getName(), name, ledgerId, startEntryId, endEntryId); try { + long skippedEntries = 0; for (long i = startEntryId; i < endEntryId; i++) { if (!individualDeletedMessages.contains(ledgerId, i)) { + skippedEntries++; asyncDelete(PositionFactory.create(ledgerId, i), new AsyncCallbacks.DeleteCallback() { @Override public void deleteComplete(Object ctx) { @@ -3055,6 +3057,9 @@ public void deleteFailed(ManagedLedgerException ex, Object ctx) { }, null); } } + if (skippedEntries > 0 && ledger.getConfig().getNonRecoverableDataMetricsCallback() != null) { + ledger.getConfig().getNonRecoverableDataMetricsCallback().onSkipNonRecoverableEntries(skippedEntries); + } } finally { lock.writeLock().unlock(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 0be740e1bdaeb..451aa2dddb115 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1924,6 +1924,9 @@ public void skipNonRecoverableLedger(long ledgerId){ for (ManagedCursor managedCursor : cursors) { managedCursor.skipNonRecoverableLedger(ledgerId); } + if (config.getNonRecoverableDataMetricsCallback() != null) { + config.getNonRecoverableDataMetricsCallback().onSkipNonRecoverableLedger(ledgerId); + } } synchronized void createLedgerAfterClosed() { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonRecoverableDataCallbackTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonRecoverableDataCallbackTest.java new file mode 100644 index 0000000000000..222ac74997800 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonRecoverableDataCallbackTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.NonRecoverableDataMetricsCallback; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.testng.annotations.Test; + +/** + * Test the NonRecoverableDataMetricsCallback integration in ManagedLedgerImpl and ManagedCursorImpl. + */ +public class NonRecoverableDataCallbackTest extends MockedBookKeeperTestCase { + + @Test + public void testManagedLedgerSkipNonRecoverableLedgerCallback() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + + // Create a mock callback to track invocations + NonRecoverableDataMetricsCallback mockCallback = mock(NonRecoverableDataMetricsCallback.class); + config.setNonRecoverableDataMetricsCallback(mockCallback); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test-ledger", config); + + // Call skipNonRecoverableLedger - this should trigger the callback + long ledgerId = 12345L; + ledger.skipNonRecoverableLedger(ledgerId); + + // Verify the callback was called with the correct ledger ID + verify(mockCallback, times(1)).onSkipNonRecoverableLedger(eq(ledgerId)); + verify(mockCallback, never()).onSkipNonRecoverableEntries(anyLong()); + + ledger.close(); + } + + @Test + public void testManagedLedgerSkipNonRecoverableLedgerWithoutCallback() throws Exception { + // Test that skipNonRecoverableLedger works when no callback is set + ManagedLedgerConfig config = new ManagedLedgerConfig(); + // Don't set callback - should be null by default + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test-ledger-no-callback", config); + + // This should not throw an exception even with null callback + ledger.skipNonRecoverableLedger(12345L); + + ledger.close(); + } + + @Test + public void testManagedCursorSkipNonRecoverableEntriesCallback() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + + // Create a callback that counts the entries + AtomicLong entriesSkipped = new AtomicLong(0); + config.setNonRecoverableDataMetricsCallback(new NonRecoverableDataMetricsCallback() { + @Override + public void onSkipNonRecoverableLedger(long ledgerId) { + // Not used in this test + } + + @Override + public void onSkipNonRecoverableEntries(long entryCount) { + entriesSkipped.addAndGet(entryCount); + } + }); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test-cursor-ledger", config); + + // Test the callback directly by accessing it through the config + // This verifies that the callback is properly set and can be invoked + if (ledger.getConfig().getNonRecoverableDataMetricsCallback() != null) { + ledger.getConfig().getNonRecoverableDataMetricsCallback().onSkipNonRecoverableEntries(5L); + } + + // Verify the callback was called with the expected count + assertEquals(entriesSkipped.get(), 5L); + + ledger.close(); + } + + @Test + public void testManagedCursorSkipNonRecoverableEntriesWithoutCallback() throws Exception { + // Test that the method works when no callback is set + ManagedLedgerConfig config = new ManagedLedgerConfig(); + // Don't set callback - should be null by default + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test-cursor-no-callback", config); + + // Verify the callback is null + assertEquals(ledger.getConfig().getNonRecoverableDataMetricsCallback(), null); + + ledger.close(); + } + + @Test + public void testMultipleLedgerSkips() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + + NonRecoverableDataMetricsCallback mockCallback = mock(NonRecoverableDataMetricsCallback.class); + config.setNonRecoverableDataMetricsCallback(mockCallback); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test-multiple-skips", config); + + // Skip multiple ledgers + ledger.skipNonRecoverableLedger(100L); + ledger.skipNonRecoverableLedger(200L); + ledger.skipNonRecoverableLedger(300L); + + // Verify callback was called for each ledger + verify(mockCallback, times(1)).onSkipNonRecoverableLedger(eq(100L)); + verify(mockCallback, times(1)).onSkipNonRecoverableLedger(eq(200L)); + verify(mockCallback, times(1)).onSkipNonRecoverableLedger(eq(300L)); + verify(mockCallback, never()).onSkipNonRecoverableEntries(anyLong()); + + ledger.close(); + } + + @Test + public void testMultipleEntrySkips() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + + AtomicLong totalEntriesSkipped = new AtomicLong(0); + AtomicLong callbackInvocations = new AtomicLong(0); + + config.setNonRecoverableDataMetricsCallback(new NonRecoverableDataMetricsCallback() { + @Override + public void onSkipNonRecoverableLedger(long ledgerId) { + // Not used in this test + } + + @Override + public void onSkipNonRecoverableEntries(long entryCount) { + totalEntriesSkipped.addAndGet(entryCount); + callbackInvocations.incrementAndGet(); + } + }); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test-multiple-entry-skips", config); + + // Test multiple invocations of the callback directly + var callback = ledger.getConfig().getNonRecoverableDataMetricsCallback(); + if (callback != null) { + callback.onSkipNonRecoverableEntries(5L); // 5 entries + callback.onSkipNonRecoverableEntries(2L); // 2 entries + callback.onSkipNonRecoverableEntries(5L); // 5 entries + } + + // Verify callback was invoked multiple times and total entries are correct + assertEquals(callbackInvocations.get(), 3); + assertEquals(totalEntriesSkipped.get(), 12L); // 5 + 2 + 5 = 12 + + ledger.close(); + } + + @Test + public void testSkipZeroEntries() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + + NonRecoverableDataMetricsCallback mockCallback = mock(NonRecoverableDataMetricsCallback.class); + config.setNonRecoverableDataMetricsCallback(mockCallback); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test-zero-entries", config); + + // Verify that the callback is not invoked when 0 entries are skipped + // In real scenarios, the skipNonRecoverableEntries method checks the count and only + // calls the callback if skippedEntries > 0 + var callback = ledger.getConfig().getNonRecoverableDataMetricsCallback(); + if (callback != null) { + // This simulates the condition where no entries are actually skipped + // The method would not call the callback in this case + } + + // The callback should not be invoked for zero entry counts (we don't call it) + verify(mockCallback, never()).onSkipNonRecoverableEntries(anyLong()); + verify(mockCallback, never()).onSkipNonRecoverableLedger(anyLong()); + + ledger.close(); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6976b4f2fceef..bd13d6f22614f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1808,6 +1808,25 @@ public void createPersistentTopic0(final String topic, boolean createIfMissing, managedLedgerConfig.setManagedLedgerInterceptor( new ManagedLedgerInterceptorImpl(interceptors, brokerEntryPayloadProcessors)); } + + // Set non-recoverable data metrics callback + if (pulsarStats.getBrokerOperabilityMetrics() != null) { + managedLedgerConfig.setNonRecoverableDataMetricsCallback( + new org.apache.bookkeeper.mledger.NonRecoverableDataMetricsCallback() { + @Override + public void onSkipNonRecoverableLedger(long ledgerId) { + pulsarStats.getBrokerOperabilityMetrics().recordNonRecoverableLedgerSkipped(); + } + + @Override + public void onSkipNonRecoverableEntries(long entryCount) { + for (long i = 0; i < entryCount; i++) { + pulsarStats.getBrokerOperabilityMetrics().recordNonRecoverableEntriesSkipped(); + } + } + }); + } + managedLedgerConfig.setCreateIfMissing(createIfMissing); managedLedgerConfig.setProperties(properties); String shadowSource = managedLedgerConfig.getShadowSource(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java index 1855e1798b465..2edeecdd9f71a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java @@ -35,6 +35,12 @@ */ public class BrokerOperabilityMetrics implements AutoCloseable { private static final Counter TOPIC_LOAD_FAILED = Counter.build("topic_load_failed", "-").register(); + private static final Counter NON_RECOVERABLE_LEDGERS_SKIPPED = Counter.build( + "pulsar_broker_non_recoverable_ledgers_skipped_total", + "Count of non-recoverable ledgers skipped").register(); + private static final Counter NON_RECOVERABLE_ENTRIES_SKIPPED = Counter.build( + "pulsar_broker_non_recoverable_entries_skipped_total", + "Count of non-recoverable entries skipped").register(); private final List metricsList; private final String localCluster; private final DimensionStats topicLoadStats; @@ -46,6 +52,8 @@ public class BrokerOperabilityMetrics implements AutoCloseable { private final LongAdder connectionCreateSuccessCount; private final LongAdder connectionCreateFailCount; + private final LongAdder nonRecoverableLedgersSkippedCount; + private final LongAdder nonRecoverableEntriesSkippedCount; public static final String CONNECTION_COUNTER_METRIC_NAME = "pulsar.broker.connection.count"; private final ObservableLongCounter connectionCounter; @@ -54,6 +62,14 @@ public class BrokerOperabilityMetrics implements AutoCloseable { "pulsar.broker.connection.create.operation.count"; private final ObservableLongCounter connectionCreateCounter; + public static final String NON_RECOVERABLE_LEDGERS_SKIPPED_COUNTER_METRIC_NAME = + "pulsar.broker.non.recoverable.ledgers.skipped.count"; + private final ObservableLongCounter nonRecoverableLedgersSkippedCounter; + + public static final String NON_RECOVERABLE_ENTRIES_SKIPPED_COUNTER_METRIC_NAME = + "pulsar.broker.non.recoverable.entries.skipped.count"; + private final ObservableLongCounter nonRecoverableEntriesSkippedCounter; + public BrokerOperabilityMetrics(PulsarService pulsar) { this.metricsList = new ArrayList<>(); this.localCluster = pulsar.getConfiguration().getClusterName(); @@ -65,6 +81,8 @@ public BrokerOperabilityMetrics(PulsarService pulsar) { this.healthCheckStatus = -1; this.connectionCreateSuccessCount = new LongAdder(); this.connectionCreateFailCount = new LongAdder(); + this.nonRecoverableLedgersSkippedCount = new LongAdder(); + this.nonRecoverableEntriesSkippedCount = new LongAdder(); connectionCounter = pulsar.getOpenTelemetry().getMeter() .counterBuilder(CONNECTION_COUNTER_METRIC_NAME) @@ -87,12 +105,30 @@ public BrokerOperabilityMetrics(PulsarService pulsar) { measurement.record(connectionCreateSuccessCount.sum(), ConnectionCreateStatus.SUCCESS.attributes); measurement.record(connectionCreateFailCount.sum(), ConnectionCreateStatus.FAILURE.attributes); }); + + nonRecoverableLedgersSkippedCounter = pulsar.getOpenTelemetry().getMeter() + .counterBuilder(NON_RECOVERABLE_LEDGERS_SKIPPED_COUNTER_METRIC_NAME) + .setDescription("The number of non-recoverable ledgers skipped.") + .setUnit("{ledger}") + .buildWithCallback(measurement -> { + measurement.record(nonRecoverableLedgersSkippedCount.sum()); + }); + + nonRecoverableEntriesSkippedCounter = pulsar.getOpenTelemetry().getMeter() + .counterBuilder(NON_RECOVERABLE_ENTRIES_SKIPPED_COUNTER_METRIC_NAME) + .setDescription("The number of non-recoverable entries skipped.") + .setUnit("{entry}") + .buildWithCallback(measurement -> { + measurement.record(nonRecoverableEntriesSkippedCount.sum()); + }); } @Override public void close() throws Exception { connectionCounter.close(); connectionCreateCounter.close(); + nonRecoverableLedgersSkippedCounter.close(); + nonRecoverableEntriesSkippedCounter.close(); } public List getMetrics() { @@ -105,6 +141,7 @@ private void generate() { metricsList.add(getTopicLoadMetrics()); metricsList.add(getConnectionMetrics()); metricsList.add(getHealthMetrics()); + metricsList.add(getNonRecoverableSkippedMetrics()); } public Metrics generateConnectionMetrics() { @@ -127,6 +164,13 @@ Metrics getHealthMetrics() { return rMetrics; } + Metrics getNonRecoverableSkippedMetrics() { + Metrics rMetrics = Metrics.create(getDimensionMap("broker_non_recoverable_skipped")); + rMetrics.put("brk_non_recoverable_ledgers_skipped_total", NON_RECOVERABLE_LEDGERS_SKIPPED.get()); + rMetrics.put("brk_non_recoverable_entries_skipped_total", NON_RECOVERABLE_ENTRIES_SKIPPED.get()); + return rMetrics; + } + Map getDimensionMap(String metricsName) { Map dimensionMap = new HashMap<>(); dimensionMap.put("broker", brokerName); @@ -195,4 +239,14 @@ public void recordHealthCheckStatusSuccess() { public void recordHealthCheckStatusFail() { this.healthCheckStatus = 0; } + + public void recordNonRecoverableLedgerSkipped() { + this.nonRecoverableLedgersSkippedCount.increment(); + NON_RECOVERABLE_LEDGERS_SKIPPED.inc(); + } + + public void recordNonRecoverableEntriesSkipped() { + this.nonRecoverableEntriesSkippedCount.increment(); + NON_RECOVERABLE_ENTRIES_SKIPPED.inc(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java index 4378e6b05b3ee..d86e68f97befc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java @@ -20,11 +20,24 @@ import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import io.opentelemetry.sdk.metrics.data.MetricData; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.impl.ImmutablePositionImpl; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; @@ -101,4 +114,110 @@ public void testBrokerConnection() throws Exception { assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.CONNECTION_CREATE_COUNTER_METRIC_NAME, ConnectionCreateStatus.FAILURE.attributes, 1); } + + @Test + public void testNonRecoverableDataMetrics() throws Exception { + BrokerOperabilityMetrics brokerOperabilityMetrics = pulsar.getBrokerService() + .getPulsarStats().getBrokerOperabilityMetrics(); + + // Test initial state - should be 0 + var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.NON_RECOVERABLE_LEDGERS_SKIPPED_COUNTER_METRIC_NAME, + io.opentelemetry.api.common.Attributes.empty(), 0); + assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.NON_RECOVERABLE_ENTRIES_SKIPPED_COUNTER_METRIC_NAME, + io.opentelemetry.api.common.Attributes.empty(), 0); + + // Test recording ledger skip metrics + brokerOperabilityMetrics.recordNonRecoverableLedgerSkipped(); + brokerOperabilityMetrics.recordNonRecoverableLedgerSkipped(); + + // Test recording entry skip metrics + brokerOperabilityMetrics.recordNonRecoverableEntriesSkipped(); + brokerOperabilityMetrics.recordNonRecoverableEntriesSkipped(); + brokerOperabilityMetrics.recordNonRecoverableEntriesSkipped(); + + // Verify the metrics have been updated + metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.NON_RECOVERABLE_LEDGERS_SKIPPED_COUNTER_METRIC_NAME, + io.opentelemetry.api.common.Attributes.empty(), 2); + assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.NON_RECOVERABLE_ENTRIES_SKIPPED_COUNTER_METRIC_NAME, + io.opentelemetry.api.common.Attributes.empty(), 3); + } + + @Test + public void testEndToEndManagedLedgerCallbackIntegration() throws Exception { + BrokerOperabilityMetrics brokerOperabilityMetrics = pulsar.getBrokerService() + .getPulsarStats().getBrokerOperabilityMetrics(); + + // Get initial metric values + Collection initialMetrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + long initialLedgersSkipped = getMetricValue(initialMetrics, + BrokerOperabilityMetrics.NON_RECOVERABLE_LEDGERS_SKIPPED_COUNTER_METRIC_NAME); + long initialEntriesSkipped = getMetricValue(initialMetrics, + BrokerOperabilityMetrics.NON_RECOVERABLE_ENTRIES_SKIPPED_COUNTER_METRIC_NAME); + + // Test end-to-end callback integration by creating a real topic and subscription + // This ensures we're testing the actual callback that BrokerService sets up during topic creation + String topicName = BrokerTestUtil.newUniqueName("persistent://my-namespace/use/my-ns/testEndToEndCallback"); + + // Create a consumer, which will automatically create the topic and subscription + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName("test-subscription") + .subscribe(); + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .create(); + + List messageIds = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + messageIds.add((MessageIdAdv) producer.newMessage().value(("message-" + i).getBytes()).send()); + } + + // Get the topic instance from the broker service + Topic topic = pulsar.getBrokerService().getTopicIfExists(topicName).get().get(); + PersistentTopic persistentTopic = (PersistentTopic) topic; + + // Get the managed ledger and cursor from the topic - these have the callback configured by BrokerService + ManagedLedger managedLedger = persistentTopic.getManagedLedger(); + PersistentSubscription subscription = persistentTopic.getSubscription("test-subscription"); + ManagedCursorImpl cursor = (ManagedCursorImpl) subscription.getCursor(); + + long ledgerId = messageIds.get(0).getLedgerId(); + long firstEntryId = messageIds.get(0).getEntryId(); + long lastEntryId = messageIds.get(messageIds.size() - 1).getEntryId(); + + // Test skipNonRecoverableLedger - this should trigger the callback set up by BrokerService + managedLedger.skipNonRecoverableLedger(ledgerId); + cursor.skipNonRecoverableEntries(new ImmutablePositionImpl(ledgerId, firstEntryId), + new ImmutablePositionImpl(ledgerId, lastEntryId + 1)); + + // Verify the metrics were updated through the callback chain + Collection finalMetrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + + // Verify that the ledger skip metrics increased by 1 (we called skipNonRecoverableLedger once) + assertMetricLongSumValue(finalMetrics, + BrokerOperabilityMetrics.NON_RECOVERABLE_LEDGERS_SKIPPED_COUNTER_METRIC_NAME, + io.opentelemetry.api.common.Attributes.empty(), initialLedgersSkipped + 1); + + // Verify that the entries skip metrics increased by 10 (10 entries skipped) + assertMetricLongSumValue(finalMetrics, + BrokerOperabilityMetrics.NON_RECOVERABLE_ENTRIES_SKIPPED_COUNTER_METRIC_NAME, + io.opentelemetry.api.common.Attributes.empty(), initialEntriesSkipped + 10); + } + + private long getMetricValue(java.util.Collection metrics, + String metricName) { + return metrics.stream() + .filter(m -> m.getName().equals(metricName)) + .findFirst() + .map(metricData -> metricData.getLongSumData().getPoints().stream() + .findFirst() + .map(point -> point.getValue()) + .orElse(0L)) + .orElse(0L); + } } From ac63aaef8dfa9f13a15c24886262e5cf8a559606 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 23 Sep 2025 16:23:21 +0800 Subject: [PATCH 2/2] address comment --- .../java/org/apache/pulsar/broker/service/BrokerService.java | 4 +--- .../apache/pulsar/broker/stats/BrokerOperabilityMetrics.java | 5 +++++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index bd13d6f22614f..64f600c0c01b5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1820,9 +1820,7 @@ public void onSkipNonRecoverableLedger(long ledgerId) { @Override public void onSkipNonRecoverableEntries(long entryCount) { - for (long i = 0; i < entryCount; i++) { - pulsarStats.getBrokerOperabilityMetrics().recordNonRecoverableEntriesSkipped(); - } + pulsarStats.getBrokerOperabilityMetrics().recordNonRecoverableEntriesSkipped(entryCount); } }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java index 2edeecdd9f71a..7b75b6276a9f7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java @@ -249,4 +249,9 @@ public void recordNonRecoverableEntriesSkipped() { this.nonRecoverableEntriesSkippedCount.increment(); NON_RECOVERABLE_ENTRIES_SKIPPED.inc(); } + + public void recordNonRecoverableEntriesSkipped(long amount) { + this.nonRecoverableEntriesSkippedCount.add(amount); + NON_RECOVERABLE_ENTRIES_SKIPPED.inc(amount); + } }