feat: integrate vLLM artifact transfer#451
Conversation
WalkthroughIntroduces end-to-end vLLM cache artifact transfer in ModelExpress. ChangesvLLM Cache Artifact Transfer
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@examples/p2p_transfer_k8s/client/vllm/Dockerfile`:
- Line 4: Update the verification claim in docs/ARCHITECTURE.md that currently
documents the adopt_hidden_tensors() fix verification only on v0.17.1 and
v0.19.0. Since v0.23.0 is now established as the canonical vLLM version in the
Dockerfile, update ARCHITECTURE.md to reflect that the fix has been verified on
v0.23.0 and include the context of testing with DeepSeek-V4-Pro as mentioned in
this PR scope. Ensure the documentation reflects v0.23.0 as the current baseline
version while you may retain historical version references if needed for
context.
In `@modelexpress_client/python/modelexpress/engines/vllm/artifacts.py`:
- Around line 349-350: The code uses raw int(os.environ.get(...)) for parsing
environment variables at two locations, which will raise a ValueError if the
environment variables contain malformed values and cause load_model() to fail.
Wrap the int() conversion for both the MX_METADATA_PORT and the second
environment variable (around line 539) in a try-except block that catches
ValueError, logs a warning message indicating the invalid value and that the
default is being used, and then gracefully falls back to the default value
instead of allowing the exception to propagate and break artifact loading.
- Around line 512-525: The is_vllm_server_ready() function calls
urllib.request.urlopen() with a URL that may come from the environment variable
_READY_URL_ENV without validating the URL scheme, which could allow dangerous
protocols like file://, gopher://, etc. Add URL scheme validation in the
_vllm_health_url() function to ensure the returned URL only uses http or https
schemes, and raise an appropriate exception or return a safe default if the
scheme is invalid. This validation should occur before the URL is used in the
urlopen() call within is_vllm_server_ready().
In `@modelexpress_client/python/modelexpress/metadata/artifact_transfer.py`:
- Around line 394-396: The current guard at the artifact publish location
(checking if worker_grpc_server.port is None) only catches servers that were
never started, not ones that have been stopped. In the WorkerGrpcServer.stop()
method, the _port is not cleared when the server stops, allowing stopped servers
to still return an old port. Fix this by either clearing the _port (and
_servicer if applicable) in the WorkerGrpcServer.stop() method so the port check
correctly identifies stopped servers, or alternatively add an explicit
is_started property to WorkerGrpcServer that properly tracks the server state
and use that property in the guard instead of checking if port is None.
In `@modelexpress_client/python/modelexpress/metadata/heartbeat.py`:
- Around line 133-139: A race condition exists where stop() can return after the
thread join times out while _tick() is still running, allowing _tick() to
publish a READY status after _mark_stale() has been called. To fix this, add a
check within the _tick() method to prevent publishing or sending READY status if
_stop_event has already been set. This ensures that once stop() has been
invoked, no READY announcements will be sent even if _tick() continues executing
briefly. The same prevention logic should also be applied to any other methods
listed in the comment (around lines 206-241 and 243-257) that may publish status
updates.
In `@modelexpress_client/python/modelexpress/metadata/publish.py`:
- Around line 39-40: The cleanup callback for worker servers is removing entries
from _worker_servers unconditionally by device_id, which can cause a race
condition where a stale publisher cleanup removes a newly registered server for
the same device. Guard the cleanup operation by checking that the current value
in _worker_servers for that device_id still matches the server instance being
cleaned up before removing it. This ensures only the actual stale server is
removed and not a newer registration that arrived after this server was
initially registered.
In `@modelexpress_client/python/modelexpress/metadata/worker_server.py`:
- Around line 84-90: The unregister_artifact_source method removes the entire
source from _artifact_sources dictionary when a single artifact_id is found,
making other manifests under the same mx_source_id unreachable. Instead of
calling pop on _artifact_sources, only remove the specific artifact_id from the
source.manifests dictionary. You may optionally remove the entire source only if
no manifests remain after this deletion to maintain a clean state.
- Around line 349-378: The new methods set_mx_source_id,
register_artifact_source, and unregister_artifact_source use _servicer and _port
as liveness guards to detect if the server is running, but the stop() method is
not clearing these fields when the server shuts down. This allows a stopped
server to still pass these checks and accept operations on dead endpoints.
Locate the stop() method in this class and ensure it explicitly sets both
_servicer and _port to None when the server is stopped, so these fields properly
serve as liveness indicators for the guard checks in the new helper methods.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Enterprise
Run ID: aef8a993-785d-41bc-bfe1-bb0b51229909
📒 Files selected for processing (20)
.github/actions/run-mx-p2p-test/action.yml.github/workflows/modelexpress-ci-tests.ymlci/k8s/client/conftest.pyci/k8s/client/test_p2p_k8s.pyci/k8s/client/vllm/manifest-azure.yamldocs/ARCHITECTURE.mddocs/DEPLOYMENT.mddocs/metadata.mdexamples/p2p_transfer_k8s/client/vllm/Dockerfilemodelexpress_client/python/modelexpress/__init__.pymodelexpress_client/python/modelexpress/engines/vllm/artifacts.pymodelexpress_client/python/modelexpress/engines/vllm/loader.pymodelexpress_client/python/modelexpress/metadata/artifact_transfer.pymodelexpress_client/python/modelexpress/metadata/heartbeat.pymodelexpress_client/python/modelexpress/metadata/publish.pymodelexpress_client/python/modelexpress/metadata/worker_server.pymodelexpress_client/python/tests/test_artifact_transfer.pymodelexpress_client/python/tests/test_heartbeat.pymodelexpress_client/python/tests/test_vllm_artifacts.pymodelexpress_client/python/tests/test_vllm_loader.py
11a10c4 to
bb0afaa
Compare
Signed-off-by: Zheng Luo <zheluo@nvidia.com>
Signed-off-by: Zheng Luo <zheluo@nvidia.com>
Signed-off-by: Zheng Luo <zheluo@nvidia.com>
Signed-off-by: Zheng Luo <zheluo@nvidia.com>
cefea86 to
695d385
Compare
There was a problem hiding this comment.
Few questions:
- should be update the file name to
publisher.pyas well? - should we leave heartbeat thread as is and extend with publisher thread as a subclass? we then keep the simple heartbeat thread for cases not involving artifact publishing jobs.
- with artifact transfer enabled, does the source become READY only after all the artifacts are done transferring?
| def _triton_cache_identity(ctx: LoadContext) -> p2p_pb2.SourceIdentity: | ||
| # Triton cache entries are self-keyed by source/options; share by runtime stack. | ||
| identity = p2p_pb2.SourceIdentity( | ||
| mx_source_type=p2p_pb2.MX_SOURCE_TYPE_TRITON_CACHE, | ||
| backend_framework=p2p_pb2.BACKEND_FRAMEWORK_VLLM, | ||
| cuda_version=torch.version.cuda or "", | ||
| triton_version=_triton_version(), | ||
| gpu_arch=_gpu_arch(ctx.device_id), | ||
| ) | ||
| _set_extra_if_present(identity, "triton_key", _triton_key()) | ||
| return identity | ||
|
|
||
|
|
||
| def _deep_gemm_cache_identity(ctx: LoadContext) -> p2p_pb2.SourceIdentity: | ||
| # DeepGEMM cache entries are self-keyed by JIT source/compiler; share by runtime. | ||
| identity = p2p_pb2.SourceIdentity( | ||
| mx_source_type=p2p_pb2.MX_SOURCE_TYPE_DEEP_GEMM_CACHE, | ||
| backend_framework=p2p_pb2.BACKEND_FRAMEWORK_VLLM, | ||
| cuda_version=torch.version.cuda or "", | ||
| gpu_arch=_gpu_arch(ctx.device_id), | ||
| ) | ||
| _set_extra_if_present(identity, "deep_gemm_jit_key", _deep_gemm_jit_key()) | ||
| return identity |
There was a problem hiding this comment.
how does the two identity work without specifying model_name like torch.compile identity? it seems like the server rejects any identity with empty model name.
| # vLLM JIT cache artifacts are pod-scoped, so one successful install per pod | ||
| # is enough for all local workers. | ||
| marker_path = _artifact_marker_path(transfer, identity, "install") | ||
| with _artifact_lock(marker_path): |
There was a problem hiding this comment.
On success this is intended (other TP ranks block until the cache is installed once per pod). But when a transfer fails after discovery (source went STALE or RDMA error mid-stream) the install marker is never written, the lock releases, and the next blocked rank re-runs the full transfer from scratch. With N TP ranks this serializes into up to N x 120s of blocked load_model before all ranks give up (e.g. for TP8, this translates to 8 x 120s = 16mins).
Fix suggestion: Mitigate with a pod-level negative-result marker (short TTL), a hard cap of one transfer attempt per pod, or a shorter timeout on the install path.
| self._cleanup() | ||
| logger.info(f"[Worker {self._worker_rank}] Publisher thread stopped") | ||
|
|
||
| def _on_exit(self) -> None: |
There was a problem hiding this comment.
Correctness warning: _on_exit (atexit) sets _stop_event then calls _mark_stale() -> _update_status(SOURCE_STATUS_STALE) WITHOUT joining the worker thread -- unlike stop(), which joins first (line 137). The daemon _run/_tick thread can already be past its _stop_event.is_set() checks (lines 251 and 263) and then send _update_status(SOURCE_STATUS_READY) at line 265 after the STALE, leaving the source advertised READY until the server-side reaper TTL. That directly defeats the documented guarantee 'sends UpdateStatus(STALE) for immediate detection without waiting for the reaper timeout' (docstring lines 30-32), which is exactly the P2P failover latency this project cares about. The window is narrow but real and the fix is cheap: join the thread (bounded) in _on_exit, or serialize _mark_stale against _update_status(READY) so STALE always wins.
| marker_path.unlink(missing_ok=True) | ||
|
|
||
|
|
||
| def _install_vllm_cache_artifact_once( |
There was a problem hiding this comment.
Security warning: _install_vllm_cache_artifact_once -> transfer.install(header) unpacks a tar fetched from a peer replica into the live torch_compile / triton / deep_gemm cache dirs, which the engine then loads and executes (TorchInductor generated code + .so, Triton .cubin/.ptx, DeepGEMM JIT kernels). The integrity chain (_crc32c_hex per chunk + artifact_manifest_id sha256 recomputed in _validate_fetched_artifact_manifest) only proves the received bytes match what the source advertised; it does NOT authenticate the source or attest the content. Any replica (or a compromised MX metadata server) able to publish a source with a matching SourceIdentity can therefore ship arbitrary executable code to every target that pulls it -> RCE across the deployment. It is opt-in (MX_ARTIFACT_TRANSFER=1) and identity-matched, but identity-match is not authorization. Document this trust boundary in docs/DEPLOYMENT.md and docs/ARCHITECTURE.md, require the worker gRPC + MX endpoints to be network-isolated to the trusted deployment, and consider artifact signing/attestation.
Overview
This PR integrates cache artifact transfer into the vLLM ModelExpress loader so a target replica can reuse runtime-generated JIT caches from an already-warm source replica before vLLM starts model initialization.
The implementation covers three cache families:
VLLM_CACHE_ROOT/torch_compile_cacheTRITON_CACHE_DIRor~/.triton/cacheDG_JIT_CACHE_DIR,DEEP_GEMM_CACHE_DIR, orVLLM_CACHE_ROOT/deep_gemmArtifact transfer is opt-in through
MX_ARTIFACT_TRANSFER=1and requires the P2P metadata path. If artifact transfer is enabled whileMX_P2P_METADATA=0, the loader logs a warning and skips artifact transfer rather than changing the metadata default.What changed
initialize_model()runs.PublisherThread, which can retry initial publication and then heartbeat READY status.WorkerGrpcServerfor both tensor manifests and artifact manifests/chunk serving, instead of starting one artifact server per artifact source.Benchmark
DeepSeek-V4-Pro standalone vLLM on nscale B200:
0.23.0--enable-flashinfer-autotuneAdditional timing detail:
Impact in this run:
Correctness sanity:
FourFourValidation
uv run --project modelexpress_client/python --extra dev pytest modelexpress_client/python/tests/test_vllm_artifacts.py modelexpress_client/python/tests/test_artifact_transfer.py modelexpress_client/python/tests/test_heartbeat.py -qpython3 -m py_compile ci/k8s/client/test_p2p_k8s.pySummary by CodeRabbit
New Features
Documentation
Chores