From a450ec1bf564c51657e8b7e01690ed91eb8dd982 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 20 Jun 2025 19:24:31 +0800 Subject: [PATCH 01/13] Add a test for getLastMessageId when readCompacted is true and encryption is enabled --- .../GetLastMessageIdCompactedTest.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java index 252ee939aace3..3374933da823a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; @@ -39,12 +40,14 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.DefaultCryptoKeyReader; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.ReaderImpl; import org.apache.pulsar.common.naming.TopicName; @@ -52,6 +55,7 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.MockZooKeeper; import org.awaitility.Awaitility; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -516,4 +520,28 @@ public void testReaderStuckWithCompaction(boolean enabledBatch) throws Exception assertNotEquals(message, null); } } + + @Test + public void testGetLastMessageIdForEncryptedMessage() throws Exception { + final var topic = BrokerTestUtil.newUniqueName("tp"); + final var keyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem"; + @Cleanup final var producer = pulsarClient.newProducer(Schema.STRING).topic(topic) + .batchingMaxBytes(Integer.MAX_VALUE) + .batchingMaxMessages(Integer.MAX_VALUE) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .addEncryptionKey("client-ecdsa.pem") + .defaultCryptoKeyReader(keyFile) + .create(); + producer.newMessage().key("k0").value("v0").sendAsync(); + producer.newMessage().key("k0").value("v1").sendAsync(); + producer.newMessage().key("k1").value("v0").sendAsync(); + producer.newMessage().key("k1").value(null).sendAsync(); + producer.flush(); + triggerCompactionAndWait(topic); + + @Cleanup final var consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName("sub") + .defaultCryptoKeyReader(keyFile).readCompacted(true).subscribe(); + final var msgId = (MessageIdAdv) consumer.getLastMessageIds().get(0); + Assert.assertEquals(msgId.getEntryId(), 1); + } } From 9de5d2dd990ba62faaca1a9914993b7bf5127375 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 20 Jun 2025 19:48:53 +0800 Subject: [PATCH 02/13] Fix checkstyle --- .../apache/pulsar/compaction/GetLastMessageIdCompactedTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java index 3374933da823a..25b0274e59b2c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java @@ -47,7 +47,6 @@ import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.BatchMessageIdImpl; -import org.apache.pulsar.client.impl.DefaultCryptoKeyReader; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.ReaderImpl; import org.apache.pulsar.common.naming.TopicName; From de99297470cf641e0af18cacd61d361486a95e33 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 20 Jun 2025 21:31:21 +0800 Subject: [PATCH 03/13] Handle the last entry at client side --- .../pulsar/broker/service/ServerCnx.java | 36 ++++++++++++++++--- .../apache/pulsar/client/impl/ClientCnx.java | 11 +++--- .../pulsar/client/impl/ConsumerImpl.java | 6 +++- .../pulsar/common/protocol/Commands.java | 15 ++++++++ .../pulsar/common/protocol/PulsarDecoder.java | 4 +-- pulsar-common/src/main/proto/PulsarApi.proto | 1 + 6 files changed, 60 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 3aa365323ec7c..8124b2be4f6a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -27,6 +27,7 @@ import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.getMigratedClusterUrl; import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.ignoreUnrecoverableBKException; +import static org.apache.pulsar.common.api.proto.ProtocolVersion.v22; import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse; @@ -2283,7 +2284,7 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) .thenApply(lastPosition -> { int partitionIndex = TopicName.getPartitionIndex(topic.getName()); - Position markDeletePosition = null; + Position markDeletePosition = PositionFactory.EARLIEST; if (consumer.getSubscription() instanceof PersistentSubscription) { markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor() .getMarkDeletedPosition(); @@ -2344,8 +2345,7 @@ private void getLargestBatchIndexWhenPossible( } else { // if readCompacted is false, we need to return MessageId.earliest writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, -1, -1, partitionIndex, -1, - markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, - markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); + markDeletePosition.getLedgerId(), markDeletePosition.getEntryId())); } return; } @@ -2404,8 +2404,7 @@ public String toString() { writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, lastPosition.getLedgerId(), lastPosition.getEntryId(), partitionIndex, largestBatchIndex, - markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, - markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); + markDeletePosition.getLedgerId(), markDeletePosition.getEntryId())); } }); }); @@ -2415,6 +2414,10 @@ private void handleLastMessageIdFromCompactionService(PersistentTopic persistent int partitionIndex, Position markDeletePosition) { persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenAccept(entry -> { if (entry != null) { + if (getRemoteEndpointProtocolVersion() >= v22.getValue()) { + sendGetLastMessageIdResponseWithBuffer(requestId, partitionIndex, entry, markDeletePosition); + return; + } try { // in this case, all the data has been compacted, so return the last position // in the compacted ledger to the client @@ -2453,6 +2456,29 @@ private void handleLastMessageIdFromCompactionService(PersistentTopic persistent }); } + private void sendGetLastMessageIdResponseWithBuffer(long requestId, int partitionIndex, Entry entry, + Position markDeletePosition) { + try { + final var buffer = entry.getDataBuffer().retain(); + final ByteBufPair response; + try { + response = Commands.newGetLastMessageIdResponse(requestId, entry.getLedgerId(), + entry.getEntryId(), partitionIndex, markDeletePosition.getLedgerId(), + markDeletePosition.getEntryId(), buffer); + } catch (Throwable throwable) { + buffer.release(); + entry.release(); + log.error("Unexpected exception when getLastMessageId for compacted consumer (request id: {})", + requestId, throwable); + writeAndFlush(Commands.newError(requestId, ServerError.UnknownError, throwable.getMessage())); + return; + } + ctx.writeAndFlush(response, ctx.voidPromise()); + } finally { + entry.release(); + } + } + private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException { int batchSize = metadata.getNumMessagesInBatch(); if (batchSize <= 1){ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 0563fa7e66667..2054ed346ac48 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -568,7 +568,7 @@ protected void handleSuccess(CommandSuccess success) { } @Override - protected void handleGetLastMessageIdSuccess(CommandGetLastMessageIdResponse success) { + protected void handleGetLastMessageIdSuccess(CommandGetLastMessageIdResponse success, ByteBuf buf) { checkArgument(state == State.Ready); if (log.isDebugEnabled()) { @@ -576,10 +576,10 @@ protected void handleGetLastMessageIdSuccess(CommandGetLastMessageIdResponse suc ctx.channel(), success.getRequestId()); } long requestId = success.getRequestId(); - CompletableFuture requestFuture = - (CompletableFuture) pendingRequests.remove(requestId); + CompletableFuture> requestFuture = + (CompletableFuture>) pendingRequests.remove(requestId); if (requestFuture != null) { - requestFuture.complete(new CommandGetLastMessageIdResponse().copyFrom(success)); + requestFuture.complete(Pair.of(new CommandGetLastMessageIdResponse().copyFrom(success), buf)); } else { duplicatedResponseCounter.incrementAndGet(); log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId()); @@ -1062,7 +1062,8 @@ private CompletableFuture sendRequestAndHandleTimeout(ByteBuf requestMess return future; } - public CompletableFuture sendGetLastMessageId(ByteBuf request, long requestId) { + public CompletableFuture> sendGetLastMessageId( + ByteBuf request, long requestId) { return sendRequestAndHandleTimeout(request, requestId, RequestType.GetLastMessageId, true); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index dfc217cf574f4..a86676f0dd156 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -113,6 +113,7 @@ import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandAck.ValidationError; +import org.apache.pulsar.common.api.proto.CommandGetLastMessageIdResponse; import org.apache.pulsar.common.api.proto.CommandMessage; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CompressionType; @@ -2846,7 +2847,10 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, log.debug("[{}][{}] Get topic last message Id", topic, subscription); } - cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept(cmd -> { + cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept(pair -> { + final CommandGetLastMessageIdResponse cmd = pair.getKey(); + // TODO: handle buf + final ByteBuf buf = pair.getValue(); MessageIdData lastMessageId = cmd.getLastMessageId(); MessageIdImpl markDeletePosition = null; if (cmd.hasConsumerMarkDeletePosition()) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 58d58a3acef98..e50a8f6af1928 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -2100,4 +2100,19 @@ public static ProducerAccessMode convertProducerAccessMode( public static boolean peerSupportsBrokerMetadata(int peerVersion) { return peerVersion >= ProtocolVersion.v16.getValue(); } + + public static ByteBufPair newGetLastMessageIdResponse(long requestId, long lastPositionLedgerId, + long lastPositionEntryId, int partitionIndex, + long markDeleteLedgerId, long markDeleteEntryId, + ByteBuf lastEntryBuffer) { + BaseCommand cmd = localCmd(Type.GET_LAST_MESSAGE_ID_RESPONSE); + CommandGetLastMessageIdResponse response = cmd.setGetLastMessageIdResponse() + .setRequestId(requestId); + response.setLastMessageId().setLedgerId(lastPositionLedgerId).setEntryId(lastPositionEntryId) + .setPartition(partitionIndex).setBatchIndex(-1); + if (markDeleteLedgerId >= 0) { + response.setConsumerMarkDeletePosition().setLedgerId(markDeleteLedgerId).setEntryId(markDeleteEntryId); + } + return serializeCommandMessageWithSize(cmd, lastEntryBuffer); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index c05b1d796dfdd..a993dd5036b02 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -322,7 +322,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case GET_LAST_MESSAGE_ID_RESPONSE: checkArgument(cmd.hasGetLastMessageIdResponse()); - handleGetLastMessageIdSuccess(cmd.getGetLastMessageIdResponse()); + handleGetLastMessageIdSuccess(cmd.getGetLastMessageIdResponse(), buffer); break; case ACTIVE_CONSUMER_CHANGE: @@ -628,7 +628,7 @@ protected void handleTopicMigrated(CommandTopicMigrated commandMigratedTopic) { protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) { throw new UnsupportedOperationException(); } - protected void handleGetLastMessageIdSuccess(CommandGetLastMessageIdResponse success) { + protected void handleGetLastMessageIdSuccess(CommandGetLastMessageIdResponse success, ByteBuf buf) { throw new UnsupportedOperationException(); } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index eacec33169e34..851104743e172 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -265,6 +265,7 @@ enum ProtocolVersion { v19 = 19; // Add CommandTcClientConnectRequest and CommandTcClientConnectResponse v20 = 20; // Add client support for topic migration redirection CommandTopicMigrated v21 = 21; // Carry the AUTO_CONSUME schema to the Broker after this version + v22 = 22; // Send the last entry's payload buffer to the consumer if the consumer's read_compacted field is true } message CommandConnect { From 323af8cc2d7d8c4d993ec73e47a5d175a956d408 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 20 Jun 2025 22:27:21 +0800 Subject: [PATCH 04/13] TODO: no compactedOut field --- .../GetLastMessageIdCompactedTest.java | 8 +- .../apache/pulsar/client/impl/ClientCnx.java | 3 +- .../pulsar/client/impl/ConsumerImpl.java | 89 ++++++++++++++++--- 3 files changed, 82 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java index 25b0274e59b2c..40bb9cd12deef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java @@ -523,13 +523,15 @@ public void testReaderStuckWithCompaction(boolean enabledBatch) throws Exception @Test public void testGetLastMessageIdForEncryptedMessage() throws Exception { final var topic = BrokerTestUtil.newUniqueName("tp"); - final var keyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem"; + final var ecdsaPublickeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem"; + final String ecdsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-ecdsa.pem"; @Cleanup final var producer = pulsarClient.newProducer(Schema.STRING).topic(topic) .batchingMaxBytes(Integer.MAX_VALUE) .batchingMaxMessages(Integer.MAX_VALUE) .batchingMaxPublishDelay(1, TimeUnit.HOURS) .addEncryptionKey("client-ecdsa.pem") - .defaultCryptoKeyReader(keyFile) + .compressionType(CompressionType.LZ4) + .defaultCryptoKeyReader(ecdsaPublickeyFile) .create(); producer.newMessage().key("k0").value("v0").sendAsync(); producer.newMessage().key("k0").value("v1").sendAsync(); @@ -539,7 +541,7 @@ public void testGetLastMessageIdForEncryptedMessage() throws Exception { triggerCompactionAndWait(topic); @Cleanup final var consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName("sub") - .defaultCryptoKeyReader(keyFile).readCompacted(true).subscribe(); + .defaultCryptoKeyReader(ecdsaPrivateKeyFile).readCompacted(true).subscribe(); final var msgId = (MessageIdAdv) consumer.getLastMessageIds().get(0); Assert.assertEquals(msgId.getEntryId(), 1); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 2054ed346ac48..e92a253375515 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -579,7 +579,8 @@ protected void handleGetLastMessageIdSuccess(CommandGetLastMessageIdResponse suc CompletableFuture> requestFuture = (CompletableFuture>) pendingRequests.remove(requestId); if (requestFuture != null) { - requestFuture.complete(Pair.of(new CommandGetLastMessageIdResponse().copyFrom(success), buf)); + requestFuture.complete(Pair.of(new CommandGetLastMessageIdResponse().copyFrom(success), + buf.retainedDuplicate())); } else { duplicatedResponseCounter.incrementAndGet(); log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index a86676f0dd156..2ac04b14faeb0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -138,6 +138,7 @@ import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1417,6 +1418,7 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien for (int i = 0; i < cmdMessage.getAckSetsCount(); i++) { ackSet.add(cmdMessage.getAckSetAt(i)); } + } int redeliveryCount = cmdMessage.getRedeliveryCount(); MessageIdData messageId = cmdMessage.getMessageId(); @@ -2001,7 +2003,7 @@ public static DecryptResult discard() { private DecryptResult decryptPayloadIfNeeded(MessageIdData messageId, int redeliveryCount, MessageMetadata msgMetadata, - ByteBuf payload, ClientCnx currentCnx) { + ByteBuf payload, @Nullable ClientCnx currentCnx) { if (msgMetadata.getEncryptionKeysCount() == 0) { return DecryptResult.success(payload.retain()); @@ -2026,7 +2028,7 @@ private DecryptResult decryptPayloadIfNeeded(MessageIdData messageId, int redeli return handleCryptoFailure(payload, messageId, currentCnx, redeliveryCount, batchSize, false); } - private DecryptResult handleCryptoFailure(ByteBuf payload, MessageIdData messageId, ClientCnx currentCnx, + private DecryptResult handleCryptoFailure(ByteBuf payload, MessageIdData messageId, @Nullable ClientCnx currentCnx, int redeliveryCount, int batchSize, boolean cryptoReaderNotExist) { switch (conf.getCryptoFailureAction()) { @@ -2078,7 +2080,7 @@ private DecryptResult handleCryptoFailure(ByteBuf payload, MessageIdData message } private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, MessageMetadata msgMetadata, ByteBuf payload, - ClientCnx currentCnx, boolean checkMaxMessageSize) { + @Nullable ClientCnx currentCnx, boolean checkMaxMessageSize) { CompressionType compressionType = msgMetadata.getCompression(); CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType); int uncompressedSize = msgMetadata.getUncompressedSize(); @@ -2130,15 +2132,18 @@ private void discardCorruptedMessage(MessageIdImpl messageId, ClientCnx currentC stats.incrementNumReceiveFailed(); } - private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentCnx, + private void discardCorruptedMessage(MessageIdData messageId, @Nullable ClientCnx currentCnx, ValidationError validationError) { log.error("[{}][{}] Discarding corrupted message at {}:{}", topic, subscription, messageId.getLedgerId(), messageId.getEntryId()); discardMessage(messageId, currentCnx, validationError, 1); } - private void discardMessage(MessageIdData messageId, ClientCnx currentCnx, ValidationError validationError, - int batchMessages) { + private void discardMessage(MessageIdData messageId, @Nullable ClientCnx currentCnx, + ValidationError validationError, int batchMessages) { + if (currentCnx == null) { + return; + } ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(), null, AckType.Individual, validationError, Collections.emptyMap(), -1); currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise()); @@ -2849,8 +2854,6 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept(pair -> { final CommandGetLastMessageIdResponse cmd = pair.getKey(); - // TODO: handle buf - final ByteBuf buf = pair.getValue(); MessageIdData lastMessageId = cmd.getLastMessageId(); MessageIdImpl markDeletePosition = null; if (cmd.hasConsumerMarkDeletePosition()) { @@ -2862,12 +2865,8 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, topic, subscription, lastMessageId.getLedgerId(), lastMessageId.getEntryId()); } - MessageId lastMsgId = lastMessageId.getBatchIndex() <= 0 - ? new MessageIdImpl(lastMessageId.getLedgerId(), - lastMessageId.getEntryId(), lastMessageId.getPartition()) - : new BatchMessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), - lastMessageId.getPartition(), lastMessageId.getBatchIndex()); - + final ByteBuf buf = pair.getValue(); + final MessageId lastMsgId = getLastMessageIdFromResponse(lastMessageId, buf); future.complete(new GetLastMessageIdResponse(lastMsgId, markDeletePosition)); }).exceptionally(e -> { log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); @@ -2900,6 +2899,68 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, } } + private MessageId getLastMessageIdFromResponse(MessageIdData lastMessageId, ByteBuf buf) { + try { + if (buf.readableBytes() <= 0) { + if (lastMessageId.getBatchIndex() <= 0) { + return new MessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), + lastMessageId.getPartition()); + } else { + return new BatchMessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), + lastMessageId.getPartition(), lastMessageId.getBatchIndex()); + } + } + checkArgument(conf.isReadCompacted()); + + final MessageMetadata messageMetadata = Commands.parseMessageMetadata(buf); + int batchSize = messageMetadata.getNumMessagesInBatch(); + if (batchSize <= 1) { + return new MessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), + lastMessageId.getPartition()); + } + + // Parse the correct batch index from buffer + if (!verifyChecksum(buf, lastMessageId)) { + throw new RuntimeException("checksum mismatch in the last message's buffer"); + } + DecryptResult decryptResult = decryptPayloadIfNeeded(lastMessageId, 0, messageMetadata, buf, null); + if (decryptResult.shouldDiscard() || !decryptResult.success) { + throw new RuntimeException("Failed to decrypt last message's buffer"); + } + final ByteBuf uncompressedBuf = uncompressPayloadIfNeeded(lastMessageId, messageMetadata, + decryptResult.payload, null, false); + decryptResult.payload.release(); + if (uncompressedBuf == null) { + throw new RuntimeException("Failed to uncompress last message's buffer"); + } + try { + // TODO: support payload processor + int batchIndex = -1; + for (int i = 0; i < batchSize; i++) { + final SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata(); + ByteBuf payload = Commands.deSerializeSingleMessageInBatch(uncompressedBuf, singleMessageMetadata, + i, batchSize); + try { + if (singleMessageMetadata.isCompactedOut()) { + continue; + } + batchIndex = i; + } finally { + payload.release(); + } + } + return new BatchMessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), + lastMessageId.getPartition(), batchIndex); + } finally { + uncompressedBuf.release(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + buf.release(); + } + } + private boolean isMessageUndecryptable(MessageMetadata msgMetadata) { return (msgMetadata.getEncryptionKeysCount() > 0 && conf.getCryptoKeyReader() == null && conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.CONSUME); From 8e250e1d9a40d5db56f6249fca713b9f0999f6a9 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 20 Jun 2025 22:44:18 +0800 Subject: [PATCH 05/13] Fix tests --- .../GetLastMessageIdCompactedTest.java | 41 ++++++++++++++----- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java index 40bb9cd12deef..a13e44e5ea53c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java @@ -520,19 +520,31 @@ public void testReaderStuckWithCompaction(boolean enabledBatch) throws Exception } } - @Test - public void testGetLastMessageIdForEncryptedMessage() throws Exception { + @DataProvider(name = "encryptionAndCompression") + public Object[][] encryptionAndCompression(){ + return new Object[][]{ + { true, CompressionType.NONE }, + { true, CompressionType.LZ4 }, + { false, CompressionType.NONE }, + { false, CompressionType.LZ4 }, + }; + } + + @Test(dataProvider = "encryptionAndCompression") + public void testGetLastMessageIdForEncryptedMessage(boolean encryption, CompressionType compressionType) + throws Exception { final var topic = BrokerTestUtil.newUniqueName("tp"); final var ecdsaPublickeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem"; final String ecdsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-ecdsa.pem"; - @Cleanup final var producer = pulsarClient.newProducer(Schema.STRING).topic(topic) + final var producerBuilder = pulsarClient.newProducer(Schema.STRING).topic(topic) .batchingMaxBytes(Integer.MAX_VALUE) .batchingMaxMessages(Integer.MAX_VALUE) .batchingMaxPublishDelay(1, TimeUnit.HOURS) - .addEncryptionKey("client-ecdsa.pem") - .compressionType(CompressionType.LZ4) - .defaultCryptoKeyReader(ecdsaPublickeyFile) - .create(); + .compressionType(compressionType); + if (encryption) { + producerBuilder.addEncryptionKey("client-ecdsa.pem").defaultCryptoKeyReader(ecdsaPublickeyFile); + } + @Cleanup final var producer = producerBuilder.create(); producer.newMessage().key("k0").value("v0").sendAsync(); producer.newMessage().key("k0").value("v1").sendAsync(); producer.newMessage().key("k1").value("v0").sendAsync(); @@ -540,9 +552,18 @@ public void testGetLastMessageIdForEncryptedMessage() throws Exception { producer.flush(); triggerCompactionAndWait(topic); - @Cleanup final var consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName("sub") - .defaultCryptoKeyReader(ecdsaPrivateKeyFile).readCompacted(true).subscribe(); + final var consumerBuilder = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName("sub") + .readCompacted(true); + if (encryption) { + consumerBuilder.defaultCryptoKeyReader(ecdsaPrivateKeyFile); + } + @Cleanup final var consumer = consumerBuilder.subscribe(); final var msgId = (MessageIdAdv) consumer.getLastMessageIds().get(0); - Assert.assertEquals(msgId.getEntryId(), 1); + if (encryption) { + // Compaction does not work for encrypted messages + Assert.assertEquals(msgId.getBatchIndex(), 3); + } else { + Assert.assertEquals(msgId.getBatchIndex(), 1); + } } } From d25e7f40e236f3a6e9b8db7d34967442258986bb Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 20 Jun 2025 23:28:45 +0800 Subject: [PATCH 06/13] Add tests for last message id --- .../GetLastMessageIdCompactedTest.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java index a13e44e5ea53c..64c2a9a68cb16 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; @@ -60,6 +61,7 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +@Slf4j @Test(groups = "broker-impl") public class GetLastMessageIdCompactedTest extends ProducerConsumerBase { @@ -530,7 +532,7 @@ public Object[][] encryptionAndCompression(){ }; } - @Test(dataProvider = "encryptionAndCompression") + @Test(dataProvider = "encryptionAndCompression", timeOut = 30000) public void testGetLastMessageIdForEncryptedMessage(boolean encryption, CompressionType compressionType) throws Exception { final var topic = BrokerTestUtil.newUniqueName("tp"); @@ -565,5 +567,19 @@ public void testGetLastMessageIdForEncryptedMessage(boolean encryption, Compress } else { Assert.assertEquals(msgId.getBatchIndex(), 1); } + + final var readerBuilder = pulsarClient.newReader(Schema.STRING).topic(topic).startMessageId(MessageId.earliest) + .topic(topic).readCompacted(true); + if (encryption) { + readerBuilder.defaultCryptoKeyReader(ecdsaPrivateKeyFile); + } + @Cleanup final var reader = readerBuilder.create(); + MessageIdAdv readMsgId = (MessageIdAdv) MessageId.earliest; + while (reader.hasMessageAvailable()) { + final var msg = reader.readNext(); + log.info("Read key: {}, value: {}", msg.getKey(), Optional.ofNullable(msg.getValue()).orElse("(null)")); + readMsgId = (MessageIdAdv) msg.getMessageId(); + } + assertEquals(readMsgId, msgId); } } From 16bbf42e9bf2040e4e5d122a388e2194ebe91177 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 21 Jun 2025 12:56:02 +0800 Subject: [PATCH 07/13] Make PayloadToMessageIdConverter configurable --- .../pulsar/client/api/ConsumerBuilder.java | 40 +++++ .../client/impl/ConsumerBuilderImpl.java | 6 + .../pulsar/client/impl/ConsumerImpl.java | 143 +++++++++++------- .../impl/conf/ConsumerConfigurationData.java | 4 + 4 files changed, 142 insertions(+), 51 deletions(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 142c474114912..a062d6672afb4 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.api; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -950,4 +952,42 @@ ConsumerBuilder topicConfiguration(String topicName, */ ConsumerBuilder topicConfiguration(Pattern topicsPattern, java.util.function.Consumer> builderConsumer); + + /** + * When {@link ConsumerBuilder#readCompacted(boolean)} is true, the GetLastMessageId response could include the + * last entry's buffer from the compaction service. In this case, the last message's message id should be parsed + * from the buffer because the entry could include messages that have been compacted out, which won't be received + * by the consumer. + * When the broker's topic compaction service is the built-in one, users don't need to configure it because the + * built-in convert function works well. But if the broker configures a customized topic compaction service, you + * must configure `converter` with a proper function to parse the buffer correctly according to the compaction + * service's behavior. + * If `converter` throws an exception, the corresponding {@link Consumer#getLastMessageIdsAsync()}'s result will + * fail with that exception. + */ + ConsumerBuilder payloadToMessageIdFunction(PayloadToMessageIdConverter converter); + + interface LastEntry { + + long getLedgerId(); + + long getEntryId(); + + int getPartitionIndex(); + + /** + * @return the buffer that can be parsed to the `MessageMetadata` defined in `PulsarApi.proto` + */ + ByteBuffer getMetadataBuffer(); + + /** + * @return the uncompressed and unencrypted payload buffer of the last entry + */ + ByteBuffer getPayloadBuffer(); + } + + interface PayloadToMessageIdConverter { + + MessageId convert(LastEntry lastEntry) throws IOException; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index ce602a0ec9f36..a1c7554a00b6a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -634,4 +634,10 @@ public ConsumerBuilder topicConfiguration(Pattern topicsPattern, builderConsumer.accept(topicConfiguration(topicsPattern)); return this; } + + @Override + public ConsumerBuilder payloadToMessageIdFunction(PayloadToMessageIdConverter converter) { + conf.setPayloadToMessageIdConverter(converter); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 2ac04b14faeb0..9a47c22fd32e3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -28,6 +28,7 @@ import com.google.common.collect.Iterables; import com.scurrilous.circe.checksum.Crc32cIntChecksum; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; @@ -75,6 +76,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.DeadLetterProducerBuilderContext; @@ -2866,8 +2868,14 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, } final ByteBuf buf = pair.getValue(); - final MessageId lastMsgId = getLastMessageIdFromResponse(lastMessageId, buf); - future.complete(new GetLastMessageIdResponse(lastMsgId, markDeletePosition)); + try { + final MessageId lastMsgId = getLastMessageIdFromResponse(lastMessageId, buf); + future.complete(new GetLastMessageIdResponse(lastMsgId, markDeletePosition)); + } catch (IOException e) { + future.completeExceptionally(e); + } finally { + buf.release(); + } }).exceptionally(e -> { log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); future.completeExceptionally( @@ -2899,65 +2907,98 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, } } - private MessageId getLastMessageIdFromResponse(MessageIdData lastMessageId, ByteBuf buf) { + public static MessageId convertBufferToMessageId(ConsumerBuilder.LastEntry lastEntry) throws IOException { + final ByteBuf metadataBuf = Unpooled.wrappedBuffer(lastEntry.getMetadataBuffer()); + final ByteBuf buf = Unpooled.wrappedBuffer(lastEntry.getPayloadBuffer()); try { - if (buf.readableBytes() <= 0) { - if (lastMessageId.getBatchIndex() <= 0) { - return new MessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), - lastMessageId.getPartition()); - } else { - return new BatchMessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), - lastMessageId.getPartition(), lastMessageId.getBatchIndex()); + final MessageMetadata metadata = Commands.parseMessageMetadata(metadataBuf); + final int batchSize = metadata.getNumMessagesInBatch(); + int batchIndex = -1; + for (int i = 0; i < batchSize; i++) { + final SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata(); + ByteBuf payload = Commands.deSerializeSingleMessageInBatch(buf, singleMessageMetadata, + i, batchSize); + try { + if (singleMessageMetadata.isCompactedOut()) { + continue; + } + batchIndex = i; + } finally { + payload.release(); } } - checkArgument(conf.isReadCompacted()); + return new BatchMessageIdImpl(lastEntry.getLedgerId(), lastEntry.getEntryId(), + lastEntry.getPartitionIndex(), batchIndex); + } finally { + metadataBuf.release(); + buf.release(); + } + } - final MessageMetadata messageMetadata = Commands.parseMessageMetadata(buf); - int batchSize = messageMetadata.getNumMessagesInBatch(); - if (batchSize <= 1) { + private MessageId getLastMessageIdFromResponse(MessageIdData lastMessageId, ByteBuf buf) throws IOException { + if (buf.readableBytes() <= 0) { + if (lastMessageId.getBatchIndex() <= 0) { return new MessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), lastMessageId.getPartition()); + } else { + return new BatchMessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), + lastMessageId.getPartition(), lastMessageId.getBatchIndex()); } + } + checkArgument(conf.isReadCompacted()); - // Parse the correct batch index from buffer - if (!verifyChecksum(buf, lastMessageId)) { - throw new RuntimeException("checksum mismatch in the last message's buffer"); - } - DecryptResult decryptResult = decryptPayloadIfNeeded(lastMessageId, 0, messageMetadata, buf, null); - if (decryptResult.shouldDiscard() || !decryptResult.success) { - throw new RuntimeException("Failed to decrypt last message's buffer"); - } - final ByteBuf uncompressedBuf = uncompressPayloadIfNeeded(lastMessageId, messageMetadata, - decryptResult.payload, null, false); - decryptResult.payload.release(); - if (uncompressedBuf == null) { - throw new RuntimeException("Failed to uncompress last message's buffer"); - } - try { - // TODO: support payload processor - int batchIndex = -1; - for (int i = 0; i < batchSize; i++) { - final SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata(); - ByteBuf payload = Commands.deSerializeSingleMessageInBatch(uncompressedBuf, singleMessageMetadata, - i, batchSize); - try { - if (singleMessageMetadata.isCompactedOut()) { - continue; - } - batchIndex = i; - } finally { - payload.release(); - } + int startReaderIndex = buf.readerIndex(); + final MessageMetadata messageMetadata = Commands.parseMessageMetadata(buf); + final ByteBuf metadataBuf = buf.slice(startReaderIndex, buf.readerIndex() - startReaderIndex); + int batchSize = messageMetadata.getNumMessagesInBatch(); + if (batchSize <= 1) { + return new MessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), + lastMessageId.getPartition()); + } + + // Parse the correct batch index from buffer + if (!verifyChecksum(buf, lastMessageId)) { + throw new IOException("checksum mismatch in the last message's buffer"); + } + DecryptResult decryptResult = decryptPayloadIfNeeded(lastMessageId, 0, messageMetadata, buf, null); + if (decryptResult.shouldDiscard() || !decryptResult.success) { + throw new IOException("Failed to decrypt last message's buffer"); + } + final ByteBuf uncompressedBuf = uncompressPayloadIfNeeded(lastMessageId, messageMetadata, + decryptResult.payload, null, false); + decryptResult.payload.release(); + if (uncompressedBuf == null) { + throw new RuntimeException("Failed to uncompress last message's buffer"); + } + try { + return conf.getPayloadToMessageIdConverter().convert(new ConsumerBuilder.LastEntry() { + @Override + public long getLedgerId() { + return lastMessageId.getLedgerId(); } - return new BatchMessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), - lastMessageId.getPartition(), batchIndex); - } finally { - uncompressedBuf.release(); - } - } catch (IOException e) { - throw new RuntimeException(e); + + @Override + public long getEntryId() { + return lastMessageId.getEntryId(); + } + + @Override + public int getPartitionIndex() { + return lastMessageId.getPartition(); + } + + @Override + public ByteBuffer getMetadataBuffer() { + return metadataBuf.nioBuffer(); + } + + @Override + public ByteBuffer getPayloadBuffer() { + return uncompressedBuf.nioBuffer(); + } + }); } finally { - buf.release(); + uncompressedBuf.release(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 4be8c4ed73e90..21e7c167598b0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -37,6 +37,7 @@ import lombok.Data; import lombok.NoArgsConstructor; import org.apache.pulsar.client.api.BatchReceivePolicy; +import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.ConsumerEventListener; import org.apache.pulsar.client.api.CryptoKeyReader; @@ -51,6 +52,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ConsumerImpl; @Data @NoArgsConstructor @@ -414,6 +416,8 @@ public int getMaxPendingChuckedMessage() { private boolean autoScaledReceiverQueueSizeEnabled = false; private List topicConfigurations = new ArrayList<>(); + private ConsumerBuilder.PayloadToMessageIdConverter payloadToMessageIdConverter = + ConsumerImpl::convertBufferToMessageId; public TopicConsumerConfigurationData getMatchingTopicConfiguration(String topicName) { return topicConfigurations.stream() From 2f906f6c6d54330cb6a3e453841f8c12a55c33bd Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 21 Jun 2025 13:14:13 +0800 Subject: [PATCH 08/13] Apply this config to readers --- .../org/apache/pulsar/client/impl/RawReaderImpl.java | 10 ++++++++++ .../org/apache/pulsar/client/api/ConsumerBuilder.java | 2 +- .../org/apache/pulsar/client/api/ReaderBuilder.java | 4 ++++ .../apache/pulsar/client/impl/ConsumerBuilderImpl.java | 2 +- .../apache/pulsar/client/impl/ReaderBuilderImpl.java | 6 ++++++ .../java/org/apache/pulsar/client/impl/ReaderImpl.java | 1 + .../client/impl/conf/ConsumerConfigurationData.java | 6 ++++++ .../client/impl/conf/ReaderConfigurationData.java | 3 +++ 8 files changed, 32 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 32f75d71dc332..7efca8d604f7f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.RawMessage; @@ -42,6 +43,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +56,13 @@ public class RawReaderImpl implements RawReader { public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, CompletableFuture> consumerFuture, boolean createTopicIfDoesNotExist, boolean retryOnRecoverableErrors) { + this(client, topic, subscription, consumerFuture, createTopicIfDoesNotExist, retryOnRecoverableErrors, null); + } + + public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, + CompletableFuture> consumerFuture, + boolean createTopicIfDoesNotExist, boolean retryOnRecoverableErrors, + ConsumerBuilder.PayloadToMessageIdConverter payloadToMessageIdConverter) { consumerConfiguration = new ConsumerConfigurationData<>(); consumerConfiguration.getTopicNames().add(topic); consumerConfiguration.setSubscriptionName(subscription); @@ -62,6 +71,7 @@ public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, consumerConfiguration.setReadCompacted(true); consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest); consumerConfiguration.setAckReceiptEnabled(true); + consumerConfiguration.setPayloadToMessageIdConverter(payloadToMessageIdConverter); consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist, retryOnRecoverableErrors); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index a062d6672afb4..c540e5d3d285c 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -965,7 +965,7 @@ ConsumerBuilder topicConfiguration(Pattern topicsPattern, * If `converter` throws an exception, the corresponding {@link Consumer#getLastMessageIdsAsync()}'s result will * fail with that exception. */ - ConsumerBuilder payloadToMessageIdFunction(PayloadToMessageIdConverter converter); + ConsumerBuilder payloadToMessageIdConverter(PayloadToMessageIdConverter converter); interface LastEntry { diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java index 4a313feba43d5..bd5a946879c90 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java @@ -376,4 +376,8 @@ public interface ReaderBuilder extends Cloneable { */ ReaderBuilder expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit); + /** + * @see ConsumerBuilder#payloadToMessageIdConverter + */ + ReaderBuilder payloadToMessageIdConverter(ConsumerBuilder.PayloadToMessageIdConverter converter); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index a1c7554a00b6a..95caf7f88e9b5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -636,7 +636,7 @@ public ConsumerBuilder topicConfiguration(Pattern topicsPattern, } @Override - public ConsumerBuilder payloadToMessageIdFunction(PayloadToMessageIdConverter converter) { + public ConsumerBuilder payloadToMessageIdConverter(PayloadToMessageIdConverter converter) { conf.setPayloadToMessageIdConverter(converter); return this; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index d0ab90068ed31..e4b0a1f2d44cd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -30,6 +30,7 @@ import lombok.Getter; import lombok.NonNull; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.MessageCrypto; @@ -280,4 +281,9 @@ public ReaderBuilder expireTimeOfIncompleteChunkedMessage(long duration, Time return this; } + @Override + public ReaderBuilder payloadToMessageIdConverter(ConsumerBuilder.PayloadToMessageIdConverter converter) { + conf.setPayloadToMessageIdConverter(converter); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java index 8760d69447a64..5c47dc9fbcd0d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java @@ -134,6 +134,7 @@ public void reachedEndOfTopic(Consumer consumer) { .ranges(readerConfiguration.getKeyHashRanges()) ); } + consumerConfiguration.setPayloadToMessageIdConverter(readerConfiguration.getPayloadToMessageIdConverter()); ConsumerInterceptors consumerInterceptors = ReaderInterceptorUtil.convertToConsumerInterceptors( diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 21e7c167598b0..2385a952eac04 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -463,4 +463,10 @@ public ConsumerConfigurationData clone() { public boolean isReplicateSubscriptionState() { return replicateSubscriptionState != null && replicateSubscriptionState; } + + public void setPayloadToMessageIdConverter(ConsumerBuilder.PayloadToMessageIdConverter converter) { + if (converter != null) { + this.payloadToMessageIdConverter = converter; + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java index cd5aa4c12f5c3..d62da84013591 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import lombok.Data; +import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.MessageCrypto; @@ -164,6 +165,8 @@ public class ReaderConfigurationData implements Serializable, Cloneable { private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest; + private ConsumerBuilder.PayloadToMessageIdConverter payloadToMessageIdConverter; + @JsonIgnore public String getTopicName() { if (topicNames.size() > 1) { From 228f749016ebbf73dfdaee3848573ad09a206654 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 21 Jun 2025 15:26:33 +0800 Subject: [PATCH 09/13] Add tests for custom compaction service --- .../CustomCompactionServiceFactory.java | 156 ++++++++++++++++++ .../CustomCompactionServiceTest.java | 115 +++++++++++++ .../pulsar/client/api/ReaderBuilder.java | 5 + .../pulsar/client/impl/ReaderBuilderImpl.java | 7 + .../apache/pulsar/client/impl/ReaderImpl.java | 7 +- .../impl/conf/ReaderConfigurationData.java | 5 + 6 files changed, 294 insertions(+), 1 deletion(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceFactory.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceFactory.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceFactory.java new file mode 100644 index 0000000000000..c79dc94c04b77 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceFactory.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import io.netty.buffer.Unpooled; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Optional; +import java.util.function.BiPredicate; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessagePayload; +import org.apache.pulsar.client.api.MessagePayloadContext; +import org.apache.pulsar.client.api.MessagePayloadProcessor; +import org.apache.pulsar.client.api.RawMessage; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.RawMessageImpl; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.api.proto.SingleMessageMetadata; +import org.apache.pulsar.common.protocol.Commands; + +public class CustomCompactionServiceFactory extends PulsarCompactionServiceFactory { + + private static final String RETAINED_MESSAGE_INDEXES = "retained.message.indexes"; + + @Override + protected Compactor newCompactor() throws PulsarServerException { + return new CustomCompactor(getPulsarService()); + } + + public static MessageId convertPayloadToMessageId(ConsumerBuilder.LastEntry lastEntry) { + final var buf = Unpooled.wrappedBuffer(lastEntry.getMetadataBuffer()); + try { + final var metadata = Commands.parseMessageMetadata(buf); + final int batchSize = metadata.getNumMessagesInBatch(); + final var property = metadata.getPropertiesList().stream() + .filter(__ -> __.getKey().equals(RETAINED_MESSAGE_INDEXES)).findAny().orElse(null); + final int batchIndex; + if (property == null) { + batchIndex = batchSize - 1; + } else { + final var indexes = Arrays.stream(property.getValue().split(",")).map(Integer::valueOf).toList(); + batchIndex = indexes.get(indexes.size() - 1); + } + return new BatchMessageIdImpl(lastEntry.getLedgerId(), lastEntry.getEntryId(), + lastEntry.getPartitionIndex(), batchIndex); + } finally { + buf.release(); + } + } + + public static class PayloadProcessor implements MessagePayloadProcessor { + + @Override + public void process(MessagePayload payload, MessagePayloadContext context, Schema schema, + Consumer> messageConsumer) throws Exception { + final var property = context.getProperty(RETAINED_MESSAGE_INDEXES); + if (property == null) { + MessagePayloadProcessor.DEFAULT.process(payload, context, schema, messageConsumer); + return; + } + final var numMessages = context.getNumMessages(); + final var indexes = Arrays.stream(property.split(",")).map(Integer::valueOf).collect(Collectors.toSet()); + for (int i = 0; i < numMessages; i++) { + final var msg = context.getMessageAt(i, numMessages, payload, true, schema); + if (indexes.contains(i)) { + messageConsumer.accept(msg); + } + } + } + } + + private static class CustomCompactor extends PublishingOrderCompactor { + + public CustomCompactor(PulsarService pulsarService) throws PulsarServerException { + super(pulsarService.getConfiguration(), pulsarService.getClient(), pulsarService.getBookKeeperClient(), + pulsarService.getCompactorExecutor()); + } + + // This is a simple implementation that assumes all messages are not compressed and have partition keys + @Override + protected Optional rebatchMessage(String topic, RawMessage msg, MessageMetadata metadata, + BiPredicate filter, boolean retainNullKey) + throws IOException { + final var payload = msg.getHeadersAndPayload(); + if (metadata == null) { + metadata = Commands.parseMessageMetadata(payload); + } else { + Commands.skipMessageMetadata(payload); + } + + final var batchSize = metadata.getNumMessagesInBatch(); + final var singleMessageMetadata = new SingleMessageMetadata(); + final var retainedMessageIndexes = new ArrayList(); + final var batchBuffer = PulsarByteBufAllocator.DEFAULT.buffer(payload.capacity()); + // The difference from the built-in compactor's behavior is that the compactedOut field is no longer set. + // Instead, the retained messages' indexes are serialized into a property. + for (int i = 0; i < batchSize; i++) { + final var singleMessagePayload = Commands.deSerializeSingleMessageInBatch(payload, + singleMessageMetadata, 0, batchSize); + final var id = new BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(), + msg.getMessageIdData().getEntryId(), msg.getMessageIdData().getPartition(), i); + if (singleMessageMetadata.isCompactedOut()) { + retainedMessageIndexes.add(Integer.toString(i)); + Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata, Unpooled.EMPTY_BUFFER, + batchBuffer); + } else if (filter.test(singleMessageMetadata.getPartitionKey(), id) + && singleMessagePayload.readableBytes() > 0) { + retainedMessageIndexes.add(Integer.toString(i)); + Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata, + singleMessagePayload, batchBuffer); + } else { + Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata, Unpooled.EMPTY_BUFFER, + batchBuffer); + } + singleMessagePayload.release(); + } + if (retainedMessageIndexes.isEmpty()) { + return Optional.empty(); + } + metadata.addProperty().setKey(RETAINED_MESSAGE_INDEXES) + .setValue(String.join(",", retainedMessageIndexes)); + final var metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, + metadata, batchBuffer); + try { + return Optional.of(new RawMessageImpl(msg.getMessageIdData(), metadataAndPayload)); + } finally { + metadataAndPayload.release(); + } + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceTest.java new file mode 100644 index 0000000000000..c2c83f99ccbf0 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-impl") +public class CustomCompactionServiceTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setBrokerServiceCompactionMonitorIntervalInSeconds(Integer.MAX_VALUE); + conf.setRetentionCheckIntervalInSeconds(Integer.MAX_VALUE); + conf.setCompactionServiceFactoryClassName(CustomCompactionServiceFactory.class.getName()); + } + + @Test + public void testGetLastMessageIdAfterCompaction() throws Exception { + final var topic = BrokerTestUtil.newUniqueName("tp"); + @Cleanup final var producer = pulsarClient.newProducer(Schema.STRING).topic(topic) + .batchingMaxBytes(Integer.MAX_VALUE) + .batchingMaxMessages(Integer.MAX_VALUE) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .create(); + producer.newMessage().key("k0").value("v0").sendAsync(); + producer.newMessage().key("k0").value("v1").sendAsync(); + producer.newMessage().key("k1").value("v0").sendAsync(); + producer.newMessage().key("k1").value(null).sendAsync(); + producer.flush(); + + triggerCompactionAndWait(topic); + + @Cleanup final var reader = pulsarClient.newReader(Schema.STRING).topic(topic) + .startMessageId(MessageId.earliest).readCompacted(true) + .messagePayloadProcessor(new CustomCompactionServiceFactory.PayloadProcessor()) + .payloadToMessageIdConverter(CustomCompactionServiceFactory::convertPayloadToMessageId) + .create(); + final var messages = new ArrayList>(); + while (reader.hasMessageAvailable()) { + final var msg = reader.readNext(3, TimeUnit.SECONDS); + assertNotNull(msg); + messages.add(msg); + log.info("Read {} => {} {{}}", msg.getKey(), msg.getValue(), msg.getMessageId()); + } + Assert.assertEquals(messages.size(), 1); + final var msg = messages.get(0); + Assert.assertEquals(msg.getKey(), "k0"); + Assert.assertEquals(msg.getValue(), "v1"); + Assert.assertEquals(((MessageIdAdv) msg.getMessageId()).getBatchIndex(), 1); + final var lastMsgId = reader.getLastMessageIds().get(0); + log.info("Last msg id: {}", lastMsgId); + Assert.assertEquals(msg.getMessageId(), lastMsgId); + } + + private void triggerCompactionAndWait(String topicName) throws Exception { + final var persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get(); + persistentTopic.triggerCompaction(); + Awaitility.await().untilAsserted(() -> { + Position lastConfirmPos = persistentTopic.getManagedLedger().getLastConfirmedEntry(); + Position markDeletePos = persistentTopic + .getSubscription(Compactor.COMPACTION_SUBSCRIPTION).getCursor().getMarkDeletedPosition(); + assertEquals(markDeletePos.getLedgerId(), lastConfirmPos.getLedgerId()); + assertEquals(markDeletePos.getEntryId(), lastConfirmPos.getEntryId()); + }); + } +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java index bd5a946879c90..a856e8d074337 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java @@ -376,6 +376,11 @@ public interface ReaderBuilder extends Cloneable { */ ReaderBuilder expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit); + /** + * @see ConsumerBuilder#messagePayloadProcessor + */ + ReaderBuilder messagePayloadProcessor(MessagePayloadProcessor payloadProcessor); + /** * @see ConsumerBuilder#payloadToMessageIdConverter */ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index e4b0a1f2d44cd..e9ce3d39d9752 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.MessageCrypto; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessagePayloadProcessor; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.Reader; @@ -281,6 +282,12 @@ public ReaderBuilder expireTimeOfIncompleteChunkedMessage(long duration, Time return this; } + @Override + public ReaderBuilder messagePayloadProcessor(MessagePayloadProcessor payloadProcessor) { + conf.setPayloadProcessor(payloadProcessor); + return this; + } + @Override public ReaderBuilder payloadToMessageIdConverter(ConsumerBuilder.PayloadToMessageIdConverter converter) { conf.setPayloadToMessageIdConverter(converter); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java index 5c47dc9fbcd0d..c4ad632138c0f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java @@ -134,7 +134,12 @@ public void reachedEndOfTopic(Consumer consumer) { .ranges(readerConfiguration.getKeyHashRanges()) ); } - consumerConfiguration.setPayloadToMessageIdConverter(readerConfiguration.getPayloadToMessageIdConverter()); + if (readerConfiguration.getPayloadProcessor() != null) { + consumerConfiguration.setPayloadProcessor(readerConfiguration.getPayloadProcessor()); + } + if (readerConfiguration.getPayloadToMessageIdConverter() != null) { + consumerConfiguration.setPayloadToMessageIdConverter(readerConfiguration.getPayloadToMessageIdConverter()); + } ConsumerInterceptors consumerInterceptors = ReaderInterceptorUtil.convertToConsumerInterceptors( diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java index d62da84013591..987a8f2a8b104 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java @@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.MessageCrypto; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessagePayloadProcessor; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.ReaderInterceptor; import org.apache.pulsar.client.api.ReaderListener; @@ -165,8 +166,12 @@ public class ReaderConfigurationData implements Serializable, Cloneable { private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest; + @JsonIgnore private ConsumerBuilder.PayloadToMessageIdConverter payloadToMessageIdConverter; + @JsonIgnore + private MessagePayloadProcessor payloadProcessor; + @JsonIgnore public String getTopicName() { if (topicNames.size() > 1) { From 4e4d3b316d0a94467ab39a0e1136d3d3e863f156 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 21 Jun 2025 16:01:07 +0800 Subject: [PATCH 10/13] Fix checkstyle --- .../main/java/org/apache/pulsar/client/impl/RawReaderImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 7efca8d604f7f..c4be2eb3382fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -43,7 +43,6 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; -import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From 8f879203073f76eb28adfb56b5b833baadcb6f5f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 21 Jun 2025 19:48:44 +0800 Subject: [PATCH 11/13] Refactor and improve the docs --- .../pulsar/client/impl/RawReaderImpl.java | 4 +- .../CustomCompactionServiceFactory.java | 4 +- .../pulsar/client/api/ConsumerBuilder.java | 52 ++++++------------- .../api/PayloadToMessageIdConverter.java | 28 ++++++++++ .../pulsar/client/api/ReaderBuilder.java | 2 +- .../client/impl/ConsumerBuilderImpl.java | 1 + .../pulsar/client/impl/ConsumerImpl.java | 7 +-- .../pulsar/client/impl/ReaderBuilderImpl.java | 4 +- .../impl/conf/ConsumerConfigurationData.java | 6 +-- .../impl/conf/ReaderConfigurationData.java | 4 +- 10 files changed, 62 insertions(+), 50 deletions(-) create mode 100644 pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PayloadToMessageIdConverter.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index c4be2eb3382fb..6a27b79edf776 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -27,8 +27,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PayloadToMessageIdConverter; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; @@ -61,7 +61,7 @@ public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, CompletableFuture> consumerFuture, boolean createTopicIfDoesNotExist, boolean retryOnRecoverableErrors, - ConsumerBuilder.PayloadToMessageIdConverter payloadToMessageIdConverter) { + PayloadToMessageIdConverter payloadToMessageIdConverter) { consumerConfiguration = new ConsumerConfigurationData<>(); consumerConfiguration.getTopicNames().add(topic); consumerConfiguration.setSubscriptionName(subscription); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceFactory.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceFactory.java index c79dc94c04b77..29cf1ee789c2b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceFactory.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceFactory.java @@ -28,12 +28,12 @@ import java.util.stream.Collectors; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessagePayload; import org.apache.pulsar.client.api.MessagePayloadContext; import org.apache.pulsar.client.api.MessagePayloadProcessor; +import org.apache.pulsar.client.api.PayloadToMessageIdConverter; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.BatchMessageIdImpl; @@ -52,7 +52,7 @@ protected Compactor newCompactor() throws PulsarServerException { return new CustomCompactor(getPulsarService()); } - public static MessageId convertPayloadToMessageId(ConsumerBuilder.LastEntry lastEntry) { + public static MessageId convertPayloadToMessageId(PayloadToMessageIdConverter.LastEntry lastEntry) { final var buf = Unpooled.wrappedBuffer(lastEntry.getMetadataBuffer()); try { final var metadata = Commands.parseMessageMetadata(buf); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index c540e5d3d285c..74d33b5015fbf 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.client.api; -import java.io.IOException; -import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -954,40 +952,24 @@ ConsumerBuilder topicConfiguration(Pattern topicsPattern, java.util.function.Consumer> builderConsumer); /** - * When {@link ConsumerBuilder#readCompacted(boolean)} is true, the GetLastMessageId response could include the - * last entry's buffer from the compaction service. In this case, the last message's message id should be parsed - * from the buffer because the entry could include messages that have been compacted out, which won't be received - * by the consumer. - * When the broker's topic compaction service is the built-in one, users don't need to configure it because the - * built-in convert function works well. But if the broker configures a customized topic compaction service, you - * must configure `converter` with a proper function to parse the buffer correctly according to the compaction - * service's behavior. - * If `converter` throws an exception, the corresponding {@link Consumer#getLastMessageIdsAsync()}'s result will - * fail with that exception. + * Configures a custom `PayloadToMessageIdConverter` to handle the parsing of the last entry's buffer when + * {@link ConsumerBuilder#readCompacted(boolean)} is set to `true`. + * + * When compaction is enabled, the `GetLastMessageId` response may include the buffer of the last entry from the + * compaction service. In such cases, the last message's message ID must be extracted from the buffer, as the entry + * may contain messages that have been compacted out and will not be delivered to the consumer. + * + * If the broker's topic compaction service uses the built-in implementation, users do not need to configure this + * explicitly, as the default conversion function handles the parsing correctly. However, if the broker is + * configured with a custom topic compaction service, you must provide a `converter` with an appropriate function to + * parse the buffer correctly based on the behavior of the custom compaction service. + * + * If the provided `converter` throws an exception during parsing, the corresponding result of + * {@link Consumer#getLastMessageIdsAsync()} will fail with that exception. + * + * @param converter The custom `PayloadToMessageIdConverter` to parse the last entry's buffer. + * @return The updated `ConsumerBuilder` instance. */ ConsumerBuilder payloadToMessageIdConverter(PayloadToMessageIdConverter converter); - interface LastEntry { - - long getLedgerId(); - - long getEntryId(); - - int getPartitionIndex(); - - /** - * @return the buffer that can be parsed to the `MessageMetadata` defined in `PulsarApi.proto` - */ - ByteBuffer getMetadataBuffer(); - - /** - * @return the uncompressed and unencrypted payload buffer of the last entry - */ - ByteBuffer getPayloadBuffer(); - } - - interface PayloadToMessageIdConverter { - - MessageId convert(LastEntry lastEntry) throws IOException; - } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PayloadToMessageIdConverter.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PayloadToMessageIdConverter.java new file mode 100644 index 0000000000000..31a187be1b157 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PayloadToMessageIdConverter.java @@ -0,0 +1,28 @@ +package org.apache.pulsar.client.api; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public interface PayloadToMessageIdConverter { + + MessageId convert(LastEntry lastEntry) throws IOException; + + interface LastEntry { + + long getLedgerId(); + + long getEntryId(); + + int getPartitionIndex(); + + /** + * @return the buffer that can be parsed to the `MessageMetadata` defined in `PulsarApi.proto` + */ + ByteBuffer getMetadataBuffer(); + + /** + * @return the uncompressed and unencrypted payload buffer of the last entry + */ + ByteBuffer getPayloadBuffer(); + } +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java index a856e8d074337..c6057ac082b3d 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java @@ -384,5 +384,5 @@ public interface ReaderBuilder extends Cloneable { /** * @see ConsumerBuilder#payloadToMessageIdConverter */ - ReaderBuilder payloadToMessageIdConverter(ConsumerBuilder.PayloadToMessageIdConverter converter); + ReaderBuilder payloadToMessageIdConverter(PayloadToMessageIdConverter converter); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 95caf7f88e9b5..74f8d677748e1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.MessageListenerExecutor; import org.apache.pulsar.client.api.MessagePayloadProcessor; +import org.apache.pulsar.client.api.PayloadToMessageIdConverter; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException; import org.apache.pulsar.client.api.RedeliveryBackoff; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 9a47c22fd32e3..56667a74cf211 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -76,7 +76,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.DeadLetterProducerBuilderContext; @@ -86,6 +85,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.Messages; +import org.apache.pulsar.client.api.PayloadToMessageIdConverter; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClientException; @@ -2907,7 +2907,8 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, } } - public static MessageId convertBufferToMessageId(ConsumerBuilder.LastEntry lastEntry) throws IOException { + public static MessageId convertBufferToMessageId(PayloadToMessageIdConverter.LastEntry lastEntry) + throws IOException { final ByteBuf metadataBuf = Unpooled.wrappedBuffer(lastEntry.getMetadataBuffer()); final ByteBuf buf = Unpooled.wrappedBuffer(lastEntry.getPayloadBuffer()); try { @@ -2971,7 +2972,7 @@ private MessageId getLastMessageIdFromResponse(MessageIdData lastMessageId, Byte throw new RuntimeException("Failed to uncompress last message's buffer"); } try { - return conf.getPayloadToMessageIdConverter().convert(new ConsumerBuilder.LastEntry() { + return conf.getPayloadToMessageIdConverter().convert(new PayloadToMessageIdConverter.LastEntry() { @Override public long getLedgerId() { return lastMessageId.getLedgerId(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index e9ce3d39d9752..c4a350b9e498c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -30,12 +30,12 @@ import lombok.Getter; import lombok.NonNull; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.MessageCrypto; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessagePayloadProcessor; +import org.apache.pulsar.client.api.PayloadToMessageIdConverter; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.Reader; @@ -289,7 +289,7 @@ public ReaderBuilder messagePayloadProcessor(MessagePayloadProcessor payloadP } @Override - public ReaderBuilder payloadToMessageIdConverter(ConsumerBuilder.PayloadToMessageIdConverter converter) { + public ReaderBuilder payloadToMessageIdConverter(PayloadToMessageIdConverter converter) { conf.setPayloadToMessageIdConverter(converter); return this; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 2385a952eac04..beb8796dadfa0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -37,7 +37,6 @@ import lombok.Data; import lombok.NoArgsConstructor; import org.apache.pulsar.client.api.BatchReceivePolicy; -import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.ConsumerEventListener; import org.apache.pulsar.client.api.CryptoKeyReader; @@ -47,6 +46,7 @@ import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.MessageListenerExecutor; import org.apache.pulsar.client.api.MessagePayloadProcessor; +import org.apache.pulsar.client.api.PayloadToMessageIdConverter; import org.apache.pulsar.client.api.RedeliveryBackoff; import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.SubscriptionInitialPosition; @@ -416,7 +416,7 @@ public int getMaxPendingChuckedMessage() { private boolean autoScaledReceiverQueueSizeEnabled = false; private List topicConfigurations = new ArrayList<>(); - private ConsumerBuilder.PayloadToMessageIdConverter payloadToMessageIdConverter = + private PayloadToMessageIdConverter payloadToMessageIdConverter = ConsumerImpl::convertBufferToMessageId; public TopicConsumerConfigurationData getMatchingTopicConfiguration(String topicName) { @@ -464,7 +464,7 @@ public boolean isReplicateSubscriptionState() { return replicateSubscriptionState != null && replicateSubscriptionState; } - public void setPayloadToMessageIdConverter(ConsumerBuilder.PayloadToMessageIdConverter converter) { + public void setPayloadToMessageIdConverter(PayloadToMessageIdConverter converter) { if (converter != null) { this.payloadToMessageIdConverter = converter; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java index 987a8f2a8b104..6b52174b2f2ae 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java @@ -27,12 +27,12 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import lombok.Data; -import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.MessageCrypto; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessagePayloadProcessor; +import org.apache.pulsar.client.api.PayloadToMessageIdConverter; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.ReaderInterceptor; import org.apache.pulsar.client.api.ReaderListener; @@ -167,7 +167,7 @@ public class ReaderConfigurationData implements Serializable, Cloneable { private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest; @JsonIgnore - private ConsumerBuilder.PayloadToMessageIdConverter payloadToMessageIdConverter; + private PayloadToMessageIdConverter payloadToMessageIdConverter; @JsonIgnore private MessagePayloadProcessor payloadProcessor; From 96d2c2a51a0043b09cef4b2cd05ded9b199d8487 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 21 Jun 2025 19:52:03 +0800 Subject: [PATCH 12/13] Support customizing the RawReader --- .../main/java/org/apache/pulsar/compaction/Compactor.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java index d37298757db9d..729cda8f1f1e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java @@ -56,8 +56,11 @@ public Compactor(ServiceConfiguration conf, } public CompletableFuture compact(String topic) { - return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION, false, false).thenComposeAsync( - this::compactAndCloseReader, scheduler); + return createRawReader(topic).thenComposeAsync(this::compactAndCloseReader, scheduler); + } + + protected CompletableFuture createRawReader(String topic) { + return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION, false, false); } private CompletableFuture compactAndCloseReader(RawReader reader) { From 03807300fc708a070026ad594aeb10bdb9ca1fd0 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 21 Jun 2025 20:03:51 +0800 Subject: [PATCH 13/13] Support customizing RawReader --- .../compaction/CustomCompactionServiceFactory.java | 12 ++++++++++++ .../compaction/CustomCompactionServiceTest.java | 5 +++++ .../compaction/GetLastMessageIdCompactedTest.java | 5 ++--- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceFactory.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceFactory.java index 29cf1ee789c2b..2dce6c18c03be 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceFactory.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceFactory.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -35,9 +36,12 @@ import org.apache.pulsar.client.api.MessagePayloadProcessor; import org.apache.pulsar.client.api.PayloadToMessageIdConverter; import org.apache.pulsar.client.api.RawMessage; +import org.apache.pulsar.client.api.RawReader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.RawMessageImpl; +import org.apache.pulsar.client.impl.RawReaderImpl; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.SingleMessageMetadata; @@ -101,6 +105,14 @@ public CustomCompactor(PulsarService pulsarService) throws PulsarServerException pulsarService.getCompactorExecutor()); } + @Override + protected CompletableFuture createRawReader(String topic) { + final var future = new CompletableFuture>(); + final var reader = new RawReaderImpl((PulsarClientImpl) pulsar, topic, COMPACTION_SUBSCRIPTION, future, + false, false, CustomCompactionServiceFactory::convertPayloadToMessageId); + return future.thenApply(__ -> reader); + } + // This is a simple implementation that assumes all messages are not compressed and have partition keys @Override protected Optional rebatchMessage(String topic, RawMessage msg, MessageMetadata metadata, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceTest.java index c2c83f99ccbf0..a077bf93e56c9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceTest.java @@ -99,6 +99,11 @@ public void testGetLastMessageIdAfterCompaction() throws Exception { final var lastMsgId = reader.getLastMessageIds().get(0); log.info("Last msg id: {}", lastMsgId); Assert.assertEquals(msg.getMessageId(), lastMsgId); + + // Trigger the RawReader#getLastMessageId function and ensure no exception will be thrown + admin.namespaces().unload("public/default"); + triggerCompactionAndWait(topic); + Assert.assertEquals(reader.getLastMessageIds().get(0), lastMsgId); } private void triggerCompactionAndWait(String topicName) throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java index 64c2a9a68cb16..a99949607f702 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java @@ -55,7 +55,6 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.MockZooKeeper; import org.awaitility.Awaitility; -import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -563,9 +562,9 @@ public void testGetLastMessageIdForEncryptedMessage(boolean encryption, Compress final var msgId = (MessageIdAdv) consumer.getLastMessageIds().get(0); if (encryption) { // Compaction does not work for encrypted messages - Assert.assertEquals(msgId.getBatchIndex(), 3); + assertEquals(msgId.getBatchIndex(), 3); } else { - Assert.assertEquals(msgId.getBatchIndex(), 1); + assertEquals(msgId.getBatchIndex(), 1); } final var readerBuilder = pulsarClient.newReader(Schema.STRING).topic(topic).startMessageId(MessageId.earliest)