diff --git a/.gitignore b/.gitignore index 120cb5c..2289041 100644 --- a/.gitignore +++ b/.gitignore @@ -167,3 +167,4 @@ outputs/ archive/ docs/20*-*.md data/ +.tasks/ diff --git a/Dockerfile b/Dockerfile index 4e76c34..f04994d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,6 @@ ARG UBUNTU_VERSION=22.04 ARG CUDA_MAJOR_VERSION=12.8.1 +ARG PYTHON_VERSION=3.11 ######################## # Stage 1: build stage # @@ -8,6 +9,7 @@ FROM nvidia/cuda:${CUDA_MAJOR_VERSION}-cudnn-devel-ubuntu${UBUNTU_VERSION} AS bu ARG USER_UID=1001 ARG USER_GID=1001 +ARG PYTHON_VERSION # ensures that Python output to stdout/stderr is not buffered: prevents missing information when terminating ENV PYTHONUNBUFFERED=1 @@ -26,6 +28,7 @@ WORKDIR /home/user ENV PATH="/home/user/.local/bin:${PATH}" RUN apt-get update && apt-get install -y --no-install-recommends \ + software-properties-common \ libtiff-dev \ cmake \ zlib1g-dev \ @@ -36,24 +39,22 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ zip unzip \ git \ openssh-server \ - software-properties-common \ + gnupg2 \ + gpg-agent \ + && add-apt-repository -y ppa:deadsnakes/ppa \ + && apt-get update && apt-get install -y --no-install-recommends \ + python${PYTHON_VERSION} \ + python${PYTHON_VERSION}-dev \ + python${PYTHON_VERSION}-venv \ + python${PYTHON_VERSION}-distutils \ && mkdir /var/run/sshd \ + && curl -fsSL https://bootstrap.pypa.io/get-pip.py | python${PYTHON_VERSION} \ + && ln -sf /usr/bin/python${PYTHON_VERSION} /usr/local/bin/python3 \ + && ln -sf /usr/bin/python${PYTHON_VERSION} /usr/local/bin/python \ + && ln -sf /usr/bin/python${PYTHON_VERSION} /usr/bin/python \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* -RUN add-apt-repository -y ppa:deadsnakes/ppa \ - && apt-get update \ - && apt-get install -y --no-install-recommends \ - python3.11 \ - python3.11-dev \ - python3.11-venv \ - python3.11-distutils \ - && ln -sf /usr/bin/python3.11 /usr/local/bin/python3 \ - && ln -sf /usr/bin/python3.11 /usr/bin/python3 \ - && ln -sf /usr/bin/python3.11 /usr/local/bin/python \ - && ln -sf /usr/bin/python3.11 /usr/bin/python \ - && rm -rf /var/lib/apt/lists/* - # libjpeg-turbo 3.x (required by PyTurboJPEG>=2) ARG LIBJPEG_TURBO_VERSION=3.1.0 RUN curl -fsSL https://github.com/libjpeg-turbo/libjpeg-turbo/releases/download/${LIBJPEG_TURBO_VERSION}/libjpeg-turbo-${LIBJPEG_TURBO_VERSION}.tar.gz \ @@ -69,8 +70,7 @@ WORKDIR /opt/app/ ARG PYTORCH_CUDA_INDEX_URL=https://download.pytorch.org/whl/cu128 ARG GIT_MODEL_DEPENDENCIES="git+https://github.com/lilab-stanford/MUSK.git git+https://github.com/Mahmoodlab/CONCH.git git+https://github.com/prov-gigapath/prov-gigapath.git git+https://github.com/facebookresearch/sam2.git" -RUN python -m ensurepip --upgrade \ - && python -m pip install --upgrade pip setuptools pip-tools \ +RUN python -m pip install --upgrade pip setuptools pip-tools \ && python -m pip install hatchling psutil \ && rm -rf /home/user/.cache/pip @@ -108,6 +108,7 @@ FROM nvidia/cuda:${CUDA_MAJOR_VERSION}-cudnn-runtime-ubuntu${UBUNTU_VERSION} ARG USER_UID=1001 ARG USER_GID=1001 +ARG PYTHON_VERSION ENV PYTHONUNBUFFERED=1 ENV DEBIAN_FRONTEND=noninteractive TZ=Europe/Amsterdam @@ -126,32 +127,31 @@ WORKDIR /home/user ENV PATH="/home/user/.local/bin:${PATH}" RUN apt-get update && apt-get install -y --no-install-recommends \ + software-properties-common \ libtiff-dev \ zlib1g-dev \ libnuma1 \ + libspatialindex-dev \ curl \ vim screen \ zip unzip \ git \ openssh-server \ - software-properties-common \ + gnupg2 \ + gpg-agent \ + && add-apt-repository -y ppa:deadsnakes/ppa \ + && apt-get update && apt-get install -y --no-install-recommends \ + python${PYTHON_VERSION} \ + python${PYTHON_VERSION}-venv \ + python${PYTHON_VERSION}-distutils \ && mkdir /var/run/sshd \ + && curl -fsSL https://bootstrap.pypa.io/get-pip.py | python${PYTHON_VERSION} \ + && ln -sf /usr/bin/python${PYTHON_VERSION} /usr/local/bin/python3 \ + && ln -sf /usr/bin/python${PYTHON_VERSION} /usr/local/bin/python \ + && ln -sf /usr/bin/python${PYTHON_VERSION} /usr/bin/python \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* -RUN add-apt-repository -y ppa:deadsnakes/ppa \ - && apt-get update \ - && apt-get install -y --no-install-recommends \ - python3.11 \ - python3.11-dev \ - python3.11-venv \ - python3.11-distutils \ - && ln -sf /usr/bin/python3.11 /usr/local/bin/python3 \ - && ln -sf /usr/bin/python3.11 /usr/bin/python3 \ - && ln -sf /usr/bin/python3.11 /usr/local/bin/python \ - && ln -sf /usr/bin/python3.11 /usr/bin/python \ - && rm -rf /var/lib/apt/lists/* - # libjpeg-turbo 3.x (copied from build stage) COPY --from=build /usr/local/lib/libjpeg* /usr/local/lib/ COPY --from=build /usr/local/lib/libturbojpeg* /usr/local/lib/ @@ -166,11 +166,11 @@ RUN apt-get update && curl -L ${ASAP_URL} -o /tmp/ASAP.deb && apt-get install -- rm -rf /var/lib/apt/lists/* # copy Python libs & entrypoints from build stage (includes flash-attn, your deps, ASAP .pth) -COPY --from=build /usr/local/lib/python3.11/dist-packages /usr/local/lib/python3.11/dist-packages +COPY --from=build /usr/local/lib/python${PYTHON_VERSION}/dist-packages /usr/local/lib/python${PYTHON_VERSION}/dist-packages COPY --from=build /usr/local/bin /usr/local/bin # register libnvimgcodec so cucim can use GPU-accelerated JPEG decoding -RUN echo "/usr/local/lib/python3.11/dist-packages/nvidia/nvimgcodec" > /etc/ld.so.conf.d/nvimgcodec.conf && \ +RUN echo "/usr/local/lib/python${PYTHON_VERSION}/dist-packages/nvidia/nvimgcodec" > /etc/ld.so.conf.d/nvimgcodec.conf && \ ldconfig # copy app code diff --git a/Dockerfile.ci b/Dockerfile.ci index ff17a69..6b3eaea 100755 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -3,6 +3,7 @@ FROM ubuntu:22.04 ARG USER_UID=1001 ARG USER_GID=1001 +ARG PYTHON_VERSION=3.11 ENV PYTHONUNBUFFERED=1 ENV DEBIAN_FRONTEND=noninteractive @@ -18,37 +19,35 @@ RUN groupadd --gid ${USER_GID} user \ WORKDIR /opt/app RUN apt-get update && apt-get install -y --no-install-recommends \ + software-properties-common \ libtiff-dev \ + cmake \ zlib1g-dev \ libnuma1 \ libspatialindex-dev \ curl \ - cmake \ - gnupg2 \ - gpg-agent \ vim screen \ zip unzip \ git \ openssh-server \ build-essential \ ninja-build \ - software-properties-common \ + gnupg2 \ + gpg-agent \ + && add-apt-repository -y ppa:deadsnakes/ppa \ + && apt-get update && apt-get install -y --no-install-recommends \ + python${PYTHON_VERSION} \ + python${PYTHON_VERSION}-dev \ + python${PYTHON_VERSION}-venv \ + python${PYTHON_VERSION}-distutils \ + && mkdir /var/run/sshd \ + && curl -fsSL https://bootstrap.pypa.io/get-pip.py | python${PYTHON_VERSION} \ + && ln -sf /usr/bin/python${PYTHON_VERSION} /usr/local/bin/python3 \ + && ln -sf /usr/bin/python${PYTHON_VERSION} /usr/local/bin/python \ + && ln -sf /usr/bin/python${PYTHON_VERSION} /usr/bin/python \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* -RUN add-apt-repository -y ppa:deadsnakes/ppa \ - && apt-get update \ - && apt-get install -y --no-install-recommends \ - python3.11 \ - python3.11-dev \ - python3.11-venv \ - python3.11-distutils \ - && ln -sf /usr/bin/python3.11 /usr/local/bin/python3 \ - && ln -sf /usr/bin/python3.11 /usr/bin/python3 \ - && ln -sf /usr/bin/python3.11 /usr/local/bin/python \ - && ln -sf /usr/bin/python3.11 /usr/bin/python \ - && rm -rf /var/lib/apt/lists/* - # libjpeg-turbo 3.x (required by PyTurboJPEG>=2) ARG LIBJPEG_TURBO_VERSION=3.1.0 RUN curl -fsSL https://github.com/libjpeg-turbo/libjpeg-turbo/releases/download/${LIBJPEG_TURBO_VERSION}/libjpeg-turbo-${LIBJPEG_TURBO_VERSION}.tar.gz \ @@ -71,8 +70,7 @@ RUN set -eux; \ apt-get clean; \ rm -rf /var/lib/apt/lists/* -RUN python -m ensurepip --upgrade \ - && python -m pip install --upgrade pip setuptools pip-tools \ +RUN python -m pip install --upgrade pip setuptools pip-tools \ && python -m pip install hatchling psutil \ && rm -rf /home/user/.cache/pip diff --git a/docs/preprocessing.rst b/docs/preprocessing.rst index 746f65e..1ecf6c3 100644 --- a/docs/preprocessing.rst +++ b/docs/preprocessing.rst @@ -73,8 +73,12 @@ Both are disabled by default. Enable them via the ``preview`` dict: } ) -Preview images are written to ``/preview/mask/.png`` -and ``/preview/tiling/.png``. Their paths are also +Preview images are written to ``/preview/mask/.jpg`` +and ``/preview/tiling/.jpg``. Their paths are also recorded in ``process_list.csv`` and on the returned :class:`~slide2vec.EmbeddedSlide` (``mask_preview_path``, ``tiling_preview_path``). + +When resuming a run, existing preview paths are preserved in +``process_list.csv`` for unchanged successful tiling artifacts if the preview +files still exist on disk. diff --git a/pyproject.toml b/pyproject.toml index 2c7f8c7..f496415 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ classifiers = [ "Programming Language :: Python :: 3.13", ] dependencies = [ - "hs2p[asap,cucim,openslide,sam2,vips]>=4.0.1", + "hs2p[asap,cucim,openslide,sam2,vips]>=4.0.5", "omegaconf", "matplotlib", "numpy<2", @@ -88,7 +88,7 @@ fm = [ "pandas", "pillow", "rich", - "hs2p[asap,cucim,openslide,sam2,vips]>=4.0.1", + "hs2p[asap,cucim,openslide,sam2,vips]>=4.0.5", "wandb", "torch>=2.3,<2.8", "torchvision>=0.18.0", diff --git a/slide2vec/distributed/pipeline_worker.py b/slide2vec/distributed/pipeline_worker.py index a07624b..76b9499 100644 --- a/slide2vec/distributed/pipeline_worker.py +++ b/slide2vec/distributed/pipeline_worker.py @@ -48,6 +48,16 @@ def main(argv=None) -> int: if not callable(load_successful_tiled_slides_fn): from slide2vec.runtime.manifest import load_successful_tiled_slides as load_successful_tiled_slides_fn slide_records, tiling_results = load_successful_tiled_slides_fn(tiling_input_dir) + requested_sample_ids = request.get("sample_ids") + if requested_sample_ids is not None: + requested_sample_id_set = {str(sample_id) for sample_id in requested_sample_ids} + paired = [ + (slide, tiling_result) + for slide, tiling_result in zip(slide_records, tiling_results) + if slide.sample_id in requested_sample_id_set + ] + slide_records = [slide for slide, _ in paired] + tiling_results = [tiling_result for _, tiling_result in paired] assignments = assign_slides_to_ranks(slide_records, tiling_results, num_gpus=world_size) assigned_ids = assignments.get(global_rank, []) if not assigned_ids: diff --git a/slide2vec/inference.py b/slide2vec/inference.py index ed7ba58..d949b2e 100644 --- a/slide2vec/inference.py +++ b/slide2vec/inference.py @@ -744,6 +744,14 @@ def run_pipeline( save_latents=execution.save_latents, resume=resolved_preprocessing.resume, ) + skipped_slide_count = len(embeddable_slides) - len(pending_slides) + if resolved_preprocessing.resume and skipped_slide_count > 0: + emit_progress( + "embedding.resume", + total_slide_count=len(embeddable_slides), + pending_slide_count=len(pending_slides), + skipped_slide_count=skipped_slide_count, + ) local_persist_callback, _, _ = persist_callbacks.build_incremental_persist_callback( model=model, preprocessing=resolved_preprocessing, @@ -785,6 +793,7 @@ def run_pipeline( "embedding.finished", slide_count=len(embeddable_slides), slides_completed=len(embeddable_slides), + slides_skipped=skipped_slide_count, tile_artifacts=len(tile_artifacts) + len(hierarchical_artifacts), slide_artifacts=len(slide_artifacts), ) diff --git a/slide2vec/progress.py b/slide2vec/progress.py index 1c16a69..fa571bd 100644 --- a/slide2vec/progress.py +++ b/slide2vec/progress.py @@ -153,6 +153,11 @@ def _format_line(self, kind: str, payload: dict[str, Any]) -> str | None: return f"Model {payload['model_name']} ready on {payload['device']}" if kind == "embedding.started": return f"Embedding slides ({payload['slide_count']} total)..." + if kind == "embedding.resume": + return ( + f"Resume: skipped {payload['skipped_slide_count']} already processed slide(s); " + f"{payload['pending_slide_count']} pending" + ) if kind == "embedding.assignment.started": return f"Assigning slides across {payload['num_gpus']} GPU(s)..." if kind == "embedding.assignment.finished": @@ -172,9 +177,15 @@ def _format_line(self, kind: str, payload: dict[str, Any]) -> str | None: if kind == "embedding.slide.finished": return f"Completed {_progress_subject(payload)} ({payload['num_tiles']} tiles)" if kind == "embedding.finished": + skipped_text = ( + f", {payload['slides_skipped']} skipped" + if "slides_skipped" in payload + else "" + ) return ( f"Embedding finished: {payload['slides_completed']}/{payload['slide_count']} slides, " f"{payload['tile_artifacts']} tile artifacts, {payload['slide_artifacts']} slide artifacts" + f"{skipped_text}" ) if kind == "backend.selected": return _format_backend_selected_message(payload) @@ -367,6 +378,23 @@ def emit(self, event: ProgressEvent) -> None: self._ensure_progress_started() self._task_ids["embedding"] = self.progress.add_task("Embedding slides", total=payload["slide_count"]) return + if kind == "embedding.resume": + skipped = int(payload["skipped_slide_count"]) + pending = int(payload["pending_slide_count"]) + total = int(payload["total_slide_count"]) + if skipped > 0: + self.console.print( + f"Resume: skipped {skipped} already processed slide(s); {pending}/{total} pending" + ) + task_id = self._task_ids.get("embedding") + if task_id is not None: + self.progress.update( + task_id, + total=pending, + completed=0, + description=f"Embedding slides ({pending} pending, {skipped} skipped)", + ) + return if kind == "embedding.assignment.started": self._ensure_progress_started() self._task_ids["embedding_assignment"] = self.progress.add_task( @@ -616,11 +644,14 @@ def _embedding_summary_rows(payload: dict[str, Any]) -> list[tuple[str, str]]: slide_count = int(payload["slide_count"]) completed = int(payload["slides_completed"]) failed = max(0, slide_count - completed) - return [ + rows = [ ("Slides w/ tiles", str(slide_count)), ("Completed", str(completed)), ("Failed", str(failed)), ] + if "slides_skipped" in payload: + rows.insert(2, ("Skipped", str(payload["slides_skipped"]))) + return rows def read_progress_events( diff --git a/slide2vec/runtime/artifacts_collect.py b/slide2vec/runtime/artifacts_collect.py index e8fe86d..ca2eea1 100644 --- a/slide2vec/runtime/artifacts_collect.py +++ b/slide2vec/runtime/artifacts_collect.py @@ -19,7 +19,9 @@ collect_pipeline_artifacts, update_process_list_after_embedding, ) +from slide2vec.runtime.persist_callbacks import pending_local_embedding_records from slide2vec.runtime.process_list import resolved_process_list_output_variant +from slide2vec.progress import emit_progress def collect_local_pipeline_artifacts( @@ -68,13 +70,67 @@ def collect_distributed_pipeline_artifacts( persist_hierarchical_embeddings = is_hierarchical_preprocessing(preprocessing) include_slide_embeddings = model.level == "slide" include_tile_embeddings = persist_tile_embeddings and not persist_hierarchical_embeddings + pending_slides, _ = pending_local_embedding_records( + successful_slides, + [None] * len(successful_slides), + process_list_path=process_list_path, + output_dir=output_dir, + output_format=execution.output_format, + persist_tile_embeddings=persist_tile_embeddings, + persist_hierarchical_embeddings=persist_hierarchical_embeddings, + include_slide_embeddings=include_slide_embeddings, + save_latents=execution.save_latents, + resume=preprocessing.resume, + ) + skipped_slide_count = len(successful_slides) - len(pending_slides) + if preprocessing.resume and skipped_slide_count > 0: + emit_progress( + "embedding.resume", + total_slide_count=len(successful_slides), + pending_slide_count=len(pending_slides), + skipped_slide_count=skipped_slide_count, + ) + slide_by_sample_id = {slide.sample_id: slide for slide in successful_slides} + live_updated_sample_ids: set[str] = set() + + def _update_process_list_for_finished_slide(event) -> None: + if getattr(event, "kind", None) != "embedding.slide.finished": + return + payload = getattr(event, "payload", {}) or {} + sample_id = str(payload.get("sample_id", "")) + slide = slide_by_sample_id.get(sample_id) + if slide is None or sample_id in live_updated_sample_ids: + return + tile_artifacts, hierarchical_artifacts, slide_artifacts = collect_pipeline_artifacts( + [slide], + output_dir=output_dir, + output_format=execution.output_format, + include_tile_embeddings=include_tile_embeddings, + include_hierarchical_embeddings=persist_hierarchical_embeddings, + include_slide_embeddings=include_slide_embeddings, + ) + update_process_list_after_embedding( + process_list_path, + successful_slides=[slide], + persist_tile_embeddings=persist_tile_embeddings, + persist_hierarchical_embeddings=persist_hierarchical_embeddings, + include_slide_embeddings=include_slide_embeddings, + encoder_name=model.name, + output_variant=resolved_process_list_output_variant(model), + tile_artifacts=tile_artifacts, + hierarchical_artifacts=hierarchical_artifacts, + slide_artifacts=slide_artifacts, + ) + live_updated_sample_ids.add(sample_id) + run_distributed_embedding_stage( model=model, - successful_slides=successful_slides, + successful_slides=pending_slides, preprocessing=preprocessing, execution=execution, output_dir=output_dir, tiling_input_dir=tiling_input_dir, + on_progress_event=_update_process_list_for_finished_slide, ) tile_artifacts, hierarchical_artifacts, slide_artifacts = collect_pipeline_artifacts( successful_slides, diff --git a/slide2vec/runtime/distributed.py b/slide2vec/runtime/distributed.py index 5b71b61..80b77a9 100644 --- a/slide2vec/runtime/distributed.py +++ b/slide2vec/runtime/distributed.py @@ -9,7 +9,7 @@ import time from contextlib import contextmanager from pathlib import Path -from typing import Any, Sequence +from typing import Any, Callable, Sequence import numpy as np import torch @@ -63,6 +63,7 @@ def run_torchrun_worker( request_path: Path, failure_title: str, progress_events_path: Path | None = None, + progress_event_callback: Callable[[Any], None] | None = None, popen_factory=subprocess.Popen, ) -> None: command = [ @@ -98,11 +99,15 @@ def run_torchrun_worker( events, offsets = read_progress_events(progress_events_path, offsets=offsets) for event in events: emit_progress_event(event) + if progress_event_callback is not None: + progress_event_callback(event) time.sleep(0.1) if progress_events_path is not None: events, offsets = read_progress_events(progress_events_path, offsets=offsets) for event in events: emit_progress_event(event) + if progress_event_callback is not None: + progress_event_callback(event) returncode = process.wait() stdout_thread.join(timeout=1.0) stderr_thread.join(timeout=1.0) diff --git a/slide2vec/runtime/distributed_stage.py b/slide2vec/runtime/distributed_stage.py index 4b52176..61e63b2 100644 --- a/slide2vec/runtime/distributed_stage.py +++ b/slide2vec/runtime/distributed_stage.py @@ -3,7 +3,7 @@ import json from subprocess import Popen from pathlib import Path -from typing import Any, Sequence +from typing import Any, Callable, Sequence import torch from hs2p import SlideSpec @@ -49,6 +49,7 @@ def build_pipeline_worker_request_payload( execution: ExecutionOptions, *, tiling_input_dir: Path, + sample_ids: Sequence[str] | None = None, progress_events_path: Path | None = None, ) -> dict[str, Any]: return { @@ -56,6 +57,7 @@ def build_pipeline_worker_request_payload( "preprocessing": serialize_preprocessing(preprocessing), "execution": serialize_execution(execution, preprocessing=preprocessing), "tiling_input_dir": str(tiling_input_dir), + "sample_ids": list(sample_ids) if sample_ids is not None else None, "progress_events_path": str(progress_events_path) if progress_events_path is not None else None, } @@ -106,6 +108,7 @@ def run_distributed_embedding_stage( execution: ExecutionOptions, output_dir: Path, tiling_input_dir: Path | None = None, + on_progress_event: Callable[[Any], None] | None = None, ) -> None: if not successful_slides: return @@ -117,6 +120,7 @@ def run_distributed_embedding_stage( preprocessing, execution, tiling_input_dir=tiling_input_dir or output_dir, + sample_ids=[slide.sample_id for slide in successful_slides], progress_events_path=progress_events_path, ) request_path.write_text(json.dumps(request_payload, indent=2, sort_keys=True), encoding="utf-8") @@ -137,6 +141,7 @@ def run_distributed_embedding_stage( request_path=request_path, failure_title="Distributed feature extraction failed", progress_events_path=progress_events_path, + progress_event_callback=on_progress_event, popen_factory=Popen, ) diff --git a/slide2vec/runtime/persist_callbacks.py b/slide2vec/runtime/persist_callbacks.py index 988e073..811ca18 100644 --- a/slide2vec/runtime/persist_callbacks.py +++ b/slide2vec/runtime/persist_callbacks.py @@ -31,18 +31,15 @@ def has_complete_local_embedding_outputs( ) -> bool: if persist_hierarchical_embeddings: hierarchical_artifact_path = output_dir / "hierarchical_embeddings" / f"{sample_id}.{output_format}" - hierarchical_metadata_path = output_dir / "hierarchical_embeddings" / f"{sample_id}.meta.json" - if not hierarchical_artifact_path.is_file() or not hierarchical_metadata_path.is_file(): + if not hierarchical_artifact_path.is_file(): return False elif persist_tile_embeddings: tile_artifact_path = output_dir / "tile_embeddings" / f"{sample_id}.{output_format}" - tile_metadata_path = output_dir / "tile_embeddings" / f"{sample_id}.meta.json" - if not tile_artifact_path.is_file() or not tile_metadata_path.is_file(): + if not tile_artifact_path.is_file(): return False if include_slide_embeddings: slide_artifact_path = output_dir / "slide_embeddings" / f"{sample_id}.{output_format}" - slide_metadata_path = output_dir / "slide_embeddings" / f"{sample_id}.meta.json" - if not slide_artifact_path.is_file() or not slide_metadata_path.is_file(): + if not slide_artifact_path.is_file(): return False if save_latents: latent_suffix = "pt" if output_format == "pt" else "npz" diff --git a/slide2vec/runtime/persistence.py b/slide2vec/runtime/persistence.py index c640093..b48cfb5 100644 --- a/slide2vec/runtime/persistence.py +++ b/slide2vec/runtime/persistence.py @@ -10,8 +10,10 @@ HierarchicalEmbeddingArtifact, SlideEmbeddingArtifact, TileEmbeddingArtifact, + load_array, load_metadata, ) +from slide2vec.utils.tiling_io import atomic_write_dataframe_csv def collect_pipeline_artifacts( @@ -47,14 +49,21 @@ def collect_pipeline_artifacts( def load_tile_artifact(sample_id: str, *, output_dir: Path, output_format: str) -> TileEmbeddingArtifact: artifact_path = output_dir / "tile_embeddings" / f"{sample_id}.{output_format}" metadata_path = output_dir / "tile_embeddings" / f"{sample_id}.meta.json" - metadata = load_metadata(metadata_path) + if metadata_path.is_file(): + metadata = load_metadata(metadata_path) + feature_dim = int(metadata["feature_dim"]) + num_tiles = int(metadata["num_tiles"]) + else: + features = load_array(artifact_path) + feature_dim = int(features.shape[-1]) if getattr(features, "ndim", 0) else 1 + num_tiles = int(features.shape[0]) if getattr(features, "ndim", 0) else 1 return TileEmbeddingArtifact( sample_id=sample_id, path=artifact_path, metadata_path=metadata_path, format=output_format, - feature_dim=int(metadata["feature_dim"]), - num_tiles=int(metadata["num_tiles"]), + feature_dim=feature_dim, + num_tiles=num_tiles, ) @@ -66,22 +75,36 @@ def load_hierarchical_artifact( ) -> HierarchicalEmbeddingArtifact: artifact_path = output_dir / "hierarchical_embeddings" / f"{sample_id}.{output_format}" metadata_path = output_dir / "hierarchical_embeddings" / f"{sample_id}.meta.json" - metadata = load_metadata(metadata_path) + if metadata_path.is_file(): + metadata = load_metadata(metadata_path) + feature_dim = int(metadata["feature_dim"]) + num_regions = int(metadata["num_regions"]) + tiles_per_region = int(metadata["tiles_per_region"]) + else: + features = load_array(artifact_path) + feature_dim = int(features.shape[2]) + num_regions = int(features.shape[0]) + tiles_per_region = int(features.shape[1]) return HierarchicalEmbeddingArtifact( sample_id=sample_id, path=artifact_path, metadata_path=metadata_path, format=output_format, - feature_dim=int(metadata["feature_dim"]), - num_regions=int(metadata["num_regions"]), - tiles_per_region=int(metadata["tiles_per_region"]), + feature_dim=feature_dim, + num_regions=num_regions, + tiles_per_region=tiles_per_region, ) def load_slide_artifact(sample_id: str, *, output_dir: Path, output_format: str) -> SlideEmbeddingArtifact: artifact_path = output_dir / "slide_embeddings" / f"{sample_id}.{output_format}" metadata_path = output_dir / "slide_embeddings" / f"{sample_id}.meta.json" - metadata = load_metadata(metadata_path) + if metadata_path.is_file(): + metadata = load_metadata(metadata_path) + feature_dim = int(metadata["feature_dim"]) + else: + embedding = load_array(artifact_path) + feature_dim = int(embedding.shape[-1]) if getattr(embedding, "ndim", 0) else 1 latent_suffix = "pt" if output_format == "pt" else "npz" latent_path = output_dir / "slide_latents" / f"{sample_id}.{latent_suffix}" return SlideEmbeddingArtifact( @@ -89,7 +112,7 @@ def load_slide_artifact(sample_id: str, *, output_dir: Path, output_format: str) path=artifact_path, metadata_path=metadata_path, format=output_format, - feature_dim=int(metadata["feature_dim"]), + feature_dim=feature_dim, latent_path=latent_path if latent_path.is_file() else None, ) @@ -162,4 +185,4 @@ def _resolve_path_str(value: Any) -> str | None: df.loc[mask, "aggregation_status"] = ( "success" if slide.sample_id in slide_success_ids else "error" ) - df.to_csv(process_list_path, index=False) + atomic_write_dataframe_csv(df, process_list_path) diff --git a/slide2vec/runtime/process_list.py b/slide2vec/runtime/process_list.py index 0ccb107..7485e1c 100644 --- a/slide2vec/runtime/process_list.py +++ b/slide2vec/runtime/process_list.py @@ -19,7 +19,7 @@ ) from slide2vec.runtime.embedding import build_hierarchical_embedding_metadata, build_tile_embedding_metadata from slide2vec.runtime.tiling import resolve_slide_backend -from slide2vec.utils.tiling_io import load_tiling_result_from_row +from slide2vec.utils.tiling_io import atomic_write_dataframe_csv, load_tiling_result_from_row def num_rows(data) -> int: @@ -197,4 +197,105 @@ def _resolve_path_str(value: Any) -> str | None: process_df["tiling_preview_path"].notna(), mapped_tiling_preview_paths, ) - process_df.to_csv(process_list_path, index=False) + atomic_write_dataframe_csv(process_df, process_list_path) + + +def restore_resume_metadata_after_tiling( + process_list_path: Path, + previous_process_df: pd.DataFrame | None, +) -> None: + """Restore slide2vec-owned metadata lost when hs2p rewrites process_list.csv. + + hs2p resume validates and re-records successful tiling rows, but it only + writes tiling columns. Preserve embedding and preview metadata when the + current row still references the same tiling artifacts as the previous row. + """ + if previous_process_df is None or previous_process_df.empty: + return + + def _is_missing(value: Any) -> bool: + return value is None or pd.isna(value) or str(value).strip() == "" + + def _same_path(left: Any, right: Any) -> bool: + if _is_missing(left) and _is_missing(right): + return True + if _is_missing(left) or _is_missing(right): + return False + left_path = Path(str(left)).expanduser().resolve(strict=False) + right_path = Path(str(right)).expanduser().resolve(strict=False) + return left_path == right_path + + def _same_value(left: Any, right: Any) -> bool: + if _is_missing(left) and _is_missing(right): + return True + if _is_missing(left) or _is_missing(right): + return False + return str(left) == str(right) + + def _same_int(left: Any, right: Any) -> bool: + if _is_missing(left) and _is_missing(right): + return True + if _is_missing(left) or _is_missing(right): + return False + return int(left) == int(right) + + def _existing_path(value: Any) -> str | None: + if _is_missing(value): + return None + path = Path(str(value)) + return str(path) if path.is_file() else None + + preserve_columns = ( + "feature_status", + "feature_path", + "encoder_name", + "output_variant", + "feature_kind", + "aggregation_status", + ) + preview_columns = ("mask_preview_path", "tiling_preview_path") + previous_by_sample_id = { + str(row["sample_id"]): row + for row in previous_process_df.to_dict("records") + if "sample_id" in row + } + current_df = pd.read_csv(process_list_path) + changed = False + + for column in (*preserve_columns, *preview_columns): + if column in previous_process_df.columns and column not in current_df.columns: + current_df[column] = pd.Series([None] * len(current_df), dtype="object") + changed = True + elif column in current_df.columns: + current_df[column] = current_df[column].astype("object") + + for index, current_row in current_df.iterrows(): + previous_row = previous_by_sample_id.get(str(current_row["sample_id"])) + if previous_row is None: + continue + if current_row.get("tiling_status") != "success" or previous_row.get("tiling_status") != "success": + continue + unchanged_tiling = ( + _same_int(current_row.get("num_tiles"), previous_row.get("num_tiles")) + and _same_path(current_row.get("coordinates_npz_path"), previous_row.get("coordinates_npz_path")) + and _same_path(current_row.get("coordinates_meta_path"), previous_row.get("coordinates_meta_path")) + and _same_path(current_row.get("tiles_tar_path"), previous_row.get("tiles_tar_path")) + and _same_value(current_row.get("backend"), previous_row.get("backend")) + and _same_value(current_row.get("requested_backend"), previous_row.get("requested_backend")) + ) + if not unchanged_tiling: + continue + for column in preserve_columns: + if column in previous_process_df.columns: + current_df.at[index, column] = previous_row.get(column) + changed = True + for column in preview_columns: + if column not in previous_process_df.columns: + continue + restored_path = _existing_path(previous_row.get(column)) + if restored_path is not None and _is_missing(current_row.get(column)): + current_df.at[index, column] = restored_path + changed = True + + if changed: + atomic_write_dataframe_csv(current_df, process_list_path) diff --git a/slide2vec/runtime/tiling_pipeline.py b/slide2vec/runtime/tiling_pipeline.py index 32e0099..6db7109 100644 --- a/slide2vec/runtime/tiling_pipeline.py +++ b/slide2vec/runtime/tiling_pipeline.py @@ -16,7 +16,10 @@ from slide2vec.api import PreprocessingConfig, _resolve_hierarchical_preprocessing from slide2vec.encoders.registry import resolve_preprocessing_defaults from slide2vec.progress import emit_progress, read_tiling_progress_snapshot -from slide2vec.runtime.process_list import record_slide_metadata_in_process_list +from slide2vec.runtime.process_list import ( + record_slide_metadata_in_process_list, + restore_resume_metadata_after_tiling, +) from slide2vec.runtime.progress_bridge import bridge_hs2p_progress_to_slide2vec from slide2vec.runtime.tiling import build_hs2p_configs, resolve_tiling_backend from slide2vec.utils.log_utils import suppress_c_stderr @@ -109,6 +112,11 @@ def prepare_tiled_slides( num_workers: int, ) -> tuple[list[SlideSpec], list[Any], Path]: process_list_path = output_dir / "process_list.csv" + previous_process_df = ( + pd.read_csv(process_list_path) + if preprocessing.resume and process_list_path.is_file() + else None + ) tiling_artifacts = tile_slides_with_progress( slide_records, preprocessing, @@ -122,6 +130,7 @@ def prepare_tiled_slides( preprocessing=preprocessing, tiling_artifacts=tiling_artifacts, ) + restore_resume_metadata_after_tiling(process_list_path, previous_process_df) process_df = load_tiling_process_df(process_list_path) tiling_results = [] successful_slides = [] diff --git a/slide2vec/utils/config.py b/slide2vec/utils/config.py index 44268ab..35d9e41 100644 --- a/slide2vec/utils/config.py +++ b/slide2vec/utils/config.py @@ -113,7 +113,7 @@ def setup(args): cfg = get_cfg_from_args(args) if cfg.resume: - run_id = cfg.resume_dirname + run_id = cfg.resume_dirname or "" elif not args.skip_datetime: run_id = datetime.datetime.now().strftime("%Y-%m-%d_%H_%M") else: diff --git a/slide2vec/utils/tiling_io.py b/slide2vec/utils/tiling_io.py index 3293be7..b933b61 100644 --- a/slide2vec/utils/tiling_io.py +++ b/slide2vec/utils/tiling_io.py @@ -1,3 +1,4 @@ +import tempfile from pathlib import Path from typing import Any @@ -6,6 +7,39 @@ from hs2p import SlideSpec, load_tiling_result +def atomic_write_dataframe_csv(df: pd.DataFrame, path: Path) -> None: + """Write a DataFrame to ``path`` atomically. + + A crash mid-write must never leave a half-written process_list.csv, since + that breaks resume. We write to a sibling temp file and ``replace()`` it + onto the target. Some network filesystems, notably CIFS shares, reject the + atomic replace step even when normal writes still work, so we fall back to a + direct overwrite in that case. + """ + path = Path(path) + path.parent.mkdir(parents=True, exist_ok=True) + temp_path: Path | None = None + try: + with tempfile.NamedTemporaryFile( + mode="w", + suffix=".csv", + dir=path.parent, + delete=False, + ) as handle: + temp_path = Path(handle.name) + df.to_csv(handle, index=False) + try: + temp_path.replace(path) + except PermissionError: + df.to_csv(path, index=False) + temp_path.unlink(missing_ok=True) + else: + temp_path = None + finally: + if temp_path is not None: + temp_path.unlink(missing_ok=True) + + REQUIRED_MANIFEST_COLUMNS = ("sample_id", "image_path") BASE_PROCESS_COLUMNS = ( "sample_id", diff --git a/tests/test_hs2p_package_cutover.py b/tests/test_hs2p_package_cutover.py index b8f1d5f..799ad81 100644 --- a/tests/test_hs2p_package_cutover.py +++ b/tests/test_hs2p_package_cutover.py @@ -221,3 +221,72 @@ def fake_load_model(**kwargs): assert model.level == "tile" assert model.feature_dim == 1280 assert captured["name"] == "virchow2" + + +def test_atomic_write_dataframe_csv_writes_expected_content(tmp_path: Path): + helper = importlib.import_module("slide2vec.utils.tiling_io") + + target = tmp_path / "process_list.csv" + df = pd.DataFrame([{"sample_id": "slide-1", "tiling_status": "success"}]) + helper.atomic_write_dataframe_csv(df, target) + + assert target.is_file() + assert pd.read_csv(target).to_dict("records") == [ + {"sample_id": "slide-1", "tiling_status": "success"} + ] + + +def test_atomic_write_dataframe_csv_preserves_existing_file_on_crash(monkeypatch, tmp_path: Path): + """A crash mid-write must leave the existing process_list.csv untouched + (and not leave a stray temp file behind), so resume can still trust it.""" + helper = importlib.import_module("slide2vec.utils.tiling_io") + + target = tmp_path / "process_list.csv" + target.write_text("sample_id,tiling_status\nslide-1,success\n", encoding="utf-8") + original_bytes = target.read_bytes() + + real_replace = Path.replace + + def _crashing_replace(self, *args, **kwargs): + if Path(self).parent == tmp_path: + raise RuntimeError("simulated crash during rename") + return real_replace(self, *args, **kwargs) + + monkeypatch.setattr(Path, "replace", _crashing_replace) + + with pytest.raises(RuntimeError, match="simulated crash"): + helper.atomic_write_dataframe_csv( + pd.DataFrame([{"sample_id": "slide-2", "tiling_status": "success"}]), + target, + ) + + assert target.read_bytes() == original_bytes + leftover = [p for p in tmp_path.iterdir() if p != target] + assert leftover == [] + + +def test_atomic_write_dataframe_csv_falls_back_on_permission_error(monkeypatch, tmp_path: Path): + helper = importlib.import_module("slide2vec.utils.tiling_io") + + target = tmp_path / "process_list.csv" + target.write_text("sample_id,tiling_status\nslide-1,success\n", encoding="utf-8") + + real_replace = Path.replace + + def _permission_error_replace(self, *args, **kwargs): + if Path(self).parent == tmp_path: + raise PermissionError(13, "permission denied") + return real_replace(self, *args, **kwargs) + + monkeypatch.setattr(Path, "replace", _permission_error_replace) + + helper.atomic_write_dataframe_csv( + pd.DataFrame([{"sample_id": "slide-2", "tiling_status": "error"}]), + target, + ) + + assert pd.read_csv(target).to_dict("records") == [ + {"sample_id": "slide-2", "tiling_status": "error"} + ] + leftover = [p for p in tmp_path.iterdir() if p != target] + assert leftover == [] diff --git a/tests/test_progress.py b/tests/test_progress.py index ee4a707..b913380 100644 --- a/tests/test_progress.py +++ b/tests/test_progress.py @@ -369,6 +369,17 @@ def test_plain_text_reporter_formats_assignment_progress(): ) == "Slide assignment complete: 10 slide(s) across 4 GPU(s)" ) + assert ( + reporter._format_line( + "embedding.resume", + { + "total_slide_count": 10, + "pending_slide_count": 8, + "skipped_slide_count": 2, + }, + ) + == "Resume: skipped 2 already processed slide(s); 8 pending" + ) def test_plain_text_reporter_formats_tissue_progress(): @@ -835,6 +846,33 @@ def test_rich_reporter_defers_tiling_bar_until_progress(monkeypatch): assert 2 not in reporter.progress.tasks +def test_rich_reporter_updates_embedding_total_for_resume_skips(monkeypatch): + import slide2vec.progress as progress + + FakeConsole, _FakeProgress = _install_fake_rich_runtime(monkeypatch) + console = FakeConsole() + reporter = progress.RichCliProgressReporter(console=console) + + reporter.emit(progress.ProgressEvent(kind="embedding.started", payload={"slide_count": 10})) + reporter.emit( + progress.ProgressEvent( + kind="embedding.resume", + payload={ + "total_slide_count": 10, + "pending_slide_count": 8, + "skipped_slide_count": 2, + }, + ) + ) + + assert reporter.progress.tasks[1]["total"] == 8 + assert reporter.progress.tasks[1]["completed"] == 0 + assert reporter.progress.tasks[1]["description"] == "Embedding slides (8 pending, 2 skipped)" + assert [line[0] for line in console.lines] == [ + "Resume: skipped 2 already processed slide(s); 8/10 pending" + ] + + def test_rich_reporter_emits_backend_selected_without_log_suffix(monkeypatch): import slide2vec.progress as progress diff --git a/tests/test_regression_core.py b/tests/test_regression_core.py index a91b827..dcb7e0c 100644 --- a/tests/test_regression_core.py +++ b/tests/test_regression_core.py @@ -103,6 +103,46 @@ def test_get_cfg_from_args_rejects_models_with_ambiguous_spacing_defaults(tmp_pa get_cfg_from_args(args) +def test_setup_resumes_from_base_output_dir_when_resume_dirname_is_empty( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +): + pytest.importorskip("omegaconf") + + from slide2vec.utils.config import setup + + config_path = tmp_path / "config.yaml" + config_path.write_text( + "\n".join( + [ + "csv: /tmp/slides.csv", + f"output_dir: {tmp_path / 'output'}", + "resume: true", + "resume_dirname:", + ] + ) + ) + + args = SimpleNamespace( + config_file=str(config_path), + output_dir=None, + opts=["resume=true", "resume_dirname="], + skip_datetime=True, + run_on_cpu=False, + ) + + monkeypatch.setattr("slide2vec.utils.config.is_main_process", lambda: True) + monkeypatch.setattr("slide2vec.utils.config.fix_random_seeds", lambda seed: None) + monkeypatch.setattr("slide2vec.utils.config.setup_logging", lambda **kwargs: None) + monkeypatch.setattr("slide2vec.utils.config.get_sha", lambda: "deadbeef") + + cfg, cfg_path = setup(args) + + assert cfg.output_dir == str(tmp_path / "output") + assert cfg_path == str(tmp_path / "output" / "config.yaml") + assert (tmp_path / "output").is_dir() + + def test_list_models_is_public_and_returns_all_registered_models(): from slide2vec import list_models diff --git a/tests/test_regression_inference.py b/tests/test_regression_inference.py index 0f1ba7a..ee6bcea 100644 --- a/tests/test_regression_inference.py +++ b/tests/test_regression_inference.py @@ -241,7 +241,16 @@ def test_collect_distributed_pipeline_artifacts_runs_stage_collects_and_updates( captured = {} - def fake_run_stage(*, model, successful_slides, preprocessing, execution, output_dir, tiling_input_dir=None): + def fake_run_stage( + *, + model, + successful_slides, + preprocessing, + execution, + output_dir, + tiling_input_dir=None, + on_progress_event=None, + ): captured["run_stage"] = { "model": model, "successful_slides": successful_slides, @@ -249,6 +258,7 @@ def fake_run_stage(*, model, successful_slides, preprocessing, execution, output "execution": execution, "output_dir": output_dir, "tiling_input_dir": tiling_input_dir, + "on_progress_event": on_progress_event, } def fake_collect(slides, *, output_dir, output_format, include_tile_embeddings, include_hierarchical_embeddings, include_slide_embeddings): @@ -369,6 +379,60 @@ def test_collect_distributed_pipeline_artifacts_uses_hierarchical_artifacts_for_ assert slide_artifacts == [] +def test_collect_distributed_pipeline_artifacts_resume_skips_completed_hierarchical_slides( + monkeypatch, + tmp_path: Path, +): + completed_slide = make_slide("slide-done") + pending_slide = make_slide("slide-pending") + process_list_path = tmp_path / "process_list.csv" + process_list_path.write_text( + "sample_id,annotation,image_path,mask_path,requested_backend,backend,spacing_at_level_0,tiling_status,num_tiles,coordinates_npz_path,coordinates_meta_path,feature_status,feature_path,encoder_name,output_variant,feature_kind,error,traceback\n" + f"slide-done,tissue,/tmp/slide-done.svs,,auto,asap,,success,1,/tmp/slide-done.coordinates.npz,/tmp/slide-done.coordinates.meta.json,success,{tmp_path / 'hierarchical_embeddings' / 'slide-done.pt'},virchow2,default,hierarchical,,\n" + "slide-pending,tissue,/tmp/slide-pending.svs,,auto,asap,,success,1,/tmp/slide-pending.coordinates.npz,/tmp/slide-pending.coordinates.meta.json,tbp,,,,,\n", + encoding="utf-8", + ) + write_hierarchical_embeddings( + "slide-done", + np.zeros((1, 2, 4), dtype=np.float32), + output_dir=tmp_path, + output_format="pt", + metadata={"image_path": "/tmp/slide-done.svs"}, + ) + preprocessing = replace( + DEFAULT_PREPROCESSING, + requested_region_size_px=448, + region_tile_multiple=2, + resume=True, + ) + execution = ExecutionOptions(output_dir=tmp_path, num_gpus=2, output_format="pt") + model = SimpleNamespace(name="virchow2", level="tile") + captured = {} + + def fake_run_stage(**kwargs): + captured["run_stage_slides"] = list(kwargs["successful_slides"]) + + def fake_collect(slides, **kwargs): + captured["collect_slides"] = list(slides) + return [], ["hierarchical-artifacts"], [] + + monkeypatch.setattr(artifacts_collect, "run_distributed_embedding_stage", fake_run_stage) + monkeypatch.setattr(artifacts_collect, "collect_pipeline_artifacts", fake_collect) + monkeypatch.setattr(artifacts_collect, "update_process_list_after_embedding", lambda *args, **kwargs: None) + + artifacts_collect.collect_distributed_pipeline_artifacts( + model=model, + successful_slides=[completed_slide, pending_slide], + process_list_path=process_list_path, + preprocessing=preprocessing, + execution=execution, + output_dir=tmp_path, + ) + + assert [slide.sample_id for slide in captured["run_stage_slides"]] == ["slide-pending"] + assert [slide.sample_id for slide in captured["collect_slides"]] == ["slide-done", "slide-pending"] + + def test_has_complete_local_embedding_outputs_uses_hierarchical_artifacts_for_hierarchical_preprocessing( tmp_path: Path, ): @@ -545,6 +609,51 @@ def fake_persist_embedded_slide(model, embedded_slide, tiling_result, *, preproc assert recorded.loc["slide-a", "feature_path"] == str((tmp_path / "relative-output" / "slide_embeddings" / "slide-a.pt").resolve()) +def test_distributed_collection_updates_process_list_when_worker_slide_finishes( + monkeypatch, + tmp_path: Path, +): + process_list_path = tmp_path / "process_list.csv" + process_list_path.write_text( + "sample_id,annotation,image_path,mask_path,requested_backend,backend,spacing_at_level_0,tiling_status,num_tiles,coordinates_npz_path,coordinates_meta_path,error,traceback\n" + "slide-a,tissue,/tmp/slide-a.svs,,asap,asap,,success,1,/tmp/slide-a.coordinates.npz,/tmp/slide-a.coordinates.meta.json,,\n", + encoding="utf-8", + ) + slide = make_slide("slide-a") + artifact = write_hierarchical_embeddings( + "slide-a", + np.zeros((1, 2, 4), dtype=np.float32), + output_dir=tmp_path, + output_format="pt", + metadata={"image_path": "/tmp/slide-a.svs"}, + ) + captured = {} + + def fake_run_distributed_embedding_stage(*args, **kwargs): + callback = kwargs.get("on_progress_event") + captured["callback"] = callback + assert callback is not None + callback(SimpleNamespace(kind="embedding.slide.finished", payload={"sample_id": "slide-a"})) + + monkeypatch.setattr(artifacts_collect, "run_distributed_embedding_stage", fake_run_distributed_embedding_stage) + + model = SimpleNamespace(name="prost40m", level="tile", _output_variant=None) + artifacts_collect.collect_distributed_pipeline_artifacts( + model=model, + successful_slides=[slide], + process_list_path=process_list_path, + preprocessing=PreprocessingConfig(region_tile_multiple=2), + execution=ExecutionOptions(output_dir=tmp_path, num_gpus=2), + output_dir=tmp_path, + ) + + recorded = pd.read_csv(process_list_path).set_index("sample_id") + assert captured["callback"] is not None + assert recorded.loc["slide-a", "feature_status"] == "success" + assert recorded.loc["slide-a", "feature_path"] == str(artifact.path.resolve()) + assert recorded.loc["slide-a", "feature_kind"] == "hierarchical" + + def test_aggregate_tiles_uses_autocast_for_slide_encoding(monkeypatch, tmp_path: Path): import slide2vec.inference as inference @@ -997,6 +1106,7 @@ def test_pipeline_worker_disables_result_collection_when_streaming(monkeypatch, "preprocessing": {}, "execution": {}, "tiling_input_dir": str(tmp_path), + "sample_ids": ["slide-a"], } ), encoding="utf-8", @@ -1043,6 +1153,69 @@ def fake_compute_embedded_slides(*args, **kwargs): assert captured["collect_results"] is False +def test_pipeline_worker_filters_to_requested_sample_ids(monkeypatch, tmp_path: Path): + import torch.distributed as dist + + import slide2vec.distributed as distributed + import slide2vec.runtime.serialization as serialization + from slide2vec.api import Model + from slide2vec.distributed import pipeline_worker + + request_path = tmp_path / "request.json" + request_path.write_text( + json.dumps( + { + "model": { + "name": "virchow2", + "allow_non_recommended_settings": False, + }, + "preprocessing": {}, + "execution": {}, + "tiling_input_dir": str(tmp_path), + "sample_ids": ["slide-b"], + } + ), + encoding="utf-8", + ) + + slide_a = make_slide("slide-a") + slide_b = make_slide("slide-b") + tiling_a = SimpleNamespace(x=np.array([0]), y=np.array([1]), tile_size_lv0=224) + tiling_b = SimpleNamespace(x=np.array([2]), y=np.array([3]), tile_size_lv0=224) + captured = {} + + monkeypatch.setattr(distributed, "enable", lambda overwrite=True: None) + monkeypatch.setattr(distributed, "get_local_rank", lambda: 0) + monkeypatch.setattr(distributed, "get_global_rank", lambda: 0) + monkeypatch.setattr(distributed, "get_global_size", lambda: 1) + monkeypatch.setattr(dist, "is_available", lambda: False) + monkeypatch.setattr(dist, "is_initialized", lambda: False) + monkeypatch.setattr(Model, "from_preset", lambda *args, **kwargs: SimpleNamespace()) + monkeypatch.setattr(serialization, "deserialize_preprocessing", lambda payload: DEFAULT_PREPROCESSING) + monkeypatch.setattr( + serialization, + "deserialize_execution", + lambda payload: ExecutionOptions(output_dir=tmp_path), + ) + monkeypatch.setattr( + manifest, + "load_successful_tiled_slides", + lambda tiling_input_dir: ([slide_a, slide_b], [tiling_a, tiling_b]), + ) + monkeypatch.setattr(persist_callbacks, "build_incremental_persist_callback", + lambda **kwargs: (lambda *args, **kwargs: None, [], []), + ) + + def fake_compute_embedded_slides(_model, slides, _tiling_results, **kwargs): + captured["computed_sample_ids"] = [slide.sample_id for slide in slides] + return [] + + monkeypatch.setattr(embedding_pipeline, "compute_embedded_slides", fake_compute_embedded_slides) + + assert pipeline_worker.main(["--output-dir", str(tmp_path), "--request-path", str(request_path)]) == 0 + assert captured["computed_sample_ids"] == ["slide-b"] + + def test_direct_embed_worker_streams_payloads_without_retaining_results(monkeypatch, tmp_path: Path): import torch import torch.distributed as dist @@ -1214,6 +1387,38 @@ def fake_compute_tile_embeddings(_loaded, _model, slide, _tiling_result, **_kwar assert result.slide_artifacts == [] +def test_resume_skip_accepts_existing_tile_embedding_without_metadata(tmp_path: Path): + slide = make_slide("slide-a") + process_list_path = tmp_path / "process_list.csv" + process_list_path.write_text( + "sample_id,annotation,image_path,mask_path,requested_backend,backend,spacing_at_level_0,tiling_status,num_tiles,coordinates_npz_path,coordinates_meta_path,feature_status,error,traceback\n" + "slide-a,tissue,/tmp/slide-a.svs,,auto,asap,,success,1,/tmp/slide-a.coordinates.npz,/tmp/slide-a.coordinates.meta.json,success,,\n", + encoding="utf-8", + ) + artifact_path = tmp_path / "tile_embeddings" / "slide-a.npz" + artifact_path.parent.mkdir(parents=True, exist_ok=True) + np.savez_compressed(artifact_path, features=np.array([[1.0, 2.0]], dtype=np.float32)) + + pending_slides, pending_tiling_results = persist_callbacks.pending_local_embedding_records( + [slide], + [SimpleNamespace(x=np.array([0]), y=np.array([0]), tile_size_lv0=224)], + process_list_path=process_list_path, + output_dir=tmp_path, + output_format="npz", + persist_tile_embeddings=True, + persist_hierarchical_embeddings=False, + include_slide_embeddings=False, + save_latents=False, + resume=True, + ) + tile_artifact = persistence.load_tile_artifact("slide-a", output_dir=tmp_path, output_format="npz") + + assert pending_slides == [] + assert pending_tiling_results == [] + assert tile_artifact.feature_dim == 2 + assert tile_artifact.num_tiles == 1 + + def test_run_pipeline_local_persists_completed_embeddings_before_later_slide_failure(monkeypatch, tmp_path: Path): pytest.importorskip("torch") import slide2vec.inference as inference @@ -1587,6 +1792,69 @@ def test_prepare_tiled_slides_records_preview_paths_in_process_list(monkeypatch, assert Path(recorded.loc[0, "tiling_preview_path"]) == Path("/tmp/preview/tiling/slide-a.png").resolve() +def test_prepare_tiled_slides_resume_preserves_embedding_and_existing_preview_metadata( + monkeypatch, + tmp_path: Path, +): + process_list_path = tmp_path / "process_list.csv" + coordinates_npz_path = tmp_path / "tiles" / "slide-a.coordinates.npz" + coordinates_meta_path = tmp_path / "tiles" / "slide-a.coordinates.meta.json" + mask_preview_path = tmp_path / "preview" / "mask" / "slide-a.jpg" + tiling_preview_path = tmp_path / "preview" / "tiling" / "slide-a.jpg" + feature_path = tmp_path / "tile_embeddings" / "slide-a.npz" + for path in [ + coordinates_npz_path, + coordinates_meta_path, + mask_preview_path, + tiling_preview_path, + feature_path, + ]: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text("artifact", encoding="utf-8") + process_list_path.write_text( + "sample_id,annotation,image_path,mask_path,requested_backend,backend,spacing_at_level_0,tiling_status,num_tiles,coordinates_npz_path,coordinates_meta_path,tiles_tar_path,mask_preview_path,tiling_preview_path,feature_status,feature_path,encoder_name,output_variant,feature_kind,error,traceback\n" + f"slide-a,tissue,/tmp/slide-a.svs,,asap,asap,,success,1,{coordinates_npz_path},{coordinates_meta_path},,{mask_preview_path},{tiling_preview_path},success,{feature_path},virchow2,default,tile,,\n", + encoding="utf-8", + ) + + def fake_tile_slides(*args, **kwargs): + process_list_path.write_text( + "sample_id,annotation,image_path,mask_path,requested_backend,backend,tiling_status,num_tiles,coordinates_npz_path,coordinates_meta_path,tiles_tar_path,error,traceback\n" + f"slide-a,tissue,/tmp/slide-a.svs,,asap,asap,success,1,{coordinates_npz_path},{coordinates_meta_path},,,\n", + encoding="utf-8", + ) + return [ + SimpleNamespace( + sample_id="slide-a", + mask_preview_path=None, + tiling_preview_path=None, + ) + ] + + monkeypatch.setattr(tiling_pipeline, "tile_slides", fake_tile_slides) + monkeypatch.setattr( + tiling_pipeline, + "load_tiling_result_from_row", + lambda row: SimpleNamespace(x=np.array([0]), y=np.array([0]), tile_size_lv0=224), + ) + + tiling_pipeline.prepare_tiled_slides( + [make_slide("slide-a")], + replace(DEFAULT_PREPROCESSING, resume=True), + output_dir=tmp_path, + num_workers=0, + ) + + recorded = pd.read_csv(process_list_path) + assert recorded.loc[0, "feature_status"] == "success" + assert Path(recorded.loc[0, "feature_path"]) == feature_path + assert recorded.loc[0, "encoder_name"] == "virchow2" + assert recorded.loc[0, "output_variant"] == "default" + assert recorded.loc[0, "feature_kind"] == "tile" + assert Path(recorded.loc[0, "mask_preview_path"]) == mask_preview_path + assert Path(recorded.loc[0, "tiling_preview_path"]) == tiling_preview_path + + def test_record_slide_metadata_in_process_list_adds_backend_columns(monkeypatch, tmp_path: Path): import slide2vec.inference as inference