Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion datacache/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,14 @@ def fetch(
decompress=False,
force=False,
timeout=None,
use_wget_if_available=True):
use_wget_if_available=None):
"""
Return the local path to the downloaded copy of a given URL.
Don't download the file again if it's already present,
unless `force` is True.

`use_wget_if_available` is deprecated and ignored (datacache always uses
its streaming Python downloader now); passing it emits a warning.
"""
key = (url, decompress)
if not force and key in self._local_paths:
Expand Down
126 changes: 61 additions & 65 deletions datacache/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
import gzip
import logging
import os
import errno
import subprocess
from shutil import move
import warnings
from shutil import copyfileobj, move
from tempfile import NamedTemporaryFile
import zipfile
import urllib
Expand Down Expand Up @@ -97,7 +96,6 @@ def _download_to_temp_file(
timeout=None,
base_name="download",
ext="tmp",
use_wget_if_available=False,
chunk_size=DEFAULT_CHUNK_SIZE,
progress_callback=None):

Expand All @@ -110,47 +108,41 @@ def _download_to_temp_file(
delete=False) as tmp:
tmp_path = tmp.name

def download_using_python():
with open(tmp_path, mode="w+b") as tmp_file:
_stream_to_file(
download_url,
tmp_file,
timeout=timeout,
chunk_size=chunk_size,
progress_callback=progress_callback)

if not use_wget_if_available:
download_using_python()
else:
try:
# first try using wget to download since this works on Travis
# even when FTP otherwise fails
wget_command_list = [
"wget",
download_url,
"-O", tmp_path,
"--no-verbose",
]
if download_url.startswith("ftp"):
wget_command_list.extend(["--passive-ftp"])
if timeout:
wget_command_list.extend(["-T", "%s" % timeout])
logger.info("Running: %s" % (" ".join(wget_command_list)))
subprocess.call(wget_command_list)
except OSError as e:
if e.errno == errno.ENOENT:
# wget not found
download_using_python()
else:
raise
with open(tmp_path, mode="w+b") as tmp_file:
_stream_to_file(
download_url,
tmp_file,
timeout=timeout,
chunk_size=chunk_size,
progress_callback=progress_callback)
return tmp_path


def _decompress_to_file(src_stream, full_path):
"""Stream an already-open decompressed source into ``full_path`` atomically.

Writes to a sibling temp file and moves it into place, so a corrupt or
interrupted decompress (e.g. a gzip CRC failure, which is only detected at
end-of-stream) never leaves a partial file at ``full_path`` that a later
``fetch_file`` would mistake for a complete cache hit.
"""
out_dir = os.path.dirname(full_path) or "."
with NamedTemporaryFile(dir=out_dir, delete=False) as tmp:
tmp_out = tmp.name
try:
with open(tmp_out, "wb") as dst:
copyfileobj(src_stream, dst)
move(tmp_out, full_path)
finally:
# On success the move consumed tmp_out; on failure drop the partial.
if os.path.exists(tmp_out):
os.remove(tmp_out)


def _download_and_decompress_if_necessary(
full_path,
download_url,
timeout=None,
use_wget_if_available=False,
chunk_size=DEFAULT_CHUNK_SIZE,
progress_callback=None):
"""
Expand All @@ -164,36 +156,35 @@ def _download_and_decompress_if_necessary(
timeout=timeout,
base_name=base_name,
ext=ext,
use_wget_if_available=use_wget_if_available,
chunk_size=chunk_size,
progress_callback=progress_callback)

if download_url.endswith("zip") and not filename.endswith("zip"):
logger.info("Decompressing zip into %s...", filename)
with zipfile.ZipFile(tmp_path) as z:
names = z.namelist()
if len(names) == 0:
infos = z.infolist()
if not infos:
raise ValueError("Empty zip archive")
if filename in names:
chosen_filename = filename
else:
# If zip archive contains multiple files, choose the biggest.
biggest_size = 0
chosen_filename = names[0]
for info in z.infolist():
if info.file_size > biggest_size:
chosen_filename = info.filename
biggest_size = info.file_size
extract_path = z.extract(chosen_filename)
move(extract_path, full_path)
# Prefer the member matching the local filename; if there's no such
# member (e.g. a multi-file archive), fall back to the biggest one.
chosen = next(
(info for info in infos if info.filename == filename),
max(infos, key=lambda info: info.file_size))
# Stream the chosen member's *contents* into full_path. We
# deliberately avoid ZipFile.extract(), which writes into the
# current working directory and recreates the member's stored path
# -- a cwd-pollution + path-traversal footgun (a member named e.g.
# "../evil" would escape the intended directory).
with z.open(chosen) as src:
_decompress_to_file(src, full_path)
os.remove(tmp_path)
elif download_url.endswith("gz") and not filename.endswith("gz"):
logger.info("Decompressing gzip into %s...", filename)
# Stream the gunzip to disk rather than read the whole (decompressed)
# file into memory -- the whole point of the streaming download (#49).
with gzip.GzipFile(tmp_path) as src:
contents = src.read()
_decompress_to_file(src, full_path)
os.remove(tmp_path)
with open(full_path, 'wb') as dst:
dst.write(contents)
elif download_url.endswith(("html", "htm")) and full_path.endswith(".csv"):
logger.info("Extracting HTML table into CSV %s...", filename)
df = pd.read_html(tmp_path, header=0)[0]
Expand Down Expand Up @@ -223,7 +214,7 @@ def fetch_file(
subdir=None,
force=False,
timeout=None,
use_wget_if_available=False,
use_wget_if_available=None,
chunk_size=DEFAULT_CHUNK_SIZE,
progress_callback=None):
"""
Expand Down Expand Up @@ -256,23 +247,29 @@ def fetch_file(
Timeout for download in seconds, default is None which uses
global timeout.

use_wget_if_available: bool, optional
If the `wget` command is available, use that for download instead
of Python libraries (default True)
use_wget_if_available : bool, optional
Deprecated and ignored. datacache now always uses its streaming Python
downloader (which handles http(s) and ftp); the legacy `wget` path was
removed. Passing this argument emits a DeprecationWarning.

chunk_size : int, optional
Number of bytes to stream from the server to disk at a time when
using the Python downloader. Defaults to 1 MB.
Number of bytes to stream from the server to disk at a time.
Defaults to 1 MB.

progress_callback : callable, optional
If provided, called as ``progress_callback(bytes_downloaded,
total_bytes)`` after each chunk is written, where `total_bytes` is the
server-reported size or None if unknown. Lets callers render a progress
bar (e.g. tqdm) without datacache taking on that dependency. Only
applies to the Python downloader, not the optional `wget` path.
bar (e.g. tqdm) without datacache taking on that dependency.

Returns the full path of the local file.
"""
if use_wget_if_available is not None:
warnings.warn(
"use_wget_if_available is deprecated and ignored; datacache now always "
"uses its streaming Python downloader (handles http(s) and ftp).",
DeprecationWarning,
stacklevel=2)
filename = build_local_filename(download_url, filename, decompress)
full_path = build_path(filename, subdir)
if not os.path.exists(full_path) or force:
Expand All @@ -281,7 +278,6 @@ def fetch_file(
full_path=full_path,
download_url=download_url,
timeout=timeout,
use_wget_if_available=use_wget_if_available,
chunk_size=chunk_size,
progress_callback=progress_callback)
else:
Expand Down
2 changes: 1 addition & 1 deletion datacache/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = '1.6.0'
__version__ = '1.7.0'


156 changes: 126 additions & 30 deletions tests/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,129 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from datacache import fetch_file

FASTA_FILENAME = 'Homo_sapiens.GRCh37.75.dna_rm.chromosome.MT.fa'
URL = \
'ftp://ftp.ensembl.org/pub/release-75/fasta/homo_sapiens/dna/Homo_sapiens.GRCh37.75.dna_rm.chromosome.MT.fa.gz'


def test_fetch_decompress():
for use_wget_if_available in [True, False]:
for timeout in [None, 10**6]:
path1 = fetch_file(
URL,
decompress=True,
subdir="datacache",
use_wget_if_available=use_wget_if_available,
timeout=timeout)
assert path1.endswith(FASTA_FILENAME)
with open(path1, 'r') as f1:
s1 = f1.read()
assert "TCAATTTCGTGCCAG" in s1

def test_fetch_subdirs():
path = fetch_file(URL, decompress=True, subdir="datacache")
assert path.endswith(FASTA_FILENAME)

# if we change the subdir then data should end up in
# something like /Users/me/Library/Caches/epitopes_test/
other_path = fetch_file(URL, decompress=True, subdir="datacache_test")
assert other_path.endswith(FASTA_FILENAME)
assert other_path != path, other_path
"""
Tests for ``fetch_file`` download + decompression.

These serve a local ``.gz`` over a ``file://`` URL and redirect the cache to an
isolated tmp dir, so they don't depend on a live FTP server or on cross-run
cached state -- the two sources of flakiness in the old live-Ensembl test (#42).
"""

import gzip
import os
import zipfile

import pytest

from datacache import common, fetch_file


@pytest.fixture
def isolated_cache(tmp_path, monkeypatch):
"""Point datacache's cache dir at an isolated tmp dir (so the test neither
depends on nor pollutes the user's real cache). Fixtures live in ``tmp_path``;
the cache lives in ``tmp_path/cache``."""
cache = tmp_path / "cache"
monkeypatch.setattr(common, "get_data_dir", lambda subdir=None, envkey=None: str(cache))
return tmp_path


def _write_gz(path, text):
with gzip.open(str(path), "wb") as f:
f.write(text.encode())
return "file://" + str(path)


def test_fetch_decompress(isolated_cache):
url = _write_gz(isolated_cache / "seq.fa.gz", "ACGTACGT TCAATTTCGTGCCAG\n")
path = fetch_file(url, filename="seq.fa.gz", decompress=True)
assert path.endswith("seq.fa")
with open(path) as f:
assert "TCAATTTCGTGCCAG" in f.read()


def test_fetch_decompress_caches_then_force(isolated_cache):
# Deterministic cache behaviour. The old test looped over
# use_wget_if_available/timeout but the file was downloaded once and reused,
# so the result depended on iteration order (#42). Here we assert it
# explicitly: without force the cached copy is reused even if the source
# changes; with force it's re-fetched.
src = isolated_cache / "data.fa.gz"
url = _write_gz(src, "FIRST\n")
p1 = fetch_file(url, filename="data.fa.gz", decompress=True)
with open(p1) as f:
assert f.read() == "FIRST\n"

_write_gz(src, "SECOND\n") # change source; cached copy should win
p2 = fetch_file(url, filename="data.fa.gz", decompress=True)
assert p2 == p1
with open(p2) as f:
assert f.read() == "FIRST\n"

p3 = fetch_file(url, filename="data.fa.gz", decompress=True, force=True)
with open(p3) as f:
assert f.read() == "SECOND\n"


def test_fetch_decompress_zip_picks_named_member(isolated_cache):
# Multi-member zip: the member matching the local filename is chosen and
# streamed into the cache -- never extracted into the working directory
# (the cwd-pollution / path-traversal footgun of the old ZipFile.extract).
archive = isolated_cache / "table.tsv.zip"
with zipfile.ZipFile(str(archive), "w") as z:
z.writestr("table.tsv", "the wanted member\n")
z.writestr("readme.txt", "ignore me\n")
url = "file://" + str(archive)

path = fetch_file(url, filename="table.tsv.zip", decompress=True)
assert path.endswith("table.tsv")
with open(path) as f:
assert f.read() == "the wanted member\n"
# Nothing leaked into the current working directory.
assert not os.path.exists("table.tsv")
assert not os.path.exists("readme.txt")


def test_corrupt_gz_leaves_no_partial_cache(isolated_cache):
# A truncated gzip decompresses partway then fails its trailing CRC check.
# fetch_file must surface the error and leave NO file at the destination,
# so the next call re-fetches instead of serving a silent partial.
good = gzip.compress(b"the full payload that must not be half-cached\n" * 50)
corrupt = good[:-8] # drop the CRC32 + ISIZE trailer
archive = isolated_cache / "trunc.fa.gz"
archive.write_bytes(corrupt)
url = "file://" + str(archive)

with pytest.raises(Exception):
fetch_file(url, filename="trunc.fa.gz", decompress=True)

dest = isolated_cache / "cache" / "trunc.fa"
assert not dest.exists(), "corrupt decompress must not leave a cached partial"


def test_fetch_subdirs(tmp_path, monkeypatch):
# Different subdirs resolve to different cache locations. Hermetic: route
# each subdir to its own tmp dir.
src = tmp_path / "seq.fa.gz"
url = _write_gz(src, "ACGT\n")
dirs = {"datacache": tmp_path / "a", "datacache_test": tmp_path / "b"}
monkeypatch.setattr(
common,
"get_data_dir",
lambda subdir=None, envkey=None: str(dirs.get(subdir, tmp_path / "default")),
)

path = fetch_file(url, filename="seq.fa.gz", decompress=True, subdir="datacache")
assert path.endswith("seq.fa")
other_path = fetch_file(url, filename="seq.fa.gz", decompress=True, subdir="datacache_test")
assert other_path.endswith("seq.fa")
assert other_path != path


def test_use_wget_if_available_is_deprecated(isolated_cache):
# The legacy wget path was removed; passing the argument is accepted for
# backwards compatibility but warns and is otherwise ignored.
url = _write_gz(isolated_cache / "w.fa.gz", "ACGT\n")
with pytest.warns(DeprecationWarning, match="use_wget_if_available is deprecated"):
path = fetch_file(url, filename="w.fa.gz", decompress=True, use_wget_if_available=True)
assert path.endswith("w.fa")
Loading