From b04312a8729affad77ea2c030216bd3a4d02f05b Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 25 Jun 2025 01:27:43 +0800 Subject: [PATCH] [fix][prox]Pulsar Proxy OOM because forget to clear topic name cache --- conf/proxy.conf | 8 ++++++++ conf/websocket.conf | 8 ++++++++ .../pulsar/broker/service/BrokerService.java | 1 + .../apache/pulsar/common/naming/TopicName.java | 14 +++++++++++++- .../pulsar/proxy/server/ProxyConfiguration.java | 15 +++++++++++++++ .../apache/pulsar/proxy/server/ProxyService.java | 10 +++++++++- .../proxy/server/ProxyConfigurationTest.java | 12 ++++++++++++ .../apache/pulsar/websocket/WebSocketService.java | 9 +++++++++ 8 files changed, 75 insertions(+), 2 deletions(-) diff --git a/conf/proxy.conf b/conf/proxy.conf index 46d84744e1297..5f20c0ae18205 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -39,6 +39,14 @@ brokerServiceURLTLS= brokerWebServiceURL= brokerWebServiceURLTLS= +# Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache +# per "maxSecondsToClearTopicNameCache", it does not mean broker will not cache TopicName. +topicNameCacheMaxCapacity=100000 + +# A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache frequently when +# there are too many topics are in use. +maxSecondsToClearTopicNameCache=7200 + # If function workers are setup in a separate cluster, configure the following 2 settings. This url should point to # the discovery service provider of the function workers cluster, and does not support multi urls yet. functionWorkerWebServiceURL= diff --git a/conf/websocket.conf b/conf/websocket.conf index 91f7f7d4c23bb..9e87fdf525726 100644 --- a/conf/websocket.conf +++ b/conf/websocket.conf @@ -92,6 +92,14 @@ maxHttpServerConnections=2048 # Max concurrent web requests maxConcurrentHttpRequests=1024 +# Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache +# per "maxSecondsToClearTopicNameCache", it does not mean broker will not cache TopicName. +topicNameCacheMaxCapacity=100000 + +# A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache frequently when +# there are too many topics are in use. +maxSecondsToClearTopicNameCache=7200 + ### --- Authentication --- ### # Enable authentication diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 359c0daf5b8ea..0e7ad4165b742 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -651,6 +651,7 @@ protected void startClearInvalidateTopicNameCacheTask() { maxSecondsToClearTopicNameCache, maxSecondsToClearTopicNameCache, TimeUnit.SECONDS); + TopicName.setEvictCacheByScheduledTask(true); } protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpdateFrequencyInSecs) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index b2f96bfe6e259..7d1e783ff786f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import lombok.Getter; +import lombok.Setter; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.util.Codec; @@ -37,6 +39,8 @@ public class TopicName implements ServiceUnitId { public static final String PARTITIONED_TOPIC_SUFFIX = "-partition-"; + public static final int CLIENT_MAX_TOPIC_NAME_CACHE = 10_000; + private final String completeTopicName; private final TopicDomain domain; @@ -51,6 +55,10 @@ public class TopicName implements ServiceUnitId { private static final ConcurrentHashMap cache = new ConcurrentHashMap<>(); + @Getter + @Setter + private static boolean evictCacheByScheduledTask = false; + public static void clearIfReachedMaxCapacity(int maxCapacity) { if (maxCapacity < 0) { // Unlimited cache. @@ -82,7 +90,11 @@ public static TopicName get(String topic) { if (tp != null) { return tp; } - return cache.computeIfAbsent(topic, k -> new TopicName(k)); + tp = cache.computeIfAbsent(topic, k -> new TopicName(k)); + if (!evictCacheByScheduledTask) { + clearIfReachedMaxCapacity(CLIENT_MAX_TOPIC_NAME_CACHE); + } + return tp; } public static TopicName getPartitionedTopicName(String topic) { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index d89801d360b1c..507278c7421f1 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -173,6 +173,21 @@ public class ProxyConfiguration implements PulsarConfiguration { ) private boolean zooKeeperAllowReadOnlyOperations; + @FieldContext( + dynamic = true, + category = CATEGORY_SERVER, + doc = "Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache" + + " per maxSecondsToClearTopicNameCache, it does not mean broker will not cache TopicName." + ) + private int topicNameCacheMaxCapacity = 100_000; + + @FieldContext( + category = CATEGORY_SERVER, + doc = "A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache" + + " frequently when there are too many topics are in use." + ) + private int maxSecondsToClearTopicNameCache = 3600 * 2; + @FieldContext( category = CATEGORY_BROKER_DISCOVERY, doc = "If does not set metadataStoreUrl or configurationMetadataStoreUrl, this url should point to the" diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 11afb68398e19..f760b1f7fa033 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -66,6 +66,7 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.netty.DnsResolverUtil; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -236,7 +237,6 @@ public void start() throws Exception { bootstrap.childOption(ChannelOption.TCP_NODELAY, true); bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024)); - Class serverSocketChannelClass = EventLoopUtil.getServerSocketChannelClass(workerGroup); bootstrap.channel(serverSocketChannelClass); @@ -246,6 +246,14 @@ public void start() throws Exception { && EpollServerSocketChannel.class.isAssignableFrom(serverSocketChannelClass)) { proxyZeroCopyModeEnabled = true; } + // Start the task the clean topic name object cache. + final int maxSecondsToClearTopicNameCache = proxyConfig.getMaxSecondsToClearTopicNameCache(); + workerGroup.scheduleAtFixedRate( + () -> TopicName.clearIfReachedMaxCapacity(proxyConfig.getTopicNameCacheMaxCapacity()), + maxSecondsToClearTopicNameCache, + maxSecondsToClearTopicNameCache, + TimeUnit.SECONDS); + TopicName.setEvictCacheByScheduledTask(true); bootstrap.childHandler(new ServiceChannelInitializer(this, proxyConfig, false, null)); // Bind and start to accept incoming connections. diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConfigurationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConfigurationTest.java index 18e7efbd7b5c6..a1f5c239d0271 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConfigurationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConfigurationTest.java @@ -73,6 +73,8 @@ public void testBackwardCompatibility() throws IOException { printWriter.println("zookeeperSessionTimeoutMs=60"); printWriter.println("zooKeeperCacheExpirySeconds=500"); printWriter.println("httpMaxRequestHeaderSize=1234"); + printWriter.println("topicNameCacheMaxCapacity=20000"); + printWriter.println("maxSecondsToClearTopicNameCache=1800"); } testConfigFile.deleteOnExit(); InputStream stream = new FileInputStream(testConfigFile); @@ -81,6 +83,8 @@ public void testBackwardCompatibility() throws IOException { assertEquals(serviceConfig.getMetadataStoreSessionTimeoutMillis(), 60); assertEquals(serviceConfig.getMetadataStoreCacheExpirySeconds(), 500); assertEquals(serviceConfig.getHttpMaxRequestHeaderSize(), 1234); + assertEquals(serviceConfig.getTopicNameCacheMaxCapacity(), 20000); + assertEquals(serviceConfig.getMaxSecondsToClearTopicNameCache(), 1800); testConfigFile = new File("tmp." + System.currentTimeMillis() + ".properties"); if (testConfigFile.exists()) { @@ -91,6 +95,8 @@ public void testBackwardCompatibility() throws IOException { printWriter.println("metadataStoreCacheExpirySeconds=500"); printWriter.println("zooKeeperSessionTimeoutMillis=-1"); printWriter.println("zooKeeperCacheExpirySeconds=-1"); + printWriter.println("topicNameCacheMaxCapacity=200"); + printWriter.println("maxSecondsToClearTopicNameCache=900"); } testConfigFile.deleteOnExit(); stream = new FileInputStream(testConfigFile); @@ -98,6 +104,8 @@ public void testBackwardCompatibility() throws IOException { stream.close(); assertEquals(serviceConfig.getMetadataStoreSessionTimeoutMillis(), 60); assertEquals(serviceConfig.getMetadataStoreCacheExpirySeconds(), 500); + assertEquals(serviceConfig.getTopicNameCacheMaxCapacity(), 200); + assertEquals(serviceConfig.getMaxSecondsToClearTopicNameCache(), 900); testConfigFile = new File("tmp." + System.currentTimeMillis() + ".properties"); if (testConfigFile.exists()) { @@ -108,6 +116,8 @@ public void testBackwardCompatibility() throws IOException { printWriter.println("metadataStoreCacheExpirySeconds=30"); printWriter.println("zookeeperSessionTimeoutMs=100"); printWriter.println("zooKeeperCacheExpirySeconds=300"); + printWriter.println("topicNameCacheMaxCapacity=100"); + printWriter.println("maxSecondsToClearTopicNameCache=100"); } testConfigFile.deleteOnExit(); stream = new FileInputStream(testConfigFile); @@ -115,6 +125,8 @@ public void testBackwardCompatibility() throws IOException { stream.close(); assertEquals(serviceConfig.getMetadataStoreSessionTimeoutMillis(), 100); assertEquals(serviceConfig.getMetadataStoreCacheExpirySeconds(), 300); + assertEquals(serviceConfig.getTopicNameCacheMaxCapacity(), 100); + assertEquals(serviceConfig.getMaxSecondsToClearTopicNameCache(), 100); } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java index 7bb4df7baa533..27202dc9ecd92 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java @@ -46,6 +46,7 @@ import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.internal.PropertiesUtils; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; @@ -128,6 +129,14 @@ public void start() throws PulsarServerException, PulsarClientException, Malform throw new PulsarServerException(e); } } + // Start the task the clean topic name object cache. + final int maxSecondsToClearTopicNameCache = config.getMaxSecondsToClearTopicNameCache(); + executor.scheduleAtFixedRate( + () -> TopicName.clearIfReachedMaxCapacity(config.getTopicNameCacheMaxCapacity()), + maxSecondsToClearTopicNameCache, + maxSecondsToClearTopicNameCache, + TimeUnit.SECONDS); + TopicName.setEvictCacheByScheduledTask(true); log.info("Pulsar WebSocket Service started"); }