diff --git a/datacache/cache.py b/datacache/cache.py index 93688d3..824bb2f 100644 --- a/datacache/cache.py +++ b/datacache/cache.py @@ -75,11 +75,14 @@ def fetch( decompress=False, force=False, timeout=None, - use_wget_if_available=True): + use_wget_if_available=None): """ Return the local path to the downloaded copy of a given URL. Don't download the file again if it's already present, unless `force` is True. + + `use_wget_if_available` is deprecated and ignored (datacache always uses + its streaming Python downloader now); passing it emits a warning. """ key = (url, decompress) if not force and key in self._local_paths: diff --git a/datacache/download.py b/datacache/download.py index 3a0b02a..4b59a97 100644 --- a/datacache/download.py +++ b/datacache/download.py @@ -13,9 +13,8 @@ import gzip import logging import os -import errno -import subprocess -from shutil import move +import warnings +from shutil import copyfileobj, move from tempfile import NamedTemporaryFile import zipfile import urllib @@ -97,7 +96,6 @@ def _download_to_temp_file( timeout=None, base_name="download", ext="tmp", - use_wget_if_available=False, chunk_size=DEFAULT_CHUNK_SIZE, progress_callback=None): @@ -110,47 +108,41 @@ def _download_to_temp_file( delete=False) as tmp: tmp_path = tmp.name - def download_using_python(): - with open(tmp_path, mode="w+b") as tmp_file: - _stream_to_file( - download_url, - tmp_file, - timeout=timeout, - chunk_size=chunk_size, - progress_callback=progress_callback) - - if not use_wget_if_available: - download_using_python() - else: - try: - # first try using wget to download since this works on Travis - # even when FTP otherwise fails - wget_command_list = [ - "wget", - download_url, - "-O", tmp_path, - "--no-verbose", - ] - if download_url.startswith("ftp"): - wget_command_list.extend(["--passive-ftp"]) - if timeout: - wget_command_list.extend(["-T", "%s" % timeout]) - logger.info("Running: %s" % (" ".join(wget_command_list))) - subprocess.call(wget_command_list) - except OSError as e: - if e.errno == errno.ENOENT: - # wget not found - download_using_python() - else: - raise + with open(tmp_path, mode="w+b") as tmp_file: + _stream_to_file( + download_url, + tmp_file, + timeout=timeout, + chunk_size=chunk_size, + progress_callback=progress_callback) return tmp_path +def _decompress_to_file(src_stream, full_path): + """Stream an already-open decompressed source into ``full_path`` atomically. + + Writes to a sibling temp file and moves it into place, so a corrupt or + interrupted decompress (e.g. a gzip CRC failure, which is only detected at + end-of-stream) never leaves a partial file at ``full_path`` that a later + ``fetch_file`` would mistake for a complete cache hit. + """ + out_dir = os.path.dirname(full_path) or "." + with NamedTemporaryFile(dir=out_dir, delete=False) as tmp: + tmp_out = tmp.name + try: + with open(tmp_out, "wb") as dst: + copyfileobj(src_stream, dst) + move(tmp_out, full_path) + finally: + # On success the move consumed tmp_out; on failure drop the partial. + if os.path.exists(tmp_out): + os.remove(tmp_out) + + def _download_and_decompress_if_necessary( full_path, download_url, timeout=None, - use_wget_if_available=False, chunk_size=DEFAULT_CHUNK_SIZE, progress_callback=None): """ @@ -164,36 +156,35 @@ def _download_and_decompress_if_necessary( timeout=timeout, base_name=base_name, ext=ext, - use_wget_if_available=use_wget_if_available, chunk_size=chunk_size, progress_callback=progress_callback) if download_url.endswith("zip") and not filename.endswith("zip"): logger.info("Decompressing zip into %s...", filename) with zipfile.ZipFile(tmp_path) as z: - names = z.namelist() - if len(names) == 0: + infos = z.infolist() + if not infos: raise ValueError("Empty zip archive") - if filename in names: - chosen_filename = filename - else: - # If zip archive contains multiple files, choose the biggest. - biggest_size = 0 - chosen_filename = names[0] - for info in z.infolist(): - if info.file_size > biggest_size: - chosen_filename = info.filename - biggest_size = info.file_size - extract_path = z.extract(chosen_filename) - move(extract_path, full_path) + # Prefer the member matching the local filename; if there's no such + # member (e.g. a multi-file archive), fall back to the biggest one. + chosen = next( + (info for info in infos if info.filename == filename), + max(infos, key=lambda info: info.file_size)) + # Stream the chosen member's *contents* into full_path. We + # deliberately avoid ZipFile.extract(), which writes into the + # current working directory and recreates the member's stored path + # -- a cwd-pollution + path-traversal footgun (a member named e.g. + # "../evil" would escape the intended directory). + with z.open(chosen) as src: + _decompress_to_file(src, full_path) os.remove(tmp_path) elif download_url.endswith("gz") and not filename.endswith("gz"): logger.info("Decompressing gzip into %s...", filename) + # Stream the gunzip to disk rather than read the whole (decompressed) + # file into memory -- the whole point of the streaming download (#49). with gzip.GzipFile(tmp_path) as src: - contents = src.read() + _decompress_to_file(src, full_path) os.remove(tmp_path) - with open(full_path, 'wb') as dst: - dst.write(contents) elif download_url.endswith(("html", "htm")) and full_path.endswith(".csv"): logger.info("Extracting HTML table into CSV %s...", filename) df = pd.read_html(tmp_path, header=0)[0] @@ -223,7 +214,7 @@ def fetch_file( subdir=None, force=False, timeout=None, - use_wget_if_available=False, + use_wget_if_available=None, chunk_size=DEFAULT_CHUNK_SIZE, progress_callback=None): """ @@ -256,23 +247,29 @@ def fetch_file( Timeout for download in seconds, default is None which uses global timeout. - use_wget_if_available: bool, optional - If the `wget` command is available, use that for download instead - of Python libraries (default True) + use_wget_if_available : bool, optional + Deprecated and ignored. datacache now always uses its streaming Python + downloader (which handles http(s) and ftp); the legacy `wget` path was + removed. Passing this argument emits a DeprecationWarning. chunk_size : int, optional - Number of bytes to stream from the server to disk at a time when - using the Python downloader. Defaults to 1 MB. + Number of bytes to stream from the server to disk at a time. + Defaults to 1 MB. progress_callback : callable, optional If provided, called as ``progress_callback(bytes_downloaded, total_bytes)`` after each chunk is written, where `total_bytes` is the server-reported size or None if unknown. Lets callers render a progress - bar (e.g. tqdm) without datacache taking on that dependency. Only - applies to the Python downloader, not the optional `wget` path. + bar (e.g. tqdm) without datacache taking on that dependency. Returns the full path of the local file. """ + if use_wget_if_available is not None: + warnings.warn( + "use_wget_if_available is deprecated and ignored; datacache now always " + "uses its streaming Python downloader (handles http(s) and ftp).", + DeprecationWarning, + stacklevel=2) filename = build_local_filename(download_url, filename, decompress) full_path = build_path(filename, subdir) if not os.path.exists(full_path) or force: @@ -281,7 +278,6 @@ def fetch_file( full_path=full_path, download_url=download_url, timeout=timeout, - use_wget_if_available=use_wget_if_available, chunk_size=chunk_size, progress_callback=progress_callback) else: diff --git a/datacache/version.py b/datacache/version.py index d946f51..770f03c 100644 --- a/datacache/version.py +++ b/datacache/version.py @@ -1,3 +1,3 @@ -__version__ = '1.6.0' +__version__ = '1.7.0' diff --git a/tests/test_download.py b/tests/test_download.py index 17935f8..3156033 100644 --- a/tests/test_download.py +++ b/tests/test_download.py @@ -12,33 +12,129 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datacache import fetch_file - -FASTA_FILENAME = 'Homo_sapiens.GRCh37.75.dna_rm.chromosome.MT.fa' -URL = \ - 'ftp://ftp.ensembl.org/pub/release-75/fasta/homo_sapiens/dna/Homo_sapiens.GRCh37.75.dna_rm.chromosome.MT.fa.gz' - - -def test_fetch_decompress(): - for use_wget_if_available in [True, False]: - for timeout in [None, 10**6]: - path1 = fetch_file( - URL, - decompress=True, - subdir="datacache", - use_wget_if_available=use_wget_if_available, - timeout=timeout) - assert path1.endswith(FASTA_FILENAME) - with open(path1, 'r') as f1: - s1 = f1.read() - assert "TCAATTTCGTGCCAG" in s1 - -def test_fetch_subdirs(): - path = fetch_file(URL, decompress=True, subdir="datacache") - assert path.endswith(FASTA_FILENAME) - - # if we change the subdir then data should end up in - # something like /Users/me/Library/Caches/epitopes_test/ - other_path = fetch_file(URL, decompress=True, subdir="datacache_test") - assert other_path.endswith(FASTA_FILENAME) - assert other_path != path, other_path +""" +Tests for ``fetch_file`` download + decompression. + +These serve a local ``.gz`` over a ``file://`` URL and redirect the cache to an +isolated tmp dir, so they don't depend on a live FTP server or on cross-run +cached state -- the two sources of flakiness in the old live-Ensembl test (#42). +""" + +import gzip +import os +import zipfile + +import pytest + +from datacache import common, fetch_file + + +@pytest.fixture +def isolated_cache(tmp_path, monkeypatch): + """Point datacache's cache dir at an isolated tmp dir (so the test neither + depends on nor pollutes the user's real cache). Fixtures live in ``tmp_path``; + the cache lives in ``tmp_path/cache``.""" + cache = tmp_path / "cache" + monkeypatch.setattr(common, "get_data_dir", lambda subdir=None, envkey=None: str(cache)) + return tmp_path + + +def _write_gz(path, text): + with gzip.open(str(path), "wb") as f: + f.write(text.encode()) + return "file://" + str(path) + + +def test_fetch_decompress(isolated_cache): + url = _write_gz(isolated_cache / "seq.fa.gz", "ACGTACGT TCAATTTCGTGCCAG\n") + path = fetch_file(url, filename="seq.fa.gz", decompress=True) + assert path.endswith("seq.fa") + with open(path) as f: + assert "TCAATTTCGTGCCAG" in f.read() + + +def test_fetch_decompress_caches_then_force(isolated_cache): + # Deterministic cache behaviour. The old test looped over + # use_wget_if_available/timeout but the file was downloaded once and reused, + # so the result depended on iteration order (#42). Here we assert it + # explicitly: without force the cached copy is reused even if the source + # changes; with force it's re-fetched. + src = isolated_cache / "data.fa.gz" + url = _write_gz(src, "FIRST\n") + p1 = fetch_file(url, filename="data.fa.gz", decompress=True) + with open(p1) as f: + assert f.read() == "FIRST\n" + + _write_gz(src, "SECOND\n") # change source; cached copy should win + p2 = fetch_file(url, filename="data.fa.gz", decompress=True) + assert p2 == p1 + with open(p2) as f: + assert f.read() == "FIRST\n" + + p3 = fetch_file(url, filename="data.fa.gz", decompress=True, force=True) + with open(p3) as f: + assert f.read() == "SECOND\n" + + +def test_fetch_decompress_zip_picks_named_member(isolated_cache): + # Multi-member zip: the member matching the local filename is chosen and + # streamed into the cache -- never extracted into the working directory + # (the cwd-pollution / path-traversal footgun of the old ZipFile.extract). + archive = isolated_cache / "table.tsv.zip" + with zipfile.ZipFile(str(archive), "w") as z: + z.writestr("table.tsv", "the wanted member\n") + z.writestr("readme.txt", "ignore me\n") + url = "file://" + str(archive) + + path = fetch_file(url, filename="table.tsv.zip", decompress=True) + assert path.endswith("table.tsv") + with open(path) as f: + assert f.read() == "the wanted member\n" + # Nothing leaked into the current working directory. + assert not os.path.exists("table.tsv") + assert not os.path.exists("readme.txt") + + +def test_corrupt_gz_leaves_no_partial_cache(isolated_cache): + # A truncated gzip decompresses partway then fails its trailing CRC check. + # fetch_file must surface the error and leave NO file at the destination, + # so the next call re-fetches instead of serving a silent partial. + good = gzip.compress(b"the full payload that must not be half-cached\n" * 50) + corrupt = good[:-8] # drop the CRC32 + ISIZE trailer + archive = isolated_cache / "trunc.fa.gz" + archive.write_bytes(corrupt) + url = "file://" + str(archive) + + with pytest.raises(Exception): + fetch_file(url, filename="trunc.fa.gz", decompress=True) + + dest = isolated_cache / "cache" / "trunc.fa" + assert not dest.exists(), "corrupt decompress must not leave a cached partial" + + +def test_fetch_subdirs(tmp_path, monkeypatch): + # Different subdirs resolve to different cache locations. Hermetic: route + # each subdir to its own tmp dir. + src = tmp_path / "seq.fa.gz" + url = _write_gz(src, "ACGT\n") + dirs = {"datacache": tmp_path / "a", "datacache_test": tmp_path / "b"} + monkeypatch.setattr( + common, + "get_data_dir", + lambda subdir=None, envkey=None: str(dirs.get(subdir, tmp_path / "default")), + ) + + path = fetch_file(url, filename="seq.fa.gz", decompress=True, subdir="datacache") + assert path.endswith("seq.fa") + other_path = fetch_file(url, filename="seq.fa.gz", decompress=True, subdir="datacache_test") + assert other_path.endswith("seq.fa") + assert other_path != path + + +def test_use_wget_if_available_is_deprecated(isolated_cache): + # The legacy wget path was removed; passing the argument is accepted for + # backwards compatibility but warns and is otherwise ignored. + url = _write_gz(isolated_cache / "w.fa.gz", "ACGT\n") + with pytest.warns(DeprecationWarning, match="use_wget_if_available is deprecated"): + path = fetch_file(url, filename="w.fa.gz", decompress=True, use_wget_if_available=True) + assert path.endswith("w.fa")