Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ muzzle {
}

apply from: "$rootDir/gradle/java.gradle"
apply plugin: 'java-test-fixtures'

repositories {
maven {
Expand All @@ -33,6 +34,8 @@ tasks.named("latestDepTest", Test) {
dependencies {
compileOnly group: 'javax.jms', name: 'jms-api', version: '1.1-rev-1'

testFixturesCompileOnly group: 'javax.jms', name: 'jms-api', version: '1.1-rev-1'

testImplementation project(':dd-java-agent:instrumentation:datadog:tracing:trace-annotation')
testImplementation group: 'org.apache.activemq.tooling', name: 'activemq-junit', version: '5.14.5'
testImplementation group: 'org.apache.activemq', name: 'activemq-pool', version: '5.14.5'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueSender;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -265,6 +268,18 @@ public CharSequence toResourceName(String destinationName, boolean isQueue) {
return joiner.apply(destinationName);
}

public Destination getDestination(final MessageProducer messageProducer) throws JMSException {
try {
return messageProducer.getDestination(); // >= 1.1
} catch (AbstractMethodError ignored) {
// <=1.1 getDestination is not available so we need to pay an additional instanceOf
if (messageProducer instanceof QueueSender) {
return ((QueueSender) messageProducer).getQueue();
}
return ((TopicPublisher) messageProducer).getTopic();
}
}

public String getDestinationName(Destination destination) {
String name = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ public static AgentScope beforeSend(
// fall-back when producer wasn't created via standard Session.createProducer API
if (null != producerState) {
resourceName = producerState.getResourceName();
Destination destination = producer.getDestination();
Destination destination = PRODUCER_DECORATE.getDestination(producer);
destinationName = PRODUCER_DECORATE.getDestinationName(destination);
} else {
Destination destination = producer.getDestination();
Destination destination = PRODUCER_DECORATE.getDestination(producer);
destinationName = PRODUCER_DECORATE.getDestinationName(destination);
boolean isQueue = PRODUCER_DECORATE.isQueue(destination);
resourceName = PRODUCER_DECORATE.toResourceName(destinationName, isQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public static void bindProducerState(
int ackMode;
try {
ackMode = session.getAcknowledgeMode();
} catch (Exception ignored) {
} catch (Throwable ignored) {
ackMode = Session.AUTO_ACKNOWLEDGE;
}
sessionState =
Expand Down Expand Up @@ -155,7 +155,7 @@ public static void bindConsumerState(
int ackMode;
try {
ackMode = session.getAcknowledgeMode();
} catch (Exception ignored) {
} catch (Throwable ignored) {
ackMode = Session.AUTO_ACKNOWLEDGE;
Comment on lines +158 to 159
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve JMS 1.0 session ack mode

When a JMS 1.0 provider uses QueueConnection.createQueueSession/TopicConnection.createTopicSession with transacted=true or Session.CLIENT_ACKNOWLEDGE, getAcknowledgeMode() is exactly the 1.1 method that raises AbstractMethodError, so this fallback records the SessionState as AUTO_ACKNOWLEDGE. JMSMessageConsumerInstrumentation only defers spans to acknowledge()/commit() when SessionState reports client-ack/transacted, and the Commit/Recover advices have the same guards, so these JMS 1.0 sessions now finish spans and time-in-queue batches with auto-ack semantics instead of the session's actual semantics. The fallback should preserve at least the transacted/client-ack mode from creation or getTransacted() rather than defaulting every error to AUTO.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been done on purpose - the tradeoff is to have not exact time here but still having 1.0 supported. Otherwise I would have needed to instrument also javax.jms.Connection (and all the subclasses) to capture that info when createSession/createTopicSession/createQueueSession are called. And that would have been made the instrumentation way heavier just to support few library versions. The tradeoff that we have in this PR is to me acceptable

}
sessionState =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,32 @@ import datadog.trace.agent.test.naming.VersionedNamingTestBase
import datadog.trace.api.Config
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.Trace
import datadog.trace.api.config.TracerConfig
import datadog.trace.api.config.TraceInstrumentationConfig
import datadog.trace.api.config.TracerConfig
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags
import datadog.trace.bootstrap.instrumentation.api.Tags
import datadog.trace.core.DDSpan
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.activemq.command.ActiveMQTextMessage
import org.apache.activemq.junit.EmbeddedActiveMQBroker
import spock.lang.Shared

import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference
import javax.jms.Connection
import javax.jms.ConnectionFactory
import javax.jms.Destination
import javax.jms.Message
import javax.jms.MessageListener
import javax.jms.Queue
import javax.jms.QueueConnection
import javax.jms.QueueSession
import javax.jms.Session
import javax.jms.TemporaryQueue
import javax.jms.TemporaryTopic
import javax.jms.Queue
import javax.jms.Topic
import javax.jms.TextMessage
import javax.jms.Topic
import javax.jms.TopicConnection
import javax.jms.TopicSession
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference
import jms10mock.Jms10ConnectionFactory
import org.apache.activemq.command.ActiveMQTextMessage
import org.apache.activemq.junit.EmbeddedActiveMQBroker
import spock.lang.Shared

abstract class JMS1Test extends VersionedNamingTestBase {
@Shared
Expand Down Expand Up @@ -69,9 +69,13 @@ abstract class JMS1Test extends VersionedNamingTestBase {
true
}

def createConnectionFactory() {
broker.createConnectionFactory()
}

def setupSpec() {
broker.start()
final ActiveMQConnectionFactory connectionFactory = broker.createConnectionFactory()
final ConnectionFactory connectionFactory = createConnectionFactory()

connection = connectionFactory.createConnection()
connection.start()
Expand Down Expand Up @@ -1097,3 +1101,10 @@ class JMS1V1ForkedTest extends JMS1Test {
"jms.process"
}
}

class JMS10Test extends JMS1V0Test {
@Override
def createConnectionFactory() {
new Jms10ConnectionFactory(super.createConnectionFactory())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package jms10mock;

import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;

/** Wraps a real {@link Connection} but simulates a JMS 1.0 provider. */
public class Jms10Connection implements QueueConnection, TopicConnection {
private final Connection delegate;

public Jms10Connection(Connection delegate) {
this.delegate = delegate;
}

// --- JMS 1.1-only unified Connection method ---

@Override
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
throw new AbstractMethodError(
"JMS 1.0 provider does not implement createSession(boolean, int) on Connection");
}

// --- JMS 1.0 QueueConnection methods ---

@Override
public QueueSession createQueueSession(boolean transacted, int acknowledgeMode)
throws JMSException {
return new Jms10Session(delegate.createSession(transacted, acknowledgeMode));
}

// --- JMS 1.0 TopicConnection methods ---

@Override
public TopicSession createTopicSession(boolean transacted, int acknowledgeMode)
throws JMSException {
return new Jms10Session(delegate.createSession(transacted, acknowledgeMode));
}

// --- Common Connection methods ---

@Override
public String getClientID() throws JMSException {
return delegate.getClientID();
}

@Override
public void setClientID(String clientID) throws JMSException {
delegate.setClientID(clientID);
}

@Override
public ConnectionMetaData getMetaData() throws JMSException {
return delegate.getMetaData();
}

@Override
public ExceptionListener getExceptionListener() throws JMSException {
return delegate.getExceptionListener();
}

@Override
public void setExceptionListener(ExceptionListener listener) throws JMSException {
delegate.setExceptionListener(listener);
}

@Override
public void start() throws JMSException {
delegate.start();
}

@Override
public void stop() throws JMSException {
delegate.stop();
}

@Override
public void close() throws JMSException {
delegate.close();
}

// --- ConnectionConsumer methods — not commonly used, throw for JMS 1.1 unified form ---

@Override
public ConnectionConsumer createConnectionConsumer(
Destination destination,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages)
throws JMSException {
throw new AbstractMethodError(
"JMS 1.0 provider does not implement createConnectionConsumer(Destination, ...)");
}

@Override
public ConnectionConsumer createConnectionConsumer(
Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
throws JMSException {
return delegate.createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages);
}

@Override
public ConnectionConsumer createConnectionConsumer(
Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
throws JMSException {
return delegate.createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages);
}

@Override
public ConnectionConsumer createDurableConnectionConsumer(
Topic topic,
String subscriptionName,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages)
throws JMSException {
return delegate.createDurableConnectionConsumer(
topic, subscriptionName, messageSelector, sessionPool, maxMessages);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package jms10mock;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;

/**
* Wraps a real {@link ConnectionFactory} but simulates a JMS 1.0 provider.
*
* <p>In JMS 1.0, clients used the domain-specific {@link QueueConnectionFactory} and {@link
* TopicConnectionFactory} to obtain connections. The unified {@link ConnectionFactory} and its
* {@code createConnection()} methods are JMS 1.1 additions that this wrapper does not support.
*/
public class Jms10ConnectionFactory implements QueueConnectionFactory, TopicConnectionFactory {
private final ConnectionFactory delegate;

public Jms10ConnectionFactory(ConnectionFactory delegate) {
this.delegate = delegate;
}

// --- JMS 1.1-only unified ConnectionFactory methods ---

@Override
public Connection createConnection() throws JMSException {
return delegate.createConnection();
}

@Override
public Connection createConnection(String userName, String password) throws JMSException {
return delegate.createConnection(userName, password);
}

// --- JMS 1.0 QueueConnectionFactory methods ---
@Override
public QueueConnection createQueueConnection() throws JMSException {
return new Jms10Connection(delegate.createConnection());
}

@Override
public QueueConnection createQueueConnection(String userName, String password)
throws JMSException {
return new Jms10Connection(delegate.createConnection(userName, password));
}

// --- JMS 1.0 TopicConnectionFactory methods ---

@Override
public TopicConnection createTopicConnection() throws JMSException {
return new Jms10Connection(delegate.createConnection());
}

@Override
public TopicConnection createTopicConnection(String userName, String password)
throws JMSException {
return new Jms10Connection(delegate.createConnection(userName, password));
}
}
Loading
Loading