From 7ec470fa8bfc412e843a5e3bf3992cf821f134ba Mon Sep 17 00:00:00 2001 From: hechengbo Date: Fri, 5 Jun 2026 11:49:52 +0800 Subject: [PATCH 1/5] [fix][common] Handle synchronous failures from future suppliers --- .../bookkeeper/mledger/util/Futures.java | 11 ++++- .../bookkeeper/mledger/util/FuturesTest.java | 45 +++++++++++++++++++ .../apache/pulsar/common/util/FutureUtil.java | 40 ++++++++++++----- .../pulsar/common/util/FutureUtilTest.java | 38 +++++++++++++++- 4 files changed, 122 insertions(+), 12 deletions(-) create mode 100644 managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/FuturesTest.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java index f5ad77a71d8c4..89e5f678589a8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java @@ -75,7 +75,16 @@ public static CompletableFuture executeWithRetry(Supplier needRetryExceptionClass, int maxRetryTimes) { CompletableFuture resultFuture = new CompletableFuture<>(); - op.get().whenComplete((res, ex) -> { + CompletableFuture opFuture; + try { + opFuture = op.get(); + } catch (Throwable ex) { + opFuture = FutureUtil.failedFuture(ex); + } + if (opFuture == null) { + opFuture = FutureUtil.failedFuture(new NullPointerException("Retry operation returned null future")); + } + opFuture.whenComplete((res, ex) -> { if (ex == null) { resultFuture.complete(res); } else { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/FuturesTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/FuturesTest.java new file mode 100644 index 0000000000000..d35082912ad4c --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/FuturesTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.util; + +import static org.testng.Assert.assertEquals; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.testng.annotations.Test; + +public class FuturesTest { + + @Test + public void testExecuteWithRetryHandlesSynchronousFailure() throws Exception { + AtomicInteger attempts = new AtomicInteger(); + + CompletableFuture result = Futures.executeWithRetry(() -> { + if (attempts.incrementAndGet() == 1) { + throw new CompletionException(new ManagedLedgerException.MetaStoreException("sync fail")); + } + return CompletableFuture.completedFuture("ok"); + }, ManagedLedgerException.MetaStoreException.class, 1); + + assertEquals(result.get(2, TimeUnit.SECONDS), "ok"); + assertEquals(attempts.get(), 2); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index 4c78895fa385b..f20361f3294fd 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -200,6 +200,20 @@ public static CompletableFuture failedFuture(Throwable t) { return future; } + private static CompletableFuture getFutureSafely(Supplier> supplier, + String nullFutureMessage) { + CompletableFuture future; + try { + future = supplier.get(); + } catch (Throwable t) { + return failedFuture(t); + } + if (future == null) { + return failedFuture(new NullPointerException(nullFutureMessage)); + } + return future; + } + public static Throwable unwrapCompletionException(Throwable ex) { if (ex instanceof CompletionException) { return unwrapCompletionException(ex.getCause()); @@ -238,11 +252,13 @@ public synchronized CompletableFuture sequential(Supplier newTask.get()) - : sequencerFuture.exceptionally(ex -> null).thenCompose(__ -> newTask.get()); + ? sequencerFuture.thenCompose(__ -> getFutureSafely(newTask, + "Expected Supplier should not return null")) + : sequencerFuture.exceptionally(ex -> null).thenCompose(__ -> getFutureSafely(newTask, + "Expected Supplier should not return null")); } } @@ -298,13 +314,17 @@ public static CompletableFuture addTimeoutHandling(CompletableFuture f } final CompletableFuture future = new CompletableFuture<>(); try { - executor.execute(() -> futureSupplier.get().whenComplete((result, error) -> { - if (error != null) { - future.completeExceptionally(error); - return; - } - future.complete(result); - })); + executor.execute(() -> { + CompletableFuture supplierFuture = + getFutureSafely(futureSupplier, "Expected Supplier should not return null"); + supplierFuture.whenComplete((result, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + future.complete(result); + }); + }); } catch (RejectedExecutionException ex) { future.completeExceptionally(ex); } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java index ee1483c3a9a89..3a2125700b4db 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import lombok.Cleanup; import org.assertj.core.util.Lists; @@ -252,4 +253,39 @@ public void testSequencer() { Assert.assertEquals(list3.get(i), (Integer) i); } } -} \ No newline at end of file + + @Test + public void testSequencerReturnsFailedFutureWhenTaskThrowsSynchronously() throws Exception { + FutureUtil.Sequencer sequencer = FutureUtil.Sequencer.create(); + + CompletableFuture future = sequencer.sequential(() -> { + throw new IllegalStateException("sync fail"); + }); + + try { + future.get(2, TimeUnit.SECONDS); + fail("Should have failed."); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof IllegalStateException); + assertEquals(e.getCause().getMessage(), "sync fail"); + } + } + + @Test + public void testComposeAsyncReturnsFailedFutureWhenSupplierThrowsSynchronously() throws Exception { + @Cleanup("shutdownNow") + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + + CompletableFuture future = FutureUtil.composeAsync(() -> { + throw new IllegalStateException("sync fail"); + }, executor); + + try { + future.get(2, TimeUnit.SECONDS); + fail("Should have failed."); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof IllegalStateException); + assertEquals(e.getCause().getMessage(), "sync fail"); + } + } +} From ac6e61160a552889311b86aadac5cc84edf3abb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radiance=27=20=E6=B3=A2=E6=B3=A2?= <39521534+Radiancebobo@users.noreply.github.com> Date: Fri, 5 Jun 2026 16:57:53 +0800 Subject: [PATCH 2/5] Update pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java Co-authored-by: Lari Hotari --- .../main/java/org/apache/pulsar/common/util/FutureUtil.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index f20361f3294fd..6c081879f6831 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -200,8 +200,7 @@ public static CompletableFuture failedFuture(Throwable t) { return future; } - private static CompletableFuture getFutureSafely(Supplier> supplier, - String nullFutureMessage) { + public static CompletableFuture supplySafely(Supplier> supplier) { CompletableFuture future; try { future = supplier.get(); From 4ab4d43c2df28c170955e33d9bb6b4762b0e88b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radiance=27=20=E6=B3=A2=E6=B3=A2?= <39521534+Radiancebobo@users.noreply.github.com> Date: Fri, 5 Jun 2026 16:58:04 +0800 Subject: [PATCH 3/5] Update pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java Co-authored-by: Lari Hotari --- .../src/main/java/org/apache/pulsar/common/util/FutureUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index 6c081879f6831..ffe6453444845 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -208,7 +208,7 @@ public static CompletableFuture supplySafely(Supplier getFutureSafely(newTask, - "Expected Supplier should not return null")) - : sequencerFuture.exceptionally(ex -> null).thenCompose(__ -> getFutureSafely(newTask, - "Expected Supplier should not return null")); + ? sequencerFuture.thenCompose(__ -> supplySafely(newTask)) + : sequencerFuture.exceptionally(ex -> null).thenCompose(__ -> supplySafely(newTask)); } } @@ -314,8 +323,7 @@ public static CompletableFuture addTimeoutHandling(CompletableFuture f final CompletableFuture future = new CompletableFuture<>(); try { executor.execute(() -> { - CompletableFuture supplierFuture = - getFutureSafely(futureSupplier, "Expected Supplier should not return null"); + CompletableFuture supplierFuture = supplySafely(futureSupplier); supplierFuture.whenComplete((result, error) -> { if (error != null) { future.completeExceptionally(error); From 99405831cfc23c3e370efcbd5949cc1a79b79246 Mon Sep 17 00:00:00 2001 From: hechengbo Date: Sat, 6 Jun 2026 08:13:15 +0800 Subject: [PATCH 5/5] [fix][common] Handle synchronous failures from future suppliers --- CODING.md | 5 +- .../bookkeeper/mledger/util/Futures.java | 11 +--- .../apache/pulsar/common/util/FutureUtil.java | 21 ++++---- .../pulsar/common/util/FutureUtilTest.java | 50 +++++++++++++++++++ 4 files changed, 65 insertions(+), 22 deletions(-) diff --git a/CODING.md b/CODING.md index 6623e342e75b8..cf82b1e3f4244 100644 --- a/CODING.md +++ b/CODING.md @@ -76,8 +76,9 @@ Pulsar relies heavily on `CompletableFuture`; prefer it over `ListenableFuture` return firstAsync(arg).thenCompose(v -> secondAsync(v)); ``` - **Converting a synchronous-throwing method to a failed future is not mechanical** — some callers rely - on the throw happening *before* the async work starts, so evaluate each call site. Use a shared - `checkArgumentAsync` helper (in `FutureUtil`) to validate without duplicating try/catch. + on the throw happening *before* the async work starts, so evaluate each call site. Use shared + `FutureUtil` helpers, such as `supplySafely` for invoking a `Supplier>`, + instead of duplicating try/catch or null-future handling. - **Limit concurrency and handle backpressure.** Firing many async operations at once can overwhelm the system. Options: diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java index 89e5f678589a8..ecd52d9ce3b1a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java @@ -75,16 +75,7 @@ public static CompletableFuture executeWithRetry(Supplier needRetryExceptionClass, int maxRetryTimes) { CompletableFuture resultFuture = new CompletableFuture<>(); - CompletableFuture opFuture; - try { - opFuture = op.get(); - } catch (Throwable ex) { - opFuture = FutureUtil.failedFuture(ex); - } - if (opFuture == null) { - opFuture = FutureUtil.failedFuture(new NullPointerException("Retry operation returned null future")); - } - opFuture.whenComplete((res, ex) -> { + FutureUtil.supplySafely(op).whenComplete((res, ex) -> { if (ex == null) { resultFuture.complete(res); } else { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index 85488e1912b46..79e2aff30d7eb 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -204,14 +204,15 @@ public static CompletableFuture failedFuture(Throwable t) { * Invokes a supplier that is expected to return a {@link CompletableFuture}, converting synchronous failures into * a failed future. * - * If the supplier itself is {@code null}, the returned future is completed exceptionally with a - * {@link NullPointerException}. - * * @param supplier the supplier to invoke * @param the result type of the returned future - * @return the future returned by the supplier, or a failed future if the supplier throws or returns {@code null} + * @return the future returned by the supplier, or a failed future if the supplier is {@code null}, throws, + * or returns {@code null} */ public static CompletableFuture supplySafely(Supplier> supplier) { + if (supplier == null) { + return failedFuture(new NullPointerException("Expected Supplier should not be null")); + } CompletableFuture future; try { future = supplier.get(); @@ -251,8 +252,9 @@ public static Sequencer create() { } /** - * @return a {@link CompletableFuture} representing the newly scheduled task, - * or one completed exceptionally with {@link NullPointerException} if param is null. + * @return a {@link CompletableFuture} representing the newly scheduled task, or the current failed chain when + * exceptions are allowed to break the chain. Returns a failed future if the supplier is {@code null}, throws, + * or returns {@code null}. */ public synchronized CompletableFuture sequential(Supplier> newTask) { if (newTask == null) { @@ -309,8 +311,8 @@ public static CompletableFuture addTimeoutHandling(CompletableFuture f /** * @return a {@link CompletableFuture} representing the asynchronous composition. - * The returned future is completed exceptionally with {@link NullPointerException} if one of params is null, - * or with {@link RejectedExecutionException} if the task cannot be accepted for execution. + * The returned future is completed exceptionally if one of the params is {@code null}, if the supplier throws or + * returns {@code null}, or if the executor rejects the task. */ public static @NonNull CompletableFuture composeAsync(Supplier> futureSupplier, Executor executor) { @@ -323,8 +325,7 @@ public static CompletableFuture addTimeoutHandling(CompletableFuture f final CompletableFuture future = new CompletableFuture<>(); try { executor.execute(() -> { - CompletableFuture supplierFuture = supplySafely(futureSupplier); - supplierFuture.whenComplete((result, error) -> { + supplySafely(futureSupplier).whenComplete((result, error) -> { if (error != null) { future.completeExceptionally(error); return; diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java index 3a2125700b4db..81c649b600ad9 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java @@ -21,6 +21,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.io.PrintWriter; @@ -271,6 +272,55 @@ public void testSequencerReturnsFailedFutureWhenTaskThrowsSynchronously() throws } } + @Test + public void testSupplySafelyReturnsSupplierFuture() throws Exception { + CompletableFuture expected = CompletableFuture.completedFuture("ok"); + CompletableFuture future = FutureUtil.supplySafely(() -> expected); + + assertSame(future, expected); + assertEquals(future.get(2, TimeUnit.SECONDS), "ok"); + } + + @Test + public void testSupplySafelyReturnsFailedFutureWhenSupplierIsNull() throws Exception { + CompletableFuture future = FutureUtil.supplySafely(null); + + try { + future.get(2, TimeUnit.SECONDS); + fail("Should have failed."); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof NullPointerException); + } + } + + @Test + public void testSupplySafelyReturnsFailedFutureWhenSupplierThrowsSynchronously() throws Exception { + RuntimeException expected = new IllegalStateException("sync fail"); + + CompletableFuture future = FutureUtil.supplySafely(() -> { + throw expected; + }); + + try { + future.get(2, TimeUnit.SECONDS); + fail("Should have failed."); + } catch (ExecutionException e) { + assertEquals(e.getCause(), expected); + } + } + + @Test + public void testSupplySafelyReturnsFailedFutureWhenSupplierReturnsNull() throws Exception { + CompletableFuture future = FutureUtil.supplySafely(() -> null); + + try { + future.get(2, TimeUnit.SECONDS); + fail("Should have failed."); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof NullPointerException); + } + } + @Test public void testComposeAsyncReturnsFailedFutureWhenSupplierThrowsSynchronously() throws Exception { @Cleanup("shutdownNow")