diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java index 4395a9c1ddfff..89507721afa59 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java @@ -66,6 +66,11 @@ protected DistributedQueryRunner createQueryRunner() .put("fs.cache.directories", cacheDirectory.toAbsolutePath().toString()) .put("fs.cache.max-sizes", "100MB") .put("hudi.metadata.cache.enabled", "false") + // Disable async table-statistics refresh: it reads the metadata table on a + // background executor whose spans can outlive the query and leak into the next + // test's measurement (the symmetric off-by-N flake). Disabling it makes the + // file-operation counts deterministic right after the query returns. + .put("hudi.table-statistics-enabled", "false") .buildOrThrow(); return HudiQueryRunner.builder() @@ -77,7 +82,6 @@ protected DistributedQueryRunner createQueryRunner() @Test public void testSelectWithFilter() - throws InterruptedException { @Language("SQL") String query = "SELECT * FROM " + HUDI_MULTI_FG_PT_V8_MOR + " WHERE country='SG'"; assertFileSystemAccesses( @@ -115,7 +119,6 @@ public void testSelectWithFilter() @Test public void testJoin() - throws InterruptedException { @Language("SQL") String query = "SELECT t1.id, t1.name, t1.price, t1.ts FROM " + HUDI_MULTI_FG_PT_V8_MOR + " t1 " + @@ -125,16 +128,16 @@ public void testJoin() assertFileSystemAccesses(query, ImmutableMultiset.builder() .addCopies(new FileOperation("Alluxio.readCached", DATA), 6) - .addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 288) + .addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 215) .addCopies(new FileOperation("Alluxio.readCached", TIMELINE), 8) .addCopies(new FileOperation("Alluxio.readCached", LOG), 30) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 39) - .addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 93) + .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 29) + .addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 69) .addCopies(new FileOperation("InputFile.length", TIMELINE), 4) .addCopies(new FileOperation("InputFile.length", LOG), 2) - .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 5) - .addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 3) - .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 5) + .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 4) + .addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 2) + .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4) .build()); assertFileSystemAccesses(query, @@ -154,37 +157,10 @@ public void testJoin() } private void assertFileSystemAccesses(@Language("SQL") String query, Multiset expectedCacheAccesses) - throws InterruptedException { DistributedQueryRunner queryRunner = getDistributedQueryRunner(); queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query); - // Async table-stats computation can outlive the synchronous query and emit spans into - // the exporter after execute returns. A fixed Thread.sleep races with this — when - // stats from query N is still running while query N+1's measurement happens, spans - // leak across the boundary and counts get scrambled (the symmetric off-by-N failure - // across paired tests). Poll until the span set is stable for two consecutive reads. - Multiset actual = waitForStableSpans(queryRunner); - assertMultisetsEqual(actual, expectedCacheAccesses); - } - - /** - * Returns the file-operation span set once two consecutive reads (200ms apart) agree. - * Bounded by a 30-second ceiling so a runaway test fails loudly instead of hanging. - */ - private static Multiset waitForStableSpans(QueryRunner queryRunner) - throws InterruptedException - { - long deadlineMillis = System.currentTimeMillis() + 30_000L; - Multiset previous = null; - while (System.currentTimeMillis() < deadlineMillis) { - Thread.sleep(200L); - Multiset current = getFileOperations(queryRunner); - if (previous != null && current.equals(previous)) { - return current; - } - previous = current; - } - return previous != null ? previous : getFileOperations(queryRunner); + assertMultisetsEqual(getFileOperations(queryRunner), expectedCacheAccesses); } public static Multiset getFileOperations(QueryRunner queryRunner) diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java index ed362391b017a..61867b1cde7dd 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java @@ -56,6 +56,11 @@ protected DistributedQueryRunner createQueryRunner() .put("hudi.metadata-enabled", "true") .put("hudi.metadata.cache.enabled", "true") .put("fs.cache.enabled", "false") + // Disable async table-statistics refresh: it reads the metadata table on a + // background executor whose spans can outlive the query and leak into the next + // test's measurement (the symmetric off-by-N flake). Disabling it makes the + // file-operation counts deterministic right after the query returns. + .put("hudi.table-statistics-enabled", "false") .buildOrThrow(); return HudiQueryRunner.builder() @@ -67,7 +72,6 @@ protected DistributedQueryRunner createQueryRunner() @Test public void testSelectWithFilter() - throws InterruptedException { @Language("SQL") String query = "SELECT * FROM " + HUDI_MULTI_FG_PT_V8_MOR + " WHERE country='SG'"; assertFileSystemAccesses( @@ -101,7 +105,6 @@ public void testSelectWithFilter() @Test public void testJoin() - throws InterruptedException { @Language("SQL") String query = "SELECT t1.id, t1.name, t1.price, t1.ts FROM " + HUDI_MULTI_FG_PT_V8_MOR + " t1 " + @@ -111,14 +114,14 @@ public void testJoin() assertFileSystemAccesses(query, ImmutableMultiset.builder() .addCopies(new FileOperation("FileSystemCache.cacheInput", DATA), 6) - .addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 39) - .addCopies(new FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 54) + .addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 29) + .addCopies(new FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 40) .addCopies(new FileOperation("FileSystemCache.cacheStream", TIMELINE), 4) .addCopies(new FileOperation("FileSystemCache.cacheStream", LOG), 2) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 39) - .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 5) - .addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 3) - .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 5) + .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 29) + .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 4) + .addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 2) + .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4) .build()); assertFileSystemAccesses(query, @@ -136,37 +139,10 @@ public void testJoin() } private void assertFileSystemAccesses(@Language("SQL") String query, Multiset expectedCacheAccesses) - throws InterruptedException { DistributedQueryRunner queryRunner = getDistributedQueryRunner(); queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query); - // Async table-stats computation can outlive the synchronous query and emit spans into - // the exporter after execute returns. A fixed Thread.sleep races with this — when - // stats from query N is still running while query N+1's measurement happens, spans - // leak across the boundary and counts get scrambled (the symmetric off-by-N failure - // across paired tests). Poll until the span set is stable for two consecutive reads. - Multiset actual = waitForStableSpans(queryRunner); - assertMultisetsEqual(actual, expectedCacheAccesses); - } - - /** - * Returns the file-operation span set once two consecutive reads (200ms apart) agree. - * Bounded by a 30-second ceiling so a runaway test fails loudly instead of hanging. - */ - private static Multiset waitForStableSpans(QueryRunner queryRunner) - throws InterruptedException - { - long deadlineMillis = System.currentTimeMillis() + 30_000L; - Multiset previous = null; - while (System.currentTimeMillis() < deadlineMillis) { - Thread.sleep(200L); - Multiset current = getFileOperations(queryRunner); - if (previous != null && current.equals(previous)) { - return current; - } - previous = current; - } - return previous != null ? previous : getFileOperations(queryRunner); + assertMultisetsEqual(getFileOperations(queryRunner), expectedCacheAccesses); } private static Multiset getFileOperations(QueryRunner queryRunner) diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java index 9d6a6a8a52002..71518b9fc67b6 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java @@ -56,6 +56,11 @@ protected DistributedQueryRunner createQueryRunner() .put("hudi.metadata-enabled", "true") .put("hudi.metadata.cache.enabled", "false") .put("fs.cache.enabled", "false") + // Disable async table-statistics refresh: it reads the metadata table on a + // background executor whose spans can outlive the query and leak into the next + // test's measurement (the symmetric off-by-N flake). Disabling it makes the + // file-operation counts deterministic right after the query returns. + .put("hudi.table-statistics-enabled", "false") .buildOrThrow(); return HudiQueryRunner.builder() @@ -67,7 +72,6 @@ protected DistributedQueryRunner createQueryRunner() @Test public void testSelectWithFilter() - throws InterruptedException { @Language("SQL") String query = "SELECT * FROM " + HUDI_MULTI_FG_PT_V8_MOR + " WHERE country='SG'"; assertFileSystemAccesses( @@ -101,7 +105,6 @@ public void testSelectWithFilter() @Test public void testJoin() - throws InterruptedException { @Language("SQL") String query = "SELECT t1.id, t1.name, t1.price, t1.ts FROM " + HUDI_MULTI_FG_PT_V8_MOR + " t1 " + @@ -111,12 +114,12 @@ public void testJoin() assertFileSystemAccesses(query, ImmutableMultiset.builder() .addCopies(new FileOperationUtils.FileOperation("Input.readTail", DATA), 6) - .addCopies(new FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 39) - .addCopies(new FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 39) - .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 5) - .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 54) - .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 3) - .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 5) + .addCopies(new FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 29) + .addCopies(new FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 29) + .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 4) + .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 40) + .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 2) + .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TIMELINE), 4) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", LOG), 2) .build()); @@ -136,37 +139,10 @@ public void testJoin() } private void assertFileSystemAccesses(@Language("SQL") String query, Multiset expectedCacheAccesses) - throws InterruptedException { DistributedQueryRunner queryRunner = getDistributedQueryRunner(); queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query); - // Async table-stats computation can outlive the synchronous query and emit spans into - // the exporter after execute returns. A fixed Thread.sleep races with this — when - // stats from query N is still running while query N+1's measurement happens, spans - // leak across the boundary and counts get scrambled (the symmetric off-by-N failure - // across paired tests). Poll until the span set is stable for two consecutive reads. - Multiset actual = waitForStableSpans(queryRunner); - assertMultisetsEqual(actual, expectedCacheAccesses); - } - - /** - * Returns the file-operation span set once two consecutive reads (200ms apart) agree. - * Bounded by a 30-second ceiling so a runaway test fails loudly instead of hanging. - */ - private static Multiset waitForStableSpans(QueryRunner queryRunner) - throws InterruptedException - { - long deadlineMillis = System.currentTimeMillis() + 30_000L; - Multiset previous = null; - while (System.currentTimeMillis() < deadlineMillis) { - Thread.sleep(200L); - Multiset current = getFileOperations(queryRunner); - if (previous != null && current.equals(previous)) { - return current; - } - previous = current; - } - return previous != null ? previous : getFileOperations(queryRunner); + assertMultisetsEqual(getFileOperations(queryRunner), expectedCacheAccesses); } private static Multiset getFileOperations(QueryRunner queryRunner)