Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 151 additions & 1 deletion rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub mod throttle;
mod tracing;
use crate::object_reader::SmallReader;
use crate::object_writer::{LocalWriter, WriteResult};
use crate::traits::Writer;
use crate::traits::{WriteExt, Writer};
use crate::utils::tracking_store::{IOTracker, IoStats};
use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
use lance_core::{Error, Result};
Expand Down Expand Up @@ -771,11 +771,53 @@ impl ObjectStore {
Ok(())
}

/// AWS S3 and GCS reject a single-shot server-side copy whose source is
/// larger than this; such sources are streamed through a multipart write.
const MAX_SINGLE_COPY_BYTES: u64 = 5 * 1024 * 1024 * 1024; // 5 GiB

pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
// S3 and GCS cap single-shot server-side copies at 5 GiB and object_store
// does not fall back to a multipart copy for larger sources
// (https://github.com/apache/arrow-rs-object-store/issues/563). Azure and
// other blob stores don't have this limit, so we only pay for the fallback
// (an extra size lookup) on S3 and GCS.
let multipart_copy_fallback = matches!(self.scheme.as_str(), "s3" | "s3+ddb" | "gs");
self.copy_impl(
from,
to,
multipart_copy_fallback,
Self::MAX_SINGLE_COPY_BYTES,
)
.await
}

/// Copy `from` to `to`. When `multipart_copy_fallback` is set, a source
/// larger than `max_single_copy` is streamed through a multipart write
/// instead of a single-shot server-side copy. Both are parameters so tests
/// can drive the streaming path without a multi-gigabyte fixture or an S3
/// endpoint.
async fn copy_impl(
&self,
from: &Path,
to: &Path,
multipart_copy_fallback: bool,
max_single_copy: u64,
) -> Result<()> {
if self.is_local() {
// Use std::fs::copy for local filesystem to support cross-filesystem copies
return super::local::copy_file(from, to);
}
if multipart_copy_fallback {
// Reuse the reader for both the size lookup (a single cached HEAD)
// and the streamed copy, avoiding a separate HEAD request.
let reader = self.open(from).await?;
Comment thread
vivek-bharathan marked this conversation as resolved.
if reader.size().await? as u64 > max_single_copy {
let mut writer = self.create(to).await?;
writer.copy_from_reader(reader.as_ref()).await?;
Writer::shutdown(writer.as_mut()).await?;
return Ok(());
}
}
Ok(self.inner.copy(from, to).await?)
}

Expand Down Expand Up @@ -1077,11 +1119,19 @@ fn infer_block_size(scheme: &str) -> usize {
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use bytes::Bytes;
use lance_core::utils::tempfile::{TempStdDir, TempStdFile, TempStrDir};
use object_store::memory::InMemory;
use object_store::{
CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, PutMultipartOptions,
PutOptions, PutPayload, PutResult, Result as OSResult,
};
use rstest::rstest;
use std::env::set_current_dir;
use std::fmt::{Display, Formatter};
use std::fs::{create_dir_all, write};
use std::ops::Range;
use std::path::Path as StdPath;
use std::sync::atomic::{AtomicBool, Ordering};

Expand Down Expand Up @@ -1500,6 +1550,106 @@ mod tests {
assert_eq!(copied_content, b"test content");
}

/// Inner store that forwards everything to `InMemory` except single-shot
/// server-side copy (`copy_opts`), which always fails. This lets a test
/// prove that `ObjectStore::copy` fell back to a streaming multipart copy
/// for an oversized source rather than issuing a single `CopyObject`.
#[derive(Debug)]
struct CopyFailingStore {
inner: InMemory,
}

impl Display for CopyFailingStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "CopyFailingStore")
}
}

#[async_trait]
impl OSObjectStore for CopyFailingStore {
async fn put_opts(
&self,
location: &Path,
bytes: PutPayload,
opts: PutOptions,
) -> OSResult<PutResult> {
self.inner.put_opts(location, bytes, opts).await
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOptions,
) -> OSResult<Box<dyn MultipartUpload>> {
self.inner.put_multipart_opts(location, opts).await
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
self.inner.get_opts(location, options).await
}
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> OSResult<Vec<Bytes>> {
self.inner.get_ranges(location, ranges).await
}
fn delete_stream(
&self,
locations: BoxStream<'static, OSResult<Path>>,
) -> BoxStream<'static, OSResult<Path>> {
self.inner.delete_stream(locations)
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
self.inner.list(prefix)
}
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'static, OSResult<ObjectMeta>> {
self.inner.list_with_offset(prefix, offset)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
self.inner.list_with_delimiter(prefix).await
}
async fn copy_opts(&self, _from: &Path, _to: &Path, _opts: CopyOptions) -> OSResult<()> {
Err(object_store::Error::Generic {
store: "CopyFailingStore",
source: "single-shot copy disabled in test".into(),
})
}
}

#[tokio::test]
async fn test_copy_streams_objects_larger_than_threshold() {
// memory:// is non-local but isn't an S3/GCS scheme, so copy() wouldn't
// enable the fallback on its own. Drive copy_impl directly with
// multipart_copy_fallback = true to exercise the streaming path. The
// inner store rejects any single-shot copy, so a successful copy can only
// have gone through the streaming branch.
let mut store = ObjectStore::memory();
store.inner = Arc::new(CopyFailingStore {
inner: InMemory::new(),
});

let from = Path::from("source.bin");
let contents = b"streaming multipart copy payload well past the tiny threshold";
store.put(&from, contents).await.unwrap();

// Source size (61 bytes) exceeds the threshold -> must stream via a
// multipart write rather than a single-shot server-side copy.
let streamed = Path::from("streamed.bin");
store.copy_impl(&from, &streamed, true, 8).await.unwrap();
let copied = store.read_one_all(&streamed).await.unwrap();
assert_eq!(copied.as_ref(), contents.as_slice());

// Source size below the threshold -> single-shot copy, which the inner
// store rejects, confirming that the streaming branch (not native copy)
// is what made the first copy succeed.
let native = Path::from("native.bin");
assert!(
store
.copy_impl(&from, &native, true, u64::MAX)
.await
.is_err()
);
}

#[test]
#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
fn test_client_options_extracts_headers() {
Expand Down
Loading