diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java index 54c3c7f948..3412829fa7 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java @@ -32,7 +32,6 @@ import org.apache.fluss.utils.ExceptionUtils; import org.apache.fluss.utils.types.Tuple2; -import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryString; @@ -495,16 +494,10 @@ private LakeSnapshot getOrFetchLakeSnapshot(long snapshotId, Map bucketsWithoutL0 = new HashSet<>(); Set bucketsWithL0 = new HashSet<>(); - // Scan the snapshot to get all splits including level0 - Map scanOptions = new HashMap<>(); - scanOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), String.valueOf(snapshot.id())); - // hacky: set batch scan mode to compact to make sure we can get l0 level files - scanOptions.put( - CoreOptions.BATCH_SCAN_MODE.key(), CoreOptions.BatchScanMode.COMPACT.getValue()); - + // Scan the snapshot to get all data files including L0 level files Map>> manifestsByBucket = FileStoreScan.Plan.groupByPartFiles( - fileStoreTable.copy(scanOptions).store().newScan().plan().files()); + fileStoreTable.store().newScan().withSnapshot(snapshot).plan().files()); for (Map.Entry>> manifestsByBucketEntry : manifestsByBucket.entrySet()) {