Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultiset;
import com.google.common.collect.Multiset;
import io.airlift.units.Duration;
import io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer;
import io.trino.plugin.hudi.util.FileOperationUtils.FileOperation;
import io.trino.testing.AbstractTestQueryFramework;
Expand All @@ -36,15 +37,16 @@
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getCacheOperationSpans;
import static io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer.TestingTable.HUDI_MULTI_FG_PT_V8_MOR;
import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.DATA;
import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.INDEX_DEFINITION;
import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.LOG;
import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.METADATA_TABLE;
import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.METADATA_TABLE_PROPERTIES;
import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.TABLE_PROPERTIES;
import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.TIMELINE;
import static io.trino.testing.MultisetAssertions.assertMultisetsEqual;
import static io.trino.testing.assertions.Assert.assertEventually;
import static java.util.stream.Collectors.toCollection;
import static org.assertj.core.api.Assertions.assertThat;

@ResourceLock("HUDI_CACHE_SYSTEM")
@Execution(ExecutionMode.SAME_THREAD)
Expand All @@ -66,10 +68,11 @@ protected DistributedQueryRunner createQueryRunner()
.put("fs.cache.directories", cacheDirectory.toAbsolutePath().toString())
.put("fs.cache.max-sizes", "100MB")
.put("hudi.metadata.cache.enabled", "false")
// Disable async table-statistics refresh: it reads the metadata table on a
// background executor whose spans can outlive the query and leak into the next
// test's measurement (the symmetric off-by-N flake). Disabling it makes the
// file-operation counts deterministic right after the query returns.
// Disable the async table-statistics refresh: on the first query it reads the index
// definitions and table-property files (and the metadata table) on a background
// executor. Those non-metadata-table reads land in the asserted set and their timing
// is non-deterministic, so we turn the refresh off and assert only the synchronous
// planning-path reads.
.put("hudi.table-statistics-enabled", "false")
.buildOrThrow();

Expand All @@ -87,12 +90,6 @@ public void testSelectWithFilter()
assertFileSystemAccesses(
query,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation("Alluxio.readCached", DATA), 2)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the test coverage on the caching functionality. Could we run the tests sequentially or independently to avoid race conditions that cause the flakiness?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done 189f4e0 - added testReadsServedFromAlluxioCache, which warms the cache and then asserts at least one Alluxio.readCached span. It is a lower bound rather than an exact count, so it is immune to the off-by-N leak, and it is wrapped in assertEventually to tolerate the asynchronous cache write. It passed 8 consecutive runs locally.

On running them sequentially or independently: these three classes already run isolated and single-threaded. Each is annotated @Execution(SAME_THREAD) and they share @ResourceLock("HUDI_CACHE_SYSTEM"), so they never run concurrently with one another and never interleave their own methods. The flake is not a race between tests. The spans leak because Trino resets the span exporter at the start of each executeWithPlan, while Hudi's background pools (split loading, partition listing, index support) and Alluxio's asynchronous cache population keep emitting spans after the synchronous query has returned, so those spans land in the next query's measurement window even on a single thread. That is why the earlier span-stability poll (#18766) and disabling the stats refresh (#18995) did not stop it, and why the remaining exact Alluxio.* counts cannot be made deterministic by ordering alone.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yihua could you please take a second look when you get chance.
It just fails too frequently, want to fix it asap.

.addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 27)
.addCopies(new FileOperation("Alluxio.readCached", TIMELINE), 4)
.addCopies(new FileOperation("Alluxio.readCached", LOG), 15)
.addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 4)
.addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 10)
.addCopies(new FileOperation("InputFile.length", TIMELINE), 2)
.addCopies(new FileOperation("InputFile.length", LOG), 1)
.addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 2)
Expand All @@ -103,12 +100,6 @@ public void testSelectWithFilter()
assertFileSystemAccesses(
query,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation("Alluxio.readCached", DATA), 2)
.addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 27)
.addCopies(new FileOperation("Alluxio.readCached", TIMELINE), 4)
.addCopies(new FileOperation("Alluxio.readCached", LOG), 15)
.addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 4)
.addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 10)
.addCopies(new FileOperation("InputFile.length", TIMELINE), 2)
.addCopies(new FileOperation("InputFile.length", LOG), 1)
.addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 2)
Expand All @@ -127,12 +118,6 @@ public void testJoin()

assertFileSystemAccesses(query,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation("Alluxio.readCached", DATA), 6)
.addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 215)
.addCopies(new FileOperation("Alluxio.readCached", TIMELINE), 8)
.addCopies(new FileOperation("Alluxio.readCached", LOG), 30)
.addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 29)
.addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 69)
.addCopies(new FileOperation("InputFile.length", TIMELINE), 4)
.addCopies(new FileOperation("InputFile.length", LOG), 2)
.addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 4)
Expand All @@ -142,12 +127,6 @@ public void testJoin()

assertFileSystemAccesses(query,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation("Alluxio.readCached", DATA), 6)
.addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 215)
.addCopies(new FileOperation("Alluxio.readCached", TIMELINE), 8)
.addCopies(new FileOperation("Alluxio.readCached", LOG), 30)
.addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 29)
.addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 69)
.addCopies(new FileOperation("InputFile.length", TIMELINE), 4)
.addCopies(new FileOperation("InputFile.length", LOG), 2)
.addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 4)
Expand All @@ -156,6 +135,40 @@ public void testJoin()
.build());
}

@Test
public void testReadsServedFromAlluxioCache()
{
// The tests above intentionally do not assert exact Alluxio cache hit/miss counts: the cache
// write is asynchronous, so the per-query counts flake (a write from one query can still be in
// flight when the next query runs). This test instead gives count-independent coverage that the
// Alluxio cache is actually engaged: once the cache is warmed, at least one read is served from
// it (an "Alluxio.readCached" span). assertEventually re-runs the query until the asynchronous
// cache write has landed and a genuine hit is observed, and fails loudly at the deadline if the
// cache never serves a read.
@Language("SQL") String query = "SELECT * FROM " + HUDI_MULTI_FG_PT_V8_MOR;
DistributedQueryRunner queryRunner = getDistributedQueryRunner();

// Warm the cache; the page write into Alluxio happens on a background thread.
queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);

assertEventually(
Duration.valueOf("30s"),
Duration.valueOf("500ms"),
() -> {
queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
assertThat(countCachedReads(queryRunner))
.as("Alluxio.readCached spans (cache hits)")
.isGreaterThanOrEqualTo(1);
});
}

private static long countCachedReads(QueryRunner queryRunner)
{
return getCacheOperationSpans(queryRunner).stream()
.filter(span -> span.getName().equals("Alluxio.readCached"))
.count();
}

private void assertFileSystemAccesses(@Language("SQL") String query, Multiset<FileOperation> expectedCacheAccesses)
{
DistributedQueryRunner queryRunner = getDistributedQueryRunner();
Expand All @@ -169,6 +182,14 @@ public static Multiset<FileOperation> getFileOperations(QueryRunner queryRunner)
.stream()
.filter(span -> !span.getName().startsWith("InputFile.exists"))
.map(FileOperation::create)
// Metadata-table reads are issued from Hudi background pools (split loading, partition
// listing, table-statistics refresh) whose spans can outlive the synchronous query and
// land in the next query's measurement window, so their per-query counts are not
// deterministic. Alluxio cache hits/misses (Alluxio.*) depend on whether an earlier
// asynchronous cache write had already completed, so their counts are not deterministic
// either. Both are excluded; only synchronous foreground reads are asserted.
.filter(operation -> operation.fileType() != METADATA_TABLE)
.filter(operation -> !operation.operationType().startsWith("Alluxio."))
.collect(toCollection(HashMultiset::create));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ protected DistributedQueryRunner createQueryRunner()
.put("hudi.metadata-enabled", "true")
.put("hudi.metadata.cache.enabled", "true")
.put("fs.cache.enabled", "false")
// Disable async table-statistics refresh: it reads the metadata table on a
// background executor whose spans can outlive the query and leak into the next
// test's measurement (the symmetric off-by-N flake). Disabling it makes the
// file-operation counts deterministic right after the query returns.
// Disable the async table-statistics refresh: on the first query it reads the index
// definitions and table-property files (and the metadata table) on a background
// executor. Those non-metadata-table reads land in the asserted set and their timing
// is non-deterministic, so we turn the refresh off and assert only the synchronous
// planning-path reads.
.put("hudi.table-statistics-enabled", "false")
.buildOrThrow();

Expand All @@ -78,11 +79,8 @@ public void testSelectWithFilter()
query,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation("FileSystemCache.cacheInput", DATA), 2)
.addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 4)
.addCopies(new FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 6)
.addCopies(new FileOperation("FileSystemCache.cacheStream", TIMELINE), 2)
.addCopies(new FileOperation("FileSystemCache.cacheStream", LOG), 1)
.addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 4)
.addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 2)
.add(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES))
.addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 2)
Expand All @@ -92,11 +90,8 @@ public void testSelectWithFilter()
query,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation("FileSystemCache.cacheInput", DATA), 2)
.addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 4)
.addCopies(new FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 6)
.addCopies(new FileOperation("FileSystemCache.cacheStream", TIMELINE), 2)
.addCopies(new FileOperation("FileSystemCache.cacheStream", LOG), 1)
.addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 4)
.addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 2)
.add(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES))
.addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 2)
Expand All @@ -114,11 +109,8 @@ public void testJoin()
assertFileSystemAccesses(query,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation("FileSystemCache.cacheInput", DATA), 6)
.addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 29)
.addCopies(new FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 40)
.addCopies(new FileOperation("FileSystemCache.cacheStream", TIMELINE), 4)
.addCopies(new FileOperation("FileSystemCache.cacheStream", LOG), 2)
.addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 29)
.addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 4)
.addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 2)
.addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4)
Expand All @@ -127,11 +119,8 @@ public void testJoin()
assertFileSystemAccesses(query,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation("FileSystemCache.cacheInput", DATA), 6)
.addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 29)
.addCopies(new FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 40)
.addCopies(new FileOperation("FileSystemCache.cacheStream", TIMELINE), 4)
.addCopies(new FileOperation("FileSystemCache.cacheStream", LOG), 2)
.addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 29)
.addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 4)
.addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 2)
.addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4)
Expand All @@ -153,6 +142,11 @@ private static Multiset<FileOperation> getFileOperations(QueryRunner queryRunner
.filter(span -> !span.getName().startsWith("InputFile.exists"))
.filter(span -> !isTrinoSchemaOrPermissions(getFileLocation(span)))
.map(FileOperation::create)
// Metadata-table reads are issued from Hudi background pools (split loading, partition
// listing, table-statistics refresh) whose spans can outlive the synchronous query and
// land in the next query's measurement window. Their per-query counts are therefore
// non-deterministic, so they are excluded; only synchronous foreground reads are asserted.
.filter(operation -> operation.fileType() != METADATA_TABLE)
.collect(toCollection(HashMultiset::create));
}
}
Loading
Loading