diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 59886d337d1..885e6700c35 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -235,6 +235,7 @@ use tokio::sync::mpsc::{self, unbounded_channel}; use lance_core::error::LanceOptionExt; use lance_core::{ArrowResult, Error, Result}; +use std::fmt; use tracing::instrument; use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy}; @@ -258,6 +259,49 @@ use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler}; use crate::version::LanceFileVersion; use crate::{BufferScheduler, EncodingsIo}; +/// An error wrapper that adds field context (name and id) to decoding errors. +/// +/// When decoding fails, the underlying error often lacks information about which field +/// was being decoded. This wrapper captures that context so error messages look like: +/// +/// ```text +/// failed to decode field 'age' (id=10) +/// +/// Caused by: +/// number out of range +/// ``` +#[derive(Debug)] +pub struct DecodeFieldError { + /// The name of the field being decoded + pub field_name: String, + /// The Lance field id + pub field_id: i32, + /// The underlying error that occurred during decoding + pub source: Error, +} + +impl fmt::Display for DecodeFieldError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "failed to decode field '{}' (id={})", + self.field_name, self.field_id + ) + } +} + +impl std::error::Error for DecodeFieldError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(&self.source) + } +} + +impl From for Error { + fn from(err: DecodeFieldError) -> Self { + Self::wrapped(Box::new(err)) + } +} + // If users are getting batches over 10MiB large then it's time to reduce the batch size const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024; const ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE: &str = @@ -646,8 +690,14 @@ impl CoreFieldDecoderStrategy { file_buffers: buffers, positions_and_sizes: &offsets_column.buffer_offsets_and_sizes, }; - let items_scheduler = - self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?; + let child = &list_field.children[0]; + let items_scheduler = self + .create_legacy_field_scheduler(child, column_infos, buffers) + .map_err(|source| DecodeFieldError { + field_name: child.name.clone(), + field_id: child.id, + source, + })?; let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column .page_infos @@ -778,9 +828,14 @@ impl CoreFieldDecoderStrategy { } let mut child_schedulers = Vec::with_capacity(field.children.len()); - for field in field.children.iter() { - let field_scheduler = - self.create_structural_field_scheduler(field, column_infos)?; + for child_field in field.children.iter() { + let field_scheduler = self + .create_structural_field_scheduler(child_field, column_infos) + .map_err(|source| DecodeFieldError { + field_name: child_field.name.clone(), + field_id: child_field.id, + source, + })?; child_schedulers.push(field_scheduler); } @@ -792,8 +847,13 @@ impl CoreFieldDecoderStrategy { } DataType::List(_) | DataType::LargeList(_) => { let child = field.children.first().expect_ok()?; - let child_scheduler = - self.create_structural_field_scheduler(child, column_infos)?; + let child_scheduler = self + .create_structural_field_scheduler(child, column_infos) + .map_err(|source| DecodeFieldError { + field_name: child.name.clone(), + field_id: child.id, + source, + })?; Ok(Box::new(StructuralListScheduler::new(child_scheduler)) as Box) } @@ -801,8 +861,13 @@ impl CoreFieldDecoderStrategy { if matches!(inner.data_type(), DataType::Struct(_)) => { let child = field.children.first().expect_ok()?; - let child_scheduler = - self.create_structural_field_scheduler(child, column_infos)?; + let child_scheduler = self + .create_structural_field_scheduler(child, column_infos) + .map_err(|source| DecodeFieldError { + field_name: child.name.clone(), + field_id: child.id, + source, + })?; Ok(Box::new(StructuralFixedSizeListScheduler::new( child_scheduler, *dimension, @@ -816,8 +881,13 @@ impl CoreFieldDecoderStrategy { return Err(Error::not_supported_source(format!("Map data type is not supported with keys_sorted=true now, current value is {}", *keys_sorted).into())); } let entries_child = field.children.first().expect_ok()?; - let child_scheduler = - self.create_structural_field_scheduler(entries_child, column_infos)?; + let child_scheduler = self + .create_structural_field_scheduler(entries_child, column_infos) + .map_err(|source| DecodeFieldError { + field_name: entries_child.name.clone(), + field_id: entries_child.id, + source, + })?; Ok(Box::new(StructuralMapScheduler::new(child_scheduler)) as Box) } @@ -942,10 +1012,15 @@ impl CoreFieldDecoderStrategy { .map(|page| page.num_rows) .sum(); let mut child_schedulers = Vec::with_capacity(field.children.len()); - for field in &field.children { + for child_field in &field.children { column_infos.next_top_level(); - let field_scheduler = - self.create_legacy_field_scheduler(field, column_infos, buffers)?; + let field_scheduler = self + .create_legacy_field_scheduler(child_field, column_infos, buffers) + .map_err(|source| DecodeFieldError { + field_name: child_field.name.clone(), + field_id: child_field.id, + source, + })?; child_schedulers.push(Arc::from(field_scheduler)); } @@ -3144,4 +3219,52 @@ mod tests { ); } } + + #[test] + fn test_decode_field_error_display() { + let source = Error::invalid_input("number out of range"); + let err = DecodeFieldError { + field_name: "age".to_string(), + field_id: 10, + source, + }; + assert_eq!(err.to_string(), "failed to decode field 'age' (id=10)"); + } + + #[test] + fn test_decode_field_error_source_chain() { + let inner = Error::invalid_input("value exceeds maximum"); + let err = DecodeFieldError { + field_name: "score".to_string(), + field_id: 5, + source: inner, + }; + + // Verify the source chain is preserved + let source = std::error::Error::source(&err); + assert!(source.is_some()); + let source_msg = source.unwrap().to_string(); + assert!( + source_msg.contains("value exceeds maximum"), + "expected source to contain 'value exceeds maximum', got: {}", + source_msg + ); + } + + #[test] + fn test_decode_field_error_converts_to_lance_error() { + let inner = Error::invalid_input("bad data"); + let field_err = DecodeFieldError { + field_name: "name".to_string(), + field_id: 3, + source: inner, + }; + let lance_err: Error = field_err.into(); + let msg = lance_err.to_string(); + assert!( + msg.contains("failed to decode field 'name' (id=3)"), + "expected error to contain field context, got: {}", + msg + ); + } } diff --git a/rust/lance-file/src/reader.rs b/rust/lance-file/src/reader.rs index f631c0a7892..f028af1129d 100644 --- a/rust/lance-file/src/reader.rs +++ b/rust/lance-file/src/reader.rs @@ -3,6 +3,7 @@ use std::{ collections::{BTreeMap, BTreeSet}, + fmt, io::Cursor, ops::Range, pin::Pin, @@ -49,6 +50,44 @@ use crate::{ writer::PAGE_BUFFER_ALIGNMENT, }; +/// An error wrapper that adds file path context to errors that occur while reading a file. +/// +/// This wraps I/O errors, decoding errors, and other failures with the path of the file +/// being read so that error messages include the file location: +/// +/// ```text +/// failed to read file 'data.lance' +/// +/// Caused by: +/// 0: failed to decode field 'age' (id=10) +/// 1: number out of range +/// ``` +#[derive(Debug)] +pub struct ReadFileError { + /// The path of the file being read + pub path: Path, + /// The underlying error + pub source: Error, +} + +impl fmt::Display for ReadFileError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "failed to read file '{}'", self.path) + } +} + +impl std::error::Error for ReadFileError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(&self.source) + } +} + +impl From for Error { + fn from(err: ReadFileError) -> Self { + Self::wrapped(Box::new(err)) + } +} + /// Default chunk size for reading large pages (8MiB) /// Pages larger than this will be split into multiple chunks during read pub const DEFAULT_READ_CHUNK_SIZE: u64 = 8 * 1024 * 1024; @@ -415,6 +454,8 @@ pub struct FileReader { decoder_plugins: Arc, cache: Arc, options: FileReaderOptions, + /// The path of the file being read, used for error context + path: Option, } #[derive(Debug)] struct Footer { @@ -443,6 +484,7 @@ impl FileReader { metadata: self.metadata.clone(), options: self.options.clone(), num_rows: self.num_rows, + path: self.path.clone(), } } @@ -478,6 +520,18 @@ impl FileReader { } } + /// Wrap an error with file path context if the path is known. + fn wrap_err(&self, source: Error) -> Error { + match &self.path { + Some(path) => ReadFileError { + path: path.clone(), + source, + } + .into(), + None => source, + } + } + pub async fn read_global_buffer(&self, index: u32) -> Result { let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len())))?; self.scheduler @@ -888,6 +942,7 @@ impl FileReader { decoder_plugins, cache, options, + path: Some(path), }) } @@ -1156,15 +1211,18 @@ impl FileReader { Ok(()) } }; - match ¶ms { + let result = match ¶ms { ReadBatchParams::Indices(indices) => { for idx in indices { match idx { None => { - return Err(Error::invalid_input("Null value in indices array")); + return Err( + self.wrap_err(Error::invalid_input("Null value in indices array")) + ); } Some(idx) => { - verify_bound(¶ms, idx as u64, true)?; + verify_bound(¶ms, idx as u64, true) + .map_err(|e| self.wrap_err(e))?; } } } @@ -1172,7 +1230,7 @@ impl FileReader { self.take_rows(indices, batch_size, projection).await } ReadBatchParams::Range(range) => { - verify_bound(¶ms, range.end as u64, false)?; + verify_bound(¶ms, range.end as u64, false).map_err(|e| self.wrap_err(e))?; self.read_range( range.start as u64..range.end as u64, batch_size, @@ -1184,14 +1242,14 @@ impl FileReader { ReadBatchParams::Ranges(ranges) => { let mut ranges_u64 = Vec::with_capacity(ranges.len()); for range in ranges.as_ref() { - verify_bound(¶ms, range.end, false)?; + verify_bound(¶ms, range.end, false).map_err(|e| self.wrap_err(e))?; ranges_u64.push(range.start..range.end); } self.read_ranges(ranges_u64, batch_size, projection, filter) .await } ReadBatchParams::RangeFrom(range) => { - verify_bound(¶ms, range.start as u64, true)?; + verify_bound(¶ms, range.start as u64, true).map_err(|e| self.wrap_err(e))?; self.read_range( range.start as u64..self.num_rows, batch_size, @@ -1201,7 +1259,7 @@ impl FileReader { .await } ReadBatchParams::RangeTo(range) => { - verify_bound(¶ms, range.end as u64, false)?; + verify_bound(¶ms, range.end as u64, false).map_err(|e| self.wrap_err(e))?; self.read_range(0..range.end as u64, batch_size, projection, filter) .await } @@ -1209,7 +1267,8 @@ impl FileReader { self.read_range(0..self.num_rows, batch_size, projection, filter) .await } - } + }; + result.map_err(|e| self.wrap_err(e)) } /// Reads data from the file as a stream of record batches @@ -1411,15 +1470,18 @@ impl FileReader { Ok(()) } }; - match ¶ms { + let result = match ¶ms { ReadBatchParams::Indices(indices) => { for idx in indices { match idx { None => { - return Err(Error::invalid_input("Null value in indices array")); + return Err( + self.wrap_err(Error::invalid_input("Null value in indices array")) + ); } Some(idx) => { - verify_bound(¶ms, idx as u64, true)?; + verify_bound(¶ms, idx as u64, true) + .map_err(|e| self.wrap_err(e))?; } } } @@ -1427,7 +1489,7 @@ impl FileReader { self.take_rows_blocking(indices, batch_size, projection, filter) } ReadBatchParams::Range(range) => { - verify_bound(¶ms, range.end as u64, false)?; + verify_bound(¶ms, range.end as u64, false).map_err(|e| self.wrap_err(e))?; self.read_range_blocking( range.start as u64..range.end as u64, batch_size, @@ -1438,13 +1500,13 @@ impl FileReader { ReadBatchParams::Ranges(ranges) => { let mut ranges_u64 = Vec::with_capacity(ranges.len()); for range in ranges.as_ref() { - verify_bound(¶ms, range.end, false)?; + verify_bound(¶ms, range.end, false).map_err(|e| self.wrap_err(e))?; ranges_u64.push(range.start..range.end); } self.read_ranges_blocking(ranges_u64, batch_size, projection, filter) } ReadBatchParams::RangeFrom(range) => { - verify_bound(¶ms, range.start as u64, true)?; + verify_bound(¶ms, range.start as u64, true).map_err(|e| self.wrap_err(e))?; self.read_range_blocking( range.start as u64..self.num_rows, batch_size, @@ -1453,13 +1515,14 @@ impl FileReader { ) } ReadBatchParams::RangeTo(range) => { - verify_bound(¶ms, range.end as u64, false)?; + verify_bound(¶ms, range.end as u64, false).map_err(|e| self.wrap_err(e))?; self.read_range_blocking(0..range.end as u64, batch_size, projection, filter) } ReadBatchParams::RangeFull => { self.read_range_blocking(0..self.num_rows, batch_size, projection, filter) } - } + }; + result.map_err(|e| self.wrap_err(e)) } /// Reads data from the file as a stream of record batches @@ -2447,4 +2510,71 @@ mod tests { "deep_size_of ({deep_size}) should scale with column count ({num_columns})" ); } + + #[test] + fn test_read_file_error_display() { + use lance_core::Error; + use object_store::path::Path; + + let source = Error::invalid_input("corrupt data"); + let err = super::ReadFileError { + path: Path::from("data/test.lance"), + source, + }; + assert_eq!(err.to_string(), "failed to read file 'data/test.lance'"); + } + + #[test] + fn test_read_file_error_source_chain() { + use lance_core::Error; + use lance_encoding::decoder::DecodeFieldError; + use object_store::path::Path; + + let inner = Error::invalid_input("number out of range"); + let field_err = DecodeFieldError { + field_name: "age".to_string(), + field_id: 10, + source: inner, + }; + let lance_err: Error = field_err.into(); + let file_err = super::ReadFileError { + path: Path::from("data/test.lance"), + source: lance_err, + }; + + // Verify display + assert_eq!( + file_err.to_string(), + "failed to read file 'data/test.lance'" + ); + + // Verify the source chain is preserved + let source = std::error::Error::source(&file_err); + assert!(source.is_some()); + let source_msg = source.unwrap().to_string(); + assert!( + source_msg.contains("failed to decode field 'age' (id=10)"), + "expected source to contain field error, got: {}", + source_msg + ); + } + + #[test] + fn test_read_file_error_converts_to_lance_error() { + use lance_core::Error; + use object_store::path::Path; + + let inner = Error::invalid_input("bad data"); + let file_err = super::ReadFileError { + path: Path::from("tables/users.lance"), + source: inner, + }; + let lance_err: Error = file_err.into(); + let msg = lance_err.to_string(); + assert!( + msg.contains("failed to read file 'tables/users.lance'"), + "expected error to contain file path, got: {}", + msg + ); + } }