Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 37 additions & 33 deletions dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@ public class StepCollector extends EventNotifierSupport {

private final String BREADCRUMB_ID_HEADER = "breadcrumbId";
public static final String COMPONENT_INIT_TIME_HEADER = "ComponentInitTime";
public static final String FLOW_ID_HEADER = "DOVETAIL_FlowId";
public static final String FLOW_VERSION_HEADER = "DOVETAIL_FlowVersion";

public static final String RESPONSE_TIME_PROPERTY = "ResponseTime";
public static final String FLOW_ID_PROPERTY = "DOVETAIL_FlowId";
public static final String FLOW_VERSION_PROPERTY = "DOVETAIL_FlowVersion";
public static final String PROCESSING_TIME_PROPERTY = "ProcessingTime";
public static final String TIMESTAMP_PROPERTY = "Timestamp";
public static final String MESSAGE_HEADERS_SIZE_PROPERTY = "HeadersSize";
public static final String MESSAGE_BODY_SIZE_PROPERTY = "BodySize";
public static final String MESSAGE_BODY_TYPE_PROPERTY = "BodyType";
public static final String EXCHANGE_PATTERN_PROPERTY = "ExchangePattern";

private static final String BLACKLISTED_ROUTES_PARTS = "BLACKLISTED_ROUTES_PARTS";
private static String[] blacklistedRoutesParts = getBlacklistedRoutesParts();

Expand Down Expand Up @@ -105,18 +106,21 @@ public void notify(CamelEvent event) throws Exception {

if(stepId!= null && !isBlackListed(stepId)){
if (filters == null || EventUtil.isFilteredEquals(filters, stepId)) {

Exchange originalExchange = stepEvent.getExchange();

long processingTime = calculateAndUpdateComponentResponseTime(originalExchange, stepEvent);

// materialize body BEFORE async
byte[] body = stepEvent.getExchange().getMessage().getBody(byte[].class);
byte[] body = originalExchange.getMessage().getBody(byte[].class);
// create a copy of the exchange for async processing
Exchange exchange = stepEvent.getExchange().copy();
Exchange exchange = originalExchange.copy();
// replace the body in the copied exchange with the materialized byte[]
exchange.getMessage().setBody(body);

long stepTimestamp = stepEvent.getTimestamp();

// Hand off the HEAVY processing to a background thread
collectionPool.submit(() -> {
processEvent(exchange, stepId, stepTimestamp, isSuccessEvent);
processEvent(exchange, stepId, processingTime, isSuccessEvent);
});
}
}
Expand All @@ -128,13 +132,13 @@ protected void doStop() throws Exception {
super.doStop();
}

private void processEvent(Exchange exchange, String stepId, long stepTimestamp, boolean isSuccessEvent){
private void processEvent(Exchange exchange, String stepId, long processingTime, boolean isSuccessEvent){

//set fields
Message message = exchange.getMessage();
Map<String, Object> headers = message.getHeaders();
Map<String, Object> properties = exchange.getProperties();
String transactionId = message.getMessageId();
String transactionId;

//use breadcrumbId when available, otherwise set custom
transactionId = message.getHeader(BREADCRUMB_ID_HEADER, String.class);
Expand All @@ -144,11 +148,11 @@ private void processEvent(Exchange exchange, String stepId, long stepTimestamp,
}

// get previous flowId and flowVersion
String previousFlowId = exchange.getMessage().getHeader(FLOW_ID_HEADER, String.class);
String previousFlowVersion = exchange.getMessage().getHeader(FLOW_VERSION_HEADER, String.class);
String previousFlowId = exchange.getProperty(FLOW_ID_PROPERTY, String.class);
String previousFlowVersion = exchange.getProperty(FLOW_VERSION_PROPERTY, String.class);
// set flowId and flowVersion
exchange.getMessage().setHeader(FLOW_ID_HEADER, flowId);
exchange.getMessage().setHeader(FLOW_VERSION_HEADER, flowVersion);
exchange.setProperty(FLOW_ID_PROPERTY, flowId);
exchange.setProperty(FLOW_VERSION_PROPERTY, flowVersion);

//calculate times
String timestamp = EventUtil.getCreatedTimestamp();
Expand All @@ -158,11 +162,11 @@ private void processEvent(Exchange exchange, String stepId, long stepTimestamp,

if(isSuccessEvent) {
// success
messageEvent = getSuccessMessageEvent(exchange, stepId, stepTimestamp, timestamp, transactionId, previousFlowId,
messageEvent = getSuccessMessageEvent(exchange, stepId, processingTime, timestamp, transactionId, previousFlowId,
previousFlowVersion, headers, properties, expiryDate);
} else {
// failed
messageEvent = getFailedMessageEvent(exchange, stepId, stepTimestamp, timestamp, transactionId, previousFlowId,
messageEvent = getFailedMessageEvent(stepId, timestamp, transactionId, previousFlowId,
previousFlowVersion, headers, properties, expiryDate);
}

Expand All @@ -173,7 +177,7 @@ private void processEvent(Exchange exchange, String stepId, long stepTimestamp,
}

private MessageEvent getSuccessMessageEvent(
Exchange exchange, String stepId, long stepTimestamp, String timestamp, String transactionId,
Exchange exchange, String stepId, long processingTime, String timestamp, String transactionId,
String previousFlowId, String previousFlowVersion, Map<String, Object> headers, Map<String,
Object> properties, String expiryDate
) {
Expand All @@ -191,7 +195,7 @@ private MessageEvent getSuccessMessageEvent(
String bodyType = body != null ? exchange.getMessage().getBody().getClass().getSimpleName() : "";

// set custom properties
setCustomProperties(exchange, bodyType, bodyLength, stepId, stepTimestamp);
setCustomProperties(exchange, bodyType, bodyLength, stepId, processingTime);

// body to store
String bodyToStoreOnEvent = getBodyToStoreOnEvent(exchange, body);
Expand All @@ -203,7 +207,7 @@ private MessageEvent getSuccessMessageEvent(
}

private MessageEvent getFailedMessageEvent(
Exchange exchange, String stepId, long stepTimestamp, String timestamp, String transactionId,
String stepId, String timestamp, String transactionId,
String previousFlowId, String previousFlowVersion, Map<String, Object> headers, Map<String,
Object> properties, String expiryDate
) {
Expand Down Expand Up @@ -265,10 +269,9 @@ private int getLimitBodyLength() {
}
}

private void setCustomProperties(Exchange exchange, String bodyType, int bodyLength, String stepId, long stepTimestamp) {
private void setCustomProperties(Exchange exchange, String bodyType, int bodyLength, String stepId, long processingTime) {
if (EventUtil.isFilteredEquals(filters, stepId)) {
// set response time property
setResponseTimeProperty(exchange, stepTimestamp);
exchange.setProperty(PROCESSING_TIME_PROPERTY, processingTime);
}

// set timestamp property
Expand All @@ -291,17 +294,6 @@ private void setCustomProperties(Exchange exchange, String bodyType, int bodyLen

}

private void setResponseTimeProperty(Exchange exchange, long stepTimestamp){
//Set default headers for the response time

Object initTime = exchange.getIn().getHeader(COMPONENT_INIT_TIME_HEADER, Long.class);
exchange.getIn().setHeader(COMPONENT_INIT_TIME_HEADER, stepTimestamp);
if (initTime != null) {
long duration = stepTimestamp - (long) initTime;
exchange.setProperty(RESPONSE_TIME_PROPERTY, Long.toString(duration));
}
}

private boolean isBlackListed(String routeId) {
return Arrays.stream(blacklistedRoutesParts).anyMatch(routeId::contains);
}
Expand All @@ -321,4 +313,16 @@ private static String[] getBlacklistedRoutesParts() {
}
}

private static long calculateAndUpdateComponentResponseTime(Exchange originalExchange, CamelEvent.StepEvent stepEvent) {
Long initTime = originalExchange.getIn().getHeader(COMPONENT_INIT_TIME_HEADER, Long.class);
long stepTime = stepEvent.getTimestamp();
// set componentInitTime for this new component
originalExchange.getIn().setHeader(COMPONENT_INIT_TIME_HEADER, stepTime);

if (initTime == null) {
return 0L;
}
return stepTime - initTime;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ public class MessageEvent {
private static final String UNIT_MILLISECONDS = "milliseconds";
private static final String UNIT_BYTES = "bytes";

private static final Set<String> PROPERTIES_FLOW_INFO_SET = Set.of(
StepCollector.FLOW_ID_PROPERTY,
StepCollector.FLOW_VERSION_PROPERTY
);
private static final Set<String> PROPERTIES_MILLISECONDS_UNIT_SET = Set.of(
StepCollector.RESPONSE_TIME_PROPERTY
StepCollector.PROCESSING_TIME_PROPERTY
);
private static final Set<String> PROPERTIES_BYTES_UNIT_SET = Set.of(
StepCollector.MESSAGE_BODY_SIZE_PROPERTY,
Expand All @@ -49,6 +53,7 @@ public class MessageEvent {

static {
PROPERTIES_FILTER_SET = new HashSet<>();
PROPERTIES_FILTER_SET.addAll(PROPERTIES_FLOW_INFO_SET);
PROPERTIES_FILTER_SET.addAll(PROPERTIES_MILLISECONDS_UNIT_SET);
PROPERTIES_FILTER_SET.addAll(PROPERTIES_BYTES_UNIT_SET);
PROPERTIES_FILTER_SET.addAll(PROPERTIES_NO_UNIT_SET);
Expand Down