diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java index 835fb5b4ecf35..e872d7e873834 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java @@ -421,11 +421,20 @@ public void testMultiTopicConsumerTracing() throws Exception { // Verify spans for both topics List spans = spanExporter.getFinishedSpanItems(); - long consumerSpans = spans.stream() + List consumerSpans = spans.stream() .filter(s -> s.getKind() == SpanKind.CONSUMER) - .count(); - - assertEquals(consumerSpans, 2); + .collect(java.util.stream.Collectors.toList()); + Set consumerSpanTopics = consumerSpans.stream() + .map(s -> s.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.destination.name"))) + .collect(java.util.stream.Collectors.toSet()); + Set consumerSpanNames = consumerSpans.stream() + .map(SpanData::getName) + .collect(java.util.stream.Collectors.toSet()); + + assertEquals(consumerSpans.size(), 2); + assertEquals(consumerSpanTopics, Set.of(topic1, topic2)); + assertEquals(consumerSpanNames, Set.of("process " + topic1, "process " + topic2)); producer1.close(); producer2.close(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java index 79d13cd22493b..83e87805eabc2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java @@ -63,7 +63,11 @@ public class OpenTelemetryConsumerInterceptor implements ConsumerInterceptor< private final Tracer tracer; private final TextMapPropagator propagator; - private String topic; + /** + * Topic returned by the consumer. Used as a fallback when a message has no topic name, + * and as the topic key for acknowledgment callbacks that only carry a MessageId. + */ + private String consumerTopic; private String subscription; /** @@ -92,7 +96,15 @@ private String getTopicKey(MessageId messageId) { if (messageId instanceof TopicMessageId) { return ((TopicMessageId) messageId).getOwnerTopic(); } - return topic != null ? topic : ""; + return consumerTopic != null ? consumerTopic : ""; + } + + private String getMessageTopic(Message message) { + String messageTopic = message.getTopicName(); + if (messageTopic != null && !messageTopic.isEmpty()) { + return messageTopic; + } + return consumerTopic != null ? consumerTopic : ""; } @Override @@ -111,15 +123,16 @@ public Message beforeConsume(Consumer consumer, Message message) { } try { - if (topic == null) { - topic = consumer.getTopic(); + if (consumerTopic == null) { + consumerTopic = consumer.getTopic(); } if (subscription == null) { subscription = consumer.getSubscription(); } + String messageTopic = getMessageTopic(message); // Create a consumer span for this message - Span span = TracingContext.createConsumerSpan(tracer, topic, subscription, message, propagator); + Span span = TracingContext.createConsumerSpan(tracer, messageTopic, subscription, message, propagator); if (TracingContext.isValid(span)) { MessageId messageId = message.getMessageId(); @@ -137,7 +150,7 @@ public Message beforeConsume(Consumer consumer, Message message) { } log.debug().attr("messageId", messageId) - .attr("topic", topic) + .attr("topic", messageTopic) .log("Created consumer span"); } } catch (Exception e) { @@ -334,4 +347,4 @@ public void onAckTimeoutSend(Consumer consumer, Set messageIds) { } } } -} \ No newline at end of file +}