diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index ba82667690dc5..cc4b0e60c80bd 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -844,6 +844,26 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, + "it can be enabled to reduce the memory consumption caused by pendingAcks.") private boolean autoShrinkForConsumerPendingAcksMap = false; + @FieldContext( + category = CATEGORY_SERVER, + dynamic = true, + doc = "The minimum receive buffer size for AdaptiveRecvByteBufAllocator" + ) + private int minReceiveByteBuf = 1024; + + @FieldContext( + category = CATEGORY_SERVER, + dynamic = true, + doc = "The init receive buffer size for AdaptiveRecvByteBufAllocator" + ) + private int initReceiveByteBuf = 16 * 1024; + + @FieldContext( + category = CATEGORY_SERVER, + dynamic = true, + doc = "The max receive buffer size for AdaptiveRecvByteBufAllocator" + ) + private int maxReceiveByteBuf = 1 * 1024 * 1024; @FieldContext( category = CATEGORY_SERVER, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index a6345f4a56a71..61e5e015cd123 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -442,8 +442,11 @@ private void startProtocolHandler(String protocol, bootstrap.option(ChannelOption.SO_REUSEADDR, true); bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); bootstrap.childOption(ChannelOption.TCP_NODELAY, true); + int minReceiveByteBuf = pulsar().getConfiguration().getMinReceiveByteBuf(); + int initReceiveByteBuf = pulsar().getConfiguration().getInitReceiveByteBuf(); + int maxReceiveByteBuf = pulsar().getConfiguration().getMaxReceiveByteBuf(); bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, - new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024)); + new AdaptiveRecvByteBufAllocator(minReceiveByteBuf, initReceiveByteBuf, maxReceiveByteBuf)); EventLoopUtil.enableTriggeredMode(bootstrap); DefaultThreadFactory defaultThreadFactory = new ExecutorProvider.ExtendedThreadFactory("pulsar-ph-" + protocol); @@ -470,8 +473,12 @@ private ServerBootstrap defaultServerBootstrap() { bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); bootstrap.group(acceptorGroup, workerGroup); bootstrap.childOption(ChannelOption.TCP_NODELAY, true); + int minReceiveByteBuf = pulsar().getConfiguration().getMinReceiveByteBuf(); + int initReceiveByteBuf = pulsar().getConfiguration().getInitReceiveByteBuf(); + int maxReceiveByteBuf = pulsar().getConfiguration().getMaxReceiveByteBuf(); bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, - new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024)); + new AdaptiveRecvByteBufAllocator(minReceiveByteBuf, + initReceiveByteBuf, maxReceiveByteBuf)); bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup)); EventLoopUtil.enableTriggeredMode(bootstrap); return bootstrap;