From 2bfa0988e5b7d5aebb1cc9a3a1399e94b80c4d15 Mon Sep 17 00:00:00 2001 From: C5371248 Date: Tue, 9 Jun 2026 17:19:49 +0300 Subject: [PATCH] feat: offload large payload serialization to worker thread When a DataVerticle replies with a JsonArray or JsonObject over the clustered event bus, the Jackson serialization in encodeToWire can block the event loop for large payloads. This change offloads all JSON reply serialization to a worker thread via executeBlocking. A configurable threshold (eventBusLargePayloadThreshold) controls when a warning is logged about large payloads. It defaults to the jsonMaxStringSize value if set, aligning both inbound parsing limits and outbound payload monitoring to the same configured size. --- .../config/NeonBeeConfigConverter.java | 6 ++++ .../java/io/neonbee/config/NeonBeeConfig.java | 28 +++++++++++++++++ .../java/io/neonbee/data/DataVerticle.java | 30 +++++++++++++++++-- 3 files changed, 62 insertions(+), 2 deletions(-) diff --git a/src/generated/java/io/neonbee/config/NeonBeeConfigConverter.java b/src/generated/java/io/neonbee/config/NeonBeeConfigConverter.java index bb185c21a..03d6515ab 100644 --- a/src/generated/java/io/neonbee/config/NeonBeeConfigConverter.java +++ b/src/generated/java/io/neonbee/config/NeonBeeConfigConverter.java @@ -100,6 +100,11 @@ static void fromJson(Iterable> json, NeonBee obj.setJsonMaxStringSize(((Number) member.getValue()).intValue()); } break; + case "eventBusLargePayloadThreshold": + if (member.getValue() instanceof Number) { + obj.setEventBusLargePayloadThreshold(((Number) member.getValue()).intValue()); + } + break; } } } @@ -151,5 +156,6 @@ static void toJson(NeonBeeConfig obj, java.util.Map json) { json.put("micrometerRegistries", array); } json.put("jsonMaxStringSize", obj.getJsonMaxStringSize()); + json.put("eventBusLargePayloadThreshold", obj.getEventBusLargePayloadThreshold()); } } diff --git a/src/main/java/io/neonbee/config/NeonBeeConfig.java b/src/main/java/io/neonbee/config/NeonBeeConfig.java index 190122446..adb9184b8 100644 --- a/src/main/java/io/neonbee/config/NeonBeeConfig.java +++ b/src/main/java/io/neonbee/config/NeonBeeConfig.java @@ -100,6 +100,8 @@ public class NeonBeeConfig { private int jsonMaxStringSize; + private int eventBusLargePayloadThreshold; + /** * Are the metrics enabled? * @@ -535,4 +537,30 @@ public NeonBeeConfig setJsonMaxStringSize(int jsonMaxStringSize) { public int getJsonMaxStringSize() { return jsonMaxStringSize; } + + /** + * Set the threshold (in bytes) for event bus payload size above which serialization is offloaded to a worker thread + * to avoid blocking the event loop. + * + * @param eventBusLargePayloadThreshold the threshold in bytes (0 or negative disables offloading) + * @return a reference to this, so the API can be used fluently + */ + @Fluent + public NeonBeeConfig setEventBusLargePayloadThreshold(int eventBusLargePayloadThreshold) { + this.eventBusLargePayloadThreshold = eventBusLargePayloadThreshold; + return this; + } + + /** + * Get the threshold (in bytes) for event bus payload size above which serialization is offloaded to a worker + * thread. Defaults to {@link #getJsonMaxStringSize()} if set, otherwise 0 (disabled). + * + * @return the threshold in bytes + */ + public int getEventBusLargePayloadThreshold() { + if (eventBusLargePayloadThreshold > 0) { + return eventBusLargePayloadThreshold; + } + return jsonMaxStringSize; + } } diff --git a/src/main/java/io/neonbee/data/DataVerticle.java b/src/main/java/io/neonbee/data/DataVerticle.java index 4612ead81..b0c43de70 100644 --- a/src/main/java/io/neonbee/data/DataVerticle.java +++ b/src/main/java/io/neonbee/data/DataVerticle.java @@ -54,6 +54,7 @@ import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.eventbus.MessageCodec; import io.vertx.core.eventbus.ReplyException; +import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; /** @@ -271,6 +272,16 @@ private static DeliveryOptions deliveryOptions(Vertx vertx, MessageCodec c return deliveryOptions; } + @VisibleForTesting + static int payloadStringSize(Object payload) { + if (payload instanceof JsonArray jsonArray) { + return jsonArray.encode().length(); + } else if (payload instanceof JsonObject jsonObject) { + return jsonObject.encode().length(); + } + return 0; + } + /** * Creates a new data exception for any given throwable cause. * @@ -435,8 +446,23 @@ public void start(Promise promise) { routine.execute(message.body(), context).onComplete(asyncResult -> { try { if (asyncResult.succeeded()) { - message.reply(asyncResult.result(), deliveryOptions(vertx, getMessageCodec(), context)); - + Object result = asyncResult.result(); + if (result instanceof JsonArray || result instanceof JsonObject) { + vertx.executeBlocking(() -> { + int payloadSize = payloadStringSize(result); + int threshold = NeonBee.get(vertx).getConfig() + .getEventBusLargePayloadThreshold(); + if (threshold > 0 && payloadSize > threshold) { + LOGGER.correlateWith(context).warn( + "Data verticle {} replying with large payload ({} bytes)", + getQualifiedName(), payloadSize); + } + message.reply(result, deliveryOptions(vertx, getMessageCodec(), context)); + return null; + }); + } else { + message.reply(result, deliveryOptions(vertx, getMessageCodec(), context)); + } } else { Throwable cause = asyncResult.cause(); if (LOGGER.isWarnEnabled()) {