diff --git a/CODING.md b/CODING.md index 6623e342e75b8..cf82b1e3f4244 100644 --- a/CODING.md +++ b/CODING.md @@ -76,8 +76,9 @@ Pulsar relies heavily on `CompletableFuture`; prefer it over `ListenableFuture` return firstAsync(arg).thenCompose(v -> secondAsync(v)); ``` - **Converting a synchronous-throwing method to a failed future is not mechanical** — some callers rely - on the throw happening *before* the async work starts, so evaluate each call site. Use a shared - `checkArgumentAsync` helper (in `FutureUtil`) to validate without duplicating try/catch. + on the throw happening *before* the async work starts, so evaluate each call site. Use shared + `FutureUtil` helpers, such as `supplySafely` for invoking a `Supplier>`, + instead of duplicating try/catch or null-future handling. - **Limit concurrency and handle backpressure.** Firing many async operations at once can overwhelm the system. Options: diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java index f5ad77a71d8c4..ecd52d9ce3b1a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java @@ -75,7 +75,7 @@ public static CompletableFuture executeWithRetry(Supplier needRetryExceptionClass, int maxRetryTimes) { CompletableFuture resultFuture = new CompletableFuture<>(); - op.get().whenComplete((res, ex) -> { + FutureUtil.supplySafely(op).whenComplete((res, ex) -> { if (ex == null) { resultFuture.complete(res); } else { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/FuturesTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/FuturesTest.java new file mode 100644 index 0000000000000..d35082912ad4c --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/FuturesTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.util; + +import static org.testng.Assert.assertEquals; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.testng.annotations.Test; + +public class FuturesTest { + + @Test + public void testExecuteWithRetryHandlesSynchronousFailure() throws Exception { + AtomicInteger attempts = new AtomicInteger(); + + CompletableFuture result = Futures.executeWithRetry(() -> { + if (attempts.incrementAndGet() == 1) { + throw new CompletionException(new ManagedLedgerException.MetaStoreException("sync fail")); + } + return CompletableFuture.completedFuture("ok"); + }, ManagedLedgerException.MetaStoreException.class, 1); + + assertEquals(result.get(2, TimeUnit.SECONDS), "ok"); + assertEquals(attempts.get(), 2); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index 4c78895fa385b..79e2aff30d7eb 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -200,6 +200,31 @@ public static CompletableFuture failedFuture(Throwable t) { return future; } + /** + * Invokes a supplier that is expected to return a {@link CompletableFuture}, converting synchronous failures into + * a failed future. + * + * @param supplier the supplier to invoke + * @param the result type of the returned future + * @return the future returned by the supplier, or a failed future if the supplier is {@code null}, throws, + * or returns {@code null} + */ + public static CompletableFuture supplySafely(Supplier> supplier) { + if (supplier == null) { + return failedFuture(new NullPointerException("Expected Supplier should not be null")); + } + CompletableFuture future; + try { + future = supplier.get(); + } catch (Throwable t) { + return failedFuture(t); + } + if (future == null) { + return failedFuture(new NullPointerException("The given supplier returned null, supplier=" + supplier)); + } + return future; + } + public static Throwable unwrapCompletionException(Throwable ex) { if (ex instanceof CompletionException) { return unwrapCompletionException(ex.getCause()); @@ -227,8 +252,9 @@ public static Sequencer create() { } /** - * @return a {@link CompletableFuture} representing the newly scheduled task, - * or one completed exceptionally with {@link NullPointerException} if param is null. + * @return a {@link CompletableFuture} representing the newly scheduled task, or the current failed chain when + * exceptions are allowed to break the chain. Returns a failed future if the supplier is {@code null}, throws, + * or returns {@code null}. */ public synchronized CompletableFuture sequential(Supplier> newTask) { if (newTask == null) { @@ -238,11 +264,11 @@ public synchronized CompletableFuture sequential(Supplier newTask.get()) - : sequencerFuture.exceptionally(ex -> null).thenCompose(__ -> newTask.get()); + ? sequencerFuture.thenCompose(__ -> supplySafely(newTask)) + : sequencerFuture.exceptionally(ex -> null).thenCompose(__ -> supplySafely(newTask)); } } @@ -285,8 +311,8 @@ public static CompletableFuture addTimeoutHandling(CompletableFuture f /** * @return a {@link CompletableFuture} representing the asynchronous composition. - * The returned future is completed exceptionally with {@link NullPointerException} if one of params is null, - * or with {@link RejectedExecutionException} if the task cannot be accepted for execution. + * The returned future is completed exceptionally if one of the params is {@code null}, if the supplier throws or + * returns {@code null}, or if the executor rejects the task. */ public static @NonNull CompletableFuture composeAsync(Supplier> futureSupplier, Executor executor) { @@ -298,13 +324,15 @@ public static CompletableFuture addTimeoutHandling(CompletableFuture f } final CompletableFuture future = new CompletableFuture<>(); try { - executor.execute(() -> futureSupplier.get().whenComplete((result, error) -> { - if (error != null) { - future.completeExceptionally(error); - return; - } - future.complete(result); - })); + executor.execute(() -> { + supplySafely(futureSupplier).whenComplete((result, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + future.complete(result); + }); + }); } catch (RejectedExecutionException ex) { future.completeExceptionally(ex); } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java index ee1483c3a9a89..81c649b600ad9 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java @@ -21,6 +21,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.io.PrintWriter; @@ -34,6 +35,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import lombok.Cleanup; import org.assertj.core.util.Lists; @@ -252,4 +254,88 @@ public void testSequencer() { Assert.assertEquals(list3.get(i), (Integer) i); } } -} \ No newline at end of file + + @Test + public void testSequencerReturnsFailedFutureWhenTaskThrowsSynchronously() throws Exception { + FutureUtil.Sequencer sequencer = FutureUtil.Sequencer.create(); + + CompletableFuture future = sequencer.sequential(() -> { + throw new IllegalStateException("sync fail"); + }); + + try { + future.get(2, TimeUnit.SECONDS); + fail("Should have failed."); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof IllegalStateException); + assertEquals(e.getCause().getMessage(), "sync fail"); + } + } + + @Test + public void testSupplySafelyReturnsSupplierFuture() throws Exception { + CompletableFuture expected = CompletableFuture.completedFuture("ok"); + CompletableFuture future = FutureUtil.supplySafely(() -> expected); + + assertSame(future, expected); + assertEquals(future.get(2, TimeUnit.SECONDS), "ok"); + } + + @Test + public void testSupplySafelyReturnsFailedFutureWhenSupplierIsNull() throws Exception { + CompletableFuture future = FutureUtil.supplySafely(null); + + try { + future.get(2, TimeUnit.SECONDS); + fail("Should have failed."); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof NullPointerException); + } + } + + @Test + public void testSupplySafelyReturnsFailedFutureWhenSupplierThrowsSynchronously() throws Exception { + RuntimeException expected = new IllegalStateException("sync fail"); + + CompletableFuture future = FutureUtil.supplySafely(() -> { + throw expected; + }); + + try { + future.get(2, TimeUnit.SECONDS); + fail("Should have failed."); + } catch (ExecutionException e) { + assertEquals(e.getCause(), expected); + } + } + + @Test + public void testSupplySafelyReturnsFailedFutureWhenSupplierReturnsNull() throws Exception { + CompletableFuture future = FutureUtil.supplySafely(() -> null); + + try { + future.get(2, TimeUnit.SECONDS); + fail("Should have failed."); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof NullPointerException); + } + } + + @Test + public void testComposeAsyncReturnsFailedFutureWhenSupplierThrowsSynchronously() throws Exception { + @Cleanup("shutdownNow") + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + + CompletableFuture future = FutureUtil.composeAsync(() -> { + throw new IllegalStateException("sync fail"); + }, executor); + + try { + future.get(2, TimeUnit.SECONDS); + fail("Should have failed."); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof IllegalStateException); + assertEquals(e.getCause().getMessage(), "sync fail"); + } + } +}