Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -3128,6 +3128,29 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private boolean transactionCoordinatorEnabled = false;

@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Max number of txnMeta of aborted transaction to persist in broker."
+ "If the number of aborted transaction is greater than this value,"
+ " the oldest aborted transaction will be "
+ "removed from the cache and persisted in the store."
+ "default value is 0, disable persistence of aborted transaction."
)
private int transactionMetaPersistCount = 0;

@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Time in hour to persist the transaction metadata in TransactionMetadataPreserver."
)
private long transactionMetaPersistTimeInHour = 72;

@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Interval in seconds to check the expired transaction in TransactionMetadataPreserver."
)
private long transactionMetaExpireCheckIntervalInSecond = 300;


@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Class name for transaction metadata store provider"
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ private static ServerError getClientErrorCode(Throwable t, boolean checkCauseIfU
return ServerError.TransactionConflict;
} else if (t instanceof CoordinatorException.TransactionNotFoundException) {
return ServerError.TransactionNotFound;
} else if (t instanceof CoordinatorException.PreserverClosedException) {
return ServerError.TransactionPreserverClosed;
} else {
if (checkCauseIfUnknown) {
return getClientErrorCode(t.getCause(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2626,6 +2626,12 @@ private Throwable handleTxnException(Throwable ex, String op, long requestId) {
return new CoordinatorException.CoordinatorNotFoundException(cause.getMessage());

}
if (cause instanceof CoordinatorException.PreserverClosedException) {
if (log.isDebugEnabled()) {
log.debug("The transaction metadata preserver was closed for the request {}", op);
}
return cause;
}
log.error("Send response error for {} request {}.", op, requestId, cause);
return cause;
}
Expand All @@ -2634,6 +2640,12 @@ protected void handleNewTxn(CommandNewTxn command) {
checkArgument(state == State.Connected);
final long requestId = command.getRequestId();
final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId());
final String clientName;
if (command.hasClientName()) {
clientName = command.getClientName();
} else {
clientName = null;
}
if (log.isDebugEnabled()) {
log.debug("Receive new txn request {} to transaction meta store {} from {}.",
requestId, tcId, remoteAddress);
Expand All @@ -2646,7 +2658,7 @@ protected void handleNewTxn(CommandNewTxn command) {
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
final String owner = getPrincipal();
transactionMetadataStoreService.newTransaction(tcId, command.getTxnTtlSeconds(), owner)
transactionMetadataStoreService.newTransaction(tcId, command.getTxnTtlSeconds(), owner, clientName)
.whenComplete(((txnID, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -2742,8 +2754,18 @@ protected void handleEndTxn(CommandEndTxn command) {
checkArgument(state == State.Connected);
final long requestId = command.getRequestId();
final int txnAction = command.getTxnAction().getValue();
final String clientName;
if (command.hasClientName()) {
clientName = command.getClientName();
} else {
clientName = null;
}
TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTxnidMostBits());
if (log.isDebugEnabled()) {
log.debug("Receive end txn request {} to transaction meta store {} for txnId:{}.",
requestId, tcId, txnID);
}

if (!checkTransactionEnableAndSendError(requestId)) {
return;
Expand All @@ -2752,12 +2774,12 @@ protected void handleEndTxn(CommandEndTxn command) {
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();

verifyTxnOwnership(txnID)
verifyTxnOwnership(txnID, clientName)
.thenCompose(isOwner -> {
if (!isOwner) {
return failedFutureTxnNotOwned(txnID);
}
return transactionMetadataStoreService.endTransaction(txnID, txnAction, false);
return transactionMetadataStoreService.endTransaction(txnID, txnAction, false, clientName);
})
.whenComplete((v, ex) -> {
if (ex == null) {
Expand Down Expand Up @@ -2792,9 +2814,13 @@ private CompletableFuture<Boolean> isSuperUser() {
}

private CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID) {
return verifyTxnOwnership(txnID, null);
}

private CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID, String clientName) {
assert ctx.executor().inEventLoop();
return service.pulsar().getTransactionMetadataStoreService()
.verifyTxnOwnership(txnID, getPrincipal())
.verifyTxnOwnership(txnID, getPrincipal(), clientName)
.thenComposeAsync(isOwner -> {
if (isOwner) {
return CompletableFuture.completedFuture(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.pulsar.broker.stats.prometheus;

public class AggregatedTransactionCoordinatorStats {
public long tcRecoveryTime;

public long preserverRecoveryTime;

public int actives;

Expand All @@ -35,6 +38,8 @@ public class AggregatedTransactionCoordinatorStats {
public long[] executionLatency;

public void reset() {
tcRecoveryTime = 0;
preserverRecoveryTime = 0;
actives = 0;
committedCount = 0;
abortedCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ public static void generate(PulsarService pulsar, PrometheusMetricStreams stream
transactionCoordinatorStats.reset();
TransactionMetadataStoreStats transactionMetadataStoreStats =
transactionMetadataStore.getMetadataStoreStats();
transactionCoordinatorStats.tcRecoveryTime =
transactionMetadataStoreStats.getTcRecoverTime();
transactionCoordinatorStats.preserverRecoveryTime =
transactionMetadataStoreStats.getPreserverRecoverTime();
transactionCoordinatorStats.actives =
transactionMetadataStoreStats.getActives();
transactionCoordinatorStats.committedCount =
Expand Down Expand Up @@ -239,6 +243,10 @@ private static void printManageLedgerStats(PrometheusMetricStreams stream, Strin
static void printTransactionCoordinatorStats(PrometheusMetricStreams stream, String cluster,
AggregatedTransactionCoordinatorStats stats,
long coordinatorId) {
writeMetric(stream, "pulsar_txn_tc_recovery_time_ms", stats.tcRecoveryTime, cluster,
coordinatorId);
writeMetric(stream, "pulsar_txn_preserver_recovery_time_ms", stats.preserverRecoveryTime,
cluster, coordinatorId);
writeMetric(stream, "pulsar_txn_active_count", stats.actives, cluster,
coordinatorId);
writeMetric(stream, "pulsar_txn_committed_total", stats.committedCount, cluster,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,33 @@ public void handleOpenStatusTransaction(long sequenceId, long timeout) {

@Override
public void appendOpenTransactionToTimeoutTracker() {
if (log.isDebugEnabled()) {
log.debug("Append open transaction to timeout tracker, tcId: {}, openTransactions: {}",
tcId, openTransactions);
}
openTransactions.forEach(timeoutTracker::replayAddTransaction);
}

@Override
public void handleCommittingAndAbortingTransaction() {
committingTransactions.forEach(k ->
transactionMetadataStoreService.endTransaction(new TxnID(tcId, k), TxnAction.COMMIT_VALUE,
false));
if (log.isDebugEnabled()) {
log.debug("Handle committing and aborting transaction, tcId: {}, committingTransactions: {}, "
+ "abortingTransactions: {}", tcId, committingTransactions, abortingTransactions);
}
committingTransactions.forEach(k -> {
TxnID txnID = new TxnID(tcId, k);
transactionMetadataStoreService.getTxnMeta(txnID)
.thenAccept(txnMeta ->
transactionMetadataStoreService.endTransaction(txnID, TxnAction.COMMIT_VALUE,
false, txnMeta.getClientName()));
});

abortingTransactions.forEach(k ->
transactionMetadataStoreService.endTransaction(new TxnID(tcId, k), TxnAction.ABORT_VALUE,
false));
abortingTransactions.forEach(k -> {
TxnID txnID = new TxnID(tcId, k);
transactionMetadataStoreService.getTxnMeta(txnID)
.thenAccept(txnMeta ->
transactionMetadataStoreService.endTransaction(txnID, TxnAction.ABORT_VALUE,
false, txnMeta.getClientName()));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1542,6 +1542,7 @@ public void testIsSystemTopic() {
assertTrue(brokerService.isSystemTopic(TopicName.get("__transaction_buffer_snapshot")));
assertTrue(brokerService.isSystemTopic(TopicName.get("__transaction_buffer_snapshot-partition-0")));
assertTrue(brokerService.isSystemTopic(TopicName.get("__transaction_buffer_snapshot-partition-1")));
assertTrue(brokerService.isSystemTopic(TopicName.get("persistent://pulsar/system/__terminated_txn_state_0")));
assertTrue(brokerService.isSystemTopic(TopicName
.get("topicxxx-partition-0-multiTopicsReader-f433329d68__transaction_pending_ack")));
assertTrue(brokerService.isSystemTopic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3738,7 +3738,7 @@ public void sendEndTxnResponse() throws Exception {
resetChannel();
setChannelConnected();
ByteBuf clientCommand = Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L,
TxnAction.COMMIT));
TxnAction.COMMIT, null));
channel.writeInbound(clientCommand);
CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse();

Expand All @@ -3763,7 +3763,7 @@ public void sendEndTxnResponseFailed() throws Exception {
resetChannel();
setChannelConnected();
ByteBuf clientCommand = Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L,
TxnAction.COMMIT));
TxnAction.COMMIT, null));
channel.writeInbound(clientCommand);
CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse();

Expand Down
Loading