Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@
**/.flattened-pom.xml

# Release files
release.properties
release.properties

# Local toolchain / IDE files
.java-version
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
import com.solace.spring.cloud.stream.binder.util.JCSMPSessionProducerManager.CloudStreamEventHandler;
import com.solace.spring.cloud.stream.binder.util.StaticMessageHeaderMapAccessor;
import com.solace.spring.cloud.stream.binder.util.XMLMessageMapper;
import com.solacesystems.jcsmp.ClosedFacilityException;
import com.solacesystems.jcsmp.Destination;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler;
import com.solacesystems.jcsmp.JCSMPTransportException;
import com.solacesystems.jcsmp.StaleSessionException;
import com.solacesystems.jcsmp.Topic;
import com.solacesystems.jcsmp.XMLMessage;
import com.solacesystems.jcsmp.XMLMessageProducer;
Expand Down Expand Up @@ -64,6 +67,10 @@ public final class JCSMPOutboundMessageHandler implements MessageHandler, Lifecy
private boolean isRunning = false;
private ErrorMessageStrategy errorMessageStrategy;

// DATAGO-134580: recreate JCSMP producer on unsolicited termination from Solace broker.
private volatile boolean recreateProducer = false;
private final Object lifecycleLock = new Object();

private static final Logger LOGGER = LoggerFactory.getLogger(JCSMPOutboundMessageHandler.class);

public JCSMPOutboundMessageHandler(ProducerDestination destination,
Expand Down Expand Up @@ -141,6 +148,8 @@ public void handleMessage(@NonNull Message<?> message) throws MessagingException
dynamicDestinations = Collections.singletonList(getDynamicDestination(message.getHeaders(), correlationKey));
}

recreateProducerIfNeeded(correlationKey);

try {
for (int i = 0; i < smfMessages.size(); i++) {
XMLMessage smfMessage = smfMessages.get(i);
Expand All @@ -163,6 +172,18 @@ public void handleMessage(@NonNull Message<?> message) throws MessagingException
producerEventHandler.responseReceivedEx(correlationKey);
}
} catch (JCSMPException e) {
if (e instanceof StaleSessionException
|| e instanceof JCSMPTransportException
|| e instanceof ClosedFacilityException
|| producer.isClosed()) {
if (!recreateProducer) {
LOGGER.debug("Detected stale JCSMP producer for binding {} (cause: {}); will " +
"recreate on next message <message handler ID: {}>",
properties.getBindingName(), e.getClass().getSimpleName(), id);
}
recreateProducer = true;
}

if (transactedSession != null) {
try {
if (!(e instanceof RollbackException)) {
Expand Down Expand Up @@ -227,62 +248,106 @@ private Destination getDynamicDestination(Map<String, Object> headers, ErrorChan
@Override
public void start() {
LOGGER.info("Creating producer to {} {} <message handler ID: {}>", configDestinationType, configDestination.getName(), id);
if (isRunning()) {
LOGGER.warn("Nothing to do, message handler {} is already running", id);
return;
synchronized (lifecycleLock) {
if (isRunning()) {
LOGGER.warn("Nothing to do, message handler {} is already running", id);
return;
}
recreateProducer = false;

try {
Map<String, String> headerNameMapping = properties.getExtension().getHeaderNameMapping();
if (headerNameMapping != null && !headerNameMapping.isEmpty()) {
Set<String> uniqueTargetHeaderNames = new HashSet<>(headerNameMapping.values());
if (uniqueTargetHeaderNames.size() < headerNameMapping.size()) {
IllegalArgumentException exception = new IllegalArgumentException(String.format(
"Two or more headers map to the same header name in headerNameMapping %s <outbound adapter %s>",
properties.getExtension().getHeaderNameMapping(), id));
LOGGER.warn(exception.getMessage());
throw exception;
}
}
} catch (Exception e) {
String msg = String.format("Unable to get a message producer for session %s", jcsmpSession.getSessionName());
LOGGER.warn(msg, e);
throw new RuntimeException(msg, e);
}

createProducerInternal();
isRunning = true;
}
}

try {
Map<String, String> headerNameMapping = properties.getExtension().getHeaderNameMapping();
if (headerNameMapping != null && !headerNameMapping.isEmpty()) {
Set<String> uniqueTargetHeaderNames = new HashSet<>(headerNameMapping.values());
if (uniqueTargetHeaderNames.size() < headerNameMapping.size()) {
IllegalArgumentException exception = new IllegalArgumentException(String.format(
"Two or more headers map to the same header name in headerNameMapping %s <outbound adapter %s>",
properties.getExtension().getHeaderNameMapping(), id));
LOGGER.warn(exception.getMessage());
throw exception;
private void createProducerInternal() {
synchronized (lifecycleLock) {
try {
producerManager.get(id);
if (properties.getExtension().isTransacted()) {
LOGGER.info("Creating transacted session <message handler ID: {}>", id);
transactedSession = jcsmpSession.createTransactedSession();
producer = transactedSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession),
producerEventHandler);
} else {
producer = jcsmpSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession),
producerEventHandler);
}
} catch (Exception e) {
String msg = String.format("Unable to get a message producer for session %s", jcsmpSession.getSessionName());
LOGGER.warn(msg, e);
closeResources();
throw new RuntimeException(msg, e);
}
}
}

producerManager.get(id);
if (properties.getExtension().isTransacted()) {
LOGGER.info("Creating transacted session <message handler ID: {}>", id);
transactedSession = jcsmpSession.createTransactedSession();
producer = transactedSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession),
producerEventHandler);
} else {
producer = jcsmpSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession),
producerEventHandler);
private void recreateProducerIfNeeded(ErrorChannelSendingCorrelationKey correlationKey) throws MessagingException {
if (!recreateProducer && !producer.isClosed()) {
return;
}
synchronized (lifecycleLock) {
if (!recreateProducer && !producer.isClosed()) {
return;
}
} catch (Exception e) {
String msg = String.format("Unable to get a message producer for session %s", jcsmpSession.getSessionName());
LOGGER.warn(msg, e);
LOGGER.debug("Recreating JCSMP producer for binding {} after stale-flow detection <message handler ID: {}>",
properties.getBindingName(), id);
closeResources();
throw new RuntimeException(msg, e);
try {
createProducerInternal();
recreateProducer = false;
} catch (RuntimeException createError) {
recreateProducer = true;
// unwrap createProducerInternal()'s wrapper exception if necessary
Exception toReport = (createError.getCause() instanceof Exception unwrapped)
? unwrapped : createError;
throw handleMessagingException(correlationKey,
"Failed to recreate JCSMP producer after stale-flow detection", toReport);
}
}

isRunning = true;
}

@Override
public void stop() {
if (!isRunning()) return;
closeResources();
isRunning = false;
synchronized (lifecycleLock) {
if (!isRunning()) return;
closeResources();
isRunning = false;
}
}

private void closeResources() {
LOGGER.info("Stopping producer to {} {} <message handler ID: {}>", configDestinationType, configDestination.getName(), id);
if (producer != null) {
LOGGER.info("Closing producer <message handler ID: {}>", id);
producer.close();
}
if (transactedSession != null) {
LOGGER.info("Closing transacted session <message handler ID: {}>", id);
transactedSession.close();
synchronized (lifecycleLock) {
LOGGER.info("Stopping producer to {} {} <message handler ID: {}>", configDestinationType, configDestination.getName(), id);
recreateProducer = false;
if (producer != null) {
LOGGER.info("Closing producer <message handler ID: {}>", id);
producer.close();
}
if (transactedSession != null) {
LOGGER.info("Closing transacted session <message handler ID: {}>", id);
transactedSession.close();
}
producerManager.release(id);
}
producerManager.release(id);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.solace.spring.cloud.stream.binder.util;

import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
import com.solacesystems.jcsmp.ClosedFacilityException;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPTransportException;
import com.solacesystems.jcsmp.Queue;
import com.solacesystems.jcsmp.StaleSessionException;
import com.solacesystems.jcsmp.XMLMessage;
import com.solacesystems.jcsmp.XMLMessageProducer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -34,6 +37,11 @@ public void send(MessageContainer messageContainer, ErrorQueueRepublishCorrelati
XMLMessageProducer producer;
try {
producer = producerManager.get(producerKey);
if (producer.isClosed()) {
LOGGER.warn("Detected closed shared JCSMP producer before sending to error queue {}; recreating",
errorQueueName);
producer = producerManager.forceRecreate(producer);
}
} catch (Exception e) {
MessagingException wrappedException = new MessagingException(
String.format("Failed to get producer to send message %s to queue %s", xmlMessage.getMessageId(),
Expand All @@ -42,7 +50,25 @@ public void send(MessageContainer messageContainer, ErrorQueueRepublishCorrelati
throw wrappedException;
}

producer.send(xmlMessage, queue);
try {
producer.send(xmlMessage, queue);
} catch (JCSMPException e) {
if (e instanceof StaleSessionException
|| e instanceof JCSMPTransportException
|| e instanceof ClosedFacilityException
|| producer.isClosed()) {
LOGGER.debug("Detected stale shared JCSMP producer while sending to error queue {} (cause: {}); " +
"recreating for next attempt",
errorQueueName, e.getClass().getSimpleName());
try {
producerManager.forceRecreate(producer);
} catch (Exception recreateError) {
LOGGER.warn("Failed to recreate shared JCSMP producer after stale-flow detection", recreateError);
e.addSuppressed(recreateError);
}
}
throw e;
}
}

public ErrorQueueRepublishCorrelationKey createCorrelationKey(MessageContainer messageContainer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,32 @@ public T get(String key) throws Exception {
return sharedResource;
}

/**
* Compare-and-swap the shared resource. If the manager still holds {@code expected},
* close it and {@link #create()} a fresh one; otherwise return the currently-installed
* resource without re-creating.
*
* @param expected the reference the caller observed and considers no longer usable
* @return the resource currently installed in the manager
* @throws Exception whatever {@link #create()} may throw
*/
public T forceRecreate(T expected) throws Exception {
synchronized (lock) {
if (sharedResource != expected) {
return sharedResource;
}
if (sharedResource != null) {
try {
close();
} catch (Exception e) {
LOGGER.debug("Failed to close current {} during forceRecreate", type, e);
}
}
sharedResource = create();
return sharedResource;
}
}

/**
* De-register {@code key} from the shared resource.
* <p>If this is the last {@code key} associated to the shared resource, {@link #close()} the resource.
Expand Down
Loading