diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java index 4a9b33a9afdb1..5104ec51a3d7c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java @@ -20,12 +20,14 @@ import java.util.Collections; import java.util.List; +import net.jcip.annotations.NotThreadSafe; import org.apache.bookkeeper.mledger.Entry; import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.broker.service.plugin.FilterContext; import org.apache.pulsar.common.api.proto.MessageMetadata; +@NotThreadSafe public class EntryFilterSupport { protected final List entryFilters; @@ -58,7 +60,11 @@ public EntryFilterSupport(Subscription subscription) { hasFilter = CollectionUtils.isNotEmpty(entryFilters); } - public EntryFilter.FilterResult runFiltersForEntry(Entry entry, MessageMetadata msgMetadata, + /** + * Do not use this method outside dispatcher, + * the `filterContext` field is bound at dispatcher level and to avoid objection creation. + */ + protected EntryFilter.FilterResult runFiltersForEntry(Entry entry, MessageMetadata msgMetadata, Consumer consumer) { if (hasFilter) { fillContext(filterContext, msgMetadata, subscription, consumer); @@ -68,6 +74,21 @@ public EntryFilter.FilterResult runFiltersForEntry(Entry entry, MessageMetadata } } + /** + * runFiltersForAnalyzeBacklog + * the difference is the creation of the filterContext. + */ + public EntryFilter.FilterResult runFiltersForAnalyzeBacklog(Entry entry, MessageMetadata msgMetadata, + Consumer consumer) { + if (hasFilter) { + FilterContext context = new FilterContext(); + fillContext(context, msgMetadata, subscription, consumer); + return getFilterResult(context, entry, entryFilters); + } else { + return EntryFilter.FilterResult.ACCEPT; + } + } + private void fillContext(FilterContext context, MessageMetadata msgMetadata, Subscription subscription, Consumer consumer) { context.reset(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index d283cac77c7b6..1761796f1e8d5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -566,7 +566,7 @@ public CompletableFuture analyzeBacklog(Optional numMessages = messageMetadata.getNumMessagesInBatch(); } EntryFilter.FilterResult filterResult = entryFilterSupport - .runFiltersForEntry(entry, messageMetadata, null); + .runFiltersForAnalyzeBacklog(entry, messageMetadata, null); if (filterResult == null) { filterResult = EntryFilter.FilterResult.ACCEPT;