From a92e92cc980857fb0c3350b0d3d36c65ea9b71c8 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 21 Jun 2025 19:25:41 +0800 Subject: [PATCH 1/8] [pip] PIP-429: Client-Side Computation of Last Compacted Message ID --- pip/pip-429.md | 229 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 229 insertions(+) create mode 100644 pip/pip-429.md diff --git a/pip/pip-429.md b/pip/pip-429.md new file mode 100644 index 0000000000000..6de0e9a552250 --- /dev/null +++ b/pip/pip-429.md @@ -0,0 +1,229 @@ +# PIP-429: Client-Side Computation of Last Compacted Message ID + +# Background knowledge + +The typical reader's work flow looks like: + +```java +while (reader.hasMessageAvailable()) { + final var msg = reader.readNext(); + handleMessage(msg); +} +``` + +`hasMessageAvailable` could perform the `GetLastMessageId` RPC to get the last message ID from broker. However, when the reader is a compacted reader, i.e. `readCompacted(true)` is configured when creating the reader, the server side could compute the message ID from the last entry in the compaction service. + +Generally, with the built-in compaction service, when the entry represents a batch of messages, the compacted entry buffer consists of: + +1. Serialized `MessageMetadata` +2. Serialized payload buffer, which can be compressed or encrypted. The uncompressed payload buffer consists of a list of `SingleMessageMetadata` and value buffers. + +Take a typical example, when a producer that configures `LZ4` as the compression type sends the following messages in a batch: + +```java +producer.newMessage().key("k0").value("v0").sendAsync(); +producer.newMessage().key("k0").value("v1").sendAsync(); +producer.newMessage().key("k1").value("v0").sendAsync(); +producer.newMessage().key("k1").value(null).sendAsync(); +``` + +After the compaction, the compacted entry buffer could be represented as follows: + +```yaml +metadata: # MessageMetadata + num_messages_in_batch: 4 + compression: LZ4 +payload: + - singleMetadata: # SingleMessageMetadata + key: k0 + compactedOut: true + value: v1 + - singleMetadata: + key: k0 + compactedOut: false + value: v1 + - singleMetadata: + key: k1 + compactedOut: true + value: v0 + - singleMetadata: + key: k1 + compactedOut: true + nullValue: true + value: "" +``` + +- For a given key, only the latest value will be retained, so `k0 => v0` will be compacted out. +- A null value means the key will be removed, so `k1 => v0` and `k1 => null` will be compacted out. + +Prior to [#18877](https://github.com/apache/pulsar/pull/18877), the `hasMessageAvailable` and `readNext` loop might encounter issues because the `GetLastMessageId` RPC returns `{ledger, entry, batchIndex=3}` as the last message ID, which represents `k1 => null`. + +The issue occurs because the batch index of the last message ID is calculated as `num_messages_in_batch - 1` without considering certain edge cases. [#18877](https://github.com/apache/pulsar/pull/18877) resolves this problem by uncompressing the compacted entry buffer on the broker side and filtering out messages where the individual metadata has `compactedOut` set to `true`. This ensures that only valid messages are considered when determining the last message ID. + +# Motivation + +Currently, if the last message of a topic is stored in the compacted ledger, each `GetLastMessageId` RPC must uncompress the compacted entry buffer and deserialize both the `MessageMetadata` and `SingleMessageMetadata` to compute the last message ID. This process can become resource-intensive, especially when handling a large number of such topics. + +Additionally, if the messages are encrypted, they cannot be uncompressed because the public key is required for decryption. This limitation also affects the current compaction service, as encrypted messages cannot be compacted. As a result, the `GetLastMessageId` RPC will fail with an error similar to the following: + +``` +org.apache.pulsar.client.api.PulsarClientException$BrokerMetadataException: The subscription sub of the topic gets the last message id was failed +{"errorMsg":"Failed to read last entry of the compacted Ledger Invalid unknown tag type: 3","reqId":3317275583068061944, "remote":"localhost/127.0.0.1:50818", "local":"/127.0.0.1:50823"} +``` + +Instead, the expected behavior is to return the last message ID (e.g., `k1 => null` in the previous example). + +Another issue arises from the assumption made by the `GetLastMessageId` RPC that the compacted entry's payload buffer must always contain a `SingleMessageMetadata` list. However, this is not always the case. For instance, a custom topic compaction service might write a payload buffer that omits the `SingleMessageMetadata`. In such cases, the `compactedOut` information could instead be stored in the properties of the `MessageMetadata`, but the `GetLastMessageId` RPC will always fail. + +# Goals + +## In Scope + +Shift the logic for computing the last message ID from the broker side to the client side. This change will enable the client to compute the last message ID without requiring the uncompression or decryption of the compacted entry buffer. + +## Out of Scope + +The topic compaction service's components are complex and not optimally designed. While this PIP will make minimal modifications to the visibility levels of certain methods, such changes will only be made when absolutely necessary. For example, to allow a derived class to customize a `RawReader`. + +# High Level Design + +The client will have a newer protocol version. When the broker receives a `GetLastMessageId` request and validates the protocol version supports this feature, it will skip decompressing the entry buffer. Instead, it will send the payload buffer after the `GetLastMessageId` response. The client will then handle the payload buffer to compute the last message ID. + +To allow a different format of the compacted entry buffer, a configurable function will be used to compute a message id from the payload buffer. + +# Detailed Design + +## Design & Implementation Details + +In the `ServerCnx#handleLastMessageIdFromCompactionService` method, the computation logic will be bypassed if the remote protocol version is v22 or later. This ensures compatibility with newer protocol versions, where the client is expected to handle the relevant processing independently. + +```java +persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenAccept(entry -> { + if (entry != null) { + if (getRemoteEndpointProtocolVersion() >= v22.getValue()) { + sendGetLastMessageIdResponseWithBuffer(requestId, partitionIndex, entry, markDeletePosition); + return; + } +``` + +The `sendGetLastMessageIdResponseWithBuffer` method will serialize the `CommandGetLastMessageIdResponse` object and the entry buffer into a `ByteBufPair`. + +```java +public static ByteBufPair newGetLastMessageIdResponse(long requestId, long lastPositionLedgerId, + long lastPositionEntryId, int partitionIndex, + long markDeleteLedgerId, long markDeleteEntryId, + ByteBuf lastEntryBuffer) { +``` + +The consumer or reader will handle the payload processing by first decompressing it based on the compression type specified in the `MessageMetadata`. After decompression, the payload will be decrypted using the configured crypto key reader. This ensures that the payload is properly prepared for further processing, adhering to the specified compression and encryption settings. + +## Public-facing Changes + +```java +interface LastEntry { + + long getLedgerId(); + + long getEntryId(); + + int getPartitionIndex(); + + /** + * @return the buffer that can be parsed to the `MessageMetadata` defined in `PulsarApi.proto` + */ + ByteBuffer getMetadataBuffer(); + + /** + * @return the uncompressed and unencrypted payload buffer of the last entry + */ + ByteBuffer getPayloadBuffer(); +} + +interface PayloadToMessageIdConverter { + + MessageId convert(LastEntry lastEntry) throws IOException; +} +``` + +The `LastEntry` interface offers two distinct methods to retrieve the metadata buffer and the payload buffer of the last entry. This design ensures that the consumer has the flexibility to handle the payload appropriately. Specifically, the consumer is responsible for decompressing and decrypting the payload buffer before passing it to the `PayloadToMessageIdConverter`. It is important to note that if the payload buffer has been compressed or encrypted, the metadata buffer and the payload buffer will differ. + +A new configuration option will be introduced for `Consumer`, `Reader`, and `RawReader` to enhance their flexibility and functionality. Additionally, this proposal addresses a previously missing feature (see [PIP-96](https://github.com/apache/pulsar/wiki/PIP-96%3A-Message-payload-processor-for-Pulsar-client)) by adding the `messagePayloadProcessor` configuration to the `Reader`. This enhancement enables the `Reader` to process entry buffers in various formats, ensuring better adaptability to different use cases. + +```java +/** + * Configures a custom `PayloadToMessageIdConverter` to handle the parsing of the last entry's buffer when + * {@link ConsumerBuilder#readCompacted(boolean)} is set to `true`. + * + * When compaction is enabled, the `GetLastMessageId` response may include the buffer of the last entry from the + * compaction service. In such cases, the last message's message ID must be extracted from the buffer, as the entry + * may contain messages that have been compacted out and will not be delivered to the consumer. + * + * If the broker's topic compaction service uses the built-in implementation, users do not need to configure this + * explicitly, as the default conversion function handles the parsing correctly. However, if the broker is configured + * with a custom topic compaction service, you must provide a `converter` with an appropriate function to parse the + * buffer correctly based on the behavior of the custom compaction service. + * + * If the provided `converter` throws an exception during parsing, the corresponding result of + * {@link Consumer#getLastMessageIdsAsync()} will fail with that exception. + * + * @param converter The custom `PayloadToMessageIdConverter` to parse the last entry's buffer. + * @return The updated `ConsumerBuilder` instance. + */ +ConsumerBuilder payloadToMessageIdConverter(PayloadToMessageIdConverter converter); +``` + +```java +/** + * @see ConsumerBuilder#messagePayloadProcessor + */ +ReaderBuilder messagePayloadProcessor(MessagePayloadProcessor payloadProcessor); + +/** + * @see ConsumerBuilder#payloadToMessageIdConverter + */ +ReaderBuilder payloadToMessageIdConverter(ConsumerBuilder.PayloadToMessageIdConverter converter); +``` + +A new constructor will be added to `RawReaderImpl`: + +```java +public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, + CompletableFuture> consumerFuture, + boolean createTopicIfDoesNotExist, boolean retryOnRecoverableErrors, + ConsumerBuilder.PayloadToMessageIdConverter payloadToMessageIdConverter) { +``` + +and the `Compactor` will have a protected method to allow derived classes to customize the `RawReader`: + +```java +protected CompletableFuture createRawReader(String topic) { + return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION, false, false); +} +``` + +### Binary protocol + +Add a new protocol version: + +```protobuf + v22 = 22; // Send the last entry's payload buffer to the consumer if the consumer's read_compacted field is true +``` + +Older clients will continue to rely on the broker's logic to compute the last message ID, while newer clients will handle this computation on the client side. + +# Backward & Forward Compatibility + +- **Scenario 1: Older Client → Newer Broker** + When an older client sends a `GetLastMessageId` request to a newer broker, the broker computes the last message ID using its standard process. To ensure backward compatibility, the broker excludes the payload buffer from the response, aligning with the expectations of older clients. + +- **Scenario 2: Newer Client → Older Broker** + When a newer client sends a `GetLastMessageId` request to an older broker, the broker does not include the payload buffer in the response, as it lacks support for this feature. If the client receives an empty entry buffer, it seamlessly extracts the message ID directly from the response, ensuring compatibility and uninterrupted functionality across versions. + +# Alternatives + +[#24431](https://github.com/apache/pulsar/pull/24431) proposed a solution to add a property to the `MessageMetadata` to indicate the last available message's batch index in the batch. However, it's not flexiable and might have conflicts with user provided properties. + +# Links + +* Mailing List discussion thread: +* Mailing List voting thread: From e7621a218965b962f508d7220596c9f1c463febd Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 21 Jun 2025 19:39:00 +0800 Subject: [PATCH 2/8] Add discussion thread link and fix the doc --- pip/pip-429.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pip/pip-429.md b/pip/pip-429.md index 6de0e9a552250..a86bab6d3270e 100644 --- a/pip/pip-429.md +++ b/pip/pip-429.md @@ -37,7 +37,7 @@ payload: - singleMetadata: # SingleMessageMetadata key: k0 compactedOut: true - value: v1 + value: "" - singleMetadata: key: k0 compactedOut: false @@ -45,7 +45,7 @@ payload: - singleMetadata: key: k1 compactedOut: true - value: v0 + value: "" - singleMetadata: key: k1 compactedOut: true @@ -225,5 +225,5 @@ Older clients will continue to rely on the broker's logic to compute the last me # Links -* Mailing List discussion thread: +* Mailing List discussion thread: https://lists.apache.org/thread/4jghjg7yd36cl8yonbsvk5njmm96vy0k * Mailing List voting thread: From de1a5dc0e1e8999a446cefde41f6637d3fab861b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 23 Jun 2025 23:15:56 +0800 Subject: [PATCH 3/8] Update the proposal --- pip/pip-429.md | 168 +++++++++++++++---------------------------------- 1 file changed, 50 insertions(+), 118 deletions(-) diff --git a/pip/pip-429.md b/pip/pip-429.md index a86bab6d3270e..7ec1a19eb2c06 100644 --- a/pip/pip-429.md +++ b/pip/pip-429.md @@ -1,4 +1,4 @@ -# PIP-429: Client-Side Computation of Last Compacted Message ID +# PIP-429: Introducing an Efficient Compacted Batch Buffer Format # Background knowledge @@ -60,11 +60,13 @@ Prior to [#18877](https://github.com/apache/pulsar/pull/18877), the `hasMessageA The issue occurs because the batch index of the last message ID is calculated as `num_messages_in_batch - 1` without considering certain edge cases. [#18877](https://github.com/apache/pulsar/pull/18877) resolves this problem by uncompressing the compacted entry buffer on the broker side and filtering out messages where the individual metadata has `compactedOut` set to `true`. This ensures that only valid messages are considered when determining the last message ID. +The `compacted_out` field was first introduced in the early stages of development through [#1361](https://github.com/apache/pulsar/pull/1361). However, as part of the overall payload buffer, parsing a `SingleMessageMetadata` currently requires decompressing the compacted entry buffer. This process can be resource-intensive, particularly when handling large topics or encrypted messages, leading to potential performance bottlenecks. + # Motivation -Currently, if the last message of a topic is stored in the compacted ledger, each `GetLastMessageId` RPC must uncompress the compacted entry buffer and deserialize both the `MessageMetadata` and `SingleMessageMetadata` to compute the last message ID. This process can become resource-intensive, especially when handling a large number of such topics. +Decompressing the payload buffer solely to check whether individual messages have the `compacted_out` field set is both inefficient and restrictive, as it imposes constraints on the payload buffer format. Furthermore, when using a custom topic compaction service, the entry buffer in the compacted ledger may not include a `SingleMessageMetadata` for every single message, adding further complexity to the process. -Additionally, if the messages are encrypted, they cannot be uncompressed because the public key is required for decryption. This limitation also affects the current compaction service, as encrypted messages cannot be compacted. As a result, the `GetLastMessageId` RPC will fail with an error similar to the following: +This challenge is exacerbated when messages are encrypted, as decompression is not possible without the public key required for decryption. This limitation also impacts the current compaction service, as encrypted messages cannot be compacted. Consequently, operations such as the `GetLastMessageId` RPC will fail, resulting in an error similar to the following: ``` org.apache.pulsar.client.api.PulsarClientException$BrokerMetadataException: The subscription sub of the topic gets the last message id was failed @@ -75,153 +77,83 @@ Instead, the expected behavior is to return the last message ID (e.g., `k1 => nu Another issue arises from the assumption made by the `GetLastMessageId` RPC that the compacted entry's payload buffer must always contain a `SingleMessageMetadata` list. However, this is not always the case. For instance, a custom topic compaction service might write a payload buffer that omits the `SingleMessageMetadata`. In such cases, the `compactedOut` information could instead be stored in the properties of the `MessageMetadata`, but the `GetLastMessageId` RPC will always fail. +The custom topic compaction service has the flexibility to serialize and deserialize the payload buffer in a different format. However, it still depends on the `GetLastMessageId` RPC in the `hasMessageAvailableAsync` and `getLastMessageIdAsync` methods of the `RawReader` to compute the last message ID. This reliance creates a compatibility issue, as the `GetLastMessageId` RPC will fail when working with a payload buffer in a non-standard format, breaking the functionality of these methods. + # Goals ## In Scope -Shift the logic for computing the last message ID from the broker side to the client side. This change will enable the client to compute the last message ID without requiring the uncompression or decryption of the compacted entry buffer. +Instead of relying on the `compacted_out` field in `SingleMessageMetadata`, this PIP proposes to use `MessageMetadata` to determine the last message ID in the last compacted entry. Since `compacted_out` is no longer used, the payload buffer's format could be improved as well. ## Out of Scope -The topic compaction service's components are complex and not optimally designed. While this PIP will make minimal modifications to the visibility levels of certain methods, such changes will only be made when absolutely necessary. For example, to allow a derived class to customize a `RawReader`. - # High Level Design -The client will have a newer protocol version. When the broker receives a `GetLastMessageId` request and validates the protocol version supports this feature, it will skip decompressing the entry buffer. Instead, it will send the payload buffer after the `GetLastMessageId` response. The client will then handle the payload buffer to compute the last message ID. - -To allow a different format of the compacted entry buffer, a configurable function will be used to compute a message id from the payload buffer. +To enhance efficiency and simplify the handling of compacted entries, a new field will be added to `MessageMetadata` to record the batch indexes of all retained single messages. This change allows the server to determine the last message ID directly from the new field in `MessageMetadata` when processing a `GetLastMessageId` request, rather than relying on the `compacted_out` field in `SingleMessageMetadata`. -# Detailed Design +With this update, the `compacted_out` field will no longer be used, and only the retained messages will be included in the payload buffer. For example, the previous representation: -## Design & Implementation Details - -In the `ServerCnx#handleLastMessageIdFromCompactionService` method, the computation logic will be bypassed if the remote protocol version is v22 or later. This ensures compatibility with newer protocol versions, where the client is expected to handle the relevant processing independently. - -```java -persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenAccept(entry -> { - if (entry != null) { - if (getRemoteEndpointProtocolVersion() >= v22.getValue()) { - sendGetLastMessageIdResponseWithBuffer(requestId, partitionIndex, entry, markDeletePosition); - return; - } +```yaml +metadata: # MessageMetadata + num_messages_in_batch: 4 + compression: LZ4 + compacted_batch_indexes: [1] # Retained messages' batch indexes ``` -The `sendGetLastMessageIdResponseWithBuffer` method will serialize the `CommandGetLastMessageIdResponse` object and the entry buffer into a `ByteBufPair`. - -```java -public static ByteBufPair newGetLastMessageIdResponse(long requestId, long lastPositionLedgerId, - long lastPositionEntryId, int partitionIndex, - long markDeleteLedgerId, long markDeleteEntryId, - ByteBuf lastEntryBuffer) { -``` +The improvements are: +1. **Reduced Payload Size**: + - In the new format, only the retained messages are included in the payload buffer. + - For the example above, the new format contains just 1 pair of `SingleMessageMetadata` and its corresponding value buffer, compared to 4 pairs in the original format. -The consumer or reader will handle the payload processing by first decompressing it based on the compression type specified in the `MessageMetadata`. After decompression, the payload will be decrypted using the configured crypto key reader. This ensures that the payload is properly prepared for further processing, adhering to the specified compression and encryption settings. +2. **Efficient Batch Index Retrieval**: + - The actual batch index of retained messages can now be retrieved from the `compacted_batch_indexes` field in `MessageMetadata`. + - In this example, the retained message's batch index is `compacted_batch_indexes[0] = 1`. ## Public-facing Changes -```java -interface LastEntry { - - long getLedgerId(); - - long getEntryId(); - - int getPartitionIndex(); - - /** - * @return the buffer that can be parsed to the `MessageMetadata` defined in `PulsarApi.proto` - */ - ByteBuffer getMetadataBuffer(); - - /** - * @return the uncompressed and unencrypted payload buffer of the last entry - */ - ByteBuffer getPayloadBuffer(); -} +### Binary protocol -interface PayloadToMessageIdConverter { +A new field will be added to `MessageMetadata`: - MessageId convert(LastEntry lastEntry) throws IOException; -} +```protobuf + // Indicates the indexes of messages retained in the batch after compaction. When a batch is compacted, + // some messages may be removed (compacted out). For example, if the original batch contains: + // `k0 => v0, k1 => v1, k2 => v2, k1 => null`, the compacted batch will retain only `k0 => v0` and `k2 => v2`. + // In this case, this field will be set to `[0, 2]`, and the payload buffer will only include the retained messages. + // + // Note: Batches compacted by older versions of the compaction service do not include this field. For such batches, + // the `compacted_out` field in `SingleMessageMetadata` must be checked to identify and filter out compacted messages + // (e.g., `k1 => v1` and `k1 => null` in the example above). + repeated int32 compacted_batch_indexes = 31; ``` -The `LastEntry` interface offers two distinct methods to retrieve the metadata buffer and the payload buffer of the last entry. This design ensures that the consumer has the flexibility to handle the payload appropriately. Specifically, the consumer is responsible for decompressing and decrypting the payload buffer before passing it to the `PayloadToMessageIdConverter`. It is important to note that if the payload buffer has been compressed or encrypted, the metadata buffer and the payload buffer will differ. - -A new configuration option will be introduced for `Consumer`, `Reader`, and `RawReader` to enhance their flexibility and functionality. Additionally, this proposal addresses a previously missing feature (see [PIP-96](https://github.com/apache/pulsar/wiki/PIP-96%3A-Message-payload-processor-for-Pulsar-client)) by adding the `messagePayloadProcessor` configuration to the `Reader`. This enhancement enables the `Reader` to process entry buffers in various formats, ensuring better adaptability to different use cases. +# Backward & Forward Compatibility -```java -/** - * Configures a custom `PayloadToMessageIdConverter` to handle the parsing of the last entry's buffer when - * {@link ConsumerBuilder#readCompacted(boolean)} is set to `true`. - * - * When compaction is enabled, the `GetLastMessageId` response may include the buffer of the last entry from the - * compaction service. In such cases, the last message's message ID must be extracted from the buffer, as the entry - * may contain messages that have been compacted out and will not be delivered to the consumer. - * - * If the broker's topic compaction service uses the built-in implementation, users do not need to configure this - * explicitly, as the default conversion function handles the parsing correctly. However, if the broker is configured - * with a custom topic compaction service, you must provide a `converter` with an appropriate function to parse the - * buffer correctly based on the behavior of the custom compaction service. - * - * If the provided `converter` throws an exception during parsing, the corresponding result of - * {@link Consumer#getLastMessageIdsAsync()} will fail with that exception. - * - * @param converter The custom `PayloadToMessageIdConverter` to parse the last entry's buffer. - * @return The updated `ConsumerBuilder` instance. - */ -ConsumerBuilder payloadToMessageIdConverter(PayloadToMessageIdConverter converter); -``` +# Alternatives -```java -/** - * @see ConsumerBuilder#messagePayloadProcessor - */ -ReaderBuilder messagePayloadProcessor(MessagePayloadProcessor payloadProcessor); - -/** - * @see ConsumerBuilder#payloadToMessageIdConverter - */ -ReaderBuilder payloadToMessageIdConverter(ConsumerBuilder.PayloadToMessageIdConverter converter); -``` +### Using a Property in `MessageMetadata` -A new constructor will be added to `RawReaderImpl`: +[#24431](https://github.com/apache/pulsar/pull/24431) proposed a solution to add a property to the `MessageMetadata` to indicate the last available message's batch index in the batch. However, it's not flexiable and might have conflicts with user provided properties. -```java -public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, - CompletableFuture> consumerFuture, - boolean createTopicIfDoesNotExist, boolean retryOnRecoverableErrors, - ConsumerBuilder.PayloadToMessageIdConverter payloadToMessageIdConverter) { -``` +### Client-Side Computation of Last Compacted Message ID -and the `Compactor` will have a protected method to allow derived classes to customize the `RawReader`: +The previous solution this proposal is to pass the last compacted entry's buffer as well as the `GetLastMessageId` response, so the client can compute the last compacted message ID by itself. It can even handle the encrypted messages because the client side should have the public key to decrypt the message. -```java -protected CompletableFuture createRawReader(String topic) { - return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION, false, false); -} -``` +However, it's unnecessarily complicated that the whole entry buffer will be sent to the client for each `GetLastMessageId` RPC. -### Binary protocol +### Payload Buffer Format Improvement -Add a new protocol version: +Actually, we can reduce the payload buffer size by removing the `SingleMessageMetadata` and empty value buffers for messages that are compacted out. -```protobuf - v22 = 22; // Send the last entry's payload buffer to the consumer if the consumer's read_compacted field is true +```yaml +payload: + - singleMetadata: + key: k0 + compactedOut: false + value: v1 ``` -Older clients will continue to rely on the broker's logic to compute the last message ID, while newer clients will handle this computation on the client side. - -# Backward & Forward Compatibility - -- **Scenario 1: Older Client → Newer Broker** - When an older client sends a `GetLastMessageId` request to a newer broker, the broker computes the last message ID using its standard process. To ensure backward compatibility, the broker excludes the payload buffer from the response, aligning with the expectations of older clients. - -- **Scenario 2: Newer Client → Older Broker** - When a newer client sends a `GetLastMessageId` request to an older broker, the broker does not include the payload buffer in the response, as it lacks support for this feature. If the client receives an empty entry buffer, it seamlessly extracts the message ID directly from the response, ensuring compatibility and uninterrupted functionality across versions. - -# Alternatives - -[#24431](https://github.com/apache/pulsar/pull/24431) proposed a solution to add a property to the `MessageMetadata` to indicate the last available message's batch index in the batch. However, it's not flexiable and might have conflicts with user provided properties. +However, it would bring a compatibility issue that older clients would not be able to parse the new format unless a `MessagePayloadProcessor` is configured. # Links From 707fb7700daa556ec7bf96292a586579aa20ae65 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 23 Jun 2025 23:36:49 +0800 Subject: [PATCH 4/8] Update proposal --- pip/pip-429.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pip/pip-429.md b/pip/pip-429.md index 7ec1a19eb2c06..3fdf84794b639 100644 --- a/pip/pip-429.md +++ b/pip/pip-429.md @@ -1,4 +1,4 @@ -# PIP-429: Introducing an Efficient Compacted Batch Buffer Format +# PIP-429: Optimize Handling of Compacted Last Entry by Skipping Payload Buffer Parsing # Background knowledge @@ -83,7 +83,7 @@ The custom topic compaction service has the flexibility to serialize and deseria ## In Scope -Instead of relying on the `compacted_out` field in `SingleMessageMetadata`, this PIP proposes to use `MessageMetadata` to determine the last message ID in the last compacted entry. Since `compacted_out` is no longer used, the payload buffer's format could be improved as well. +Instead of relying on the `compacted_out` field in `SingleMessageMetadata`, this PIP proposes to use `MessageMetadata` to determine the last message ID in the compacted last entry. Since `compacted_out` is no longer used, the payload buffer's format could be improved as well. ## Out of Scope @@ -137,7 +137,7 @@ A new field will be added to `MessageMetadata`: ### Client-Side Computation of Last Compacted Message ID -The previous solution this proposal is to pass the last compacted entry's buffer as well as the `GetLastMessageId` response, so the client can compute the last compacted message ID by itself. It can even handle the encrypted messages because the client side should have the public key to decrypt the message. +The previous solution this proposal is to pass the compacted last entry's buffer as well as the `GetLastMessageId` response, so the client can compute the last message ID by itself. It can even handle the encrypted messages because the client side should have the public key to decrypt the message. However, it's unnecessarily complicated that the whole entry buffer will be sent to the client for each `GetLastMessageId` RPC. From 098efa7f158e95c0cce38daf4639772924a7c000 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 24 Jun 2025 12:46:03 +0800 Subject: [PATCH 5/8] Illustrate compatibility --- pip/pip-429.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pip/pip-429.md b/pip/pip-429.md index 3fdf84794b639..5f93fbb3d6e0b 100644 --- a/pip/pip-429.md +++ b/pip/pip-429.md @@ -129,6 +129,10 @@ A new field will be added to `MessageMetadata`: # Backward & Forward Compatibility +For entry buffers written by old version brokers, there is no `compacted_batch_indexes` field in the `MessageMetadata`. In this case, the `GetLastMessageId` RPC will still work as before, relying on the `compacted_out` field in `SingleMessageMetadata` to determine the last message ID. + +Downgrading remains safe because the `compacted_batch_indexes` field, which is unknown to older versions, will simply be ignored when parsing `MessageMetadata`. Additionally, this proposal ensures backward compatibility by retaining the `compacted_out` field in `SingleMessageMetadata`. + # Alternatives ### Using a Property in `MessageMetadata` From dbe31264ab8bf16c969a0e3e38bc5c3968a54943 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 26 Jun 2025 11:46:07 +0800 Subject: [PATCH 6/8] Update pip/pip-429.md Co-authored-by: Penghui Li --- pip/pip-429.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pip/pip-429.md b/pip/pip-429.md index 5f93fbb3d6e0b..88d0e7f1df3be 100644 --- a/pip/pip-429.md +++ b/pip/pip-429.md @@ -66,7 +66,7 @@ The `compacted_out` field was first introduced in the early stages of developmen Decompressing the payload buffer solely to check whether individual messages have the `compacted_out` field set is both inefficient and restrictive, as it imposes constraints on the payload buffer format. Furthermore, when using a custom topic compaction service, the entry buffer in the compacted ledger may not include a `SingleMessageMetadata` for every single message, adding further complexity to the process. -This challenge is exacerbated when messages are encrypted, as decompression is not possible without the public key required for decryption. This limitation also impacts the current compaction service, as encrypted messages cannot be compacted. Consequently, operations such as the `GetLastMessageId` RPC will fail, resulting in an error similar to the following: +This challenge is exacerbated when messages are encrypted, as decryption is not possible without the public key required for decryption. This limitation also impacts the current compaction service, as encrypted messages cannot be compacted. Consequently, operations such as the `GetLastMessageId` RPC will fail, resulting in an error similar to the following: ``` org.apache.pulsar.client.api.PulsarClientException$BrokerMetadataException: The subscription sub of the topic gets the last message id was failed From d89c1ca018a6a8a7e56cd6a80ce91062682f63ca Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 26 Jun 2025 19:04:50 +0800 Subject: [PATCH 7/8] Add changes to TopicCompactionService --- pip/pip-429.md | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/pip/pip-429.md b/pip/pip-429.md index 88d0e7f1df3be..8ca018a02aab2 100644 --- a/pip/pip-429.md +++ b/pip/pip-429.md @@ -111,6 +111,42 @@ The improvements are: ## Public-facing Changes +### Public API + +Remove `readLastCompactedEntry` and `findEntryByEntryIndex` methods from `TopicCompactionService` interface. +- `findEntryByEntryIndex`: it's never used other in tests +- `readLastCompactedEntry`: it exposes the `Entry` to the caller, while the caller should only care about the position of the last message. + +Change the return value of `findEntryByPublishTime` from `CompletableFuture` to `CompletableFuture`. This change is made because the caller only needs the position of the last message, not the entire entry. + +Add the following new methods: + +```java +/** + * Retrieve the position of the last message before compaction. + * + * @return A future that completes with the position of the last message before compaction, or + * {@link MessagePosition#EARLIEST} if no such message exists. + */ +CompletableFuture getLastMessagePosition(); + +/** + * Represents the position of a message. + *

+ * The `ledgerId` and `entryId` together specify the exact entry to which the message belongs. For batched messages, + * the `batchIndex` field indicates the index of the message within the batch. If the message is not part of a + * batch, the `batchIndex` field is set to -1. The `publishTime` field corresponds to the publishing time of the + * entry's metadata, providing a timestamp for when the entry was published. + *

+ */ +record MessagePosition(long ledgerId, long entryId, int batchIndex, long publishTime) { + + public static final MessagePosition EARLIEST = new MessagePosition(-1L, -1L, 0, 0); +} +``` + +After this change, the processing of the entry buffer will be handled within the `TopicCompactionService` instead of being managed on the caller side (`ServerCnx`). This adjustment enhances the flexibility of the implementation, enabling more advanced use cases. For example, it allows the position to be stored in an external metadata service, providing greater scalability and modularity. + ### Binary protocol A new field will be added to `MessageMetadata`: From d153d3cc39e4c9627c4213220d6f6de6b896ec84 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 16 Jul 2025 16:07:22 +0800 Subject: [PATCH 8/8] Update VOTE thread --- pip/pip-429.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pip/pip-429.md b/pip/pip-429.md index 8ca018a02aab2..8d56455ced094 100644 --- a/pip/pip-429.md +++ b/pip/pip-429.md @@ -198,4 +198,4 @@ However, it would bring a compatibility issue that older clients would not be ab # Links * Mailing List discussion thread: https://lists.apache.org/thread/4jghjg7yd36cl8yonbsvk5njmm96vy0k -* Mailing List voting thread: +* Mailing List voting thread: https://lists.apache.org/thread/04wormrz3qpcl4w0vpcrm7401wmp4q3w