diff --git a/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java b/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java index 27b49faf..0c5749f4 100644 --- a/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java +++ b/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java @@ -39,15 +39,16 @@ public class StepCollector extends EventNotifierSupport { private final String BREADCRUMB_ID_HEADER = "breadcrumbId"; public static final String COMPONENT_INIT_TIME_HEADER = "ComponentInitTime"; - public static final String FLOW_ID_HEADER = "DOVETAIL_FlowId"; - public static final String FLOW_VERSION_HEADER = "DOVETAIL_FlowVersion"; - public static final String RESPONSE_TIME_PROPERTY = "ResponseTime"; + public static final String FLOW_ID_PROPERTY = "DOVETAIL_FlowId"; + public static final String FLOW_VERSION_PROPERTY = "DOVETAIL_FlowVersion"; + public static final String PROCESSING_TIME_PROPERTY = "ProcessingTime"; public static final String TIMESTAMP_PROPERTY = "Timestamp"; public static final String MESSAGE_HEADERS_SIZE_PROPERTY = "HeadersSize"; public static final String MESSAGE_BODY_SIZE_PROPERTY = "BodySize"; public static final String MESSAGE_BODY_TYPE_PROPERTY = "BodyType"; public static final String EXCHANGE_PATTERN_PROPERTY = "ExchangePattern"; + private static final String BLACKLISTED_ROUTES_PARTS = "BLACKLISTED_ROUTES_PARTS"; private static String[] blacklistedRoutesParts = getBlacklistedRoutesParts(); @@ -105,18 +106,21 @@ public void notify(CamelEvent event) throws Exception { if(stepId!= null && !isBlackListed(stepId)){ if (filters == null || EventUtil.isFilteredEquals(filters, stepId)) { + + Exchange originalExchange = stepEvent.getExchange(); + + long processingTime = calculateAndUpdateComponentResponseTime(originalExchange, stepEvent); + // materialize body BEFORE async - byte[] body = stepEvent.getExchange().getMessage().getBody(byte[].class); + byte[] body = originalExchange.getMessage().getBody(byte[].class); // create a copy of the exchange for async processing - Exchange exchange = stepEvent.getExchange().copy(); + Exchange exchange = originalExchange.copy(); // replace the body in the copied exchange with the materialized byte[] exchange.getMessage().setBody(body); - long stepTimestamp = stepEvent.getTimestamp(); - // Hand off the HEAVY processing to a background thread collectionPool.submit(() -> { - processEvent(exchange, stepId, stepTimestamp, isSuccessEvent); + processEvent(exchange, stepId, processingTime, isSuccessEvent); }); } } @@ -128,13 +132,13 @@ protected void doStop() throws Exception { super.doStop(); } - private void processEvent(Exchange exchange, String stepId, long stepTimestamp, boolean isSuccessEvent){ + private void processEvent(Exchange exchange, String stepId, long processingTime, boolean isSuccessEvent){ //set fields Message message = exchange.getMessage(); Map headers = message.getHeaders(); Map properties = exchange.getProperties(); - String transactionId = message.getMessageId(); + String transactionId; //use breadcrumbId when available, otherwise set custom transactionId = message.getHeader(BREADCRUMB_ID_HEADER, String.class); @@ -144,11 +148,11 @@ private void processEvent(Exchange exchange, String stepId, long stepTimestamp, } // get previous flowId and flowVersion - String previousFlowId = exchange.getMessage().getHeader(FLOW_ID_HEADER, String.class); - String previousFlowVersion = exchange.getMessage().getHeader(FLOW_VERSION_HEADER, String.class); + String previousFlowId = exchange.getProperty(FLOW_ID_PROPERTY, String.class); + String previousFlowVersion = exchange.getProperty(FLOW_VERSION_PROPERTY, String.class); // set flowId and flowVersion - exchange.getMessage().setHeader(FLOW_ID_HEADER, flowId); - exchange.getMessage().setHeader(FLOW_VERSION_HEADER, flowVersion); + exchange.setProperty(FLOW_ID_PROPERTY, flowId); + exchange.setProperty(FLOW_VERSION_PROPERTY, flowVersion); //calculate times String timestamp = EventUtil.getCreatedTimestamp(); @@ -158,11 +162,11 @@ private void processEvent(Exchange exchange, String stepId, long stepTimestamp, if(isSuccessEvent) { // success - messageEvent = getSuccessMessageEvent(exchange, stepId, stepTimestamp, timestamp, transactionId, previousFlowId, + messageEvent = getSuccessMessageEvent(exchange, stepId, processingTime, timestamp, transactionId, previousFlowId, previousFlowVersion, headers, properties, expiryDate); } else { // failed - messageEvent = getFailedMessageEvent(exchange, stepId, stepTimestamp, timestamp, transactionId, previousFlowId, + messageEvent = getFailedMessageEvent(stepId, timestamp, transactionId, previousFlowId, previousFlowVersion, headers, properties, expiryDate); } @@ -173,7 +177,7 @@ private void processEvent(Exchange exchange, String stepId, long stepTimestamp, } private MessageEvent getSuccessMessageEvent( - Exchange exchange, String stepId, long stepTimestamp, String timestamp, String transactionId, + Exchange exchange, String stepId, long processingTime, String timestamp, String transactionId, String previousFlowId, String previousFlowVersion, Map headers, Map properties, String expiryDate ) { @@ -191,7 +195,7 @@ private MessageEvent getSuccessMessageEvent( String bodyType = body != null ? exchange.getMessage().getBody().getClass().getSimpleName() : ""; // set custom properties - setCustomProperties(exchange, bodyType, bodyLength, stepId, stepTimestamp); + setCustomProperties(exchange, bodyType, bodyLength, stepId, processingTime); // body to store String bodyToStoreOnEvent = getBodyToStoreOnEvent(exchange, body); @@ -203,7 +207,7 @@ private MessageEvent getSuccessMessageEvent( } private MessageEvent getFailedMessageEvent( - Exchange exchange, String stepId, long stepTimestamp, String timestamp, String transactionId, + String stepId, String timestamp, String transactionId, String previousFlowId, String previousFlowVersion, Map headers, Map properties, String expiryDate ) { @@ -265,10 +269,9 @@ private int getLimitBodyLength() { } } - private void setCustomProperties(Exchange exchange, String bodyType, int bodyLength, String stepId, long stepTimestamp) { + private void setCustomProperties(Exchange exchange, String bodyType, int bodyLength, String stepId, long processingTime) { if (EventUtil.isFilteredEquals(filters, stepId)) { - // set response time property - setResponseTimeProperty(exchange, stepTimestamp); + exchange.setProperty(PROCESSING_TIME_PROPERTY, processingTime); } // set timestamp property @@ -291,17 +294,6 @@ private void setCustomProperties(Exchange exchange, String bodyType, int bodyLen } - private void setResponseTimeProperty(Exchange exchange, long stepTimestamp){ - //Set default headers for the response time - - Object initTime = exchange.getIn().getHeader(COMPONENT_INIT_TIME_HEADER, Long.class); - exchange.getIn().setHeader(COMPONENT_INIT_TIME_HEADER, stepTimestamp); - if (initTime != null) { - long duration = stepTimestamp - (long) initTime; - exchange.setProperty(RESPONSE_TIME_PROPERTY, Long.toString(duration)); - } - } - private boolean isBlackListed(String routeId) { return Arrays.stream(blacklistedRoutesParts).anyMatch(routeId::contains); } @@ -321,4 +313,16 @@ private static String[] getBlacklistedRoutesParts() { } } + private static long calculateAndUpdateComponentResponseTime(Exchange originalExchange, CamelEvent.StepEvent stepEvent) { + Long initTime = originalExchange.getIn().getHeader(COMPONENT_INIT_TIME_HEADER, Long.class); + long stepTime = stepEvent.getTimestamp(); + // set componentInitTime for this new component + originalExchange.getIn().setHeader(COMPONENT_INIT_TIME_HEADER, stepTime); + + if (initTime == null) { + return 0L; + } + return stepTime - initTime; + } + } \ No newline at end of file diff --git a/dil/src/main/java/org/assimbly/dil/event/domain/MessageEvent.java b/dil/src/main/java/org/assimbly/dil/event/domain/MessageEvent.java index 3327d15b..fcec785d 100644 --- a/dil/src/main/java/org/assimbly/dil/event/domain/MessageEvent.java +++ b/dil/src/main/java/org/assimbly/dil/event/domain/MessageEvent.java @@ -33,8 +33,12 @@ public class MessageEvent { private static final String UNIT_MILLISECONDS = "milliseconds"; private static final String UNIT_BYTES = "bytes"; + private static final Set PROPERTIES_FLOW_INFO_SET = Set.of( + StepCollector.FLOW_ID_PROPERTY, + StepCollector.FLOW_VERSION_PROPERTY + ); private static final Set PROPERTIES_MILLISECONDS_UNIT_SET = Set.of( - StepCollector.RESPONSE_TIME_PROPERTY + StepCollector.PROCESSING_TIME_PROPERTY ); private static final Set PROPERTIES_BYTES_UNIT_SET = Set.of( StepCollector.MESSAGE_BODY_SIZE_PROPERTY, @@ -49,6 +53,7 @@ public class MessageEvent { static { PROPERTIES_FILTER_SET = new HashSet<>(); + PROPERTIES_FILTER_SET.addAll(PROPERTIES_FLOW_INFO_SET); PROPERTIES_FILTER_SET.addAll(PROPERTIES_MILLISECONDS_UNIT_SET); PROPERTIES_FILTER_SET.addAll(PROPERTIES_BYTES_UNIT_SET); PROPERTIES_FILTER_SET.addAll(PROPERTIES_NO_UNIT_SET);