Search before reporting
Motivation
I ran into this while adding pause/resume to the Go client (apache/pulsar-client-go#1507). The Go implementation mirrors the Java one, and during review I found the same gap exists in the Java client too. I already fixed it in the Go PR (apache/pulsar-client-go#1507); this issue is to bring the same fix back to Java.
When you pause a consumer and later resume it, resume() is supposed to tell the broker "ok, start sending me messages again." It does this by calling increaseAvailablePermits(cnx(), 0):
// ConsumerImpl.resume()
public void resume() {
if (paused) {
paused = false;
increaseAvailablePermits(cnx(), 0);
}
}
The problem is that increaseAvailablePermits only actually sends permits once the owed count reaches half the receiver queue size:
while (available >= getCurrentReceiverQueueSize() / 2 && !paused) {
...
sendFlowPermitsToBroker(currentCnx, available);
...
}
That "half the queue" check is fine for normal running — you don't want to send a flow command for every single message. But it's the wrong thing to do on resume. If, right when you resume, the consumer owes fewer permits than that and the broker has no permits left, then resume sends nothing. And since no messages are coming in, the owed count never goes up, so a flow command is never sent. The consumer just sits there quietly, even though you resumed it. This isn't slow - it's stuck. There's no way for it to recover on its own. When does this happen? Mostly when not every delivered message bumps the permit count by one (for example, duplicate or chunk fragments that get thrown away). In that case the owed count can end up below the threshold while the broker has already sent everything it was allowed to. Small receiverQueueSize values make it easier to hit. In the normal case the count lands back on a full queue size, so it usually works - which is why nobody noticed and the current tests don't catch it.
Solution
Give resume its own way to send permits that ignores the "half the queue" check and just sends whatever is owed. Since it sends exactly what's owed and nothing more, it can never send too much. This is the same fix I used in the Go PR.
@Override
public void resume() {
if (paused) {
paused = false;
flushAvailablePermitsToBroker(cnx());
}
}
private void flushAvailablePermitsToBroker(ClientCnx currentCnx) {
int available = AVAILABLE_PERMITS_UPDATER.get(this);
while (available > 0 && !paused) {
if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) {
sendFlowPermitsToBroker(currentCnx, available);
break;
} else {
available = AVAILABLE_PERMITS_UPDATER.get(this);
}
}
}
It uses the same compare-and-swap trick the existing code already uses, so it won't race or send twice. If the consumer happens to be disconnected, sendFlowPermitsToBroker does nothing and the reconnect path grants a fresh batch anyway, so that case is safe. And MultiTopicsConsumerImpl.resume() just calls resume on each child, so this one change covers partitioned and multi-topic consumers too.
For a test: the current testPauseAndResume drains the whole queue, so the owed count ends up back at a full queue size and the bug never shows. A real regression test needs to leave fewer permits owed than half the queue while the broker is at zero, then resume and check that a flow command actually goes out. A small unit test in ConsumerImplTest with a mocked connection is the easiest way to do that reliably.
Alternatives
No response
Anything else?
No response
Are you willing to submit a PR?
Search before reporting
Motivation
I ran into this while adding pause/resume to the Go client (apache/pulsar-client-go#1507). The Go implementation mirrors the Java one, and during review I found the same gap exists in the Java client too. I already fixed it in the Go PR (apache/pulsar-client-go#1507); this issue is to bring the same fix back to Java.
When you pause a consumer and later resume it,
resume()is supposed to tell the broker "ok, start sending me messages again." It does this by callingincreaseAvailablePermits(cnx(), 0):The problem is that
increaseAvailablePermitsonly actually sends permits once the owed count reaches half the receiver queue size:That "half the queue" check is fine for normal running — you don't want to send a flow command for every single message. But it's the wrong thing to do on resume. If, right when you resume, the consumer owes fewer permits than that and the broker has no permits left, then resume sends nothing. And since no messages are coming in, the owed count never goes up, so a flow command is never sent. The consumer just sits there quietly, even though you resumed it. This isn't slow - it's stuck. There's no way for it to recover on its own. When does this happen? Mostly when not every delivered message bumps the permit count by one (for example, duplicate or chunk fragments that get thrown away). In that case the owed count can end up below the threshold while the broker has already sent everything it was allowed to. Small
receiverQueueSizevalues make it easier to hit. In the normal case the count lands back on a full queue size, so it usually works - which is why nobody noticed and the current tests don't catch it.Solution
Give resume its own way to send permits that ignores the "half the queue" check and just sends whatever is owed. Since it sends exactly what's owed and nothing more, it can never send too much. This is the same fix I used in the Go PR.
It uses the same compare-and-swap trick the existing code already uses, so it won't race or send twice. If the consumer happens to be disconnected,
sendFlowPermitsToBrokerdoes nothing and the reconnect path grants a fresh batch anyway, so that case is safe. AndMultiTopicsConsumerImpl.resume()just calls resume on each child, so this one change covers partitioned and multi-topic consumers too.For a test: the current
testPauseAndResumedrains the whole queue, so the owed count ends up back at a full queue size and the bug never shows. A real regression test needs to leave fewer permits owed than half the queue while the broker is at zero, then resume and check that a flow command actually goes out. A small unit test inConsumerImplTestwith a mocked connection is the easiest way to do that reliably.Alternatives
No response
Anything else?
No response
Are you willing to submit a PR?