From 63068384f97800c64204917919b5983c299c3e84 Mon Sep 17 00:00:00 2001 From: void-ptr974 Date: Fri, 12 Jun 2026 00:46:22 +0800 Subject: [PATCH] [fix][broker] Release replicator permit on publish failure --- .../persistent/PersistentReplicator.java | 3 ++ .../PersistentReplicatorInflightTaskTest.java | 32 +++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 8433d9b9c7ba2..c599920f727b2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -408,6 +408,9 @@ public void sendComplete(Throwable exception, OpSendMsgStats opSendMsgStats) { // cursor should be rewound since it was incremented when readMoreEntries replicator.beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding.Failed_Publishing); replicator.doRewindCursor(false); + // The failed send has completed from the producer queue perspective. The cursor rewind + // makes the entry readable again, so this in-flight task must release its permit. + inFlightTask.incCompletedEntries(); } else { replicator.log.debug() .exception(exception) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java index 478437ffde015..6295f14022dd8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service.persistent; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; @@ -43,8 +44,11 @@ import org.apache.pulsar.broker.service.BrokerServiceInternalMethodInvoker; import org.apache.pulsar.broker.service.OneWayReplicatorTestBase; import org.apache.pulsar.broker.service.persistent.PersistentReplicator.InFlightTask; +import org.apache.pulsar.broker.service.persistent.PersistentReplicator.ProducerSendCallback; +import org.apache.pulsar.broker.service.persistent.PersistentReplicator.ReasonOfWaitForCursorRewinding; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.awaitility.Awaitility; import org.mockito.invocation.InvocationOnMock; @@ -167,6 +171,34 @@ public void testReadEntriesFailedCompletesInFlightTaskAfterReplicatorTerminated( } } + @Test + public void testFailedPublishCompletesInFlightTask() throws Exception { + PersistentReplicator replicator = spy(getReplicator(topicName)); + doNothing().when(replicator).beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding.Failed_Publishing); + doNothing().when(replicator).doRewindCursor(false); + doNothing().when(replicator).readMoreEntries(); + + LinkedList inFlightTasks = replicator.inFlightTasks; + List originalTasks = new ArrayList<>(inFlightTasks); + inFlightTasks.clear(); + + try { + InFlightTask task = new InFlightTask(PositionFactory.create(1, 1), 1, replicator.getReplicatorId()); + task.setEntries(Collections.singletonList(mock(Entry.class))); + inFlightTasks.add(task); + assertEquals(replicator.getPermitsIfNoPendingRead(), 999); + + ProducerSendCallback callback = ProducerSendCallback.create(replicator, mock(Entry.class), null, task); + callback.sendComplete(new PulsarClientException.ProducerBlockedQuotaExceededException("mocked"), null); + + assertTrue(task.isDone()); + assertEquals(replicator.getPermitsIfNoPendingRead(), 1000); + } finally { + inFlightTasks.clear(); + inFlightTasks.addAll(originalTasks); + } + } + @Test public void testCreateOrRecycleInFlightTaskIntoQueue() throws Exception { log.info("Starting testCreateOrRecycleInFlightTaskIntoQueue");