From 452a99bb4f917b96202bc85d010e1b3903a42ec7 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Sat, 13 Jun 2026 23:03:42 +0700 Subject: [PATCH] test(trino): de-flake TestHudi*FileOperations by disabling async table statistics The three TestHudi*FileOperations tests (Memory, NoCache, Alluxio) assert the exact multiset of filesystem-access spans a query emits against the Hudi metadata table, and were intermittently failing in CI with a symmetric off-by-N mismatch between the paired testJoin and testSelectWithFilter measurements. Root cause: HudiMetadata.getTableStatistics submits an asynchronous table-statistics refresh on a shared background executor for every query during planning. That task reads the metadata-table column-stats partition and emits METADATA_TABLE spans that can outlive the synchronous query and arrive in the next test's measurement window, scrambling the counts. The earlier span-stability poll (#18766) narrowed the race but did not close it. Fix: disable the async refresh in these tests via hudi.table-statistics-enabled=false. With the only asynchronous metadata reader gone, the counts are deterministic the moment the query returns, so the waitForStableSpans poll is removed and the expected multisets are recalibrated to the synchronous query I/O. Only testJoin's first-query counts change (they now match the already-warm second query); testSelectWithFilter is unchanged. Verified under JDK 23 / trino-root 472: the three classes pass 20/20 consecutive runs with zero checkstyle violations. --- .../TestHudiAlluxioCacheFileOperations.java | 48 +++++-------------- .../TestHudiMemoryCacheFileOperations.java | 48 +++++-------------- .../hudi/TestHudiNoCacheFileOperations.java | 48 +++++-------------- 3 files changed, 36 insertions(+), 108 deletions(-) diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java index 4395a9c1ddfff..89507721afa59 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java @@ -66,6 +66,11 @@ protected DistributedQueryRunner createQueryRunner() .put("fs.cache.directories", cacheDirectory.toAbsolutePath().toString()) .put("fs.cache.max-sizes", "100MB") .put("hudi.metadata.cache.enabled", "false") + // Disable async table-statistics refresh: it reads the metadata table on a + // background executor whose spans can outlive the query and leak into the next + // test's measurement (the symmetric off-by-N flake). Disabling it makes the + // file-operation counts deterministic right after the query returns. + .put("hudi.table-statistics-enabled", "false") .buildOrThrow(); return HudiQueryRunner.builder() @@ -77,7 +82,6 @@ protected DistributedQueryRunner createQueryRunner() @Test public void testSelectWithFilter() - throws InterruptedException { @Language("SQL") String query = "SELECT * FROM " + HUDI_MULTI_FG_PT_V8_MOR + " WHERE country='SG'"; assertFileSystemAccesses( @@ -115,7 +119,6 @@ public void testSelectWithFilter() @Test public void testJoin() - throws InterruptedException { @Language("SQL") String query = "SELECT t1.id, t1.name, t1.price, t1.ts FROM " + HUDI_MULTI_FG_PT_V8_MOR + " t1 " + @@ -125,16 +128,16 @@ public void testJoin() assertFileSystemAccesses(query, ImmutableMultiset.builder() .addCopies(new FileOperation("Alluxio.readCached", DATA), 6) - .addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 288) + .addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 215) .addCopies(new FileOperation("Alluxio.readCached", TIMELINE), 8) .addCopies(new FileOperation("Alluxio.readCached", LOG), 30) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 39) - .addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 93) + .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 29) + .addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 69) .addCopies(new FileOperation("InputFile.length", TIMELINE), 4) .addCopies(new FileOperation("InputFile.length", LOG), 2) - .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 5) - .addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 3) - .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 5) + .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 4) + .addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 2) + .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4) .build()); assertFileSystemAccesses(query, @@ -154,37 +157,10 @@ public void testJoin() } private void assertFileSystemAccesses(@Language("SQL") String query, Multiset expectedCacheAccesses) - throws InterruptedException { DistributedQueryRunner queryRunner = getDistributedQueryRunner(); queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query); - // Async table-stats computation can outlive the synchronous query and emit spans into - // the exporter after execute returns. A fixed Thread.sleep races with this — when - // stats from query N is still running while query N+1's measurement happens, spans - // leak across the boundary and counts get scrambled (the symmetric off-by-N failure - // across paired tests). Poll until the span set is stable for two consecutive reads. - Multiset actual = waitForStableSpans(queryRunner); - assertMultisetsEqual(actual, expectedCacheAccesses); - } - - /** - * Returns the file-operation span set once two consecutive reads (200ms apart) agree. - * Bounded by a 30-second ceiling so a runaway test fails loudly instead of hanging. - */ - private static Multiset waitForStableSpans(QueryRunner queryRunner) - throws InterruptedException - { - long deadlineMillis = System.currentTimeMillis() + 30_000L; - Multiset previous = null; - while (System.currentTimeMillis() < deadlineMillis) { - Thread.sleep(200L); - Multiset current = getFileOperations(queryRunner); - if (previous != null && current.equals(previous)) { - return current; - } - previous = current; - } - return previous != null ? previous : getFileOperations(queryRunner); + assertMultisetsEqual(getFileOperations(queryRunner), expectedCacheAccesses); } public static Multiset getFileOperations(QueryRunner queryRunner) diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java index ed362391b017a..61867b1cde7dd 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java @@ -56,6 +56,11 @@ protected DistributedQueryRunner createQueryRunner() .put("hudi.metadata-enabled", "true") .put("hudi.metadata.cache.enabled", "true") .put("fs.cache.enabled", "false") + // Disable async table-statistics refresh: it reads the metadata table on a + // background executor whose spans can outlive the query and leak into the next + // test's measurement (the symmetric off-by-N flake). Disabling it makes the + // file-operation counts deterministic right after the query returns. + .put("hudi.table-statistics-enabled", "false") .buildOrThrow(); return HudiQueryRunner.builder() @@ -67,7 +72,6 @@ protected DistributedQueryRunner createQueryRunner() @Test public void testSelectWithFilter() - throws InterruptedException { @Language("SQL") String query = "SELECT * FROM " + HUDI_MULTI_FG_PT_V8_MOR + " WHERE country='SG'"; assertFileSystemAccesses( @@ -101,7 +105,6 @@ public void testSelectWithFilter() @Test public void testJoin() - throws InterruptedException { @Language("SQL") String query = "SELECT t1.id, t1.name, t1.price, t1.ts FROM " + HUDI_MULTI_FG_PT_V8_MOR + " t1 " + @@ -111,14 +114,14 @@ public void testJoin() assertFileSystemAccesses(query, ImmutableMultiset.builder() .addCopies(new FileOperation("FileSystemCache.cacheInput", DATA), 6) - .addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 39) - .addCopies(new FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 54) + .addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 29) + .addCopies(new FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 40) .addCopies(new FileOperation("FileSystemCache.cacheStream", TIMELINE), 4) .addCopies(new FileOperation("FileSystemCache.cacheStream", LOG), 2) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 39) - .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 5) - .addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 3) - .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 5) + .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 29) + .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 4) + .addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 2) + .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4) .build()); assertFileSystemAccesses(query, @@ -136,37 +139,10 @@ public void testJoin() } private void assertFileSystemAccesses(@Language("SQL") String query, Multiset expectedCacheAccesses) - throws InterruptedException { DistributedQueryRunner queryRunner = getDistributedQueryRunner(); queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query); - // Async table-stats computation can outlive the synchronous query and emit spans into - // the exporter after execute returns. A fixed Thread.sleep races with this — when - // stats from query N is still running while query N+1's measurement happens, spans - // leak across the boundary and counts get scrambled (the symmetric off-by-N failure - // across paired tests). Poll until the span set is stable for two consecutive reads. - Multiset actual = waitForStableSpans(queryRunner); - assertMultisetsEqual(actual, expectedCacheAccesses); - } - - /** - * Returns the file-operation span set once two consecutive reads (200ms apart) agree. - * Bounded by a 30-second ceiling so a runaway test fails loudly instead of hanging. - */ - private static Multiset waitForStableSpans(QueryRunner queryRunner) - throws InterruptedException - { - long deadlineMillis = System.currentTimeMillis() + 30_000L; - Multiset previous = null; - while (System.currentTimeMillis() < deadlineMillis) { - Thread.sleep(200L); - Multiset current = getFileOperations(queryRunner); - if (previous != null && current.equals(previous)) { - return current; - } - previous = current; - } - return previous != null ? previous : getFileOperations(queryRunner); + assertMultisetsEqual(getFileOperations(queryRunner), expectedCacheAccesses); } private static Multiset getFileOperations(QueryRunner queryRunner) diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java index 9d6a6a8a52002..71518b9fc67b6 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java @@ -56,6 +56,11 @@ protected DistributedQueryRunner createQueryRunner() .put("hudi.metadata-enabled", "true") .put("hudi.metadata.cache.enabled", "false") .put("fs.cache.enabled", "false") + // Disable async table-statistics refresh: it reads the metadata table on a + // background executor whose spans can outlive the query and leak into the next + // test's measurement (the symmetric off-by-N flake). Disabling it makes the + // file-operation counts deterministic right after the query returns. + .put("hudi.table-statistics-enabled", "false") .buildOrThrow(); return HudiQueryRunner.builder() @@ -67,7 +72,6 @@ protected DistributedQueryRunner createQueryRunner() @Test public void testSelectWithFilter() - throws InterruptedException { @Language("SQL") String query = "SELECT * FROM " + HUDI_MULTI_FG_PT_V8_MOR + " WHERE country='SG'"; assertFileSystemAccesses( @@ -101,7 +105,6 @@ public void testSelectWithFilter() @Test public void testJoin() - throws InterruptedException { @Language("SQL") String query = "SELECT t1.id, t1.name, t1.price, t1.ts FROM " + HUDI_MULTI_FG_PT_V8_MOR + " t1 " + @@ -111,12 +114,12 @@ public void testJoin() assertFileSystemAccesses(query, ImmutableMultiset.builder() .addCopies(new FileOperationUtils.FileOperation("Input.readTail", DATA), 6) - .addCopies(new FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 39) - .addCopies(new FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 39) - .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 5) - .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 54) - .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 3) - .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 5) + .addCopies(new FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 29) + .addCopies(new FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 29) + .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 4) + .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 40) + .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 2) + .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TIMELINE), 4) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", LOG), 2) .build()); @@ -136,37 +139,10 @@ public void testJoin() } private void assertFileSystemAccesses(@Language("SQL") String query, Multiset expectedCacheAccesses) - throws InterruptedException { DistributedQueryRunner queryRunner = getDistributedQueryRunner(); queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query); - // Async table-stats computation can outlive the synchronous query and emit spans into - // the exporter after execute returns. A fixed Thread.sleep races with this — when - // stats from query N is still running while query N+1's measurement happens, spans - // leak across the boundary and counts get scrambled (the symmetric off-by-N failure - // across paired tests). Poll until the span set is stable for two consecutive reads. - Multiset actual = waitForStableSpans(queryRunner); - assertMultisetsEqual(actual, expectedCacheAccesses); - } - - /** - * Returns the file-operation span set once two consecutive reads (200ms apart) agree. - * Bounded by a 30-second ceiling so a runaway test fails loudly instead of hanging. - */ - private static Multiset waitForStableSpans(QueryRunner queryRunner) - throws InterruptedException - { - long deadlineMillis = System.currentTimeMillis() + 30_000L; - Multiset previous = null; - while (System.currentTimeMillis() < deadlineMillis) { - Thread.sleep(200L); - Multiset current = getFileOperations(queryRunner); - if (previous != null && current.equals(previous)) { - return current; - } - previous = current; - } - return previous != null ? previous : getFileOperations(queryRunner); + assertMultisetsEqual(getFileOperations(queryRunner), expectedCacheAccesses); } private static Multiset getFileOperations(QueryRunner queryRunner)