Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions python/lsst/resources/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,17 @@
s3CheckFileExists,
)

try:
import boto3
except ImportError:
boto3 = None

try:
from boto3.s3.transfer import TransferConfig # type: ignore
except ImportError:
TransferConfig = None

if TYPE_CHECKING:
with contextlib.suppress(ImportError):
import boto3

from .utils import TransactionProtocol


Expand Down Expand Up @@ -333,13 +335,17 @@ def _upload_file(self, local_file: ResourcePath, progress: ProgressPercentage |
raise

@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
def _copy_from(self, src: ResourcePath) -> None:
def _copy_from(self, src: ResourcePath, progress: ProgressPercentage | None) -> None:
copy_source = {
"Bucket": src.netloc,
"Key": src.relativeToPathRoot,
}
assert boto3 is not None, "boto3 must be available to get here"
# Note that boto3.resource.meta.copy is cleverer than the low
# level copy_object.
s3 = boto3.resource("s3")
try:
self.client.copy_object(CopySource=copy_source, Bucket=self.netloc, Key=self.relativeToPathRoot)
s3.meta.client.copy(copy_source, self.netloc, self.relativeToPathRoot, Callback=progress)
except (self.client.exceptions.NoSuchKey, self.client.exceptions.NoSuchBucket) as err:
raise FileNotFoundError("No such resource to transfer: {self}") from err
except ClientError as err:
Expand Down Expand Up @@ -402,11 +408,14 @@ def transfer_from(
timer_args = (src, self)

if isinstance(src, type(self)):
# Looks like an S3 remote uri so we can use direct copy
# note that boto3.resource.meta.copy is cleverer than the low
# level copy_object
# Looks like an S3 remote uri so we can use direct copy.
progress = (
ProgressPercentage(src, msg="S3 to S3 copy:")
if log.isEnabledFor(ProgressPercentage.log_level)
else None
)
with time_this(log, msg=timer_msg, args=timer_args):
self._copy_from(src)
self._copy_from(src, progress)

else:
# Use local file and upload it
Expand Down