From a4789d7f22bfc768fe9265a460987ddf37382fd8 Mon Sep 17 00:00:00 2001 From: Max Lambrecht Date: Sat, 9 May 2026 15:52:30 -0500 Subject: [PATCH] fix(workloadapi): notify watchers when retries stop Signed-off-by: Max Lambrecht --- .../spiffe/workloadapi/StreamObservers.java | 37 +++++++++++----- .../workloadapi/retry/RetryHandler.java | 17 ++++++-- .../workloadapi/retry/RetryHandlerTest.java | 42 +++++++++++++++---- 3 files changed, 72 insertions(+), 24 deletions(-) diff --git a/java-spiffe-core/src/main/java/io/spiffe/workloadapi/StreamObservers.java b/java-spiffe-core/src/main/java/io/spiffe/workloadapi/StreamObservers.java index 5156aa5c..f135f45b 100644 --- a/java-spiffe-core/src/main/java/io/spiffe/workloadapi/StreamObservers.java +++ b/java-spiffe-core/src/main/java/io/spiffe/workloadapi/StreamObservers.java @@ -12,6 +12,8 @@ import io.spiffe.workloadapi.grpc.Workload; import io.spiffe.workloadapi.retry.RetryHandler; +import java.util.EnumSet; +import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; @@ -20,8 +22,15 @@ final class StreamObservers { private static final Logger log = Logger.getLogger(StreamObservers.class.getName()); - private static final String INVALID_ARGUMENT = "INVALID_ARGUMENT"; private static final String STREAM_IS_COMPLETED = "Workload API stream is completed"; + // Retry only transient stream failures. Caller/client cancellation and terminal API errors fail the watch closed. + private static final Set NON_RETRYABLE_CODES = EnumSet.of( + Status.Code.INVALID_ARGUMENT, + Status.Code.CANCELLED, + Status.Code.PERMISSION_DENIED, + Status.Code.UNAUTHENTICATED, + Status.Code.UNIMPLEMENTED, + Status.Code.FAILED_PRECONDITION); private StreamObservers() { } @@ -63,13 +72,15 @@ private void handleWatchX509ContextError(final Throwable t) { private void handleX509ContextRetry(Throwable t) { if (retryHandler.shouldRetry()) { log.log(Level.FINE, "Retrying connecting to Workload API to register X.509 context watcher"); - retryHandler.scheduleRetry(() -> + boolean retryScheduled = retryHandler.scheduleRetry(() -> cancellableContext.run( () -> workloadApiAsyncStub.fetchX509SVID(newX509SvidRequest(), this))); - } else { - watcher.onError(new X509ContextException("Cancelling X.509 Context watch", t)); + if (retryScheduled) { + return; + } } + watcher.onError(new X509ContextException("Cancelling X.509 Context watch", t)); } @Override @@ -117,13 +128,15 @@ private void handleWatchX509BundlesError(final Throwable t) { private void handleX509BundlesRetry(Throwable t) { if (retryHandler.shouldRetry()) { log.log(Level.FINE, "Retrying connecting to Workload API to register X.509 bundles watcher"); - retryHandler.scheduleRetry(() -> + boolean retryScheduled = retryHandler.scheduleRetry(() -> cancellableContext.run( () -> workloadApiAsyncStub.fetchX509Bundles(newX509BundlesRequest(), this))); - } else { - watcher.onError(new X509BundleException("Cancelling X.509 bundles watch", t)); + if (retryScheduled) { + return; + } } + watcher.onError(new X509BundleException("Cancelling X.509 bundles watch", t)); } @Override @@ -171,12 +184,14 @@ private void handleWatchJwtBundleError(final Throwable t) { private void handleJwtBundleRetry(Throwable t) { if (retryHandler.shouldRetry()) { log.log(Level.FINE, "Retrying connecting to Workload API to register JWT Bundles watcher"); - retryHandler.scheduleRetry(() -> + boolean retryScheduled = retryHandler.scheduleRetry(() -> cancellableContext.run(() -> workloadApiAsyncStub.fetchJWTBundles(newJwtBundlesRequest(), this))); - } else { - watcher.onError(new JwtBundleException("Cancelling JWT Bundles watch", t)); + if (retryScheduled) { + return; + } } + watcher.onError(new JwtBundleException("Cancelling JWT Bundles watch", t)); } @Override @@ -188,7 +203,7 @@ public void onCompleted() { } private static boolean isErrorNotRetryable(Throwable t) { - return INVALID_ARGUMENT.equals(Status.fromThrowable(t).getCode().name()); + return NON_RETRYABLE_CODES.contains(Status.fromThrowable(t).getCode()); } private static Workload.X509SVIDRequest newX509SvidRequest() { diff --git a/java-spiffe-core/src/main/java/io/spiffe/workloadapi/retry/RetryHandler.java b/java-spiffe-core/src/main/java/io/spiffe/workloadapi/retry/RetryHandler.java index 29e3dff9..6a80eec2 100644 --- a/java-spiffe-core/src/main/java/io/spiffe/workloadapi/retry/RetryHandler.java +++ b/java-spiffe-core/src/main/java/io/spiffe/workloadapi/retry/RetryHandler.java @@ -1,6 +1,7 @@ package io.spiffe.workloadapi.retry; import java.time.Duration; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -26,18 +27,26 @@ public RetryHandler(final ExponentialBackoffPolicy exponentialBackoffPolicy, fin * Updates the next delay and retries count. * * @param runnable the task to be scheduled for execution + * @return true if the retry was scheduled, false otherwise */ - public void scheduleRetry(final Runnable runnable) { + public boolean scheduleRetry(final Runnable runnable) { if (executor.isShutdown()) { - return; + return false; } if (exponentialBackoffPolicy.reachedMaxRetries(retryCount)) { - return; + return false; } - executor.schedule(runnable, nextDelay.getSeconds(), TimeUnit.SECONDS); + + try { + executor.schedule(runnable, nextDelay.getSeconds(), TimeUnit.SECONDS); + } catch (RejectedExecutionException e) { + return false; + } + nextDelay = exponentialBackoffPolicy.nextDelay(nextDelay); retryCount++; + return true; } /** diff --git a/java-spiffe-core/src/test/java/io/spiffe/workloadapi/retry/RetryHandlerTest.java b/java-spiffe-core/src/test/java/io/spiffe/workloadapi/retry/RetryHandlerTest.java index 8d7a300a..6d806abe 100644 --- a/java-spiffe-core/src/test/java/io/spiffe/workloadapi/retry/RetryHandlerTest.java +++ b/java-spiffe-core/src/test/java/io/spiffe/workloadapi/retry/RetryHandlerTest.java @@ -1,5 +1,6 @@ package io.spiffe.workloadapi.retry; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; @@ -11,6 +12,8 @@ import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.*; class RetryHandlerTest { @@ -18,9 +21,16 @@ class RetryHandlerTest { @Mock ScheduledExecutorService scheduledExecutorService; + private AutoCloseable mocks; + @BeforeEach void setup() { - MockitoAnnotations.initMocks(this); + mocks = MockitoAnnotations.openMocks(this); + } + + @AfterEach + void tearDown() throws Exception { + mocks.close(); } @Test @@ -30,23 +40,23 @@ void testScheduleRetry_defaultPolicy() { RetryHandler retryHandler = new RetryHandler(exponentialBackoffPolicy, scheduledExecutorService); - retryHandler.scheduleRetry(runnable); + assertTrue(retryHandler.scheduleRetry(runnable)); verify(scheduledExecutorService).schedule(runnable, 1, TimeUnit.SECONDS); assertEquals(1, retryHandler.getRetryCount()); // second retry - retryHandler.scheduleRetry(runnable); + assertTrue(retryHandler.scheduleRetry(runnable)); assertEquals(2, retryHandler.getRetryCount()); verify(scheduledExecutorService).schedule(runnable, 2, TimeUnit.SECONDS); // third retry - retryHandler.scheduleRetry(runnable); + assertTrue(retryHandler.scheduleRetry(runnable)); assertEquals(3, retryHandler.getRetryCount()); verify(scheduledExecutorService).schedule(runnable, 4, TimeUnit.SECONDS); // fourth retry - retryHandler.scheduleRetry(runnable); + assertTrue(retryHandler.scheduleRetry(runnable)); assertEquals(4, retryHandler.getRetryCount()); verify(scheduledExecutorService).schedule(runnable, 8, TimeUnit.SECONDS); } @@ -58,27 +68,41 @@ void testScheduleRetry_maxRetries() { RetryHandler retryHandler = new RetryHandler(exponentialBackoffPolicy, scheduledExecutorService); - retryHandler.scheduleRetry(runnable); + assertTrue(retryHandler.scheduleRetry(runnable)); verify(scheduledExecutorService).schedule(runnable, 1, TimeUnit.SECONDS); assertEquals(1, retryHandler.getRetryCount()); // second retry - retryHandler.scheduleRetry(runnable); + assertTrue(retryHandler.scheduleRetry(runnable)); assertEquals(2, retryHandler.getRetryCount()); verify(scheduledExecutorService).schedule(runnable, 2, TimeUnit.SECONDS); // third retry - retryHandler.scheduleRetry(runnable); + assertTrue(retryHandler.scheduleRetry(runnable)); assertEquals(3, retryHandler.getRetryCount()); verify(scheduledExecutorService).schedule(runnable, 4, TimeUnit.SECONDS); Mockito.reset(scheduledExecutorService); // fourth retry exceeds max retries - retryHandler.scheduleRetry(runnable); + assertFalse(retryHandler.scheduleRetry(runnable)); + verify(scheduledExecutorService).isShutdown(); + verifyNoMoreInteractions(scheduledExecutorService); + } + + @Test + void testScheduleRetry_executorShutdown() { + Runnable runnable = () -> { }; + ExponentialBackoffPolicy exponentialBackoffPolicy = ExponentialBackoffPolicy.DEFAULT; + when(scheduledExecutorService.isShutdown()).thenReturn(true); + + RetryHandler retryHandler = new RetryHandler(exponentialBackoffPolicy, scheduledExecutorService); + + assertFalse(retryHandler.scheduleRetry(runnable)); verify(scheduledExecutorService).isShutdown(); verifyNoMoreInteractions(scheduledExecutorService); + assertEquals(0, retryHandler.getRetryCount()); } @Test