From ce3b3f9bc31807c37e777d29c110ff4f639a4005 Mon Sep 17 00:00:00 2001 From: Vivek Date: Fri, 29 May 2026 10:10:19 -0700 Subject: [PATCH] fix: stream object copies larger than cloud's CopyObject limit copy now HEADs the source, and when it exceeds 5 GiB, falls back to a streaming multipart copy --- rust/lance-io/src/object_store.rs | 152 +++++++++++++++++++++++++++++- 1 file changed, 151 insertions(+), 1 deletion(-) diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 3cdf91f7760..698378c5b9e 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -47,7 +47,7 @@ pub mod throttle; mod tracing; use crate::object_reader::SmallReader; use crate::object_writer::{LocalWriter, WriteResult}; -use crate::traits::Writer; +use crate::traits::{WriteExt, Writer}; use crate::utils::tracking_store::{IOTracker, IoStats}; use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader}; use lance_core::{Error, Result}; @@ -771,11 +771,53 @@ impl ObjectStore { Ok(()) } + /// AWS S3 and GCS reject a single-shot server-side copy whose source is + /// larger than this; such sources are streamed through a multipart write. + const MAX_SINGLE_COPY_BYTES: u64 = 5 * 1024 * 1024 * 1024; // 5 GiB + pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + // S3 and GCS cap single-shot server-side copies at 5 GiB and object_store + // does not fall back to a multipart copy for larger sources + // (https://github.com/apache/arrow-rs-object-store/issues/563). Azure and + // other blob stores don't have this limit, so we only pay for the fallback + // (an extra size lookup) on S3 and GCS. + let multipart_copy_fallback = matches!(self.scheme.as_str(), "s3" | "s3+ddb" | "gs"); + self.copy_impl( + from, + to, + multipart_copy_fallback, + Self::MAX_SINGLE_COPY_BYTES, + ) + .await + } + + /// Copy `from` to `to`. When `multipart_copy_fallback` is set, a source + /// larger than `max_single_copy` is streamed through a multipart write + /// instead of a single-shot server-side copy. Both are parameters so tests + /// can drive the streaming path without a multi-gigabyte fixture or an S3 + /// endpoint. + async fn copy_impl( + &self, + from: &Path, + to: &Path, + multipart_copy_fallback: bool, + max_single_copy: u64, + ) -> Result<()> { if self.is_local() { // Use std::fs::copy for local filesystem to support cross-filesystem copies return super::local::copy_file(from, to); } + if multipart_copy_fallback { + // Reuse the reader for both the size lookup (a single cached HEAD) + // and the streamed copy, avoiding a separate HEAD request. + let reader = self.open(from).await?; + if reader.size().await? as u64 > max_single_copy { + let mut writer = self.create(to).await?; + writer.copy_from_reader(reader.as_ref()).await?; + Writer::shutdown(writer.as_mut()).await?; + return Ok(()); + } + } Ok(self.inner.copy(from, to).await?) } @@ -1077,11 +1119,19 @@ fn infer_block_size(scheme: &str) -> usize { #[cfg(test)] mod tests { use super::*; + use async_trait::async_trait; + use bytes::Bytes; use lance_core::utils::tempfile::{TempStdDir, TempStdFile, TempStrDir}; use object_store::memory::InMemory; + use object_store::{ + CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, PutMultipartOptions, + PutOptions, PutPayload, PutResult, Result as OSResult, + }; use rstest::rstest; use std::env::set_current_dir; + use std::fmt::{Display, Formatter}; use std::fs::{create_dir_all, write}; + use std::ops::Range; use std::path::Path as StdPath; use std::sync::atomic::{AtomicBool, Ordering}; @@ -1500,6 +1550,106 @@ mod tests { assert_eq!(copied_content, b"test content"); } + /// Inner store that forwards everything to `InMemory` except single-shot + /// server-side copy (`copy_opts`), which always fails. This lets a test + /// prove that `ObjectStore::copy` fell back to a streaming multipart copy + /// for an oversized source rather than issuing a single `CopyObject`. + #[derive(Debug)] + struct CopyFailingStore { + inner: InMemory, + } + + impl Display for CopyFailingStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "CopyFailingStore") + } + } + + #[async_trait] + impl OSObjectStore for CopyFailingStore { + async fn put_opts( + &self, + location: &Path, + bytes: PutPayload, + opts: PutOptions, + ) -> OSResult { + self.inner.put_opts(location, bytes, opts).await + } + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> OSResult> { + self.inner.put_multipart_opts(location, opts).await + } + async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult { + self.inner.get_opts(location, options).await + } + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> OSResult> { + self.inner.get_ranges(location, ranges).await + } + fn delete_stream( + &self, + locations: BoxStream<'static, OSResult>, + ) -> BoxStream<'static, OSResult> { + self.inner.delete_stream(locations) + } + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult> { + self.inner.list(prefix) + } + fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> BoxStream<'static, OSResult> { + self.inner.list_with_offset(prefix, offset) + } + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult { + self.inner.list_with_delimiter(prefix).await + } + async fn copy_opts(&self, _from: &Path, _to: &Path, _opts: CopyOptions) -> OSResult<()> { + Err(object_store::Error::Generic { + store: "CopyFailingStore", + source: "single-shot copy disabled in test".into(), + }) + } + } + + #[tokio::test] + async fn test_copy_streams_objects_larger_than_threshold() { + // memory:// is non-local but isn't an S3/GCS scheme, so copy() wouldn't + // enable the fallback on its own. Drive copy_impl directly with + // multipart_copy_fallback = true to exercise the streaming path. The + // inner store rejects any single-shot copy, so a successful copy can only + // have gone through the streaming branch. + let mut store = ObjectStore::memory(); + store.inner = Arc::new(CopyFailingStore { + inner: InMemory::new(), + }); + + let from = Path::from("source.bin"); + let contents = b"streaming multipart copy payload well past the tiny threshold"; + store.put(&from, contents).await.unwrap(); + + // Source size (61 bytes) exceeds the threshold -> must stream via a + // multipart write rather than a single-shot server-side copy. + let streamed = Path::from("streamed.bin"); + store.copy_impl(&from, &streamed, true, 8).await.unwrap(); + let copied = store.read_one_all(&streamed).await.unwrap(); + assert_eq!(copied.as_ref(), contents.as_slice()); + + // Source size below the threshold -> single-shot copy, which the inner + // store rejects, confirming that the streaming branch (not native copy) + // is what made the first copy succeed. + let native = Path::from("native.bin"); + assert!( + store + .copy_impl(&from, &native, true, u64::MAX) + .await + .is_err() + ); + } + #[test] #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))] fn test_client_options_extracts_headers() {