Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CODING.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ Pulsar relies heavily on `CompletableFuture`; prefer it over `ListenableFuture`
return firstAsync(arg).thenCompose(v -> secondAsync(v));
```
- **Converting a synchronous-throwing method to a failed future is not mechanical** — some callers rely
on the throw happening *before* the async work starts, so evaluate each call site. Use a shared
`checkArgumentAsync` helper (in `FutureUtil`) to validate without duplicating try/catch.
on the throw happening *before* the async work starts, so evaluate each call site. Use shared
`FutureUtil` helpers, such as `supplySafely` for invoking a `Supplier<CompletableFuture<T>>`,
instead of duplicating try/catch or null-future handling.

- **Limit concurrency and handle backpressure.** Firing many async operations at once can overwhelm the
system. Options:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static <T> CompletableFuture<T> executeWithRetry(Supplier<CompletableFutu
Class<? extends Exception> needRetryExceptionClass,
int maxRetryTimes) {
CompletableFuture<T> resultFuture = new CompletableFuture<>();
op.get().whenComplete((res, ex) -> {
FutureUtil.supplySafely(op).whenComplete((res, ex) -> {
if (ex == null) {
resultFuture.complete(res);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.util;

import static org.testng.Assert.assertEquals;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.testng.annotations.Test;

public class FuturesTest {

@Test
public void testExecuteWithRetryHandlesSynchronousFailure() throws Exception {
AtomicInteger attempts = new AtomicInteger();

CompletableFuture<String> result = Futures.executeWithRetry(() -> {
if (attempts.incrementAndGet() == 1) {
throw new CompletionException(new ManagedLedgerException.MetaStoreException("sync fail"));
}
return CompletableFuture.completedFuture("ok");
}, ManagedLedgerException.MetaStoreException.class, 1);

assertEquals(result.get(2, TimeUnit.SECONDS), "ok");
assertEquals(attempts.get(), 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,31 @@ public static <T> CompletableFuture<T> failedFuture(Throwable t) {
return future;
}

/**
* Invokes a supplier that is expected to return a {@link CompletableFuture}, converting synchronous failures into
* a failed future.
*
* @param supplier the supplier to invoke
* @param <T> the result type of the returned future
* @return the future returned by the supplier, or a failed future if the supplier is {@code null}, throws,
* or returns {@code null}
*/
public static <T> CompletableFuture<T> supplySafely(Supplier<CompletableFuture<T>> supplier) {
if (supplier == null) {
return failedFuture(new NullPointerException("Expected Supplier should not be null"));
}
CompletableFuture<T> future;
try {
future = supplier.get();
} catch (Throwable t) {
return failedFuture(t);
}
if (future == null) {
return failedFuture(new NullPointerException("The given supplier returned null, supplier=" + supplier));
}
return future;
}

public static Throwable unwrapCompletionException(Throwable ex) {
if (ex instanceof CompletionException) {
return unwrapCompletionException(ex.getCause());
Expand Down Expand Up @@ -227,8 +252,9 @@ public static <T> Sequencer<T> create() {
}

/**
* @return a {@link CompletableFuture} representing the newly scheduled task,
* or one completed exceptionally with {@link NullPointerException} if param is null.
* @return a {@link CompletableFuture} representing the newly scheduled task, or the current failed chain when
* exceptions are allowed to break the chain. Returns a failed future if the supplier is {@code null}, throws,
* or returns {@code null}.
*/
public synchronized CompletableFuture<T> sequential(Supplier<CompletableFuture<T>> newTask) {
if (newTask == null) {
Expand All @@ -238,11 +264,11 @@ public synchronized CompletableFuture<T> sequential(Supplier<CompletableFuture<T
if (sequencerFuture.isCompletedExceptionally() && allowExceptionBreakChain) {
return sequencerFuture;
}
return sequencerFuture = newTask.get();
return sequencerFuture = supplySafely(newTask);
}
return sequencerFuture = allowExceptionBreakChain
? sequencerFuture.thenCompose(__ -> newTask.get())
: sequencerFuture.exceptionally(ex -> null).thenCompose(__ -> newTask.get());
? sequencerFuture.thenCompose(__ -> supplySafely(newTask))
: sequencerFuture.exceptionally(ex -> null).thenCompose(__ -> supplySafely(newTask));
}
}

Expand Down Expand Up @@ -285,8 +311,8 @@ public static <T> CompletableFuture<T> addTimeoutHandling(CompletableFuture<T> f

/**
* @return a {@link CompletableFuture} representing the asynchronous composition.
* The returned future is completed exceptionally with {@link NullPointerException} if one of params is null,
* or with {@link RejectedExecutionException} if the task cannot be accepted for execution.
* The returned future is completed exceptionally if one of the params is {@code null}, if the supplier throws or
* returns {@code null}, or if the executor rejects the task.
*/
public static <T> @NonNull CompletableFuture<T> composeAsync(Supplier<CompletableFuture<T>> futureSupplier,
Executor executor) {
Expand All @@ -298,13 +324,15 @@ public static <T> CompletableFuture<T> addTimeoutHandling(CompletableFuture<T> f
}
final CompletableFuture<T> future = new CompletableFuture<>();
try {
executor.execute(() -> futureSupplier.get().whenComplete((result, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
future.complete(result);
}));
executor.execute(() -> {
supplySafely(futureSupplier).whenComplete((result, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
future.complete(result);
});
});
} catch (RejectedExecutionException ex) {
future.completeExceptionally(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.PrintWriter;
Expand All @@ -34,6 +35,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Cleanup;
import org.assertj.core.util.Lists;
Expand Down Expand Up @@ -252,4 +254,88 @@ public void testSequencer() {
Assert.assertEquals(list3.get(i), (Integer) i);
}
}
}

@Test
public void testSequencerReturnsFailedFutureWhenTaskThrowsSynchronously() throws Exception {
FutureUtil.Sequencer<String> sequencer = FutureUtil.Sequencer.create();

CompletableFuture<String> future = sequencer.sequential(() -> {
throw new IllegalStateException("sync fail");
});

try {
future.get(2, TimeUnit.SECONDS);
fail("Should have failed.");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof IllegalStateException);
assertEquals(e.getCause().getMessage(), "sync fail");
}
}

@Test
public void testSupplySafelyReturnsSupplierFuture() throws Exception {
CompletableFuture<String> expected = CompletableFuture.completedFuture("ok");
CompletableFuture<String> future = FutureUtil.supplySafely(() -> expected);

assertSame(future, expected);
assertEquals(future.get(2, TimeUnit.SECONDS), "ok");
}

@Test
public void testSupplySafelyReturnsFailedFutureWhenSupplierIsNull() throws Exception {
CompletableFuture<String> future = FutureUtil.supplySafely(null);

try {
future.get(2, TimeUnit.SECONDS);
fail("Should have failed.");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof NullPointerException);
}
}

@Test
public void testSupplySafelyReturnsFailedFutureWhenSupplierThrowsSynchronously() throws Exception {
RuntimeException expected = new IllegalStateException("sync fail");

CompletableFuture<String> future = FutureUtil.supplySafely(() -> {
throw expected;
});

try {
future.get(2, TimeUnit.SECONDS);
fail("Should have failed.");
} catch (ExecutionException e) {
assertEquals(e.getCause(), expected);
}
}

@Test
public void testSupplySafelyReturnsFailedFutureWhenSupplierReturnsNull() throws Exception {
CompletableFuture<String> future = FutureUtil.supplySafely(() -> null);

try {
future.get(2, TimeUnit.SECONDS);
fail("Should have failed.");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof NullPointerException);
}
}

@Test
public void testComposeAsyncReturnsFailedFutureWhenSupplierThrowsSynchronously() throws Exception {
@Cleanup("shutdownNow")
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

CompletableFuture<String> future = FutureUtil.composeAsync(() -> {
throw new IllegalStateException("sync fail");
}, executor);

try {
future.get(2, TimeUnit.SECONDS);
fail("Should have failed.");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof IllegalStateException);
assertEquals(e.getCause().getMessage(), "sync fail");
}
}
}