diff --git a/rust/lance-core/src/datatypes.rs b/rust/lance-core/src/datatypes.rs index 2fc76605175..026e6b0bbe9 100644 --- a/rust/lance-core/src/datatypes.rs +++ b/rust/lance-core/src/datatypes.rs @@ -70,6 +70,26 @@ pub static BLOB_V2_DESC_FIELD: LazyLock = LazyLock::new(|| { pub static BLOB_V2_DESC_LANCE_FIELD: LazyLock = LazyLock::new(|| Field::try_from(&*BLOB_V2_DESC_FIELD).unwrap()); +/// Blob v2 user-view struct fields used by internal rewrite paths. +/// +/// This schema converts the descriptor view back into the write-side view used +/// by blob compaction. +pub static BLOB_V2_USER_FIELDS: LazyLock = LazyLock::new(|| { + Fields::from(vec![ + ArrowField::new("data", DataType::LargeBinary, true), + ArrowField::new("uri", DataType::Utf8, true), + ArrowField::new("position", DataType::UInt64, true), + ArrowField::new("size", DataType::UInt64, true), + ]) +}); + +/// Blob v2 user-view struct type used by internal rewrite paths. +/// +/// This schema converts the descriptor view back into the write-side view used +/// by blob compaction. +pub static BLOB_V2_USER_TYPE: LazyLock = + LazyLock::new(|| DataType::Struct(BLOB_V2_USER_FIELDS.clone())); + pub const BLOB_LOGICAL_TYPE: &str = "blob"; /// LogicalType is a string presentation of arrow type. diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 3076a8aa7a8..5a10fa2d014 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -99,11 +99,18 @@ use crate::Result; use crate::dataset::utils::CapturedRowIds; use crate::index::DatasetIndexExt; use crate::io::commit::{commit_transaction, migrate_fragments}; +use arrow::array::AsArray; +use arrow::datatypes::{UInt8Type, UInt32Type, UInt64Type}; +use arrow_array::Array; +use arrow_array::RecordBatch; +use arrow_array::StructArray; +use arrow_array::builder::{LargeBinaryBuilder, PrimitiveBuilder, StringBuilder}; +use arrow_buffer::NullBuffer; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use futures::{StreamExt, TryStreamExt}; use lance_core::Error; -use lance_core::datatypes::BlobHandling; +use lance_core::datatypes::{BlobHandling, BlobKind}; use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::utils::tracing::{DATASET_COMPACTING_EVENT, TRACE_DATASET_EVENTS}; use lance_index::frag_reuse::FragReuseGroup; @@ -859,6 +866,326 @@ impl CompactionPlan { } } +/// Classification for one blob v2 row during compaction. +/// +/// - `Null`: NULL row or Inline blob with position=0 and size=0. +/// - `External`: External blob referenced by URI. +/// - `DataBlob`: Inline/Packed/Dedicated blob stored in Lance files. +enum RowClass { + Null, + External, + DataBlob, +} + +/// Check if a row is a null Inline blob. +/// +/// This matches `BlobV2StructuralEncoder`'s behavior of encoding null rows as +/// Inline with position=0 and size=0, and `collect_blob_entries_v2`'s behavior +/// of skipping them. +fn is_inline_null_blob( + kind: BlobKind, + position_col: &arrow::array::UInt64Array, + size_col: &arrow::array::UInt64Array, + index: usize, +) -> bool { + if kind != BlobKind::Inline { + return false; + } + let position_is_empty = position_col.is_null(index) || position_col.value(index) == 0; + let size_is_empty = size_col.is_null(index) || size_col.value(index) == 0; + position_is_empty && size_is_empty +} + +/// Column views for the 5 fields in a blob v2 descriptor struct. +struct BlobV2Descriptor<'a> { + kind_col: &'a arrow::array::UInt8Array, + position_col: &'a arrow::array::UInt64Array, + size_col: &'a arrow::array::UInt64Array, + blob_uri_col: &'a arrow::array::StringArray, + blob_id_col: &'a arrow::array::UInt32Array, +} + +impl<'a> BlobV2Descriptor<'a> { + /// Extract the 5 descriptor arrays from a blob v2 descriptor struct array. + fn try_from_struct(struct_arr: &'a StructArray, column_name: &str) -> Result { + let kind_col = struct_arr + .column_by_name("kind") + .ok_or_else(|| { + Error::internal(format!( + "Blob v2 descriptor for column '{}' missing `kind` field", + column_name + )) + })? + .as_primitive::(); + let position_col = struct_arr + .column_by_name("position") + .ok_or_else(|| { + Error::internal(format!( + "Blob v2 descriptor for column '{}' missing `position` field", + column_name + )) + })? + .as_primitive::(); + let size_col = struct_arr + .column_by_name("size") + .ok_or_else(|| { + Error::internal(format!( + "Blob v2 descriptor for column '{}' missing `size` field", + column_name + )) + })? + .as_primitive::(); + let blob_uri_col = struct_arr + .column_by_name("blob_uri") + .ok_or_else(|| { + Error::internal(format!( + "Blob v2 descriptor for column '{}' missing `blob_uri` field", + column_name + )) + })? + .as_string::(); + let blob_id_col = struct_arr + .column_by_name("blob_id") + .ok_or_else(|| { + Error::internal(format!( + "Blob v2 descriptor for column '{}' missing `blob_id` field", + column_name + )) + })? + .as_primitive::(); + Ok(Self { + kind_col, + position_col, + size_col, + blob_uri_col, + blob_id_col, + }) + } +} + +/// Result of row classification for blob v2 compaction. +struct RowClassification { + row_classes: Vec, + blob_read_addrs: Vec, +} + +/// Classify each row of a blob v2 column as Null, External, or DataBlob. +fn classify_rows( + struct_arr: &StructArray, + descriptor: &BlobV2Descriptor<'_>, + row_addrs: &arrow::array::UInt64Array, + column_name: &str, +) -> Result { + let num_rows = struct_arr.len(); + let mut row_classes = Vec::with_capacity(num_rows); + let mut blob_read_addrs = Vec::with_capacity(num_rows); + + for i in 0..num_rows { + if struct_arr.is_null(i) || descriptor.kind_col.is_null(i) { + row_classes.push(RowClass::Null); + } else { + let kind = BlobKind::try_from(descriptor.kind_col.value(i)).map_err(|e| { + Error::internal(format!( + "Blob v2 column '{}' has invalid kind at row {}: {e}", + column_name, i + )) + })?; + if kind == BlobKind::External { + row_classes.push(RowClass::External); + } else if is_inline_null_blob(kind, descriptor.position_col, descriptor.size_col, i) { + row_classes.push(RowClass::Null); + } else { + row_classes.push(RowClass::DataBlob); + blob_read_addrs.push(row_addrs.value(i)); + } + } + } + + Ok(RowClassification { + row_classes, + blob_read_addrs, + }) +} + +/// Build a blob v2 user-view struct array from classification and descriptor. +/// +/// Reads blob data lazily using row addresses to avoid materializing all blob +/// payloads in memory at once. +async fn build_user_view_struct( + dataset: &Arc, + descriptor: &BlobV2Descriptor<'_>, + classification: &RowClassification, + column_name: &str, + num_rows: usize, + null_buffer: Option, +) -> Result { + let blob_files = if classification.blob_read_addrs.is_empty() { + Vec::new() + } else { + super::blob::take_blobs_by_addresses(dataset, &classification.blob_read_addrs, column_name) + .await? + }; + + let mut data_builder = LargeBinaryBuilder::with_capacity(num_rows, 0); + let mut uri_builder = StringBuilder::with_capacity(num_rows, 0); + let mut out_position_builder = PrimitiveBuilder::::with_capacity(num_rows); + let mut out_size_builder = PrimitiveBuilder::::with_capacity(num_rows); + + let mut blob_file_idx = 0; + #[allow(clippy::needless_range_loop)] + for i in 0..num_rows { + match classification.row_classes[i] { + RowClass::Null => { + data_builder.append_null(); + uri_builder.append_null(); + out_position_builder.append_null(); + out_size_builder.append_null(); + } + RowClass::External => { + data_builder.append_null(); + let base_id = descriptor.blob_id_col.value(i); + let uri_val = descriptor.blob_uri_col.value(i); + if base_id == 0 { + uri_builder.append_value(uri_val); + } else { + let base = dataset.manifest().base_paths.get(&base_id).ok_or_else(|| { + Error::internal(format!( + "External blob in column '{}' references unknown base_id {}", + column_name, base_id + )) + })?; + let absolute_uri = format!("{}/{}", base.path.trim_end_matches('/'), uri_val); + uri_builder.append_value(&absolute_uri); + } + if descriptor.position_col.is_null(i) { + out_position_builder.append_null(); + } else { + out_position_builder.append_value(descriptor.position_col.value(i)); + } + if descriptor.size_col.is_null(i) { + out_size_builder.append_null(); + } else { + out_size_builder.append_value(descriptor.size_col.value(i)); + } + } + RowClass::DataBlob => { + let data = blob_files[blob_file_idx].read().await?; + blob_file_idx += 1; + data_builder.append_value(data.as_ref()); + uri_builder.append_null(); + out_position_builder.append_null(); + out_size_builder.append_null(); + } + } + } + + Ok(StructArray::try_new( + lance_core::datatypes::BLOB_V2_USER_FIELDS.clone(), + vec![ + Arc::new(data_builder.finish()), + Arc::new(uri_builder.finish()), + Arc::new(out_position_builder.finish()), + Arc::new(out_size_builder.finish()), + ], + null_buffer, + )?) +} + +async fn transform_blob_v2_batch( + dataset: &Arc, + schema: &lance_core::datatypes::Schema, + batch: RecordBatch, +) -> Result { + let row_addr_idx = batch + .schema() + .column_with_name(lance_core::ROW_ADDR) + .ok_or_else(|| { + Error::internal(format!( + "_rowaddr column missing from batch for blob v2 compaction, columns: {:?}", + batch + .schema() + .fields() + .iter() + .map(|f| f.name()) + .collect::>() + )) + })? + .0; + let row_addrs = batch.column(row_addr_idx).as_primitive::(); + + let mut new_columns: Vec> = Vec::new(); + let mut new_fields: Vec> = Vec::new(); + + let batch_schema = batch.schema(); + for (col_idx, field) in batch_schema.fields().iter().enumerate() { + if field.name() == lance_core::ROW_ADDR { + continue; + } + + let lance_field = schema.field(field.name()); + let is_blob_v2 = lance_field.is_some_and(|f| f.is_blob_v2()); + + if !is_blob_v2 { + new_columns.push(batch.column(col_idx).clone()); + new_fields.push(field.clone()); + continue; + } + + let struct_arr = batch + .column(col_idx) + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::internal(format!( + "Blob v2 column '{}' expected StructArray, got {:?}", + field.name(), + batch.column(col_idx).data_type() + )) + })?; + + let column_name = field.name(); + let descriptor = BlobV2Descriptor::try_from_struct(struct_arr, column_name)?; + let classification = classify_rows(struct_arr, &descriptor, row_addrs, column_name)?; + let num_rows = struct_arr.len(); + + let new_struct = build_user_view_struct( + dataset, + &descriptor, + &classification, + column_name, + num_rows, + struct_arr.nulls().cloned(), + ) + .await?; + + new_columns.push(Arc::new(new_struct)); + let logical_field = arrow_schema::Field::from(lance_field.ok_or_else(|| { + Error::internal(format!( + "Blob v2 column '{}' missing from dataset schema during compaction", + field.name() + )) + })?); + new_fields.push(Arc::new( + arrow_schema::Field::new( + field.name(), + lance_core::datatypes::BLOB_V2_USER_TYPE.clone(), + field.is_nullable(), + ) + .with_metadata(logical_field.metadata().clone()), + )); + } + + let new_schema = Arc::new(arrow_schema::Schema::new_with_metadata( + new_fields + .iter() + .map(|f| f.as_ref().clone()) + .collect::>(), + batch_schema.metadata().clone(), + )); + + Ok(RecordBatch::try_new(new_schema, new_columns)?) +} + /// Build a scan reader for rewrite and optionally capture row IDs. /// /// Parameters: @@ -877,6 +1204,7 @@ impl CompactionPlan { /// to feed the rewrite path. /// - `Option>`: A receiver to obtain captured row IDs after the /// stream completes; `None` if not capturing. +/// - `bool`: Whether the dataset has blob v2 columns and the stream includes `_rowaddr`. async fn prepare_reader( dataset: &Dataset, fragments: &[Fragment], @@ -886,15 +1214,23 @@ async fn prepare_reader( ) -> Result<( SendableRecordBatchStream, Option>, + bool, )> { let mut scanner = dataset.scan(); - let has_blob_columns = dataset + let has_legacy_blob_columns = dataset .schema() .fields_pre_order() - .any(|field| field.is_blob()); - if has_blob_columns { + .any(|field| field.is_blob() && !field.is_blob_v2()); + if has_legacy_blob_columns { scanner.blob_handling(BlobHandling::AllBinary); } + let has_blob_v2_columns = dataset + .schema() + .fields_pre_order() + .any(|field| field.is_blob_v2()); + if has_blob_v2_columns { + scanner.with_row_address(); + } if let Some(bs) = batch_size { scanner.batch_size(bs); } @@ -908,11 +1244,12 @@ async fn prepare_reader( let data = SendableRecordBatchStream::from(scanner.try_into_stream().await?); let (data_no_row_ids, rx) = make_rowid_capture_stream(data, dataset.manifest.uses_stable_row_ids())?; - Ok((data_no_row_ids, Some(rx))) + Ok((data_no_row_ids, Some(rx), has_blob_v2_columns)) } else { Ok(( SendableRecordBatchStream::from(scanner.try_into_stream().await?), None, + has_blob_v2_columns, )) } } @@ -1174,7 +1511,7 @@ async fn rewrite_files( let mut reader: Option = None; if !can_binary_copy { - let (prepared_reader, rx_initial) = prepare_reader( + let (prepared_reader, rx_initial, has_blob_v2_columns) = prepare_reader( dataset.as_ref(), &fragments, options.batch_size, @@ -1195,16 +1532,69 @@ async fn rewrite_files( num_rows, ); }); - reader = Some(Box::pin(RecordBatchStreamAdapter::new( - schema, - reader_with_progress, - ))); + + if has_blob_v2_columns { + let dataset_arc = Arc::new(dataset.as_ref().clone()); + let dataset_schema = dataset.schema().clone(); + let transformed = reader_with_progress.then(move |batch_result| { + let dataset = dataset_arc.clone(); + let schema = dataset_schema.clone(); + async move { + let batch = batch_result?; + transform_blob_v2_batch(&dataset, &schema, batch) + .await + .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e))) + } + }); + let transformed_schema = { + let mut fields: Vec> = Vec::new(); + for field in schema.fields().iter() { + if field.name() == lance_core::ROW_ADDR { + continue; + } + let lance_field = dataset.schema().field(field.name()); + if let Some(lance_field) = lance_field.filter(|f| f.is_blob_v2()) { + let logical_field = arrow_schema::Field::from(lance_field); + fields.push(Arc::new( + arrow_schema::Field::new( + field.name(), + lance_core::datatypes::BLOB_V2_USER_TYPE.clone(), + field.is_nullable(), + ) + .with_metadata(logical_field.metadata().clone()), + )); + } else { + fields.push(field.clone()); + } + } + Arc::new(arrow_schema::Schema::new_with_metadata( + fields + .iter() + .map(|f| f.as_ref().clone()) + .collect::>(), + schema.metadata().clone(), + )) + }; + reader = Some(Box::pin(RecordBatchStreamAdapter::new( + transformed_schema, + transformed, + ))); + } else { + reader = Some(Box::pin(RecordBatchStreamAdapter::new( + schema, + reader_with_progress, + ))); + } } let mut params = WriteParams { max_rows_per_file: options.target_rows_per_fragment, max_rows_per_group: options.max_rows_per_group, mode: WriteMode::Append, + // External blobs may reference URIs outside the dataset's base_paths + // (e.g. absolute file:// URIs with base_id == 0). Without this flag + // the writer would reject such blobs. + allow_external_blob_outside_bases: true, ..Default::default() }; if let Some(max_bytes_per_file) = options.max_bytes_per_file { @@ -1676,6 +2066,7 @@ mod tests { use async_trait::async_trait; use lance_arrow::BLOB_META_KEY; use lance_core::Error; + use lance_core::ROW_ID; use lance_core::utils::address::RowAddress; use lance_core::utils::tempfile::TempStrDir; use lance_datagen::Dimension; @@ -4841,7 +5232,6 @@ mod tests { ); } - /// Returns the number of files in `/data/`. fn count_data_files_in(base_dir: &str) -> usize { let data_dir = std::path::Path::new(base_dir).join("data"); if !data_dir.exists() { @@ -4932,4 +5322,1031 @@ mod tests { "Compaction data files should be cleaned up when commit fails" ); } + + async fn read_blob_bytes_by_index( + dataset: &Arc, + column: &str, + ) -> Vec<(i32, Option>)> { + let mut scanner = dataset.scan(); + scanner.with_row_id(); + let batch = scanner + .project(&["id", column]) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let ids = batch + .column_by_name("id") + .unwrap() + .as_primitive::(); + let row_ids = batch + .column_by_name(ROW_ID) + .unwrap() + .as_primitive::(); + + let mut result = Vec::with_capacity(batch.num_rows()); + for i in 0..batch.num_rows() { + let row_id = row_ids.value(i); + let id = ids.value(i); + let blobs = dataset.take_blobs(&[row_id], column).await.unwrap(); + if blobs.is_empty() { + result.push((id, None)); + } else { + let data = blobs[0].read().await.unwrap(); + result.push((id, Some(data.to_vec()))); + } + } + result + } + + #[tokio::test] + async fn test_compact_blob_v2_preserves_external_references() { + use crate::BlobArrayBuilder; + use lance_core::utils::tempfile::TempDir; + use lance_table::format::BasePath; + + let test_dir = TempDir::default(); + let external_dir = TempDir::default(); + let external_path = external_dir.std_path().join("external.bin"); + std::fs::write(&external_path, b"external-data").unwrap(); + let external_uri = format!("file://{}", external_path.display()); + let base_uri = format!("file://{}", external_dir.std_path().display()); + + let mut blob_builder = BlobArrayBuilder::new(2); + blob_builder.push_uri(external_uri.clone()).unwrap(); + blob_builder.push_bytes(b"inline-data").unwrap(); + let blob_array: ArrayRef = blob_builder.finish().unwrap(); + + let id_array: ArrayRef = Arc::new(Int32Array::from(vec![0, 1])); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + crate::blob_field("blob", true), + ])); + + let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + + let mut dataset = Dataset::write( + reader, + &test_dir.path_str(), + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + max_rows_per_file: 1, + initial_bases: Some(vec![BasePath { + id: 1, + name: Some("external".to_string()), + path: base_uri, + is_dataset_root: false, + }]), + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 2); + + for frag in dataset.get_fragments() { + let rows = frag.physical_rows().await.unwrap(); + assert!(rows > 0, "fragment {} should have rows", frag.id()); + } + + let options = CompactionOptions { + target_rows_per_fragment: 1024 * 1024, + ..Default::default() + }; + let plan = plan_compaction(&dataset, &options).await.unwrap(); + assert!( + !plan.tasks().is_empty(), + "compaction plan should have tasks, got {} tasks", + plan.tasks().len() + ); + + compact_files(&mut dataset, options, None).await.unwrap(); + + assert_eq!(dataset.get_fragments().len(), 1); + + let scan_result = dataset + .scan() + .project(&["id", "blob"]) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(scan_result.num_rows(), 2); + + let ids = scan_result + .column_by_name("id") + .unwrap() + .as_primitive::(); + let mut id_values: Vec = ids.iter().map(|v| v.unwrap()).collect(); + id_values.sort(); + assert_eq!(id_values, vec![0, 1]); + + let mut blob_values = read_blob_bytes_by_index(&Arc::new(dataset.clone()), "blob").await; + blob_values.sort_by_key(|(id, _)| *id); + assert_eq!( + blob_values, + vec![ + (0, Some(b"external-data".to_vec())), + (1, Some(b"inline-data".to_vec())) + ] + ); + } + + #[tokio::test] + async fn test_compact_blob_v2_packed_and_dedicated() { + use crate::BlobArrayBuilder; + use lance_arrow::BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY; + use lance_core::utils::tempfile::TempDir; + + let test_dir = TempDir::default(); + + let inline_data = b"small-inline-blob".as_slice(); + let packed_data: Vec = (0..64 * 1024 + 1024).map(|i| (i % 256) as u8).collect(); + let dedicated_data: Vec = (0..4 * 1024 * 1024 + 512) + .map(|i| ((i + 97) % 256) as u8) + .collect(); + + let mut blob_builder = BlobArrayBuilder::new(3); + blob_builder.push_bytes(inline_data).unwrap(); + blob_builder.push_bytes(&packed_data).unwrap(); + blob_builder.push_bytes(&dedicated_data).unwrap(); + let blob_array: ArrayRef = blob_builder.finish().unwrap(); + + let id_array: ArrayRef = Arc::new(Int32Array::from(vec![0, 1, 2])); + let mut blob_field = crate::blob_field("blob", true); + { + let metadata = blob_field.metadata().clone(); + let mut new_metadata = metadata; + new_metadata.insert( + BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY.to_string(), + (4 * 1024 * 1024).to_string(), + ); + blob_field = blob_field.with_metadata(new_metadata); + } + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + blob_field, + ])); + + let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + + let mut dataset = Dataset::write( + reader, + &test_dir.path_str(), + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + max_rows_per_file: 1, + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 3); + + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 1024 * 1024, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 1); + + let scan_result = dataset + .scan() + .project(&["id", "blob"]) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(scan_result.num_rows(), 3); + + let ids = scan_result + .column_by_name("id") + .unwrap() + .as_primitive::(); + let id_values: Vec = ids.iter().map(|v| v.unwrap()).collect(); + assert_eq!(id_values, vec![0, 1, 2]); + + let mut blob_values = read_blob_bytes_by_index(&Arc::new(dataset.clone()), "blob").await; + blob_values.sort_by_key(|(id, _)| *id); + assert_eq!( + blob_values, + vec![ + (0, Some(inline_data.to_vec())), + (1, Some(packed_data)), + (2, Some(dedicated_data)) + ] + ); + } + + #[tokio::test] + async fn test_compact_blob_v2_with_null_rows() { + use crate::BlobArrayBuilder; + use lance_core::utils::tempfile::TempDir; + + let test_dir = TempDir::default(); + + let mut blob_builder = BlobArrayBuilder::new(4); + blob_builder.push_bytes(b"inline-0").unwrap(); + blob_builder.push_null().unwrap(); + blob_builder.push_bytes(b"inline-2").unwrap(); + blob_builder.push_null().unwrap(); + let blob_array: ArrayRef = blob_builder.finish().unwrap(); + + let id_array: ArrayRef = + Arc::new(Int32Array::from(vec![Some(0), Some(1), Some(2), Some(3)])); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + crate::blob_field("blob", true), + ])); + + let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + + let mut dataset = Dataset::write( + reader, + &test_dir.path_str(), + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + max_rows_per_file: 2, + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 2); + + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 1024 * 1024, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 1); + + let scan_result = dataset + .scan() + .project(&["id", "blob"]) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(scan_result.num_rows(), 4); + + let ids = scan_result + .column_by_name("id") + .unwrap() + .as_primitive::(); + let id_values: Vec = ids.iter().map(|v| v.unwrap()).collect(); + assert_eq!(id_values, vec![0, 1, 2, 3]); + + let blob_col = scan_result.column_by_name("blob").unwrap(); + assert!( + matches!(blob_col.data_type(), DataType::Struct(_)), + "blob column should be a struct after compaction" + ); + + let mut blob_values = read_blob_bytes_by_index(&Arc::new(dataset.clone()), "blob").await; + blob_values.sort_by_key(|(id, _)| *id); + assert_eq!( + blob_values, + vec![ + (0, Some(b"inline-0".to_vec())), + (1, None), + (2, Some(b"inline-2".to_vec())), + (3, None) + ] + ); + } + + #[tokio::test] + async fn test_compact_blob_v2_deleted_rows_not_resurrected() { + use crate::BlobArrayBuilder; + use lance_core::utils::tempfile::TempDir; + + let test_dir = TempDir::default(); + + let mut blob_builder = BlobArrayBuilder::new(4); + blob_builder.push_bytes(b"blob-0").unwrap(); + blob_builder.push_bytes(b"blob-1").unwrap(); + blob_builder.push_bytes(b"blob-2").unwrap(); + blob_builder.push_bytes(b"blob-3").unwrap(); + let blob_array: ArrayRef = blob_builder.finish().unwrap(); + + let id_array: ArrayRef = Arc::new(Int32Array::from(vec![0, 1, 2, 3])); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + crate::blob_field("blob", true), + ])); + + let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + + let mut dataset = Dataset::write( + reader, + &test_dir.path_str(), + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + max_rows_per_file: 2, + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 2); + + dataset.delete("id = 1").await.unwrap(); + dataset.delete("id = 2").await.unwrap(); + + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 1024 * 1024, + materialize_deletions_threshold: 0.0, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + let scan_result = dataset + .scan() + .project(&["id", "blob"]) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(scan_result.num_rows(), 2); + + let ids = scan_result + .column_by_name("id") + .unwrap() + .as_primitive::(); + let mut id_values: Vec = ids.iter().map(|v| v.unwrap()).collect(); + id_values.sort(); + assert_eq!(id_values, vec![0, 3]); + + let blob_col = scan_result.column_by_name("blob").unwrap(); + let struct_arr = blob_col.as_any().downcast_ref::().unwrap(); + let kind_col = struct_arr + .column_by_name("kind") + .unwrap() + .as_primitive::(); + + for i in 0..kind_col.len() { + assert!( + !kind_col.is_null(i), + "row {} should have a non-null kind after compaction of deleted rows", + i + ); + } + + let mut blob_values = read_blob_bytes_by_index(&Arc::new(dataset.clone()), "blob").await; + blob_values.sort_by_key(|(id, _)| *id); + assert_eq!( + blob_values, + vec![(0, Some(b"blob-0".to_vec())), (3, Some(b"blob-3".to_vec()))] + ); + } + + #[tokio::test] + async fn test_compact_blob_v2_external_and_data_blob_mixed() { + use crate::BlobArrayBuilder; + use lance_arrow::BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY; + use lance_core::utils::tempfile::TempDir; + use lance_table::format::BasePath; + + let test_dir = TempDir::default(); + let external_dir = TempDir::default(); + let external_path = external_dir.std_path().join("external.bin"); + std::fs::write(&external_path, b"external-payload").unwrap(); + let external_uri = format!("file://{}", external_path.display()); + let base_uri = format!("file://{}", external_dir.std_path().display()); + + let packed_data: Vec = (0..64 * 1024 + 512).map(|i| (i % 256) as u8).collect(); + + let mut blob_builder = BlobArrayBuilder::new(4); + blob_builder.push_uri(external_uri.clone()).unwrap(); + blob_builder.push_bytes(&packed_data).unwrap(); + blob_builder.push_bytes(b"inline-small").unwrap(); + blob_builder.push_uri(external_uri.clone()).unwrap(); + let blob_array: ArrayRef = blob_builder.finish().unwrap(); + + let id_array: ArrayRef = Arc::new(Int32Array::from(vec![0, 1, 2, 3])); + let mut blob_field = crate::blob_field("blob", true); + { + let mut new_metadata = blob_field.metadata().clone(); + new_metadata.insert( + BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY.to_string(), + (4 * 1024 * 1024).to_string(), + ); + blob_field = blob_field.with_metadata(new_metadata); + } + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + blob_field, + ])); + + let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + + let mut dataset = Dataset::write( + reader, + &test_dir.path_str(), + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + max_rows_per_file: 2, + initial_bases: Some(vec![BasePath { + id: 1, + name: Some("external".to_string()), + path: base_uri, + is_dataset_root: false, + }]), + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 2); + + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 1024 * 1024, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 1); + + let mut blob_values = read_blob_bytes_by_index(&Arc::new(dataset.clone()), "blob").await; + blob_values.sort_by_key(|(id, _)| *id); + assert_eq!( + blob_values, + vec![ + (0, Some(b"external-payload".to_vec())), + (1, Some(packed_data)), + (2, Some(b"inline-small".to_vec())), + (3, Some(b"external-payload".to_vec())) + ] + ); + } + + #[tokio::test] + async fn test_compact_blob_v2_multiple_blob_columns() { + use crate::BlobArrayBuilder; + use lance_core::utils::tempfile::TempDir; + + let test_dir = TempDir::default(); + + let mut image_builder = BlobArrayBuilder::new(3); + image_builder.push_bytes(b"image-0").unwrap(); + image_builder.push_bytes(b"image-1").unwrap(); + image_builder.push_bytes(b"image-2").unwrap(); + let image_array: ArrayRef = image_builder.finish().unwrap(); + + let mut thumb_builder = BlobArrayBuilder::new(3); + thumb_builder.push_bytes(b"thumb-0").unwrap(); + thumb_builder.push_null().unwrap(); + thumb_builder.push_bytes(b"thumb-2").unwrap(); + let thumb_array: ArrayRef = thumb_builder.finish().unwrap(); + + let id_array: ArrayRef = Arc::new(Int32Array::from(vec![0, 1, 2])); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + crate::blob_field("image", true), + crate::blob_field("thumbnail", true), + ])); + + let batch = + RecordBatch::try_new(schema.clone(), vec![id_array, image_array, thumb_array]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + + let mut dataset = Dataset::write( + reader, + &test_dir.path_str(), + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + max_rows_per_file: 1, + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 3); + + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 1024 * 1024, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 1); + + let mut image_values = read_blob_bytes_by_index(&Arc::new(dataset.clone()), "image").await; + image_values.sort_by_key(|(id, _)| *id); + assert_eq!( + image_values, + vec![ + (0, Some(b"image-0".to_vec())), + (1, Some(b"image-1".to_vec())), + (2, Some(b"image-2".to_vec())) + ] + ); + + let mut thumb_values = + read_blob_bytes_by_index(&Arc::new(dataset.clone()), "thumbnail").await; + thumb_values.sort_by_key(|(id, _)| *id); + assert_eq!( + thumb_values, + vec![ + (0, Some(b"thumb-0".to_vec())), + (1, None), + (2, Some(b"thumb-2".to_vec())) + ] + ); + } + + #[tokio::test] + async fn test_compact_blob_v2_external_and_null_mixed() { + use crate::BlobArrayBuilder; + use lance_core::utils::tempfile::TempDir; + use lance_table::format::BasePath; + + let test_dir = TempDir::default(); + let external_dir = TempDir::default(); + let external_path = external_dir.std_path().join("mixed-external.bin"); + std::fs::write(&external_path, b"external-mixed-data").unwrap(); + let external_uri = format!("file://{}", external_path.display()); + let base_uri = format!("file://{}", external_dir.std_path().display()); + + let mut blob_builder = BlobArrayBuilder::new(4); + blob_builder.push_uri(external_uri.clone()).unwrap(); + blob_builder.push_null().unwrap(); + blob_builder.push_uri(external_uri.clone()).unwrap(); + blob_builder.push_null().unwrap(); + let blob_array: ArrayRef = blob_builder.finish().unwrap(); + + let id_array: ArrayRef = Arc::new(Int32Array::from(vec![0, 1, 2, 3])); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + crate::blob_field("blob", true), + ])); + + let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + + let mut dataset = Dataset::write( + reader, + &test_dir.path_str(), + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + max_rows_per_file: 2, + initial_bases: Some(vec![BasePath { + id: 1, + name: Some("external".to_string()), + path: base_uri, + is_dataset_root: false, + }]), + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 2); + + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 1024 * 1024, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 1); + + let mut blob_values = read_blob_bytes_by_index(&Arc::new(dataset.clone()), "blob").await; + blob_values.sort_by_key(|(id, _)| *id); + assert_eq!( + blob_values, + vec![ + (0, Some(b"external-mixed-data".to_vec())), + (1, None), + (2, Some(b"external-mixed-data".to_vec())), + (3, None) + ] + ); + } + + #[tokio::test] + async fn test_compact_blob_v2_all_null_and_all_external_fragments() { + use crate::BlobArrayBuilder; + use lance_core::utils::tempfile::TempDir; + use lance_table::format::BasePath; + + let test_dir = TempDir::default(); + let external_dir = TempDir::default(); + let external_path = external_dir.std_path().join("all-ext.bin"); + std::fs::write(&external_path, b"all-external-data").unwrap(); + let external_uri = format!("file://{}", external_path.display()); + let base_uri = format!("file://{}", external_dir.std_path().display()); + + let mut null_builder = BlobArrayBuilder::new(2); + null_builder.push_null().unwrap(); + null_builder.push_null().unwrap(); + let null_array: ArrayRef = null_builder.finish().unwrap(); + + let mut ext_builder = BlobArrayBuilder::new(2); + ext_builder.push_uri(external_uri.clone()).unwrap(); + ext_builder.push_uri(external_uri.clone()).unwrap(); + let ext_array: ArrayRef = ext_builder.finish().unwrap(); + + let id_null_array: ArrayRef = Arc::new(Int32Array::from(vec![0, 1])); + let null_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + crate::blob_field("blob", true), + ])); + let null_batch = + RecordBatch::try_new(null_schema.clone(), vec![id_null_array, null_array]).unwrap(); + + let id_ext_array: ArrayRef = Arc::new(Int32Array::from(vec![2, 3])); + let ext_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + crate::blob_field("blob", true), + ])); + let ext_batch = + RecordBatch::try_new(ext_schema.clone(), vec![id_ext_array, ext_array]).unwrap(); + + let mut dataset = Dataset::write( + RecordBatchIterator::new( + vec![null_batch, ext_batch].into_iter().map(Ok), + null_schema.clone(), + ), + &test_dir.path_str(), + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + max_rows_per_file: 2, + initial_bases: Some(vec![BasePath { + id: 1, + name: Some("external".to_string()), + path: base_uri, + is_dataset_root: false, + }]), + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 2); + + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 1024 * 1024, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 1); + + let mut blob_values = read_blob_bytes_by_index(&Arc::new(dataset.clone()), "blob").await; + blob_values.sort_by_key(|(id, _)| *id); + assert_eq!( + blob_values, + vec![ + (0, None), + (1, None), + (2, Some(b"all-external-data".to_vec())), + (3, Some(b"all-external-data".to_vec())) + ] + ); + } + + #[tokio::test] + async fn test_compact_blob_v2_external_with_multiple_base_ids() { + use crate::BlobArrayBuilder; + use lance_core::utils::tempfile::TempDir; + use lance_table::format::BasePath; + + let test_dir = TempDir::default(); + let base_a_dir = TempDir::default(); + let base_b_dir = TempDir::default(); + + let path_a = base_a_dir.std_path().join("data-a.bin"); + std::fs::write(&path_a, b"from-base-a").unwrap(); + let uri_a = format!("file://{}", path_a.display()); + let base_uri_a = format!("file://{}", base_a_dir.std_path().display()); + + let path_b = base_b_dir.std_path().join("data-b.bin"); + std::fs::write(&path_b, b"from-base-b").unwrap(); + let uri_b = format!("file://{}", path_b.display()); + let base_uri_b = format!("file://{}", base_b_dir.std_path().display()); + + let mut blob_builder = BlobArrayBuilder::new(4); + blob_builder.push_uri(uri_a.clone()).unwrap(); + blob_builder.push_uri(uri_b).unwrap(); + blob_builder.push_bytes(b"inline-data").unwrap(); + blob_builder.push_uri(uri_a).unwrap(); + let blob_array: ArrayRef = blob_builder.finish().unwrap(); + + let id_array: ArrayRef = Arc::new(Int32Array::from(vec![0, 1, 2, 3])); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + crate::blob_field("blob", true), + ])); + + let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + + let mut dataset = Dataset::write( + reader, + &test_dir.path_str(), + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + max_rows_per_file: 2, + initial_bases: Some(vec![ + BasePath { + id: 1, + name: Some("base_a".to_string()), + path: base_uri_a, + is_dataset_root: false, + }, + BasePath { + id: 2, + name: Some("base_b".to_string()), + path: base_uri_b, + is_dataset_root: false, + }, + ]), + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 2); + + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 1024 * 1024, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 1); + + let mut blob_values = read_blob_bytes_by_index(&Arc::new(dataset.clone()), "blob").await; + blob_values.sort_by_key(|(id, _)| *id); + assert_eq!( + blob_values, + vec![ + (0, Some(b"from-base-a".to_vec())), + (1, Some(b"from-base-b".to_vec())), + (2, Some(b"inline-data".to_vec())), + (3, Some(b"from-base-a".to_vec())) + ] + ); + } + + #[tokio::test] + async fn test_compact_blob_v2_large_blobs() { + use crate::BlobArrayBuilder; + use lance_core::utils::tempfile::TempDir; + + let test_dir = TempDir::default(); + + let large_blob_a: Vec = (0..512 * 1024).map(|i| (i % 256) as u8).collect(); + let large_blob_b: Vec = (0..256 * 1024).map(|i| ((i + 42) % 256) as u8).collect(); + + let mut blob_builder = BlobArrayBuilder::new(3); + blob_builder.push_bytes(&large_blob_a).unwrap(); + blob_builder.push_bytes(&large_blob_b).unwrap(); + blob_builder.push_bytes(b"small-blob").unwrap(); + let blob_array: ArrayRef = blob_builder.finish().unwrap(); + + let id_array: ArrayRef = Arc::new(Int32Array::from(vec![0, 1, 2])); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + crate::blob_field("blob", true), + ])); + + let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + + let mut dataset = Dataset::write( + reader, + &test_dir.path_str(), + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + max_rows_per_file: 1, + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 3); + + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 1024 * 1024, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 1); + + let mut blob_values = read_blob_bytes_by_index(&Arc::new(dataset.clone()), "blob").await; + blob_values.sort_by_key(|(id, _)| *id); + assert_eq!( + blob_values, + vec![ + (0, Some(large_blob_a)), + (1, Some(large_blob_b)), + (2, Some(b"small-blob".to_vec())) + ] + ); + } + + #[tokio::test] + async fn test_compact_blob_v2_blob_kind_reclassification() { + use crate::BlobArrayBuilder; + use lance_arrow::BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY; + use lance_core::utils::tempfile::TempDir; + + let test_dir = TempDir::default(); + + let medium_data: Vec = (0..32 * 1024).map(|i| (i % 256) as u8).collect(); + + let mut blob_builder = BlobArrayBuilder::new(2); + blob_builder.push_bytes(&medium_data).unwrap(); + blob_builder.push_bytes(&medium_data).unwrap(); + let blob_array: ArrayRef = blob_builder.finish().unwrap(); + + let id_array: ArrayRef = Arc::new(Int32Array::from(vec![0, 1])); + let mut blob_field = crate::blob_field("blob", true); + { + let mut new_metadata = blob_field.metadata().clone(); + new_metadata.insert( + BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY.to_string(), + (16 * 1024).to_string(), + ); + blob_field = blob_field.with_metadata(new_metadata); + } + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + blob_field, + ])); + + let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + + let mut dataset = Dataset::write( + reader, + &test_dir.path_str(), + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + max_rows_per_file: 1, + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 2); + + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 1024 * 1024, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 1); + + let mut blob_values = read_blob_bytes_by_index(&Arc::new(dataset.clone()), "blob").await; + blob_values.sort_by_key(|(id, _)| *id); + assert_eq!( + blob_values, + vec![ + (0, Some(medium_data.clone())), + (1, Some(medium_data.clone())) + ] + ); + } + + #[tokio::test] + async fn test_compact_blob_v2_multi_batch() { + use crate::BlobArrayBuilder; + use lance_core::utils::tempfile::TempDir; + + let test_dir = TempDir::default(); + + let mut blob_builder = BlobArrayBuilder::new(6); + blob_builder.push_bytes(b"batch-0-row-0").unwrap(); + blob_builder.push_bytes(b"batch-0-row-1").unwrap(); + blob_builder.push_bytes(b"batch-1-row-0").unwrap(); + blob_builder.push_null().unwrap(); + blob_builder.push_bytes(b"batch-1-row-2").unwrap(); + blob_builder.push_bytes(b"batch-1-row-3").unwrap(); + let blob_array: ArrayRef = blob_builder.finish().unwrap(); + + let id_array: ArrayRef = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5])); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + crate::blob_field("blob", true), + ])); + + let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + + let mut dataset = Dataset::write( + reader, + &test_dir.path_str(), + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + max_rows_per_file: 2, + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 3); + + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 1024 * 1024, + batch_size: Some(2), + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 1); + + let mut blob_values = read_blob_bytes_by_index(&Arc::new(dataset.clone()), "blob").await; + blob_values.sort_by_key(|(id, _)| *id); + assert_eq!( + blob_values, + vec![ + (0, Some(b"batch-0-row-0".to_vec())), + (1, Some(b"batch-0-row-1".to_vec())), + (2, Some(b"batch-1-row-0".to_vec())), + (3, None), + (4, Some(b"batch-1-row-2".to_vec())), + (5, Some(b"batch-1-row-3".to_vec())) + ] + ); + } }