Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.AbstractSubscription;
import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
import org.apache.pulsar.broker.service.BrokerServiceException;
Expand Down Expand Up @@ -471,6 +472,23 @@ public CompletableFuture<Void> acknowledgeMessageAsync(List<Position> positions,
.attr("position", position)
.log("Cumulative ack on");
AckCallback callback = new AckCallback(previousMarkDeletePosition, future);
if (dispatcher instanceof AbstractDispatcherSingleActiveConsumer singleConsumerDispatcher) {
// For compacted consumer, we should ignore the position that does not exist in the managed ledger,
// otherwise, the `asyncMarkDelete` call could jump the read position to the active ledger, which will
// skip all entries present in the compacted ledger but not present in the managed ledger.
final var consumer = singleConsumerDispatcher.getActiveConsumer();
final var ml = cursor.getManagedLedger();
if (consumer != null
&& consumer.readCompacted()
&& !cursor.isDurable()
&& ml.getOptionalLedgerInfo(position.getLedgerId()).isEmpty()) {
if (ml.getFirstPosition() == null || position.getLedgerId() > ml.getFirstPosition().getLedgerId()) {
log.warn("Received an ACK whose position is " + position + ", valid ledgers: "
+ ml.getLedgersInfo().keySet());
}
return CompletableFuture.completedFuture(null);
}
Comment thread
BewareMyPower marked this conversation as resolved.
}
cursor.asyncMarkDelete(position, mergeCursorProperties(properties),
callback, callback);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
Expand All @@ -70,12 +71,15 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.intercept.MockBrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand All @@ -87,6 +91,7 @@
import org.apache.pulsar.client.api.EncryptionKeyInfo;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
Expand Down Expand Up @@ -124,6 +129,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
protected ScheduledExecutorService compactionScheduler;
protected BookKeeper bk;
private PublishingOrderCompactor compactor;
private volatile java.util.function.Consumer<org.apache.pulsar.broker.service.Consumer> consumerCreated = __ -> {};

@Override
protected void doInitConf() throws Exception {
Expand All @@ -135,6 +141,14 @@ protected void doInitConf() throws Exception {
@Override
public void setup() throws Exception {
super.internalSetup();
pulsar.getBrokerService().setInterceptor(new MockBrokerInterceptor() {

@Override
public void consumerCreated(ServerCnx cnx, org.apache.pulsar.broker.service.Consumer consumer,
Map<String, String> metadata) {
consumerCreated.accept(consumer);
}
});

admin.clusters().createCluster(configClusterName,
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
Expand Down Expand Up @@ -164,6 +178,8 @@ public void beforeMethod() throws Exception {
admin.namespaces().removeRetention("my-tenant/my-ns");
AbstractTwoPhaseCompactor.injectionAfterSeekInPhaseTwo = () -> {};
AbstractTwoPhaseCompactor.injectionPhaseTwoSeek = RawReader::seekAsync;
consumerCreated = __ -> {};
pulsarTestContext.getMockBookKeeper().setDefaultReadEntriesDelayMillis(1);
}

protected long compact(String topic) throws ExecutionException, InterruptedException {
Expand Down Expand Up @@ -2648,4 +2664,74 @@ private void triggerAndWaitCompaction(String topic) throws Exception {
Awaitility.await().untilAsserted(() -> assertEquals(
admin.topics().compactionStatus(topic).status, LongRunningProcessStatus.Status.SUCCESS));
}

@Test
public void testReaderReadOnDeletedLedger() throws Exception {
final var topic = "persistent://my-tenant/my-ns/reader-read-on-deleted-ledger";
try (final var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create()) {
for (int i = 0; i < 3; i++) {
producer.newMessage().key("key-" + i).value("value-" + i).send();
}
}
// Trigger the ledger rollover
var ml = (ManagedLedgerImpl) ((PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get()
.orElseThrow()).getManagedLedger();
ml.getConfig().setMaxEntriesPerLedger(1);
ml.getConfig().setMaxSizePerLedgerMb(0);
ml.getConfig().setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
ml.rollCurrentLedgerIfFull();
Awaitility.await().untilAsserted(() -> assertEquals(ml.getLedgersInfo().size(), 2));

final var subName = "sub-" + System.currentTimeMillis();
@Cleanup final var reader = pulsarClient.newReader(Schema.STRING).readCompacted(true).topic(topic)
.subscriptionName(subName)
.startMessageId(MessageId.earliest).create();

// Slow down the pre-fetching
pulsarTestContext.getMockBookKeeper().setDefaultReadEntriesDelayMillis(500);

// Receive 1 message so that the startMessageId will be reset to ledger_id:0 after reconnection
assertTrue(reader.hasMessageAvailable());
final var firstMsg = reader.readNext(3, TimeUnit.SECONDS);
assertNotNull(firstMsg);

triggerAndWaitCompaction(topic);

// Simulate the pending cumulative acknowledgment is flushed after the consumer is created
// We don't need such interception if we can support controlling the acknowledgment flush for reader.
final var firstTime = new AtomicBoolean(true);
consumerCreated = serverConsumer -> {
Comment thread
BewareMyPower marked this conversation as resolved.
final var subscription = serverConsumer.getSubscription();
if (subscription.getName().contains(subName) && firstTime.compareAndSet(true, false)) {
final var msgId = (MessageIdAdv) firstMsg.getMessageId();
subscription.acknowledgeMessageAsync(List.of(PositionFactory.create(msgId.getLedgerId(),
msgId.getEntryId())), CommandAck.AckType.Cumulative, Map.of());
}
};

// Trigger the reconnection and trim the first ledger.
admin.namespaces().unload("my-tenant/my-ns");
admin.lookups().lookupTopic(topic);
final var persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(topic, true).get()
.orElseThrow();
final var trimFuture = new CompletableFuture<Void>();
persistentTopic.getManagedLedger().trimConsumedLedgersInBackground(trimFuture);
trimFuture.get();
assertEquals(persistentTopic.getManagedLedger().getLedgersInfo().size(), 1);

pulsarTestContext.getMockBookKeeper().setDefaultReadEntriesDelayMillis(1);

while (reader.hasMessageAvailable()) {
final var msg = reader.readNextAsync().get(3, TimeUnit.SECONDS);
log.info().attr("id", msg.getMessageId()).attr("key", msg.getKey())
.attr("value", msg.getValue()).log("read");
}

final var serverConsumer = persistentTopic.getSubscription(subName).getDispatcher().getConsumers().get(0);
assertEquals(((MessageIdAdv) serverConsumer.getStartMessageId()).getEntryId(), 0L);

final var emptyLedgerId = persistentTopic.getManagedLedger().getLedgersInfo().lastEntry().getKey();
assertEquals(persistentTopic.getTopicCompactionService().getLastCompactedPosition().get(),
PositionFactory.create(emptyLedgerId, -1L));
}
}
Loading