From f13a0e8b271944dec464e2b237b63e9264f6ed59 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sun, 31 May 2026 02:28:58 +0800 Subject: [PATCH 1/5] pip (cherry picked from commit a49c9e5a7c49c07eab1610e11c05348c393cce91) --- pip/pip-476.md | 140 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 pip/pip-476.md diff --git a/pip/pip-476.md b/pip/pip-476.md new file mode 100644 index 0000000000000..2d101f075e0d6 --- /dev/null +++ b/pip/pip-476.md @@ -0,0 +1,140 @@ +# PIP-476: Retain Ended Transaction Status for Precise Error Reporting + +# Background knowledge + +Pulsar Transaction Coordinator (TC) manages the lifecycle of transactions. Each transaction transitions through states: OPEN -> COMMITTING/ABORTING -> COMMITTED/ABORTED. Once a transaction reaches a final state (COMMITTED or ABORTED), its in-memory metadata (`TxnMeta`) and transaction logs are cleaned up. After cleanup, any query for that transaction's metadata returns a `TransactionNotFoundException`, making it impossible for the caller to determine whether the transaction never existed or had already finished with a known outcome. + +In real production scenarios, clients often encounter "commit transaction" request timeouts. In this case, the client cannot determine whether the transaction was actually committed successfully — the request may have reached the TC but the response was lost before being returned. Without this feature, the client can only retry the commit, but receives a `TransactionNotFoundException` and still cannot determine the actual result. + +Additionally, **Flink-connector-Pulsar** has a high dependency on this feature. When a Flink checkpoint partially succeeds (some operators committed the transaction while others failed), Flink will try to re-commit the already-completed transaction. This feature can clearly inform Flink that the transaction has already been committed successfully during the retry, thus avoiding data duplication or loss. + +# Motivation + +When a producer/consumer or admin operation queries a transaction's status after it has ended, the TC only throws a generic `TransactionNotFoundException`. This forces clients to rely on timeouts or heuristic retries. For example: + +1. A commit request succeeds on the TC, but the response is lost due to network issues. +2. The client retries the commit operation. +3. The TC no longer holds the transaction metadata and returns `TransactionNotFoundException`. +4. The client cannot tell whether the commit actually succeeded. + +This behavior leads to uncertainty and forces clients to adopt overly conservative retry logic. Retaining the ended transaction status within a configurable time window allows the TC to return `TransactionAlreadyCommittedException` (or `TransactionAlreadyAbortedException`, `TransactionAlreadyTimedOutException`), providing unambiguous feedback to the client. + +# Goals + +## In Scope + +- Retain the final status (COMMITTED, ABORTED, TIMEOUT) in memory after a transaction is removed from the active store. +- Introduce three new exception subtypes for precise error reporting: the three final statuses (COMMITTED, ABORTED, TIMEOUT) will each correspond to a different error code returned to the client. +- Provide configuration knobs for retention time and maximum cached record count. +- Persist the `isTimeout` flag in the transaction log, allowing users to determine "whether the client was too slow" when investigating failed transactions. + +## Out of Scope + +N/A + +# High Level Design + +When a transaction reaches its final state, instead of deleting all related transaction logs as before, the new behavior is: +- **Immediately delete**: transaction logs representing the "start" and "intermediate" states of the transaction. +- **Retain**: the transaction log representing the "end" state (the COMMITTED/ABORTED entry). + +This end log is retained until it reaches the `transactionEndedStatusRetentionTimeMs` or `transactionEndedStatusMaxRecordCount` limit, at which point it is deleted. The transaction itself is removed from the active `txnMetaMap`, and its final status is written to the `EndedTxnStatusCache`. + +The benefit of this approach is that when the TC restarts, it can fully reconstruct the in-memory ended transaction state from before the restart by replaying the transaction logs, because the end logs still exist. + +When a query arrives and the transaction is not found in the active store, the cache is checked. If found, a specific exception (`TransactionAlreadyCommittedException`, etc.) is thrown. If not found, the original `TransactionNotFoundException` is returned. + +# Detailed Design + +## Design & Implementation Details + +**`EndedTxnStatus` enum** (`coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/EndedTxnStatus.java`): +- `COMMITTED`, `ABORTED`, `TIMEOUT` + +**`EndedTxnStatusCache`** (`coordinator/impl/EndedTxnStatusCache.java`): +- Wraps a Caffeine `Cache`. +- `EndedTxnMetadata` is a record: `(TxnID txnID, EndedTxnStatus status, Position logPosition, long endedAtMs)`. +- On creation: + - If `retentionTimeMs` or `maxRecordCount` is 0, the cache is disabled (all queries return null). + - If both `retentionTimeMs` and `maxRecordCount` are negative, an `IllegalArgumentException` is thrown (would cause unbounded growth -> OOM). +- TTL-based expiry uses `Expiry.expireAfterCreate` to compute remaining time based on `endedAtMs + retentionTimeMs - now`. +- Optional `removalListener` callback: when a cache entry is evicted due to reaching the `retentionTimeMs` or `maxRecordCount` limit, the corresponding end transaction log is deleted. This ensures the end log is retained for a configurable period, allowing the TC to recover ended transaction state through replay after a restart. + +**`TransactionMetadataStoreConfig`** (`coordinator/TransactionMetadataStoreConfig.java`): +- Internal configuration class (not publicly exposed), containing two new config fields: `transactionEndedStatusRetentionTimeMs` and `transactionEndedStatusMaxRecordCount`. +- Default retention time: 1 hour. Default max record count: 100,000. +- Validation: retention and maxRecordCount cannot both be negative simultaneously. + +**Changes in `MLTransactionMetadataStore`:** + +- **During recovery (replay)**: when encountering a record with status COMMITTED or ABORTED, record it in the `EndedTxnStatusCache`. +- **During normal operation (endTransaction)**: after appending the end log, `positionsToDelete` is taken from the previously accumulated list of old log positions (not including the current end log entry). Call `endedTxnStatusCache.record(...)`: + - If it returns true, the `EndedTxnStatusCache` has recorded the transaction's final status, and the end transaction log will be deleted by the `EndedTxnStatusCache` (when the cache entry expires or is evicted). + - If it returns false, the cache has not recorded this transaction, and the end transaction log must be deleted immediately. +- **Comparison with the old logic**: the old code deleted all logs at once at the end of a transaction (start + intermediate + end). The new code only deletes the start and intermediate logs, while the end log is retained and timely deleted according to the retention policy. + +**Protobuf change (`PulsarTransactionMetadata.proto`)**: +- Added `optional bool is_timeout = 13 [default = false]` to `TransactionMetadataEntry`. +- Used to accurately distinguish whether a transaction was aborted due to timeout (TIMEOUT) or explicitly aborted by the client (ABORTED) during TC restart replay. This field also provides more observable information, allowing users to easily investigate the proportion of "transaction commit timeouts". + +**New exception types** (`coordinator/exceptions/CoordinatorException.java`): +- `TransactionAlreadyCommittedException(TxnID)` +- `TransactionAlreadyAbortedException(TxnID)` +- `TransactionAlreadyTimedOutException(TxnID)` +- These exceptions extend `TransactionNotFoundException` to maintain backward compatibility (existing catch blocks still work). + +## Public-facing Changes + +### Configuration + +Two new fields added to `ServiceConfiguration`, `broker.conf`, and `standalone.conf`: + +| Name | Type | Default | Description | +|------|------|---------|-------------| +| `transactionCoordinatorEndedStatusRetentionTimeMs` | long | 3600000 (1 hour) | How long to retain ended transaction status in memory after the transaction ends. Set to 0 to disable. | +| `transactionCoordinatorEndedStatusMaxRecordCount` | long | 100000 | Maximum number of ended transaction status records to retain. Set to 0 to disable. | + +### Feature Flag + +Added `FeatureFlags.supportsTransactionEndStatusErrors`: the client declares whether it supports the new error codes during the Connect handshake. If the client does not support them, the Broker converts the new exceptions to `TransactionNotFoundException` to ensure compatibility. + +# Monitoring + +- A sudden increase in `TransactionAlreadyCommittedException` / `TransactionAlreadyAbortedException` / `TransactionAlreadyTimedOutException` suggests clients are frequently retrying requests to already-finished transactions. This may indicate network issues or overly aggressive retry policies. +- Conversely, a high rate of `TransactionNotFoundException` for recently-ended transactions may indicate that the retention time or max record count are set too low. + +# Security Considerations + +No new security concerns. The existing authorization model still applies: a client must have the appropriate transaction coordinator permission to execute operations. + +# Backward & Forward Compatibility + +## Upgrade + +- New configuration fields default to 1 hour / 100,000, which is safe for existing deployments. +- Added `FeatureFlags.supportsTransactionEndStatusErrors`: the client declares whether it supports the new error codes during the Connect handshake. If the client does not support them, the Broker converts the new exceptions to `TransactionNotFoundException` to ensure compatibility. +- The new protobuf field `is_timeout` is optional and defaults to `false`, so old transaction logs remain readable. +- No changes to on-disk data format or broker-to-broker protocol. + +## Downgrade / Rollback + +- Roll back to the previous version. The new `EndedTxnStatusCache` will no longer exist, and all queries for ended transactions will return `TransactionNotFoundException`. +- The end transaction logs retained by the new version will be treated as uncleaned logs by the old version. The old version will replay and handle these logs normally during startup (the `is_timeout` field is ignored by the old version). +- After rollback, the old Broker will not recognize the `supportsTransactionEndStatusErrors` flag and will not return new error codes, maintaining compatibility with old clients. + +## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations + +Geo-replication for transactions is not currently supported. No special considerations. + +# Alternatives + +N/A + +# General Notes + +N/A + +# Links + +* Mailing List discussion thread: +* Mailing List voting thread: From 9fddfb74e653c4646e705cd3953968119f5f8f87 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 1 Jun 2026 10:41:01 +0800 Subject: [PATCH 2/5] PIP-481 --- pip/{pip-476.md => pip-481.md} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename pip/{pip-476.md => pip-481.md} (99%) diff --git a/pip/pip-476.md b/pip/pip-481.md similarity index 99% rename from pip/pip-476.md rename to pip/pip-481.md index 2d101f075e0d6..10e1f8b8f959d 100644 --- a/pip/pip-476.md +++ b/pip/pip-481.md @@ -1,4 +1,4 @@ -# PIP-476: Retain Ended Transaction Status for Precise Error Reporting +# PIP-481: Retain Ended Transaction Status for Precise Error Reporting # Background knowledge From 14e89c9743601639f3926c4073d9deb0edb83691 Mon Sep 17 00:00:00 2001 From: fengyubiao <9947090@qq.com> Date: Mon, 1 Jun 2026 11:07:14 +0800 Subject: [PATCH 3/5] Update pip-481.md --- pip/pip-481.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pip/pip-481.md b/pip/pip-481.md index 10e1f8b8f959d..f338d0e6f88b3 100644 --- a/pip/pip-481.md +++ b/pip/pip-481.md @@ -136,5 +136,5 @@ N/A # Links -* Mailing List discussion thread: +* Mailing List discussion thread: https://lists.apache.org/thread/3fmnpzzqo4040mrqtbptnf7kr48r8jb1 * Mailing List voting thread: From 157a66f2f05fb9a0804b35a9d14e49fb8627febe Mon Sep 17 00:00:00 2001 From: fengyubiao <9947090@qq.com> Date: Mon, 1 Jun 2026 11:37:53 +0800 Subject: [PATCH 4/5] Update pip-481.md --- pip/pip-481.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pip/pip-481.md b/pip/pip-481.md index f338d0e6f88b3..01fe3c9f0b059 100644 --- a/pip/pip-481.md +++ b/pip/pip-481.md @@ -91,8 +91,8 @@ Two new fields added to `ServiceConfiguration`, `broker.conf`, and `standalone.c | Name | Type | Default | Description | |------|------|---------|-------------| -| `transactionCoordinatorEndedStatusRetentionTimeMs` | long | 3600000 (1 hour) | How long to retain ended transaction status in memory after the transaction ends. Set to 0 to disable. | -| `transactionCoordinatorEndedStatusMaxRecordCount` | long | 100000 | Maximum number of ended transaction status records to retain. Set to 0 to disable. | +| `transactionEndedStatusRetentionTimeMs` | long | 3600000 (1 hour) | How long to retain ended transaction status in memory after the transaction ends. Set to 0 to disable. | +| `transactionEndedStatusMaxRecordCount` | long | 100000 | Maximum number of ended transaction status records to retain. Set to 0 to disable. | ### Feature Flag From 15128612aa196d01add002c188d60e0238b6a4aa Mon Sep 17 00:00:00 2001 From: fengyubiao <9947090@qq.com> Date: Mon, 1 Jun 2026 11:38:52 +0800 Subject: [PATCH 5/5] Update pip-481.md --- pip/pip-481.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pip/pip-481.md b/pip/pip-481.md index 01fe3c9f0b059..efe86ce23d668 100644 --- a/pip/pip-481.md +++ b/pip/pip-481.md @@ -81,7 +81,7 @@ When a query arrives and the transaction is not found in the active store, the c - `TransactionAlreadyCommittedException(TxnID)` - `TransactionAlreadyAbortedException(TxnID)` - `TransactionAlreadyTimedOutException(TxnID)` -- These exceptions extend `TransactionNotFoundException` to maintain backward compatibility (existing catch blocks still work). +- On the coordinator side, these exceptions extend `TransactionNotFoundException`. On the client side, they are mapped to dedicated `TransactionCoordinatorClientException` types. Backward compatibility for old clients is maintained by the feature flag fallback described below ## Public-facing Changes