diff --git a/rust/lance/src/dataset/optimize/binary_copy.rs b/rust/lance/src/dataset/optimize/binary_copy.rs index be8ff325c7e..8fb175bb240 100644 --- a/rust/lance/src/dataset/optimize/binary_copy.rs +++ b/rust/lance/src/dataset/optimize/binary_copy.rs @@ -337,7 +337,9 @@ pub async fn rewrite_files_binary_copy( } batch_counts.push(current_page.buffer_offsets_and_sizes.len()); for (offset, size) in current_page.buffer_offsets_and_sizes.iter() { - batch_ranges.push((*offset)..(*offset + *size)); + if *size > 0 { + batch_ranges.push((*offset)..(*offset + *size)); + } } batch_bytes += page_bytes; batch_pages += 1; @@ -363,12 +365,19 @@ pub async fn rewrite_files_binary_copy( let page = &src_column_info.page_infos[page_index - batch_pages + local_idx]; let mut new_offsets = Vec::with_capacity(*buffer_count); - for _ in 0..*buffer_count { - if let Some(bytes) = bytes_iter.next() { - let writer = current_writer.as_mut().unwrap().as_mut(); - current_pos = - apply_alignment_padding(writer, current_pos, version).await?; - let start = current_pos; + for (_, size) in page.buffer_offsets_and_sizes.iter() { + let writer = current_writer.as_mut().unwrap().as_mut(); + current_pos = + apply_alignment_padding(writer, current_pos, version).await?; + let start = current_pos; + if *size == 0 { + new_offsets.push((start, 0)); + } else { + let bytes = bytes_iter.next().ok_or_else(|| { + Error::execution( + "binary copy: missing page buffer bytes while rewriting data file", + ) + })?; writer.write_all(&bytes).await?; current_pos += bytes.len() as u64; new_offsets.push((start, bytes.len() as u64)); @@ -410,16 +419,31 @@ pub async fn rewrite_files_binary_copy( let ranges: Vec> = src_column_info .buffer_offsets_and_sizes .iter() + .filter(|(_, size)| *size > 0) .map(|(offset, size)| (*offset)..(*offset + *size)) .collect(); - let bytes_vec = file_scheduler.submit_request(ranges, 0).await?; - for bytes in bytes_vec.into_iter() { + let bytes_vec = if ranges.is_empty() { + Vec::new() + } else { + file_scheduler.submit_request(ranges, 0).await? + }; + let mut bytes_iter = bytes_vec.into_iter(); + for (_, size) in src_column_info.buffer_offsets_and_sizes.iter() { let writer = current_writer.as_mut().unwrap().as_mut(); current_pos = apply_alignment_padding(writer, current_pos, version).await?; let start = current_pos; - writer.write_all(&bytes).await?; - current_pos += bytes.len() as u64; - col_buffers[col_idx].push((start, bytes.len() as u64)); + if *size == 0 { + col_buffers[col_idx].push((start, 0)); + } else { + let bytes = bytes_iter.next().ok_or_else(|| { + Error::execution( + "binary copy: missing column buffer bytes while rewriting data file", + ) + })?; + writer.write_all(&bytes).await?; + current_pos += bytes.len() as u64; + col_buffers[col_idx].push((start, bytes.len() as u64)); + } } } } // finished all columns in the current source file diff --git a/rust/lance/src/dataset/optimize/tests/binary_copy.rs b/rust/lance/src/dataset/optimize/tests/binary_copy.rs index f7041bcf38a..390ad4b8685 100644 --- a/rust/lance/src/dataset/optimize/tests/binary_copy.rs +++ b/rust/lance/src/dataset/optimize/tests/binary_copy.rs @@ -45,6 +45,57 @@ async fn do_test_binary_copy_merge_small_files(version: LanceFileVersion) { assert_eq!(before, after); } +#[tokio::test] +async fn test_binary_copy_empty_string_scalar_index() { + for version in LanceFileVersion::iter_non_legacy() { + do_test_binary_copy_empty_string_scalar_index(version).await; + } +} + +async fn do_test_binary_copy_empty_string_scalar_index(version: LanceFileVersion) { + use arrow_array::StringArray; + + let test_dir = TempStrDir::default(); + let schema = Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)])); + let data = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from_iter_values( + std::iter::repeat_n("", 4_000), + ))], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(data)], schema); + let mut dataset = Dataset::write( + reader, + &test_dir, + Some(WriteParams { + max_rows_per_file: 1_000, + data_storage_version: Some(version), + ..Default::default() + }), + ) + .await + .unwrap(); + + let options = CompactionOptions { + target_rows_per_fragment: 100_000, + compaction_mode: Some(CompactionMode::ForceBinaryCopy), + ..Default::default() + }; + compact_files(&mut dataset, options, None).await.unwrap(); + + dataset + .create_index( + &["text"], + IndexType::Scalar, + Some("text_idx".into()), + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); +} + #[tokio::test] async fn test_binary_copy_with_defer_remap() { for version in LanceFileVersion::iter_non_legacy() {