Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, NeonBee
obj.setJsonMaxStringSize(((Number) member.getValue()).intValue());
}
break;
case "eventBusLargePayloadThreshold":
if (member.getValue() instanceof Number) {
obj.setEventBusLargePayloadThreshold(((Number) member.getValue()).intValue());
}
break;
}
}
}
Expand Down Expand Up @@ -151,5 +156,6 @@ static void toJson(NeonBeeConfig obj, java.util.Map<String, Object> json) {
json.put("micrometerRegistries", array);
}
json.put("jsonMaxStringSize", obj.getJsonMaxStringSize());
json.put("eventBusLargePayloadThreshold", obj.getEventBusLargePayloadThreshold());
}
}
28 changes: 28 additions & 0 deletions src/main/java/io/neonbee/config/NeonBeeConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ public class NeonBeeConfig {

private int jsonMaxStringSize;

private int eventBusLargePayloadThreshold;

/**
* Are the metrics enabled?
*
Expand Down Expand Up @@ -535,4 +537,30 @@ public NeonBeeConfig setJsonMaxStringSize(int jsonMaxStringSize) {
public int getJsonMaxStringSize() {
return jsonMaxStringSize;
}

/**
* Set the threshold (in bytes) for event bus payload size above which serialization is offloaded to a worker thread
* to avoid blocking the event loop.
*
* @param eventBusLargePayloadThreshold the threshold in bytes (0 or negative disables offloading)
* @return a reference to this, so the API can be used fluently
*/
@Fluent
public NeonBeeConfig setEventBusLargePayloadThreshold(int eventBusLargePayloadThreshold) {
this.eventBusLargePayloadThreshold = eventBusLargePayloadThreshold;
return this;
}

/**
* Get the threshold (in bytes) for event bus payload size above which serialization is offloaded to a worker
* thread. Defaults to {@link #getJsonMaxStringSize()} if set, otherwise 0 (disabled).
*
* @return the threshold in bytes
*/
public int getEventBusLargePayloadThreshold() {
if (eventBusLargePayloadThreshold > 0) {
return eventBusLargePayloadThreshold;
}
return jsonMaxStringSize;
}
}
30 changes: 28 additions & 2 deletions src/main/java/io/neonbee/data/DataVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;

/**
Expand Down Expand Up @@ -271,6 +272,16 @@ private static DeliveryOptions deliveryOptions(Vertx vertx, MessageCodec<?, ?> c
return deliveryOptions;
}

@VisibleForTesting
static int payloadStringSize(Object payload) {
if (payload instanceof JsonArray jsonArray) {
return jsonArray.encode().length();
} else if (payload instanceof JsonObject jsonObject) {
return jsonObject.encode().length();
}
return 0;
}

/**
* Creates a new data exception for any given throwable cause.
*
Expand Down Expand Up @@ -435,8 +446,23 @@ public void start(Promise<Void> promise) {
routine.execute(message.body(), context).onComplete(asyncResult -> {
try {
if (asyncResult.succeeded()) {
message.reply(asyncResult.result(), deliveryOptions(vertx, getMessageCodec(), context));

Object result = asyncResult.result();
if (result instanceof JsonArray || result instanceof JsonObject) {
vertx.executeBlocking(() -> {
int payloadSize = payloadStringSize(result);
int threshold = NeonBee.get(vertx).getConfig()
.getEventBusLargePayloadThreshold();
if (threshold > 0 && payloadSize > threshold) {
LOGGER.correlateWith(context).warn(
"Data verticle {} replying with large payload ({} bytes)",
getQualifiedName(), payloadSize);
}
message.reply(result, deliveryOptions(vertx, getMessageCodec(), context));
return null;
});
} else {
message.reply(result, deliveryOptions(vertx, getMessageCodec(), context));
}
} else {
Throwable cause = asyncResult.cause();
if (LOGGER.isWarnEnabled()) {
Expand Down