Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions rust/lance/src/dataset/optimize/binary_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,9 @@ pub async fn rewrite_files_binary_copy(
}
batch_counts.push(current_page.buffer_offsets_and_sizes.len());
for (offset, size) in current_page.buffer_offsets_and_sizes.iter() {
batch_ranges.push((*offset)..(*offset + *size));
if *size > 0 {
batch_ranges.push((*offset)..(*offset + *size));
}
}
batch_bytes += page_bytes;
batch_pages += 1;
Expand All @@ -363,12 +365,19 @@ pub async fn rewrite_files_binary_copy(
let page =
&src_column_info.page_infos[page_index - batch_pages + local_idx];
let mut new_offsets = Vec::with_capacity(*buffer_count);
for _ in 0..*buffer_count {
if let Some(bytes) = bytes_iter.next() {
let writer = current_writer.as_mut().unwrap().as_mut();
current_pos =
apply_alignment_padding(writer, current_pos, version).await?;
let start = current_pos;
for (_, size) in page.buffer_offsets_and_sizes.iter() {
let writer = current_writer.as_mut().unwrap().as_mut();
current_pos =
apply_alignment_padding(writer, current_pos, version).await?;
let start = current_pos;
if *size == 0 {
new_offsets.push((start, 0));
} else {
let bytes = bytes_iter.next().ok_or_else(|| {
Error::execution(
"binary copy: missing page buffer bytes while rewriting data file",
)
})?;
writer.write_all(&bytes).await?;
current_pos += bytes.len() as u64;
new_offsets.push((start, bytes.len() as u64));
Expand Down Expand Up @@ -410,16 +419,31 @@ pub async fn rewrite_files_binary_copy(
let ranges: Vec<Range<u64>> = src_column_info
.buffer_offsets_and_sizes
.iter()
.filter(|(_, size)| *size > 0)
.map(|(offset, size)| (*offset)..(*offset + *size))
.collect();
let bytes_vec = file_scheduler.submit_request(ranges, 0).await?;
for bytes in bytes_vec.into_iter() {
let bytes_vec = if ranges.is_empty() {
Vec::new()
} else {
file_scheduler.submit_request(ranges, 0).await?
};
let mut bytes_iter = bytes_vec.into_iter();
for (_, size) in src_column_info.buffer_offsets_and_sizes.iter() {
let writer = current_writer.as_mut().unwrap().as_mut();
current_pos = apply_alignment_padding(writer, current_pos, version).await?;
let start = current_pos;
writer.write_all(&bytes).await?;
current_pos += bytes.len() as u64;
col_buffers[col_idx].push((start, bytes.len() as u64));
if *size == 0 {
col_buffers[col_idx].push((start, 0));
} else {
let bytes = bytes_iter.next().ok_or_else(|| {
Error::execution(
"binary copy: missing column buffer bytes while rewriting data file",
)
})?;
writer.write_all(&bytes).await?;
current_pos += bytes.len() as u64;
col_buffers[col_idx].push((start, bytes.len() as u64));
}
}
}
} // finished all columns in the current source file
Expand Down
51 changes: 51 additions & 0 deletions rust/lance/src/dataset/optimize/tests/binary_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,57 @@ async fn do_test_binary_copy_merge_small_files(version: LanceFileVersion) {
assert_eq!(before, after);
}

#[tokio::test]
async fn test_binary_copy_empty_string_scalar_index() {
for version in LanceFileVersion::iter_non_legacy() {
do_test_binary_copy_empty_string_scalar_index(version).await;
}
}

async fn do_test_binary_copy_empty_string_scalar_index(version: LanceFileVersion) {
use arrow_array::StringArray;

let test_dir = TempStrDir::default();
let schema = Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from_iter_values(
std::iter::repeat_n("", 4_000),
))],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(data)], schema);
let mut dataset = Dataset::write(
reader,
&test_dir,
Some(WriteParams {
max_rows_per_file: 1_000,
data_storage_version: Some(version),
..Default::default()
}),
)
.await
.unwrap();

let options = CompactionOptions {
target_rows_per_fragment: 100_000,
compaction_mode: Some(CompactionMode::ForceBinaryCopy),
..Default::default()
};
compact_files(&mut dataset, options, None).await.unwrap();

dataset
.create_index(
&["text"],
IndexType::Scalar,
Some("text_idx".into()),
&ScalarIndexParams::default(),
false,
)
.await
.unwrap();
}

#[tokio::test]
async fn test_binary_copy_with_defer_remap() {
for version in LanceFileVersion::iter_non_legacy() {
Expand Down
Loading