diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index eb418627d8aaa..a7a9cb2cb1d7d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -633,7 +633,7 @@ private void completeClustering(HoodieReplaceCommitMetadata replaceCommitMetadat if (clusteringTimer != null) { long durationInMs = metrics.getDurationInMs(clusteringTimer.stop()); TimelineUtils.parseDateFromInstantTimeSafely(clusteringCommitTime).ifPresent(parsedInstant -> - metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, replaceCommitMetadata, HoodieActiveTimeline.CLUSTERING_ACTION) + metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, replaceCommitMetadata, clusteringInstant.getAction()) ); } if (config.isExpirationOfClusteringEnabled()) { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java index b785d90252880..0edb46a1c3554 100755 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java @@ -49,6 +49,7 @@ import static org.apache.hudi.metrics.HoodieMetrics.FAILURE_COUNTER; import static org.apache.hudi.metrics.HoodieMetrics.SOURCE_READ_AND_INDEX_ACTION; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -322,6 +323,41 @@ public MockHoodieActiveTimeline(HoodieInstant... instants) { } } + @Test + public void testClusteringCommitMetricsUsesVersionAwareAction() { + Random rand = new Random(); + long randomValue = 1 + rand.nextInt(); + + HoodieCommitMetadata metadata = mock(HoodieCommitMetadata.class); + when(metadata.fetchTotalPartitionsWritten()).thenReturn(randomValue + 1); + when(metadata.fetchTotalFilesInsert()).thenReturn(randomValue + 2); + when(metadata.fetchTotalFilesUpdated()).thenReturn(randomValue + 3); + when(metadata.fetchTotalRecordsWritten()).thenReturn(randomValue + 4); + when(metadata.fetchTotalUpdateRecordsWritten()).thenReturn(randomValue + 5); + when(metadata.fetchTotalInsertRecordsWritten()).thenReturn(randomValue + 6); + when(metadata.fetchTotalBytesWritten()).thenReturn(randomValue + 7); + when(metadata.getTotalScanTime()).thenReturn(randomValue + 8); + when(metadata.getTotalCreateTime()).thenReturn(randomValue + 9); + when(metadata.getTotalUpsertTime()).thenReturn(randomValue + 10); + when(metadata.getTotalCompactedRecordsUpdated()).thenReturn(randomValue + 11); + when(metadata.getTotalLogFilesCompacted()).thenReturn(randomValue + 12); + when(metadata.getTotalLogFilesSize()).thenReturn(randomValue + 13); + when(metadata.getTotalRecordsDeleted()).thenReturn(randomValue + 14); + when(metadata.getMinAndMaxEventTime()).thenReturn(Pair.of(Option.empty(), Option.empty())); + + // 1.x tables: clustering instants carry CLUSTERING_ACTION — metrics must land under "clustering.*" + hoodieMetrics.updateCommitMetrics(randomValue + 17, 100L, metadata, HoodieTimeline.CLUSTERING_ACTION); + String clusteringMetric = hoodieMetrics.getMetricsName(HoodieTimeline.CLUSTERING_ACTION, HoodieMetrics.TOTAL_PARTITIONS_WRITTEN_STR); + assertEquals(metadata.fetchTotalPartitionsWritten(), (long) metrics.getRegistry().getGauges().get(clusteringMetric).getValue()); + // No metric should have leaked into the replacecommit namespace yet + String replaceCommitMetric = hoodieMetrics.getMetricsName(HoodieTimeline.REPLACE_COMMIT_ACTION, HoodieMetrics.TOTAL_PARTITIONS_WRITTEN_STR); + assertNull(metrics.getRegistry().getGauges().get(replaceCommitMetric)); + + // 0.x tables: clustering instants carry REPLACE_COMMIT_ACTION — metrics must land under "replacecommit.*" + hoodieMetrics.updateCommitMetrics(randomValue + 17, 100L, metadata, HoodieTimeline.REPLACE_COMMIT_ACTION); + assertEquals(metadata.fetchTotalPartitionsWritten(), (long) metrics.getRegistry().getGauges().get(replaceCommitMetric).getValue()); + } + @Test public void testRollbackFailureMetric() { // Test that rollback failure metric is emitted correctly diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java index 72ae3a967ab45..1a85593a2a569 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java @@ -153,7 +153,7 @@ protected void completeClustering( long durationInMs = metrics.getDurationInMs(clusteringTimer.stop()); try { metrics.updateCommitMetrics(TimelineUtils.parseDateFromInstantTime(clusteringCommitTime).getTime(), - durationInMs, metadata, HoodieActiveTimeline.CLUSTERING_ACTION); + durationInMs, metadata, clusteringInstant.getAction()); } catch (ParseException e) { throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " + config.getBasePath() + " at time " + clusteringCommitTime, e);