From 3216647da22d7fbf1a037fa4e106b0de1ef3df6a Mon Sep 17 00:00:00 2001 From: lordcheng10 Date: Fri, 21 Apr 2023 15:39:03 +0800 Subject: [PATCH 1/2] support config receve buffer --- .../pulsar/broker/ServiceConfiguration.java | 20 +++++++++++++++++++ .../pulsar/broker/service/BrokerService.java | 5 ++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index ba82667690dc5..cc4b0e60c80bd 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -844,6 +844,26 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, + "it can be enabled to reduce the memory consumption caused by pendingAcks.") private boolean autoShrinkForConsumerPendingAcksMap = false; + @FieldContext( + category = CATEGORY_SERVER, + dynamic = true, + doc = "The minimum receive buffer size for AdaptiveRecvByteBufAllocator" + ) + private int minReceiveByteBuf = 1024; + + @FieldContext( + category = CATEGORY_SERVER, + dynamic = true, + doc = "The init receive buffer size for AdaptiveRecvByteBufAllocator" + ) + private int initReceiveByteBuf = 16 * 1024; + + @FieldContext( + category = CATEGORY_SERVER, + dynamic = true, + doc = "The max receive buffer size for AdaptiveRecvByteBufAllocator" + ) + private int maxReceiveByteBuf = 1 * 1024 * 1024; @FieldContext( category = CATEGORY_SERVER, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index a6345f4a56a71..e54d0da5a0e27 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -442,8 +442,11 @@ private void startProtocolHandler(String protocol, bootstrap.option(ChannelOption.SO_REUSEADDR, true); bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); bootstrap.childOption(ChannelOption.TCP_NODELAY, true); + int minReceiveByteBuf = pulsar().getConfiguration().getMinReceiveByteBuf(); + int initReceiveByteBuf = pulsar().getConfiguration().getInitReceiveByteBuf(); + int maxReceiveByteBuf = pulsar().getConfiguration().getMaxReceiveByteBuf(); bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, - new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024)); + new AdaptiveRecvByteBufAllocator(minReceiveByteBuf, initReceiveByteBuf, maxReceiveByteBuf)); EventLoopUtil.enableTriggeredMode(bootstrap); DefaultThreadFactory defaultThreadFactory = new ExecutorProvider.ExtendedThreadFactory("pulsar-ph-" + protocol); From db74b661da60b9041b46cfd3d9ad501e88f1ec14 Mon Sep 17 00:00:00 2001 From: lordcheng10 Date: Fri, 21 Apr 2023 16:19:09 +0800 Subject: [PATCH 2/2] support receive buffer config --- .../org/apache/pulsar/broker/service/BrokerService.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index e54d0da5a0e27..61e5e015cd123 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -473,8 +473,12 @@ private ServerBootstrap defaultServerBootstrap() { bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); bootstrap.group(acceptorGroup, workerGroup); bootstrap.childOption(ChannelOption.TCP_NODELAY, true); + int minReceiveByteBuf = pulsar().getConfiguration().getMinReceiveByteBuf(); + int initReceiveByteBuf = pulsar().getConfiguration().getInitReceiveByteBuf(); + int maxReceiveByteBuf = pulsar().getConfiguration().getMaxReceiveByteBuf(); bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, - new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024)); + new AdaptiveRecvByteBufAllocator(minReceiveByteBuf, + initReceiveByteBuf, maxReceiveByteBuf)); bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup)); EventLoopUtil.enableTriggeredMode(bootstrap); return bootstrap;