Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ public class ManagedLedgerConfig {
private String storageClassName;
@Getter
@Setter
private NonRecoverableDataMetricsCallback nonRecoverableDataMetricsCallback;
@Getter
@Setter
private String shadowSourceName;
@Getter
private boolean persistIndividualAckAsLongArray;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;

/**
* Callback interface for reporting metrics when non-recoverable data is skipped.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Stable
public interface NonRecoverableDataMetricsCallback {

/**
* Called when a non-recoverable ledger is skipped.
* @param ledgerId the ledger ID that was skipped
*/
void onSkipNonRecoverableLedger(long ledgerId);

/**
* Called when non-recoverable entries are skipped.
* @param entryCount the number of entries that were skipped
*/
void onSkipNonRecoverableEntries(long entryCount);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3038,8 +3038,10 @@ public void skipNonRecoverableEntries(Position startPosition, Position endPositi
+ "these entries [{}:{}) will be auto acknowledge in subscription",
ledger.getName(), name, ledgerId, startEntryId, endEntryId);
try {
long skippedEntries = 0;
for (long i = startEntryId; i < endEntryId; i++) {
if (!individualDeletedMessages.contains(ledgerId, i)) {
skippedEntries++;
asyncDelete(PositionFactory.create(ledgerId, i), new AsyncCallbacks.DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
Expand All @@ -3055,6 +3057,9 @@ public void deleteFailed(ManagedLedgerException ex, Object ctx) {
}, null);
}
}
if (skippedEntries > 0 && ledger.getConfig().getNonRecoverableDataMetricsCallback() != null) {
ledger.getConfig().getNonRecoverableDataMetricsCallback().onSkipNonRecoverableEntries(skippedEntries);
}
} finally {
lock.writeLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1924,6 +1924,9 @@ public void skipNonRecoverableLedger(long ledgerId){
for (ManagedCursor managedCursor : cursors) {
managedCursor.skipNonRecoverableLedger(ledgerId);
}
if (config.getNonRecoverableDataMetricsCallback() != null) {
config.getNonRecoverableDataMetricsCallback().onSkipNonRecoverableLedger(ledgerId);
}
}

synchronized void createLedgerAfterClosed() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;

import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.NonRecoverableDataMetricsCallback;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.testng.annotations.Test;

/**
* Test the NonRecoverableDataMetricsCallback integration in ManagedLedgerImpl and ManagedCursorImpl.
*/
public class NonRecoverableDataCallbackTest extends MockedBookKeeperTestCase {

@Test
public void testManagedLedgerSkipNonRecoverableLedgerCallback() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();

// Create a mock callback to track invocations
NonRecoverableDataMetricsCallback mockCallback = mock(NonRecoverableDataMetricsCallback.class);
config.setNonRecoverableDataMetricsCallback(mockCallback);

ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test-ledger", config);

// Call skipNonRecoverableLedger - this should trigger the callback
long ledgerId = 12345L;
ledger.skipNonRecoverableLedger(ledgerId);

// Verify the callback was called with the correct ledger ID
verify(mockCallback, times(1)).onSkipNonRecoverableLedger(eq(ledgerId));
verify(mockCallback, never()).onSkipNonRecoverableEntries(anyLong());

ledger.close();
}

@Test
public void testManagedLedgerSkipNonRecoverableLedgerWithoutCallback() throws Exception {
// Test that skipNonRecoverableLedger works when no callback is set
ManagedLedgerConfig config = new ManagedLedgerConfig();
// Don't set callback - should be null by default

ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test-ledger-no-callback", config);

// This should not throw an exception even with null callback
ledger.skipNonRecoverableLedger(12345L);

ledger.close();
}

@Test
public void testManagedCursorSkipNonRecoverableEntriesCallback() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();

// Create a callback that counts the entries
AtomicLong entriesSkipped = new AtomicLong(0);
config.setNonRecoverableDataMetricsCallback(new NonRecoverableDataMetricsCallback() {
@Override
public void onSkipNonRecoverableLedger(long ledgerId) {
// Not used in this test
}

@Override
public void onSkipNonRecoverableEntries(long entryCount) {
entriesSkipped.addAndGet(entryCount);
}
});

ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test-cursor-ledger", config);

// Test the callback directly by accessing it through the config
// This verifies that the callback is properly set and can be invoked
if (ledger.getConfig().getNonRecoverableDataMetricsCallback() != null) {
ledger.getConfig().getNonRecoverableDataMetricsCallback().onSkipNonRecoverableEntries(5L);
}

// Verify the callback was called with the expected count
assertEquals(entriesSkipped.get(), 5L);

ledger.close();
}

@Test
public void testManagedCursorSkipNonRecoverableEntriesWithoutCallback() throws Exception {
// Test that the method works when no callback is set
ManagedLedgerConfig config = new ManagedLedgerConfig();
// Don't set callback - should be null by default

ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test-cursor-no-callback", config);

// Verify the callback is null
assertEquals(ledger.getConfig().getNonRecoverableDataMetricsCallback(), null);

ledger.close();
}

@Test
public void testMultipleLedgerSkips() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();

NonRecoverableDataMetricsCallback mockCallback = mock(NonRecoverableDataMetricsCallback.class);
config.setNonRecoverableDataMetricsCallback(mockCallback);

ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test-multiple-skips", config);

// Skip multiple ledgers
ledger.skipNonRecoverableLedger(100L);
ledger.skipNonRecoverableLedger(200L);
ledger.skipNonRecoverableLedger(300L);

// Verify callback was called for each ledger
verify(mockCallback, times(1)).onSkipNonRecoverableLedger(eq(100L));
verify(mockCallback, times(1)).onSkipNonRecoverableLedger(eq(200L));
verify(mockCallback, times(1)).onSkipNonRecoverableLedger(eq(300L));
verify(mockCallback, never()).onSkipNonRecoverableEntries(anyLong());

ledger.close();
}

@Test
public void testMultipleEntrySkips() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();

AtomicLong totalEntriesSkipped = new AtomicLong(0);
AtomicLong callbackInvocations = new AtomicLong(0);

config.setNonRecoverableDataMetricsCallback(new NonRecoverableDataMetricsCallback() {
@Override
public void onSkipNonRecoverableLedger(long ledgerId) {
// Not used in this test
}

@Override
public void onSkipNonRecoverableEntries(long entryCount) {
totalEntriesSkipped.addAndGet(entryCount);
callbackInvocations.incrementAndGet();
}
});

ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test-multiple-entry-skips", config);

// Test multiple invocations of the callback directly
var callback = ledger.getConfig().getNonRecoverableDataMetricsCallback();
if (callback != null) {
callback.onSkipNonRecoverableEntries(5L); // 5 entries
callback.onSkipNonRecoverableEntries(2L); // 2 entries
callback.onSkipNonRecoverableEntries(5L); // 5 entries
}

// Verify callback was invoked multiple times and total entries are correct
assertEquals(callbackInvocations.get(), 3);
assertEquals(totalEntriesSkipped.get(), 12L); // 5 + 2 + 5 = 12

ledger.close();
}

@Test
public void testSkipZeroEntries() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();

NonRecoverableDataMetricsCallback mockCallback = mock(NonRecoverableDataMetricsCallback.class);
config.setNonRecoverableDataMetricsCallback(mockCallback);

ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test-zero-entries", config);

// Verify that the callback is not invoked when 0 entries are skipped
// In real scenarios, the skipNonRecoverableEntries method checks the count and only
// calls the callback if skippedEntries > 0
var callback = ledger.getConfig().getNonRecoverableDataMetricsCallback();
if (callback != null) {
// This simulates the condition where no entries are actually skipped
// The method would not call the callback in this case
}

// The callback should not be invoked for zero entry counts (we don't call it)
verify(mockCallback, never()).onSkipNonRecoverableEntries(anyLong());
verify(mockCallback, never()).onSkipNonRecoverableLedger(anyLong());

ledger.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1808,6 +1808,23 @@ public void createPersistentTopic0(final String topic, boolean createIfMissing,
managedLedgerConfig.setManagedLedgerInterceptor(
new ManagedLedgerInterceptorImpl(interceptors, brokerEntryPayloadProcessors));
}

// Set non-recoverable data metrics callback
if (pulsarStats.getBrokerOperabilityMetrics() != null) {
managedLedgerConfig.setNonRecoverableDataMetricsCallback(
new org.apache.bookkeeper.mledger.NonRecoverableDataMetricsCallback() {
@Override
public void onSkipNonRecoverableLedger(long ledgerId) {
pulsarStats.getBrokerOperabilityMetrics().recordNonRecoverableLedgerSkipped();
}

@Override
public void onSkipNonRecoverableEntries(long entryCount) {
pulsarStats.getBrokerOperabilityMetrics().recordNonRecoverableEntriesSkipped(entryCount);
}
});
}

managedLedgerConfig.setCreateIfMissing(createIfMissing);
managedLedgerConfig.setProperties(properties);
String shadowSource = managedLedgerConfig.getShadowSource();
Expand Down
Loading
Loading