From 2fa93fad523085b69eee8eb86acf216a2923bc65 Mon Sep 17 00:00:00 2001 From: NikJur Date: Thu, 26 Mar 2026 14:32:15 +0000 Subject: [PATCH 1/3] Fixes the localhost validation error and ensures executors can find each other on the local network --- deepdrivewe/parsl.py | 1 + 1 file changed, 1 insertion(+) diff --git a/deepdrivewe/parsl.py b/deepdrivewe/parsl.py index 5ef234b..2fb87b4 100644 --- a/deepdrivewe/parsl.py +++ b/deepdrivewe/parsl.py @@ -250,6 +250,7 @@ def _get_htex(self, label: str, num_nodes: int) -> HighThroughputExecutor: return HighThroughputExecutor( label=label, available_accelerators=1, # 1 GH per node + address='127.0.0.1', # Hardcoding to replace default localhost to prevent IPv4 validation errors (TO TRY in future: address_by_hostname() for dynamic variable setting and scalability) cores_per_worker=72, cpu_affinity='alternating', prefetch_capacity=0, From 806d56c8396a6cfee8a17a9fa79bd7f082bdc07e Mon Sep 17 00:00:00 2001 From: NikJur Date: Thu, 26 Mar 2026 16:28:37 +0000 Subject: [PATCH 2/3] syntax and proxyStore fix --- .../examples/openmm_ntl9_ddwe/inference.py | 28 +++++++--- .../examples/openmm_ntl9_ddwe/train.py | 32 ++++++++---- deepdrivewe/workflows/ddwe.py | 51 ++++++++++++++++--- 3 files changed, 89 insertions(+), 22 deletions(-) diff --git a/deepdrivewe/examples/openmm_ntl9_ddwe/inference.py b/deepdrivewe/examples/openmm_ntl9_ddwe/inference.py index bb422ab..7f29240 100644 --- a/deepdrivewe/examples/openmm_ntl9_ddwe/inference.py +++ b/deepdrivewe/examples/openmm_ntl9_ddwe/inference.py @@ -20,6 +20,7 @@ from deepdrivewe.recyclers import LowRecycler from deepdrivewe.resamplers import LOFLowResampler +from proxystore.store import get_store class InferenceConfig(BaseModel): """Arguments for the inference module.""" @@ -62,7 +63,7 @@ class InferenceConfig(BaseModel): def run_inference( - sim_output: list[SimResult], + sim_output: list, train_output: TrainResult, basis_states: BasisStates, target_states: list[TargetState], @@ -70,20 +71,32 @@ def run_inference( output_dir: Path, ) -> tuple[list[SimMetadata], list[SimMetadata], IterationMetadata]: """Run inference on the input data.""" + + # Initialise the store and resolve simulation + store = get_store('file-store') + if store is None: + raise RuntimeError("ProxyStore 'file-store' not found on worker.") + + resolved_sims = [store.get(key) for key in sim_output] + + # Resolve train_output if it was passed as a key + if not hasattr(train_output, 'checkpoint_path'): + train_output = store.get(train_output) + # Make the output directory - itetation = sim_output[0].metadata.iteration_id + itetation = resolved_sims[0].metadata.iteration_id output_dir = output_dir / f'{itetation:06d}' output_dir.mkdir(parents=True, exist_ok=True) # Extract the rmsd pcoord from the last frame of each simulation - pcoords = [sim.metadata.pcoord[-1][0] for sim in sim_output] + pcoords = [sim.metadata.pcoord[-1][0] for sim in resolved_sims] print(f'Progress coordinates: {pcoords}') print(f'Best progress coordinate: {min(pcoords)}') - print(f'Num input simulations: {len(sim_output)}') + print(f'Num input simulations: {len(resolved_sims)}') # Extract the simulation metadata - cur_sims = [sim.metadata for sim in sim_output] + cur_sims = [sim.metadata for sim in resolved_sims] # Load the model and history model, history = warmstart_model( @@ -92,8 +105,8 @@ def run_inference( ) # Extract the last frame contact maps and rmsd from each simulation - contact_maps = [sim.data['contact_maps'][-1] for sim in sim_output] - pcoords = [sim.data['pcoords'][-1] for sim in sim_output] + contact_maps = [sim.data['contact_maps'][-1] for sim in resolved_sims] + pcoords = [sim.data['pcoords'][-1] for sim in resolved_sims] # Convert to int16 contact_maps = [x.astype(np.int16) for x in contact_maps] @@ -156,3 +169,4 @@ def run_inference( result = resampler.run(cur_sims, binner, recycler) return result + diff --git a/deepdrivewe/examples/openmm_ntl9_ddwe/train.py b/deepdrivewe/examples/openmm_ntl9_ddwe/train.py index 680b5e1..7b76354 100644 --- a/deepdrivewe/examples/openmm_ntl9_ddwe/train.py +++ b/deepdrivewe/examples/openmm_ntl9_ddwe/train.py @@ -13,6 +13,7 @@ from deepdrivewe.ai import ConvolutionalVAE from deepdrivewe.ai import ConvolutionalVAEConfig +from proxystore.store import get_store class TrainConfig(BaseModel): """Arguments for the training module.""" @@ -33,16 +34,36 @@ class TrainConfig(BaseModel): def run_train( - sim_output: list[SimResult], + sim_output: list, # List of raw ProxyStore Keys config: TrainConfig, output_dir: Path, ) -> TrainResult: """Train the model on the simulation output.""" # Make the output directory itetation = sim_output[0].metadata.iteration_id - output_dir = output_dir / f'{itetation:06d}' + + # Manually resolve the keys using the registered 'file-store' + store = get_store('file-store') + if store is None: + raise RuntimeError("ProxyStore 'file-store' is not initialized on the worker.") + + # store.get(key) retrieves the object without the destructive 'evict' behavior + resolved_sims = [store.get(key) for key in sim_output] + print(f"DEBUG: Successfully resolved {len(resolved_sims)} simulation objects", flush=True) + + # Make the output directory using the first resolved object + iteration = resolved_sims[0].metadata.iteration_id + output_dir = output_dir / f'{iteration:06d}' output_dir.mkdir(parents=True, exist_ok=True) + # Extract contact maps and pcoords from the resolved objects + contact_maps = np.concatenate( + [sim.data['contact_maps'] for sim in resolved_sims], + axis=0 # join along the frame/sample axis + ) + pcoords = np.concatenate([sim.data['pcoords'] for sim in resolved_sims]) + pcoords = pcoords.flatten() + # Load the model configuration model_config = ConvolutionalVAEConfig.from_yaml(config.config_path) @@ -52,13 +73,6 @@ def run_train( checkpoint_path=config.checkpoint_path, ) - # Extract the last frame contact maps and rmsd from each simulation - contact_maps = np.concatenate( - [sim.data['contact_maps'] for sim in sim_output], - ) - pcoords = np.concatenate([sim.data['pcoords'] for sim in sim_output]) - pcoords = pcoords.flatten() - # Fit the model checkpoint_path = model.fit( x=contact_maps, diff --git a/deepdrivewe/workflows/ddwe.py b/deepdrivewe/workflows/ddwe.py index 77a9fa1..7097226 100644 --- a/deepdrivewe/workflows/ddwe.py +++ b/deepdrivewe/workflows/ddwe.py @@ -112,8 +112,8 @@ def process_simulation_result(self, result: Result) -> None: if not result.success: self.logger.error( f'Simulation failed after {result.retries}' - f'/{result.max_retries} attempts, quitting workflow.', - f' result={result}', + f'/{result.max_retries} attempts, quitting workflow.' # Hanging commata removed -> previously caused issues if error reported | syntax + f' result={result}' ) self.done.set() return @@ -127,8 +127,30 @@ def process_simulation_result(self, result: Result) -> None: # extract the proxied objects. The non-streaming case will # need to extract and re-proxy the objects twice (once for # the train task and once for the inference task). - output = result.value if self.streaming else extract(result.value) - self.sim_output.append(output) + + # This method extracts results from the ProxyStore backend and re-registers + # them to prevent automated cache eviction, ensuring data remains available + # for both training and inference agents. + + from proxystore.store import get_store + from proxystore.proxy import extract + + # Initialise store and safely handle proxied data + store = get_store('file-store') + raw_data = result.value + + if not self.streaming: + # Check if the result is a proxy before extraction to handle hybrid workflows + if hasattr(raw_data, '__proxy_wrapped__'): + raw_data = extract(raw_data) + + # Re-register data as a persistent key to bypass default 'evict-on-read' behaviour + key = store.put(raw_data) + + else: + key = raw_data + + self.sim_output.append(key) # If we have all the simulation results, submit a train task if len(self.sim_output) == len(self.ensemble.next_sims): @@ -160,8 +182,23 @@ def process_train_result(self, result: Result) -> None: self.done.set() return - # Store the training output - self.train_output = result.value + # This method ensures the trained model checkpoint persists by manually + # registering a non-evicting key in ProxyStore + + from proxystore.store import get_store + from proxystore.proxy import extract + + # Initialize store to handle model weight persistence + store = get_store('file-store') + raw_train_data = result.value + + # SAFE EXTRACTION: Ensure we have a concrete object before re-registration. + # This prevents the inference task from encountering an evicted proxy. + if hasattr(raw_train_data, '__proxy_wrapped__'): + raw_train_data = extract(raw_train_data) + + # Storing hard-copy key for inference + self.train_output = store.put(raw_train_data) # TODO: What should we do in the streaming case? # Does the process_train_result method even run? @@ -182,6 +219,8 @@ def process_inference_result(self, result: Result) -> None: self.done.set() return + # Could add Safe extraction for best practice, but not stricly needed as final life step in proxy circle of life... + # Unpack the output cur_sims, next_sims, metadata = result.value From 5174ceae25cab3dec0434b889ef70e365edf5949 Mon Sep 17 00:00:00 2001 From: NikJur Date: Fri, 27 Mar 2026 11:29:22 +0000 Subject: [PATCH 3/3] optional: dynamic setting of parsl interaction address replacing default 'localhost' --- deepdrivewe/parsl.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/deepdrivewe/parsl.py b/deepdrivewe/parsl.py index 2fb87b4..d348230 100644 --- a/deepdrivewe/parsl.py +++ b/deepdrivewe/parsl.py @@ -24,6 +24,8 @@ from pydantic import Field from pydantic import model_validator +from parsl.addresses import address_by_hostname + class BaseComputeConfig(BaseModel, ABC): """Compute config (HPC platform, number of GPUs, etc).""" @@ -250,7 +252,7 @@ def _get_htex(self, label: str, num_nodes: int) -> HighThroughputExecutor: return HighThroughputExecutor( label=label, available_accelerators=1, # 1 GH per node - address='127.0.0.1', # Hardcoding to replace default localhost to prevent IPv4 validation errors (TO TRY in future: address_by_hostname() for dynamic variable setting and scalability) + address=address_by_hostname(), # dynamically set address from default 'localhost' to prevent IPv4 validation errors and ensure scaling cores_per_worker=72, cpu_affinity='alternating', prefetch_capacity=0,