Extract ZIP archives while downloading#279
Conversation
Reviewer's GuideImplement streaming ZIP extraction in backend.get_archive(), introducing backend streaming/file-size interfaces and cleanup semantics, while removing num_workers support for archives and wiring stream-unzip as an optional dependency. Sequence diagram for streaming ZIP extraction in get_archivesequenceDiagram
participant Caller
participant Backend as Backend
participant StreamUnzip as stream_unzip
participant FS as FileSystem
Caller->>Backend: get_archive(src_path, dst_root, validate, verbose)
Backend->>Backend: check_path(src_path)
Backend->>Backend: detect .zip and STREAM_UNZIP_AVAILABLE
alt ZIP with streaming
Backend->>Backend: _get_archive_streaming(src_path, dst_root, validate, verbose)
Backend->>Backend: _size(src_path) [if verbose]
Backend->>Backend: create progress_bar
Backend->>Backend: stream_with_hash()
loop download chunks
Backend->>Backend: _get_file_stream(src_path)
Backend-->>Backend: chunk bytes
Backend->>Backend: update md5_hash
Backend->>Backend: pbar.update(len(chunk))
end
Backend->>StreamUnzip: stream_unzip(stream_with_hash())
loop for each entry in archive
StreamUnzip-->>Backend: file_name, file_size, unzipped_chunks
Backend->>Backend: skip directories
Backend->>FS: mkdir(parent(dst_path))
loop write unzipped_chunks
Backend->>FS: write chunk to dst_path
end
Backend->>Backend: record extracted_files
end
alt validate checksum
Backend->>Backend: checksum(src_path)
Backend->>Backend: compare expected vs actual
opt mismatch
Backend->>Backend: cleanup_on_failure()
Backend-->>Caller: raise InterruptedError
end
end
Backend-->>Caller: return extracted_files
else other archives or ZIP without streaming
Backend->>Backend: create TemporaryDirectory(tmp_root)
Backend->>Backend: get_file(src_path, local_archive, validate, verbose)
Backend->>Backend: audeer.extract_archive(local_archive, dst_root, validate, verbose)
Backend-->>Caller: return extracted_files
end
Class diagram for backend streaming interfaces and implementationsclassDiagram
class Backend {
+get_archive(src_path str, dst_root str, tmp_root str, validate bool, verbose bool) list~str~
-_get_archive_streaming(src_path str, dst_root str, validate bool, verbose bool) list~str~
-_get_file(src_path str, dst_path str, verbose bool)
-_get_file_stream(src_path str) Iterator~bytes~
-_size(path str) int
+get_file(src_path str, dst_path str, tmp_root str, num_workers int, validate bool, verbose bool)
}
class ArtifactoryBackend {
-_get_file(src_path str, dst_path str, verbose bool)
-_get_file_stream(src_path str) Iterator~bytes~
-_size(path str) int
}
class FilesystemBackend {
-_get_file(src_path str, dst_path str, verbose bool)
-_get_file_stream(src_path str) Iterator~bytes~
-_size(path str) int
}
class MinioBackend {
-_download_file(src_path str, dst_path str, verbose bool)
-_get_file_stream(src_path str) Iterator~bytes~
-_size(path str) int
}
Backend <|-- ArtifactoryBackend
Backend <|-- FilesystemBackend
Backend <|-- MinioBackend
class stream_unzip {
+stream_unzip(source Iterator~bytes~) Iterator
}
Backend ..> stream_unzip : uses for ZIP streaming
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
85a8ecd to
e8c2749
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Hey - I've found 4 issues, and left some high level feedback:
- In
_get_archive_streaming,cleanup_on_failure()is only invoked forzipfile.BadZipFile,TruncatedDataError, andUnfinishedIterationError; consider broadening the exception handling (or using atry/except Exceptionaround the whole streaming loop) so that partial extractions are also cleaned up on network or unexpected errors. - The
_get_file_streamimplementations in the different backends all hard-code the samechunk_size = 64 * 1024; you might want to centralize this value (e.g., as a constant on the base class) to avoid magic numbers and keep behavior consistent if you ever want to tune it. - The three backend-specific
test_sizetests are nearly identical; consider refactoring them into a shared parametrized test helper to reduce duplication and make it easier to add future backends.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `_get_archive_streaming`, `cleanup_on_failure()` is only invoked for `zipfile.BadZipFile`, `TruncatedDataError`, and `UnfinishedIterationError`; consider broadening the exception handling (or using a `try`/`except Exception` around the whole streaming loop) so that partial extractions are also cleaned up on network or unexpected errors.
- The `_get_file_stream` implementations in the different backends all hard-code the same `chunk_size = 64 * 1024`; you might want to centralize this value (e.g., as a constant on the base class) to avoid magic numbers and keep behavior consistent if you ever want to tune it.
- The three backend-specific `test_size` tests are nearly identical; consider refactoring them into a shared parametrized test helper to reduce duplication and make it easier to add future backends.
## Individual Comments
### Comment 1
<location> `audbackend/core/backend/artifactory.py:282` </location>
<code_context>
+ src_path = self.path(src_path)
+ chunk_size = 64 * 1024 # 64 KB
+
+ with src_path.open() as fp:
+ while data := fp.read(chunk_size):
+ yield data
</code_context>
<issue_to_address>
**issue (bug_risk):** Open the Artifactory file in binary mode to ensure bytes are yielded for hashing and streaming.
`src_path.open()` defaults to text mode, so `_get_file_stream` yields `str`. `_get_archive_streaming` passes these chunks to `hashlib.md5.update`, which requires `bytes`, leading to a type error or unintended encoding. Opening with `src_path.open("rb")` ensures raw bytes are yielded, matching the expectations of the hashing logic and other backends.
</issue_to_address>
### Comment 2
<location> `audbackend/core/backend/minio.py:408` </location>
<code_context>
src_path = self.path(src_path)
_download(src_path, dst_path, verbose=verbose)
+ def _get_file_stream(
+ self,
+ src_path: str,
</code_context>
<issue_to_address>
**issue (bug_risk):** Implement `_size` for the MinIO backend or avoid calling it to prevent `NotImplementedError` when using streaming with `verbose=True`.
In `BaseBackend.get_archive`, the streaming path calls `self._size(src_path)` when `verbose=True`, but this backend only adds `_get_file_stream` and still inherits the base `_size` that raises `NotImplementedError`. As a result, streaming from MinIO with `verbose=True` will crash. Please either implement `_size` for MinIO (as in the other backends) or avoid calling `_size` when the backend doesn’t support it and use an indeterminate progress bar instead.
</issue_to_address>
### Comment 3
<location> `tests/test_backend_filesystem.py:195-204` </location>
<code_context>
assert interface.exists(dst_file, version)
+
+
+@pytest.mark.parametrize(
+ "interface",
+ [(audbackend.backend.Artifactory, audbackend.interface.Versioned)],
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test for the case where `dst_root` is an existing file to ensure `NotADirectoryError` is raised and no partial extraction occurs.
Currently, streaming tests only cover existing directories and cleanup behavior. Please also cover this error path by creating a regular file at `dst_root`, calling `get_archive(..., dst_root=that_file)` with a valid ZIP, asserting `NotADirectoryError`, and verifying that the file is unchanged and no additional files are created nearby.
</issue_to_address>
### Comment 4
<location> `audbackend/core/backend/base.py:528` </location>
<code_context>
src_path = utils.check_path(src_path)
+ # Validate tmp_root if specified
+ # (use TemporaryDirectory to get consistent error format)
+ if tmp_root is not None:
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring `get_archive()` and `_get_archive_streaming()` into smaller helper methods so the main code paths read as simple, linear dispatch and extraction logic.
You can keep all behavior and noticeably reduce complexity by:
---
### 1. Make `get_archive()` a thin dispatcher
Right now `get_archive()` mixes “decide strategy” and “do work”. You can pull the tempfile-based path into a helper so the main method just selects a strategy:
```python
def get_archive(
self,
src_path: str,
dst_root: str,
*,
tmp_root: str = None,
validate: bool = False,
verbose: bool = False,
) -> list[str]:
if not self.opened:
raise RuntimeError(backend_not_opened_error)
src_path = utils.check_path(src_path)
# Validate tmp_root if specified (use TemporaryDirectory to get consistent error format)
if tmp_root is not None:
with tempfile.TemporaryDirectory(dir=tmp_root):
pass
if src_path.lower().endswith(".zip") and STREAM_UNZIP_AVAILABLE:
return self._get_archive_streaming(
src_path,
dst_root,
validate=validate,
verbose=verbose,
)
return self._get_archive_via_tempfile(
src_path,
dst_root,
tmp_root=tmp_root,
validate=validate,
verbose=verbose,
)
def _get_archive_via_tempfile(
self,
src_path: str,
dst_root: str,
*,
tmp_root: str | None,
validate: bool,
verbose: bool,
) -> list[str]:
with tempfile.TemporaryDirectory(dir=tmp_root) as tmp:
tmp_dir = audeer.path(tmp, os.path.basename(dst_root))
local_archive = os.path.join(
tmp_dir,
os.path.basename(src_path),
)
self.get_file(
src_path,
local_archive,
validate=validate,
verbose=verbose,
)
return audeer.extract_archive(
local_archive,
dst_root,
verbose=verbose,
)
```
This moves all non-streaming logic out of `get_archive()`, making the top-level behavior easier to scan.
---
### 2. Extract checksum + progress handling from `_get_archive_streaming()`
The `stream_with_hash()` inner function mixes three concerns (read, hash, progress). You can move that into a reusable helper, which shrinks `_get_archive_streaming()` and makes the checksum logic easier to test in isolation:
```python
def _stream_with_md5_and_progress(
self,
src_path: str,
*,
validate: bool,
verbose: bool,
) -> tuple[Iterator[bytes], hashlib._Hash | None]:
md5_hash = hashlib.md5() if validate else None
src_size = self._size(src_path) if verbose else None
desc = audeer.format_display_message(
f"Download {os.path.basename(src_path)}",
pbar=verbose,
)
pbar = audeer.progress_bar(total=src_size, desc=desc, disable=not verbose)
def iterator() -> Iterator[bytes]:
with pbar:
for chunk in self._get_file_stream(src_path):
if md5_hash is not None:
md5_hash.update(chunk)
pbar.update(len(chunk))
yield chunk
return iterator(), md5_hash
```
Then `_get_archive_streaming()` becomes more linear:
```python
def _get_archive_streaming(
self,
src_path: str,
dst_root: str,
*,
validate: bool = False,
verbose: bool = False,
) -> list[str]:
if os.path.exists(dst_root) and not os.path.isdir(dst_root):
raise NotADirectoryError(errno.ENOTDIR, os.strerror(errno.ENOTDIR), dst_root)
dst_root_existed = os.path.exists(dst_root)
audeer.mkdir(dst_root)
extracted_files: list[str] = []
stream, md5_hash = self._stream_with_md5_and_progress(
src_path,
validate=validate,
verbose=verbose,
)
try:
for file_name, file_size, unzipped_chunks in stream_unzip(stream):
# ... existing extraction logic ...
# (decode name, mkdir, write file, append to extracted_files)
...
if validate:
expected_checksum = self.checksum(src_path)
actual_checksum = md5_hash.hexdigest()
if actual_checksum != expected_checksum:
self._cleanup_extracted(dst_root, dst_root_existed, extracted_files)
raise InterruptedError(
f"Execution is interrupted because {src_path} has checksum "
f"'{actual_checksum}' when the expected checksum is "
f"'{expected_checksum}'. The extracted files have been removed."
)
except (zipfile.BadZipFile, TruncatedDataError, UnfinishedIterationError) as ex:
self._cleanup_extracted(dst_root, dst_root_existed, extracted_files)
raise RuntimeError(f"Broken archive: {src_path}") from ex
return extracted_files
```
---
### 3. Replace the closure `cleanup_on_failure()` with a small helper
The current inner function captures `dst_root_existed` and `extracted_files`. Making it a method clarifies responsibilities and avoids the closure:
```python
def _cleanup_extracted(
self,
dst_root: str,
dst_root_existed: bool,
extracted_files: Sequence[str],
) -> None:
if not dst_root_existed and os.path.exists(dst_root):
shutil.rmtree(dst_root)
else:
for file_name in extracted_files:
full_path = audeer.path(dst_root, file_name)
if os.path.exists(full_path):
os.remove(full_path)
```
Then just call:
```python
self._cleanup_extracted(dst_root, dst_root_existed, extracted_files)
```
where you currently call `cleanup_on_failure()`.
---
These small extractions keep all semantics (including checksum behavior, progress reporting, and cleanup) but significantly flatten `_get_archive_streaming()` and make `get_archive()` easier to reason about.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
I think we once discussed the idea of using ZIP without compression in that case. |
Good idea, that would eliminate the need for tracking the filename and we can just download ZIP files. The only downside is that for the old files it will still be faster using streaming ZIP extraction, but for the new ones it will be faster using |
Ok, let's say we somehow know if a file was zipped with or without compression. In case no compression was used, then we don't want to use streaming ZIP extraction and it can make sense to use multiple workers instead. So maybe it makes sense we keep the argument? |
There are two options:
At the moment, I'm more in favor of the second approach. But there is another reason to maybe stay with |
|
I see a third option: if |
|
The problem is that you will be always slower with When loading the 4.2GB model
when loading without streaming and using
Which means in most cases I would not recommend to use I would vote for one of the following solutions:
|
Ok, maybe I misunderstood, but I thought for TAR.GZ or uncompressed ZIP files using multiple workers is faster. If this is true, then |
Yes, but the package can always call |
Yes, but that's more complicated than just doing: get_archive(..., num_workers=1 if streamable else num_workers)`As long as there is a use-case for using |
I see the point, but there is one big caveat. As a normal user, I would expect that whenever |
|
But there is also the caveat that users continue using |
|
I re-added I also started to benchmark downloading an uncompressed ZIP file with multiple workers, but encountered an error with it. So maybe, we still have an issue in |
|
I repeated my benchmarks of downloading ~4.2GB file as compressed vs. uncompressed ZIP (with the fix introduced in #285). Execution time in seconds (average over 10 runs). Only for
Results are as expected:
|
This reverts commit eafab4c.
51dc22a to
1c5b418
Compare
|
Ok, so our recommendations would be:
I guess we also need to add an argument to |
Yes. I extended the docstring here to reflect this:
I created audeering/audeer#188 |
Co-authored-by: Johannes Wagner <jwagner@audeering.com>

Summary
get_archive()by extracting a ZIP file while downloading it.RemoveWhennum_workersfromget_archive()(introduced in Use workers for file download #271) for required sequential processing of the file chunks.num_workers > 1we do not use streaming extraction, but first download the file with multiple workers to a temp file and extract it afterwards using a single worker.Extraction of archived cannot be speed up by using multiple workers (compare audeering/audeer#186), hence we speed it up indirectly here. Extracting ZIP files in a streaming fashion works for all backends.
Details
TAR.GZ archives are not affected by the changes and are still first downloaded and then extracted.
The biggest downside is that we need additional external dependencies for the implementation, as we use stream-unzip, which depends on the two self-contained packages pycryptodome and stream-inflate.
stream-unzipdoes not yet support Python 3.14, there we fall back to the old behavior.Removingnum_workersfromget_archive()is not a breaking change as we yanked version 2.3.0 ofaudbackend.The pull request also adds two new private methods that every backend has to implement:
audbackend.backend.Base._get_file_stream()audbackend.backend.Base._size()(for progress bar during streaming ZIP extraction)Benchmarks
Benchmark results averaged over 10 runs using a single CPU thread, for using
Minio.get_archive()on the file/alm/audeering-omni/stage1_2/torch/7289b57d.zip(4.2 GB).Benchmark with
audmodelfor7289b57d-1.0.0(compare audeering/audmodel#38)And using
audbto loademodbversion2.0.0(execution time in seconds)Discussion
audbackendthat use ZIP files (audb,audmodel)audmodelcould be further improved by not storing the model files as ZIP files, as they are already quite compressed. Then we could download the big model file withget_file()using several workers and the remaining model metadata as a ZIP file. But this would of cause require to update howaudmodel.publish()stores the files in the first place