From c89fcb4c0e9786e47f7c3226fe727ccbeb74b5af Mon Sep 17 00:00:00 2001 From: sunil-solace Date: Thu, 14 May 2026 17:19:30 -0400 Subject: [PATCH 1/3] DATAGO-134580: Recover error-queue producer from unsolicited CloseFlow The reactive + proactive recreate-on-stale logic added in PR #141 (commits 931f09c..134e7ef) protects each binding's per-binding XMLMessageProducer in JCSMPOutboundMessageHandler. The error-queue republish path in ErrorQueueInfrastructure has the same exposure but on a different producer: it borrows the session-default producer from JCSMPSessionProducerManager via producerManager.get(producerKey) and historically had no recovery logic when the broker tore that producer down via unsolicited CloseFlow. Failure mode without this fix: when the broker fans out CloseFlow (message-spool maintenance, DR failover, "503: Service Unavailable"), the shared session-default producer is marked closed by JCSMP. Every subsequent error-queue republish in ErrorQueueInfrastructure.send() throws StaleSessionException / JCSMPTransportException / ClosedFacilityException; ErrorQueueRepublishCorrelationKey.handleError() catches at the message-retry level and re-attempts up to maxDeliveryAttempts - all attempts re-using the same dead producer reference, all doomed to fail. After max attempts the message is re-queued onto the original consumer queue, the consumer redelivers it, fails again, hits the error-queue path again, fails again. Net effect: failed-consumer messages disappear from the system after a DR failover or spool maintenance event. The fix mirrors the outbound-handler approach: - Proactive: at the top of send(), after producerManager.get(...), check producer.isClosed(). If true, call the new producerManager.forceRecreate() to rebuild the shared producer before send is attempted. - Reactive: wrap producer.send(...) in a try-catch. On StaleSessionException, JCSMPTransportException, ClosedFacilityException, or post-failure producer.isClosed(), call forceRecreate() so the next ErrorQueueRepublishCorrelationKey retry-loop iteration picks up a fresh producer. The original exception still propagates so the retry caller can do its errorQueueDeliveryAttempt++ bookkeeping. The shared producer is reference-counted across the entire session (JCSMPOutboundMessageHandler also registers itself for ref-count purposes even though it uses its own per-binding producer for sends). release() + get() does NOT work as a recovery primitive in production because it only closes the resource when registeredIds.size() <= 1 - in any deployment with at least one outbound binding, the ref-count stays > 1 and release() leaves the dead resource in place. The new forceRecreate() in SharedResourceManager sidesteps the ref-count: it unconditionally closes the current resource and create()s a new one under the existing lock, leaving registrations intact so every already-registered caller picks up the fresh resource on their next get(). Added as a generic method on SharedResourceManager since the recovery contract is independent of the JCSMP specifics. Tests (ErrorQueueInfrastructureTest, new): - test_errorQueueProducerRecreatedProactivelyOnIsClosed: closed producer detected before send -> forceRecreate -> fresh producer services the publish; stale producer never sent through (Mockito.never()). - test_errorQueueProducerRecreatedReactivelyOnStaleSendException: @CartesianTest over Stale / JCSMPTransport / ClosedFacility - verifies all three exception types trigger forceRecreate AND propagate to the retry caller (so handleError can drive its loop). - test_errorQueueProducerNotRecreatedOnUnrelatedJCSMPException: negative control - a non-stale JCSMPException (e.g. malformed message) propagates normally and does NOT churn the shared producer, guarding against an over-broad reactive arm. 417 binder-core unit tests green (was 411 + 6 new from this commit). This branch is layered on DATAGO-134580 (PR #141) so the new SharedResourceManager.forceRecreate() and the ErrorQueueInfrastructure changes can be reviewed alongside the related outbound-handler work. --- .../binder/util/ErrorQueueInfrastructure.java | 30 +++- .../binder/util/SharedResourceManager.java | 25 ++++ .../util/ErrorQueueInfrastructureTest.java | 139 ++++++++++++++++++ 3 files changed, 193 insertions(+), 1 deletion(-) create mode 100644 solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructureTest.java diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructure.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructure.java index 0a23a1bca..28915caf0 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructure.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructure.java @@ -1,9 +1,12 @@ package com.solace.spring.cloud.stream.binder.util; import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties; +import com.solacesystems.jcsmp.ClosedFacilityException; import com.solacesystems.jcsmp.JCSMPException; import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.JCSMPTransportException; import com.solacesystems.jcsmp.Queue; +import com.solacesystems.jcsmp.StaleSessionException; import com.solacesystems.jcsmp.XMLMessage; import com.solacesystems.jcsmp.XMLMessageProducer; import org.slf4j.Logger; @@ -27,6 +30,8 @@ public ErrorQueueInfrastructure(JCSMPSessionProducerManager producerManager, Str this.consumerProperties = consumerProperties; } + // DATAGO-134580: recreate shared JCSMP producer on unsolicited termination from Solace broker. + public void send(MessageContainer messageContainer, ErrorQueueRepublishCorrelationKey key) throws JCSMPException { XMLMessage xmlMessage = xmlMessageMapper.mapError(messageContainer.getMessage(), consumerProperties); xmlMessage.setCorrelationKey(key); @@ -34,6 +39,11 @@ public void send(MessageContainer messageContainer, ErrorQueueRepublishCorrelati XMLMessageProducer producer; try { producer = producerManager.get(producerKey); + if (producer.isClosed()) { + LOGGER.warn("Detected closed shared JCSMP producer before sending to error queue {}; recreating", + errorQueueName); + producer = producerManager.forceRecreate(); + } } catch (Exception e) { MessagingException wrappedException = new MessagingException( String.format("Failed to get producer to send message %s to queue %s", xmlMessage.getMessageId(), @@ -42,7 +52,25 @@ public void send(MessageContainer messageContainer, ErrorQueueRepublishCorrelati throw wrappedException; } - producer.send(xmlMessage, queue); + try { + producer.send(xmlMessage, queue); + } catch (JCSMPException e) { + if (e instanceof StaleSessionException + || e instanceof JCSMPTransportException + || e instanceof ClosedFacilityException + || producer.isClosed()) { + LOGGER.warn("Detected stale shared JCSMP producer while sending to error queue {} (cause: {}); " + + "recreating for next attempt", + errorQueueName, e.getClass().getSimpleName()); + try { + producerManager.forceRecreate(); + } catch (Exception recreateError) { + LOGGER.warn("Failed to recreate shared JCSMP producer after stale-flow detection", recreateError); + e.addSuppressed(recreateError); + } + } + throw e; + } } public ErrorQueueRepublishCorrelationKey createCorrelationKey(MessageContainer messageContainer, diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SharedResourceManager.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SharedResourceManager.java index a6279c081..c0e382393 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SharedResourceManager.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SharedResourceManager.java @@ -43,6 +43,31 @@ public T get(String key) throws Exception { return sharedResource; } + /** + * Force-replace the shared resource. Closes the existing instance (if any) and + * {@link #create()}s a new one, regardless of how many callers are currently + * registered. Existing registrations are preserved, so subsequent {@link #get(String)} + * calls from any registered caller return the new resource. Intended for recovery + * paths where a caller has detected the shared resource is no longer usable (e.g. + * the underlying broker tore down the flow via unsolicited CloseFlow). + * + * @return the freshly-created shared resource + * @throws Exception whatever exception may be thrown by {@link #create()} + */ + public T forceRecreate() throws Exception { + synchronized (lock) { + if (sharedResource != null) { + try { + close(); + } catch (Exception e) { + LOGGER.debug("Failed to close stale {} during forceRecreate", type, e); + } + } + sharedResource = create(); + return sharedResource; + } + } + /** * De-register {@code key} from the shared resource. *

If this is the last {@code key} associated to the shared resource, {@link #close()} the resource. diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructureTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructureTest.java new file mode 100644 index 000000000..1764d06cb --- /dev/null +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructureTest.java @@ -0,0 +1,139 @@ +package com.solace.spring.cloud.stream.binder.util; + +import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties; +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.ClosedFacilityException; +import com.solacesystems.jcsmp.Destination; +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.JCSMPTransportException; +import com.solacesystems.jcsmp.StaleSessionException; +import com.solacesystems.jcsmp.XMLMessage; +import com.solacesystems.jcsmp.XMLMessageProducer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junitpioneer.jupiter.cartesian.CartesianTest; +import org.junitpioneer.jupiter.cartesian.CartesianTest.Values; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; + +/** + * Unit tests for the DATAGO-134580 stale-flow recovery added to {@link ErrorQueueInfrastructure}. + * The error-queue republish path borrows the session-default producer from + * {@link JCSMPSessionProducerManager} and historically had no recovery logic when the broker + * fanned out an unsolicited CloseFlow on that producer (reactive recreation in + * {@code JCSMPOutboundMessageHandler} only protects per-binding producers, not the shared + * session-default one). + */ +@ExtendWith(MockitoExtension.class) +class ErrorQueueInfrastructureTest { + private static final String PRODUCER_KEY = "test-producer-key"; + private static final String ERROR_QUEUE_NAME = "test-error-queue"; + + @Mock JCSMPSessionProducerManager producerManager; + @Mock MessageContainer messageContainer; + @Mock ErrorQueueRepublishCorrelationKey correlationKey; + + BytesXMLMessage inputMessage; + SolaceConsumerProperties consumerProperties; + ErrorQueueInfrastructure errorQueueInfrastructure; + + @BeforeEach + void setup() { + inputMessage = JCSMPFactory.onlyInstance().createMessage(BytesXMLMessage.class); + consumerProperties = new SolaceConsumerProperties(); + errorQueueInfrastructure = new ErrorQueueInfrastructure( + producerManager, PRODUCER_KEY, ERROR_QUEUE_NAME, consumerProperties); + Mockito.when(messageContainer.getMessage()).thenReturn(inputMessage); + } + + /** + * DATAGO-134580: proactive {@code producer.isClosed()} pre-check on the error-queue + * republish path. If the broker has already torn down the shared session-default + * producer before this {@code send(...)} runs, the very first error-queue publish + * should still succeed - the manager is asked to recreate the producer before send is + * attempted, and the fresh producer services the publish. + */ + @Test + void test_errorQueueProducerRecreatedProactivelyOnIsClosed( + @Mock XMLMessageProducer staleProducer, + @Mock XMLMessageProducer freshProducer) throws Exception { + Mockito.when(producerManager.get(PRODUCER_KEY)).thenReturn(staleProducer); + Mockito.when(staleProducer.isClosed()).thenReturn(true); + Mockito.when(producerManager.forceRecreate()).thenReturn(freshProducer); + + assertThatCode(() -> errorQueueInfrastructure.send(messageContainer, correlationKey)) + .as("Proactive recreate must allow the publish to succeed on the fresh producer") + .doesNotThrowAnyException(); + + Mockito.verify(producerManager).forceRecreate(); + Mockito.verify(freshProducer).send(any(XMLMessage.class), any(Destination.class)); + Mockito.verify(staleProducer, Mockito.never()).send(any(XMLMessage.class), any(Destination.class)); + } + + /** + * DATAGO-134580: reactive recreation when {@code send(...)} itself throws a + * stale-flow exception. The race window between our proactive {@code isClosed()} + * check and the actual send means the broker can tear the producer down mid-flight; + * in that case the exception must propagate so {@code ErrorQueueRepublishCorrelationKey} + * can retry, and the manager must be force-recreated so the next retry attempt picks up + * a fresh producer rather than re-using the dead one. + * + *

Parameterized over the three concrete JCSMP exception types we treat as + * stale-flow signals - the recovery contract must apply to all of them. + */ + @CartesianTest(name = "[{index}] exception={0}") + void test_errorQueueProducerRecreatedReactivelyOnStaleSendException( + @Values(strings = {"stale", "transport", "closed-facility"}) String exceptionType, + @Mock XMLMessageProducer staleProducer) throws Exception { + Mockito.when(producerManager.get(PRODUCER_KEY)).thenReturn(staleProducer); + Mockito.when(staleProducer.isClosed()).thenReturn(false); + + JCSMPException sendError = switch (exceptionType) { + case "stale" -> new StaleSessionException( + "Tried to perform operation on a closed XML message producer", + new JCSMPException("Received unsolicited CloseFlow for producer (503:Service Unavailable).")); + case "transport" -> new JCSMPTransportException( + "Received unsolicited CloseFlow for producer (503:Service Unavailable)."); + case "closed-facility" -> new ClosedFacilityException("Producer is closed"); + default -> throw new IllegalArgumentException("unknown exception type: " + exceptionType); + }; + Mockito.doThrow(sendError).when(staleProducer).send(any(XMLMessage.class), any(Destination.class)); + + assertThatThrownBy(() -> errorQueueInfrastructure.send(messageContainer, correlationKey)) + .as("Stale-flow send failure must propagate so the retry caller can re-attempt") + .isInstanceOf(sendError.getClass()); + + // The manager must have been asked to forceRecreate so the next retry by + // ErrorQueueRepublishCorrelationKey gets a fresh producer instead of the dead one. + Mockito.verify(producerManager).forceRecreate(); + } + + /** + * DATAGO-134580: a non-stale {@code JCSMPException} from {@code send(...)} must + * propagate normally and must not trigger a producer recreate. Guards + * against an over-broad reactive arm that would churn the shared producer on + * every transient publish error (e.g. a malformed message). + */ + @Test + void test_errorQueueProducerNotRecreatedOnUnrelatedJCSMPException( + @Mock XMLMessageProducer producer) throws Exception { + Mockito.when(producerManager.get(PRODUCER_KEY)).thenReturn(producer); + Mockito.when(producer.isClosed()).thenReturn(false); + + JCSMPException unrelated = new JCSMPException("Some unrelated publishing error"); + Mockito.doThrow(unrelated).when(producer).send(any(XMLMessage.class), any(Destination.class)); + + assertThatThrownBy(() -> errorQueueInfrastructure.send(messageContainer, correlationKey)) + .isInstanceOf(JCSMPException.class) + .hasMessage("Some unrelated publishing error"); + + Mockito.verify(producerManager, Mockito.never()).forceRecreate(); + } +} \ No newline at end of file From 673217884e66ed94f66af6a7d929a550ab177d16 Mon Sep 17 00:00:00 2001 From: sunil-solace Date: Thu, 14 May 2026 17:44:34 -0400 Subject: [PATCH 2/3] DATAGO-134580: Apply CAS semantics to forceRecreate + generic docs Three PR #142 review items: C1 (Copilot) + C3 (mayur-solace) - race in forceRecreate(). The original unconditional implementation could have two callers both observe the same stale shared resource, both enter forceRecreate(), and have the second caller close a healthy replacement that the first caller just installed. Fix: compare-and-swap. forceRecreate now takes an `expected` argument - the reference the caller observed. Under the lock, the manager recreates only if `sharedResource == expected`; otherwise it returns whatever a concurrent caller already installed without closing or re-creating anything. The caller-visible contract is now: pass what you observed, use what's returned. C2 (mayur-solace) - Javadoc on SharedResourceManager.forceRecreate referenced the broker / CloseFlow concern specifically. Since SharedResourceManager is generic and could host non-broker resources in the future, the docs are rewritten to describe the CAS contract generically without naming the JCSMP/broker context. ErrorQueueInfrastructure.send() updated at both call sites to pass the observed producer reference and use the value returned by forceRecreate (which may be the fresh one we requested, or the already-installed replacement another caller put in place). New unit test testErrorQueueProducerUsesManagerReturnedReferenceAfterForceRecreate simulates the exact race C1 flagged: stale producer observed, manager's CAS returns an already-installed replacement, send must use the replacement rather than the locally-observed stale one. Existing tests updated to pass the observed reference and verify CAS arguments. Also aligned the test method names to drop the test_ underscore form, matching the no-underscore convention used elsewhere in the binder-core test suite (e.g. SolaceErrorMessageHandlerTest). 418 binder-core unit tests green (was 417 + 1 new CAS-race test). --- .../binder/util/ErrorQueueInfrastructure.java | 4 +- .../binder/util/SharedResourceManager.java | 34 +++++++++---- .../util/ErrorQueueInfrastructureTest.java | 51 +++++++++++++++---- 3 files changed, 69 insertions(+), 20 deletions(-) diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructure.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructure.java index 28915caf0..5b1311046 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructure.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructure.java @@ -42,7 +42,7 @@ public void send(MessageContainer messageContainer, ErrorQueueRepublishCorrelati if (producer.isClosed()) { LOGGER.warn("Detected closed shared JCSMP producer before sending to error queue {}; recreating", errorQueueName); - producer = producerManager.forceRecreate(); + producer = producerManager.forceRecreate(producer); } } catch (Exception e) { MessagingException wrappedException = new MessagingException( @@ -63,7 +63,7 @@ public void send(MessageContainer messageContainer, ErrorQueueRepublishCorrelati "recreating for next attempt", errorQueueName, e.getClass().getSimpleName()); try { - producerManager.forceRecreate(); + producerManager.forceRecreate(producer); } catch (Exception recreateError) { LOGGER.warn("Failed to recreate shared JCSMP producer after stale-flow detection", recreateError); e.addSuppressed(recreateError); diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SharedResourceManager.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SharedResourceManager.java index c0e382393..b7908b2a7 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SharedResourceManager.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SharedResourceManager.java @@ -44,23 +44,39 @@ public T get(String key) throws Exception { } /** - * Force-replace the shared resource. Closes the existing instance (if any) and - * {@link #create()}s a new one, regardless of how many callers are currently - * registered. Existing registrations are preserved, so subsequent {@link #get(String)} - * calls from any registered caller return the new resource. Intended for recovery - * paths where a caller has detected the shared resource is no longer usable (e.g. - * the underlying broker tore down the flow via unsolicited CloseFlow). + * Conditionally replace the shared resource using compare-and-swap semantics. * - * @return the freshly-created shared resource + *

If the manager still holds the {@code expected} reference, the existing + * resource is closed and a fresh one is {@link #create()}d. If the manager + * already holds a different reference - because a concurrent caller has already + * replaced it - this is a no-op and the currently-installed resource is + * returned. This prevents two callers that observed the same stale resource + * from both recreating: the second caller sees that the resource has already + * changed and uses the replacement rather than closing a potentially in-use + * resource that the first caller installed. + * + *

Existing registrations are preserved, so subsequent {@link #get(String)} + * calls from any registered caller return the (possibly newly-installed) + * resource. + * + * @param expected the resource reference the caller observed and considers no + * longer usable; pass the value previously returned by + * {@link #get(String)} or by an earlier call to this method + * @return the resource currently installed in the manager - either the + * freshly-created one (if the swap happened) or whatever a concurrent + * caller installed (if it did not) * @throws Exception whatever exception may be thrown by {@link #create()} */ - public T forceRecreate() throws Exception { + public T forceRecreate(T expected) throws Exception { synchronized (lock) { + if (sharedResource != expected) { + return sharedResource; + } if (sharedResource != null) { try { close(); } catch (Exception e) { - LOGGER.debug("Failed to close stale {} during forceRecreate", type, e); + LOGGER.debug("Failed to close current {} during forceRecreate", type, e); } } sharedResource = create(); diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructureTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructureTest.java index 1764d06cb..299246722 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructureTest.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructureTest.java @@ -61,18 +61,20 @@ void setup() { * attempted, and the fresh producer services the publish. */ @Test - void test_errorQueueProducerRecreatedProactivelyOnIsClosed( + void testErrorQueueProducerRecreatedProactivelyOnIsClosed( @Mock XMLMessageProducer staleProducer, @Mock XMLMessageProducer freshProducer) throws Exception { Mockito.when(producerManager.get(PRODUCER_KEY)).thenReturn(staleProducer); Mockito.when(staleProducer.isClosed()).thenReturn(true); - Mockito.when(producerManager.forceRecreate()).thenReturn(freshProducer); + Mockito.when(producerManager.forceRecreate(staleProducer)).thenReturn(freshProducer); assertThatCode(() -> errorQueueInfrastructure.send(messageContainer, correlationKey)) .as("Proactive recreate must allow the publish to succeed on the fresh producer") .doesNotThrowAnyException(); - Mockito.verify(producerManager).forceRecreate(); + // CAS contract: caller passes the observed (stale) reference so the manager only + // recreates if it still holds that exact instance. + Mockito.verify(producerManager).forceRecreate(staleProducer); Mockito.verify(freshProducer).send(any(XMLMessage.class), any(Destination.class)); Mockito.verify(staleProducer, Mockito.never()).send(any(XMLMessage.class), any(Destination.class)); } @@ -89,7 +91,7 @@ void test_errorQueueProducerRecreatedProactivelyOnIsClosed( * stale-flow signals - the recovery contract must apply to all of them. */ @CartesianTest(name = "[{index}] exception={0}") - void test_errorQueueProducerRecreatedReactivelyOnStaleSendException( + void testErrorQueueProducerRecreatedReactivelyOnStaleSendException( @Values(strings = {"stale", "transport", "closed-facility"}) String exceptionType, @Mock XMLMessageProducer staleProducer) throws Exception { Mockito.when(producerManager.get(PRODUCER_KEY)).thenReturn(staleProducer); @@ -110,9 +112,10 @@ void test_errorQueueProducerRecreatedReactivelyOnStaleSendException( .as("Stale-flow send failure must propagate so the retry caller can re-attempt") .isInstanceOf(sendError.getClass()); - // The manager must have been asked to forceRecreate so the next retry by - // ErrorQueueRepublishCorrelationKey gets a fresh producer instead of the dead one. - Mockito.verify(producerManager).forceRecreate(); + // The manager must have been asked to forceRecreate (with the observed stale + // producer for CAS semantics) so the next retry by ErrorQueueRepublishCorrelationKey + // gets a fresh producer instead of the dead one. + Mockito.verify(producerManager).forceRecreate(staleProducer); } /** @@ -122,7 +125,7 @@ void test_errorQueueProducerRecreatedReactivelyOnStaleSendException( * every transient publish error (e.g. a malformed message). */ @Test - void test_errorQueueProducerNotRecreatedOnUnrelatedJCSMPException( + void testErrorQueueProducerNotRecreatedOnUnrelatedJCSMPException( @Mock XMLMessageProducer producer) throws Exception { Mockito.when(producerManager.get(PRODUCER_KEY)).thenReturn(producer); Mockito.when(producer.isClosed()).thenReturn(false); @@ -134,6 +137,36 @@ void test_errorQueueProducerNotRecreatedOnUnrelatedJCSMPException( .isInstanceOf(JCSMPException.class) .hasMessage("Some unrelated publishing error"); - Mockito.verify(producerManager, Mockito.never()).forceRecreate(); + Mockito.verify(producerManager, Mockito.never()).forceRecreate(any()); + } + + /** + * DATAGO-134580: CAS contract verification. When two callers both observe the + * same stale producer and both call {@code forceRecreate(stale)}, the manager + * recreates exactly once - the second call returns the already-recreated + * resource without closing it. {@code ErrorQueueInfrastructure.send(...)} must + * use the value returned by {@code forceRecreate} (rather than its own observed + * reference) so it ends up using whatever the manager currently holds, not a + * resource that another caller has since closed and replaced. + */ + @Test + void testErrorQueueProducerUsesManagerReturnedReferenceAfterForceRecreate( + @Mock XMLMessageProducer staleProducer, + @Mock XMLMessageProducer alreadyRecreatedByAnotherCaller) throws Exception { + Mockito.when(producerManager.get(PRODUCER_KEY)).thenReturn(staleProducer); + Mockito.when(staleProducer.isClosed()).thenReturn(true); + // Simulate the CAS no-op outcome: another caller already replaced the stale + // producer, so the manager's CAS does not recreate again - it returns the + // already-installed replacement instead. + Mockito.when(producerManager.forceRecreate(staleProducer)) + .thenReturn(alreadyRecreatedByAnotherCaller); + + assertThatCode(() -> errorQueueInfrastructure.send(messageContainer, correlationKey)) + .as("send must use the manager-returned reference (the already-installed replacement) " + + "and not the locally-observed stale reference") + .doesNotThrowAnyException(); + + Mockito.verify(alreadyRecreatedByAnotherCaller).send(any(XMLMessage.class), any(Destination.class)); + Mockito.verify(staleProducer, Mockito.never()).send(any(XMLMessage.class), any(Destination.class)); } } \ No newline at end of file From cedd2f202b44ec79b1535445aaef87765bf71f94 Mon Sep 17 00:00:00 2001 From: sunil-solace Date: Thu, 14 May 2026 18:08:03 -0400 Subject: [PATCH 3/3] DATAGO-134580: Trim forceRecreate Javadoc to essentials Per PR #142 follow-up: the previous Javadoc (24 lines, two paragraphs of explanation) was verbose for an IDE hover. Reduced to a single sentence describing the CAS contract plus the standard param/return/throws. --- .../binder/util/SharedResourceManager.java | 27 +++++-------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SharedResourceManager.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SharedResourceManager.java index b7908b2a7..38fdd509b 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SharedResourceManager.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SharedResourceManager.java @@ -44,28 +44,13 @@ public T get(String key) throws Exception { } /** - * Conditionally replace the shared resource using compare-and-swap semantics. + * Compare-and-swap the shared resource. If the manager still holds {@code expected}, + * close it and {@link #create()} a fresh one; otherwise return the currently-installed + * resource without re-creating. * - *

If the manager still holds the {@code expected} reference, the existing - * resource is closed and a fresh one is {@link #create()}d. If the manager - * already holds a different reference - because a concurrent caller has already - * replaced it - this is a no-op and the currently-installed resource is - * returned. This prevents two callers that observed the same stale resource - * from both recreating: the second caller sees that the resource has already - * changed and uses the replacement rather than closing a potentially in-use - * resource that the first caller installed. - * - *

Existing registrations are preserved, so subsequent {@link #get(String)} - * calls from any registered caller return the (possibly newly-installed) - * resource. - * - * @param expected the resource reference the caller observed and considers no - * longer usable; pass the value previously returned by - * {@link #get(String)} or by an earlier call to this method - * @return the resource currently installed in the manager - either the - * freshly-created one (if the swap happened) or whatever a concurrent - * caller installed (if it did not) - * @throws Exception whatever exception may be thrown by {@link #create()} + * @param expected the reference the caller observed and considers no longer usable + * @return the resource currently installed in the manager + * @throws Exception whatever {@link #create()} may throw */ public T forceRecreate(T expected) throws Exception { synchronized (lock) {