Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ protected DistributedQueryRunner createQueryRunner()
.put("fs.cache.directories", cacheDirectory.toAbsolutePath().toString())
.put("fs.cache.max-sizes", "100MB")
.put("hudi.metadata.cache.enabled", "false")
// Disable async table-statistics refresh: it reads the metadata table on a
// background executor whose spans can outlive the query and leak into the next
// test's measurement (the symmetric off-by-N flake). Disabling it makes the
// file-operation counts deterministic right after the query returns.
.put("hudi.table-statistics-enabled", "false")
.buildOrThrow();

return HudiQueryRunner.builder()
Expand All @@ -77,7 +82,6 @@ protected DistributedQueryRunner createQueryRunner()

@Test
public void testSelectWithFilter()
throws InterruptedException
{
@Language("SQL") String query = "SELECT * FROM " + HUDI_MULTI_FG_PT_V8_MOR + " WHERE country='SG'";
assertFileSystemAccesses(
Expand Down Expand Up @@ -115,7 +119,6 @@ public void testSelectWithFilter()

@Test
public void testJoin()
throws InterruptedException
{
@Language("SQL") String query = "SELECT t1.id, t1.name, t1.price, t1.ts FROM " +
HUDI_MULTI_FG_PT_V8_MOR + " t1 " +
Expand All @@ -125,16 +128,16 @@ public void testJoin()
assertFileSystemAccesses(query,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation("Alluxio.readCached", DATA), 6)
.addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 288)
.addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 215)
.addCopies(new FileOperation("Alluxio.readCached", TIMELINE), 8)
.addCopies(new FileOperation("Alluxio.readCached", LOG), 30)
.addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 39)
.addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 93)
.addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 29)
.addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 69)
.addCopies(new FileOperation("InputFile.length", TIMELINE), 4)
.addCopies(new FileOperation("InputFile.length", LOG), 2)
.addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 5)
.addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 3)
.addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 5)
.addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 4)
.addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 2)
.addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4)
.build());

assertFileSystemAccesses(query,
Expand All @@ -154,37 +157,10 @@ public void testJoin()
}

private void assertFileSystemAccesses(@Language("SQL") String query, Multiset<FileOperation> expectedCacheAccesses)
throws InterruptedException
{
DistributedQueryRunner queryRunner = getDistributedQueryRunner();
queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
// Async table-stats computation can outlive the synchronous query and emit spans into
// the exporter after execute returns. A fixed Thread.sleep races with this — when
// stats from query N is still running while query N+1's measurement happens, spans
// leak across the boundary and counts get scrambled (the symmetric off-by-N failure
// across paired tests). Poll until the span set is stable for two consecutive reads.
Multiset<FileOperation> actual = waitForStableSpans(queryRunner);
assertMultisetsEqual(actual, expectedCacheAccesses);
}

/**
* Returns the file-operation span set once two consecutive reads (200ms apart) agree.
* Bounded by a 30-second ceiling so a runaway test fails loudly instead of hanging.
*/
private static Multiset<FileOperation> waitForStableSpans(QueryRunner queryRunner)
throws InterruptedException
{
long deadlineMillis = System.currentTimeMillis() + 30_000L;
Multiset<FileOperation> previous = null;
while (System.currentTimeMillis() < deadlineMillis) {
Thread.sleep(200L);
Multiset<FileOperation> current = getFileOperations(queryRunner);
if (previous != null && current.equals(previous)) {
return current;
}
previous = current;
}
return previous != null ? previous : getFileOperations(queryRunner);
assertMultisetsEqual(getFileOperations(queryRunner), expectedCacheAccesses);
}

public static Multiset<FileOperation> getFileOperations(QueryRunner queryRunner)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ protected DistributedQueryRunner createQueryRunner()
.put("hudi.metadata-enabled", "true")
.put("hudi.metadata.cache.enabled", "true")
.put("fs.cache.enabled", "false")
// Disable async table-statistics refresh: it reads the metadata table on a
// background executor whose spans can outlive the query and leak into the next
// test's measurement (the symmetric off-by-N flake). Disabling it makes the
// file-operation counts deterministic right after the query returns.
.put("hudi.table-statistics-enabled", "false")
.buildOrThrow();

return HudiQueryRunner.builder()
Expand All @@ -67,7 +72,6 @@ protected DistributedQueryRunner createQueryRunner()

@Test
public void testSelectWithFilter()
throws InterruptedException
{
@Language("SQL") String query = "SELECT * FROM " + HUDI_MULTI_FG_PT_V8_MOR + " WHERE country='SG'";
assertFileSystemAccesses(
Expand Down Expand Up @@ -101,7 +105,6 @@ public void testSelectWithFilter()

@Test
public void testJoin()
throws InterruptedException
{
@Language("SQL") String query = "SELECT t1.id, t1.name, t1.price, t1.ts FROM " +
HUDI_MULTI_FG_PT_V8_MOR + " t1 " +
Expand All @@ -111,14 +114,14 @@ public void testJoin()
assertFileSystemAccesses(query,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation("FileSystemCache.cacheInput", DATA), 6)
.addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 39)
.addCopies(new FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 54)
.addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 29)
.addCopies(new FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 40)
.addCopies(new FileOperation("FileSystemCache.cacheStream", TIMELINE), 4)
.addCopies(new FileOperation("FileSystemCache.cacheStream", LOG), 2)
.addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 39)
.addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 5)
.addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 3)
.addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 5)
.addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 29)
.addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 4)
.addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 2)
.addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4)
.build());

assertFileSystemAccesses(query,
Expand All @@ -136,37 +139,10 @@ public void testJoin()
}

private void assertFileSystemAccesses(@Language("SQL") String query, Multiset<FileOperation> expectedCacheAccesses)
throws InterruptedException
{
DistributedQueryRunner queryRunner = getDistributedQueryRunner();
queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
// Async table-stats computation can outlive the synchronous query and emit spans into
// the exporter after execute returns. A fixed Thread.sleep races with this — when
// stats from query N is still running while query N+1's measurement happens, spans
// leak across the boundary and counts get scrambled (the symmetric off-by-N failure
// across paired tests). Poll until the span set is stable for two consecutive reads.
Multiset<FileOperation> actual = waitForStableSpans(queryRunner);
assertMultisetsEqual(actual, expectedCacheAccesses);
}

/**
* Returns the file-operation span set once two consecutive reads (200ms apart) agree.
* Bounded by a 30-second ceiling so a runaway test fails loudly instead of hanging.
*/
private static Multiset<FileOperation> waitForStableSpans(QueryRunner queryRunner)
throws InterruptedException
{
long deadlineMillis = System.currentTimeMillis() + 30_000L;
Multiset<FileOperation> previous = null;
while (System.currentTimeMillis() < deadlineMillis) {
Thread.sleep(200L);
Multiset<FileOperation> current = getFileOperations(queryRunner);
if (previous != null && current.equals(previous)) {
return current;
}
previous = current;
}
return previous != null ? previous : getFileOperations(queryRunner);
assertMultisetsEqual(getFileOperations(queryRunner), expectedCacheAccesses);
}

private static Multiset<FileOperation> getFileOperations(QueryRunner queryRunner)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ protected DistributedQueryRunner createQueryRunner()
.put("hudi.metadata-enabled", "true")
.put("hudi.metadata.cache.enabled", "false")
.put("fs.cache.enabled", "false")
// Disable async table-statistics refresh: it reads the metadata table on a
// background executor whose spans can outlive the query and leak into the next
// test's measurement (the symmetric off-by-N flake). Disabling it makes the
// file-operation counts deterministic right after the query returns.
.put("hudi.table-statistics-enabled", "false")
.buildOrThrow();

return HudiQueryRunner.builder()
Expand All @@ -67,7 +72,6 @@ protected DistributedQueryRunner createQueryRunner()

@Test
public void testSelectWithFilter()
throws InterruptedException
{
@Language("SQL") String query = "SELECT * FROM " + HUDI_MULTI_FG_PT_V8_MOR + " WHERE country='SG'";
assertFileSystemAccesses(
Expand Down Expand Up @@ -101,7 +105,6 @@ public void testSelectWithFilter()

@Test
public void testJoin()
throws InterruptedException
{
@Language("SQL") String query = "SELECT t1.id, t1.name, t1.price, t1.ts FROM " +
HUDI_MULTI_FG_PT_V8_MOR + " t1 " +
Expand All @@ -111,12 +114,12 @@ public void testJoin()
assertFileSystemAccesses(query,
ImmutableMultiset.<FileOperationUtils.FileOperation>builder()
.addCopies(new FileOperationUtils.FileOperation("Input.readTail", DATA), 6)
.addCopies(new FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 39)
.addCopies(new FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 39)
.addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 5)
.addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 54)
.addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 3)
.addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 5)
.addCopies(new FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 29)
.addCopies(new FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 29)
.addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 4)
.addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 40)
.addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 2)
.addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4)
.addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TIMELINE), 4)
.addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", LOG), 2)
.build());
Expand All @@ -136,37 +139,10 @@ public void testJoin()
}

private void assertFileSystemAccesses(@Language("SQL") String query, Multiset<FileOperationUtils.FileOperation> expectedCacheAccesses)
throws InterruptedException
{
DistributedQueryRunner queryRunner = getDistributedQueryRunner();
queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
// Async table-stats computation can outlive the synchronous query and emit spans into
// the exporter after execute returns. A fixed Thread.sleep races with this — when
// stats from query N is still running while query N+1's measurement happens, spans
// leak across the boundary and counts get scrambled (the symmetric off-by-N failure
// across paired tests). Poll until the span set is stable for two consecutive reads.
Multiset<FileOperationUtils.FileOperation> actual = waitForStableSpans(queryRunner);
assertMultisetsEqual(actual, expectedCacheAccesses);
}

/**
* Returns the file-operation span set once two consecutive reads (200ms apart) agree.
* Bounded by a 30-second ceiling so a runaway test fails loudly instead of hanging.
*/
private static Multiset<FileOperationUtils.FileOperation> waitForStableSpans(QueryRunner queryRunner)
throws InterruptedException
{
long deadlineMillis = System.currentTimeMillis() + 30_000L;
Multiset<FileOperationUtils.FileOperation> previous = null;
while (System.currentTimeMillis() < deadlineMillis) {
Thread.sleep(200L);
Multiset<FileOperationUtils.FileOperation> current = getFileOperations(queryRunner);
if (previous != null && current.equals(previous)) {
return current;
}
previous = current;
}
return previous != null ? previous : getFileOperations(queryRunner);
assertMultisetsEqual(getFileOperations(queryRunner), expectedCacheAccesses);
}

private static Multiset<FileOperationUtils.FileOperation> getFileOperations(QueryRunner queryRunner)
Expand Down
Loading