From 0f6b01cb9a305d9d27c9a5bae846ea2b293ae0b7 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Sun, 14 Jun 2026 17:43:13 +0700 Subject: [PATCH 1/3] test(trino): de-flake TestHudi*FileOperations by asserting only synchronous reads PR #18995 disabled the async table-statistics refresh to de-flake the three TestHudi*FileOperations classes, but they kept flaking on master with the same symmetric off-by-N mismatch in the metadata-table span counts. The stats refresh was only one of several asynchronous sources of filesystem-access spans. These tests assert the exact multiset of low-level filesystem spans a query emits. Trino resets the span exporter at the start of each executeWithPlan, so any span a background thread emits after the synchronous query returns lands in the next test's measurement window and scrambles the counts. The Hudi read path emits such spans from shared background pools that read the metadata table (split loading, partition listing, index support) and, for the Alluxio variant, from asynchronous cache population whose hit/miss outcome depends on whether an earlier cache write had completed. Rather than continue trying to time these background tasks, this change stops asserting the quantities they produce and keeps only the operations that happen synchronously on the foreground planning/scan path. getFileOperations now excludes METADATA_TABLE operations in all three classes, and additionally excludes all Alluxio.* operations in the Alluxio class; the corresponding expected entries are removed. hudi.table-statistics-enabled=false is kept because the stats executor also reads the index definition and table-property files on a background thread, which are part of the surviving asserted set. No production code is changed. Verified under JDK 23 / trino-root 472: the build succeeds with zero checkstyle violations and the three classes pass 20 consecutive runs. --- .../TestHudiAlluxioCacheFileOperations.java | 42 ++++++------------- .../TestHudiMemoryCacheFileOperations.java | 26 +++++------- .../hudi/TestHudiNoCacheFileOperations.java | 26 +++++------- 3 files changed, 33 insertions(+), 61 deletions(-) diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java index 89507721afa5..4c31f08aabad 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java @@ -36,7 +36,6 @@ import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getCacheOperationSpans; import static io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer.TestingTable.HUDI_MULTI_FG_PT_V8_MOR; -import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.DATA; import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.INDEX_DEFINITION; import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.LOG; import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.METADATA_TABLE; @@ -66,10 +65,11 @@ protected DistributedQueryRunner createQueryRunner() .put("fs.cache.directories", cacheDirectory.toAbsolutePath().toString()) .put("fs.cache.max-sizes", "100MB") .put("hudi.metadata.cache.enabled", "false") - // Disable async table-statistics refresh: it reads the metadata table on a - // background executor whose spans can outlive the query and leak into the next - // test's measurement (the symmetric off-by-N flake). Disabling it makes the - // file-operation counts deterministic right after the query returns. + // Disable the async table-statistics refresh: on the first query it reads the index + // definitions and table-property files (and the metadata table) on a background + // executor. Those non-metadata-table reads land in the asserted set and their timing + // is non-deterministic, so we turn the refresh off and assert only the synchronous + // planning-path reads. .put("hudi.table-statistics-enabled", "false") .buildOrThrow(); @@ -87,12 +87,6 @@ public void testSelectWithFilter() assertFileSystemAccesses( query, ImmutableMultiset.builder() - .addCopies(new FileOperation("Alluxio.readCached", DATA), 2) - .addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 27) - .addCopies(new FileOperation("Alluxio.readCached", TIMELINE), 4) - .addCopies(new FileOperation("Alluxio.readCached", LOG), 15) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 4) - .addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 10) .addCopies(new FileOperation("InputFile.length", TIMELINE), 2) .addCopies(new FileOperation("InputFile.length", LOG), 1) .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 2) @@ -103,12 +97,6 @@ public void testSelectWithFilter() assertFileSystemAccesses( query, ImmutableMultiset.builder() - .addCopies(new FileOperation("Alluxio.readCached", DATA), 2) - .addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 27) - .addCopies(new FileOperation("Alluxio.readCached", TIMELINE), 4) - .addCopies(new FileOperation("Alluxio.readCached", LOG), 15) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 4) - .addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 10) .addCopies(new FileOperation("InputFile.length", TIMELINE), 2) .addCopies(new FileOperation("InputFile.length", LOG), 1) .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 2) @@ -127,12 +115,6 @@ public void testJoin() assertFileSystemAccesses(query, ImmutableMultiset.builder() - .addCopies(new FileOperation("Alluxio.readCached", DATA), 6) - .addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 215) - .addCopies(new FileOperation("Alluxio.readCached", TIMELINE), 8) - .addCopies(new FileOperation("Alluxio.readCached", LOG), 30) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 29) - .addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 69) .addCopies(new FileOperation("InputFile.length", TIMELINE), 4) .addCopies(new FileOperation("InputFile.length", LOG), 2) .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 4) @@ -142,12 +124,6 @@ public void testJoin() assertFileSystemAccesses(query, ImmutableMultiset.builder() - .addCopies(new FileOperation("Alluxio.readCached", DATA), 6) - .addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 215) - .addCopies(new FileOperation("Alluxio.readCached", TIMELINE), 8) - .addCopies(new FileOperation("Alluxio.readCached", LOG), 30) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 29) - .addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 69) .addCopies(new FileOperation("InputFile.length", TIMELINE), 4) .addCopies(new FileOperation("InputFile.length", LOG), 2) .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 4) @@ -169,6 +145,14 @@ public static Multiset getFileOperations(QueryRunner queryRunner) .stream() .filter(span -> !span.getName().startsWith("InputFile.exists")) .map(FileOperation::create) + // Metadata-table reads are issued from Hudi background pools (split loading, partition + // listing, table-statistics refresh) whose spans can outlive the synchronous query and + // land in the next query's measurement window, so their per-query counts are not + // deterministic. Alluxio cache hits/misses (Alluxio.*) depend on whether an earlier + // asynchronous cache write had already completed, so their counts are not deterministic + // either. Both are excluded; only synchronous foreground reads are asserted. + .filter(operation -> operation.fileType() != METADATA_TABLE) + .filter(operation -> !operation.operationType().startsWith("Alluxio.")) .collect(toCollection(HashMultiset::create)); } } diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java index 61867b1cde7d..8e1b2fe81911 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java @@ -56,10 +56,11 @@ protected DistributedQueryRunner createQueryRunner() .put("hudi.metadata-enabled", "true") .put("hudi.metadata.cache.enabled", "true") .put("fs.cache.enabled", "false") - // Disable async table-statistics refresh: it reads the metadata table on a - // background executor whose spans can outlive the query and leak into the next - // test's measurement (the symmetric off-by-N flake). Disabling it makes the - // file-operation counts deterministic right after the query returns. + // Disable the async table-statistics refresh: on the first query it reads the index + // definitions and table-property files (and the metadata table) on a background + // executor. Those non-metadata-table reads land in the asserted set and their timing + // is non-deterministic, so we turn the refresh off and assert only the synchronous + // planning-path reads. .put("hudi.table-statistics-enabled", "false") .buildOrThrow(); @@ -78,11 +79,8 @@ public void testSelectWithFilter() query, ImmutableMultiset.builder() .addCopies(new FileOperation("FileSystemCache.cacheInput", DATA), 2) - .addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 4) - .addCopies(new FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 6) .addCopies(new FileOperation("FileSystemCache.cacheStream", TIMELINE), 2) .addCopies(new FileOperation("FileSystemCache.cacheStream", LOG), 1) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 4) .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 2) .add(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES)) .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 2) @@ -92,11 +90,8 @@ public void testSelectWithFilter() query, ImmutableMultiset.builder() .addCopies(new FileOperation("FileSystemCache.cacheInput", DATA), 2) - .addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 4) - .addCopies(new FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 6) .addCopies(new FileOperation("FileSystemCache.cacheStream", TIMELINE), 2) .addCopies(new FileOperation("FileSystemCache.cacheStream", LOG), 1) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 4) .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 2) .add(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES)) .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 2) @@ -114,11 +109,8 @@ public void testJoin() assertFileSystemAccesses(query, ImmutableMultiset.builder() .addCopies(new FileOperation("FileSystemCache.cacheInput", DATA), 6) - .addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 29) - .addCopies(new FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 40) .addCopies(new FileOperation("FileSystemCache.cacheStream", TIMELINE), 4) .addCopies(new FileOperation("FileSystemCache.cacheStream", LOG), 2) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 29) .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 4) .addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 2) .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4) @@ -127,11 +119,8 @@ public void testJoin() assertFileSystemAccesses(query, ImmutableMultiset.builder() .addCopies(new FileOperation("FileSystemCache.cacheInput", DATA), 6) - .addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 29) - .addCopies(new FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 40) .addCopies(new FileOperation("FileSystemCache.cacheStream", TIMELINE), 4) .addCopies(new FileOperation("FileSystemCache.cacheStream", LOG), 2) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 29) .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 4) .addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 2) .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4) @@ -153,6 +142,11 @@ private static Multiset getFileOperations(QueryRunner queryRunner .filter(span -> !span.getName().startsWith("InputFile.exists")) .filter(span -> !isTrinoSchemaOrPermissions(getFileLocation(span))) .map(FileOperation::create) + // Metadata-table reads are issued from Hudi background pools (split loading, partition + // listing, table-statistics refresh) whose spans can outlive the synchronous query and + // land in the next query's measurement window. Their per-query counts are therefore + // non-deterministic, so they are excluded; only synchronous foreground reads are asserted. + .filter(operation -> operation.fileType() != METADATA_TABLE) .collect(toCollection(HashMultiset::create)); } } diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java index 71518b9fc67b..55071241fff7 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java @@ -56,10 +56,11 @@ protected DistributedQueryRunner createQueryRunner() .put("hudi.metadata-enabled", "true") .put("hudi.metadata.cache.enabled", "false") .put("fs.cache.enabled", "false") - // Disable async table-statistics refresh: it reads the metadata table on a - // background executor whose spans can outlive the query and leak into the next - // test's measurement (the symmetric off-by-N flake). Disabling it makes the - // file-operation counts deterministic right after the query returns. + // Disable the async table-statistics refresh: on the first query it reads the index + // definitions and table-property files (and the metadata table) on a background + // executor. Those non-metadata-table reads land in the asserted set and their timing + // is non-deterministic, so we turn the refresh off and assert only the synchronous + // planning-path reads. .put("hudi.table-statistics-enabled", "false") .buildOrThrow(); @@ -78,9 +79,6 @@ public void testSelectWithFilter() query, ImmutableMultiset.builder() .addCopies(new FileOperationUtils.FileOperation("Input.readTail", DATA), 2) - .addCopies(new FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 4) - .addCopies(new FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 4) - .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 6) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 2) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TIMELINE), 2) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", LOG), 1) @@ -92,10 +90,7 @@ public void testSelectWithFilter() query, ImmutableMultiset.builder() .addCopies(new FileOperationUtils.FileOperation("Input.readTail", DATA), 2) - .addCopies(new FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 4) - .addCopies(new FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 4) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 2) - .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 6) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TIMELINE), 2) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", LOG), 1) .add(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES)) @@ -114,10 +109,7 @@ public void testJoin() assertFileSystemAccesses(query, ImmutableMultiset.builder() .addCopies(new FileOperationUtils.FileOperation("Input.readTail", DATA), 6) - .addCopies(new FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 29) - .addCopies(new FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 29) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 4) - .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 40) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 2) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TIMELINE), 4) @@ -127,10 +119,7 @@ public void testJoin() assertFileSystemAccesses(query, ImmutableMultiset.builder() .addCopies(new FileOperationUtils.FileOperation("Input.readTail", DATA), 6) - .addCopies(new FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 29) - .addCopies(new FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 29) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 4) - .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 40) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 2) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TIMELINE), 4) @@ -153,6 +142,11 @@ private static Multiset getFileOperations(Quer .filter(span -> !span.getName().startsWith("InputFile.exists")) .filter(span -> !isTrinoSchemaOrPermissions(getFileLocation(span))) .map(FileOperationUtils.FileOperation::create) + // Metadata-table reads are issued from Hudi background pools (split loading, partition + // listing, table-statistics refresh) whose spans can outlive the synchronous query and + // land in the next query's measurement window. Their per-query counts are therefore + // non-deterministic, so they are excluded; only synchronous foreground reads are asserted. + .filter(operation -> operation.fileType() != METADATA_TABLE) .collect(toCollection(HashMultiset::create)); } } From 189f4e0b668c6e77a654e5529d69673b1d4fbfbe Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Mon, 15 Jun 2026 11:56:56 +0700 Subject: [PATCH 2/3] test(trino): cover Alluxio cache hits with a count-independent assertion The previous change stopped asserting the exact Alluxio.* span counts in TestHudiAlluxioCacheFileOperations because the cache write is asynchronous, so the per-query hit/miss counts flake. That left the Alluxio class with no assertion that the filesystem cache is actually engaged. This adds testReadsServedFromAlluxioCache, which warms the cache and then asserts that re-running the query produces at least one Alluxio.readCached span. The assertion is a lower bound rather than an exact count, so it is immune to the symmetric off-by-N leakage, and it is wrapped in assertEventually so it retries until the asynchronous cache write has landed and fails loudly at a 30s deadline if the cache never serves a read. Test-only change. Verified under JDK 23 / trino-root 472: the build succeeds with zero checkstyle violations and TestHudiAlluxioCacheFileOperations passes 8 consecutive runs. --- .../TestHudiAlluxioCacheFileOperations.java | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java index 4c31f08aabad..911c5d4c3e7d 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultiset; import com.google.common.collect.Multiset; +import io.airlift.units.Duration; import io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer; import io.trino.plugin.hudi.util.FileOperationUtils.FileOperation; import io.trino.testing.AbstractTestQueryFramework; @@ -43,7 +44,9 @@ import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.TABLE_PROPERTIES; import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.TIMELINE; import static io.trino.testing.MultisetAssertions.assertMultisetsEqual; +import static io.trino.testing.assertions.Assert.assertEventually; import static java.util.stream.Collectors.toCollection; +import static org.assertj.core.api.Assertions.assertThat; @ResourceLock("HUDI_CACHE_SYSTEM") @Execution(ExecutionMode.SAME_THREAD) @@ -132,6 +135,40 @@ public void testJoin() .build()); } + @Test + public void testReadsServedFromAlluxioCache() + { + // The tests above intentionally do not assert exact Alluxio cache hit/miss counts: the cache + // write is asynchronous, so the per-query counts flake (a write from one query can still be in + // flight when the next query runs). This test instead gives count-independent coverage that the + // Alluxio cache is actually engaged: once the cache is warmed, at least one read is served from + // it (an "Alluxio.readCached" span). assertEventually re-runs the query until the asynchronous + // cache write has landed and a genuine hit is observed, and fails loudly at the deadline if the + // cache never serves a read. + @Language("SQL") String query = "SELECT * FROM " + HUDI_MULTI_FG_PT_V8_MOR; + DistributedQueryRunner queryRunner = getDistributedQueryRunner(); + + // Warm the cache; the page write into Alluxio happens on a background thread. + queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query); + + assertEventually( + Duration.valueOf("30s"), + Duration.valueOf("500ms"), + () -> { + queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query); + assertThat(countCachedReads(queryRunner)) + .as("Alluxio.readCached spans (cache hits)") + .isGreaterThanOrEqualTo(1); + }); + } + + private static long countCachedReads(QueryRunner queryRunner) + { + return getCacheOperationSpans(queryRunner).stream() + .filter(span -> span.getName().equals("Alluxio.readCached")) + .count(); + } + private void assertFileSystemAccesses(@Language("SQL") String query, Multiset expectedCacheAccesses) { DistributedQueryRunner queryRunner = getDistributedQueryRunner(); From 377a80fa49d0bfc2af38e3933d43cff2dc0c8bf6 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Mon, 15 Jun 2026 14:26:57 +0700 Subject: [PATCH 3/3] Re-trigger Azure CI (unrelated infra heap OOM flake; root-cause fix in apache/hudi#19008)