From 59f1704a6395087e7f73c785da52a9bfe1115c74 Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Fri, 8 Sep 2023 17:55:01 -0500 Subject: [PATCH 01/17] Start connecting execute_DAG to storage manager --- gufe/protocols/protocoldag.py | 108 +++++++++++++++++++++------------ gufe/protocols/protocolunit.py | 5 +- gufe/storage/storagemanager.py | 6 +- 3 files changed, 77 insertions(+), 42 deletions(-) diff --git a/gufe/protocols/protocoldag.py b/gufe/protocols/protocoldag.py index 375f31531..d877297b9 100644 --- a/gufe/protocols/protocoldag.py +++ b/gufe/protocols/protocoldag.py @@ -17,6 +17,12 @@ ProtocolUnit, ProtocolUnitResult, ProtocolUnitFailure, Context ) +from ..storage.storagemanager import StorageManager +from ..storage.externalresource.filestorage import FileStorage + +import logging +_logger = logging.getLogger(__name__) + class DAGMixin: _protocol_units: list[ProtocolUnit] @@ -333,9 +339,26 @@ def _from_dict(cls, dct: dict): return cls(**dct) +class ReproduceOldBehaviorStorageManager(StorageManager): + # Default behavior has scratch at {dag_label}/scratch/{unit_label} and + # shared at {dag_label}/{unit_label}. This little class makes changes + # that get us back to the original behavior of this class: scratch at + # {dag_label}/scratch_{unit_label} and shared at + # {dag_label}/shared_{unit_label}. + def _scratch_loc(self, unit_label): + return self.scratch_root / f"scratch_{unit_label}" + + def get_shared(self, unit_label): + return super().get_shared(f"shared_{unit_label}") + + def get_permanent(self, unit_label): + return super().get_permanent(f"shared_{unit_label}") + + def execute_DAG(protocoldag: ProtocolDAG, *, - shared_basedir: Path, - scratch_basedir: Path, + shared_basedir: PathLike, + scratch_basedir: PathLike, + permanent: Optional[PathLike] = None, keep_shared: bool = False, keep_scratch: bool = False, raise_error: bool = True, @@ -379,46 +402,53 @@ def execute_DAG(protocoldag: ProtocolDAG, *, # iterate in DAG order results: dict[GufeKey, ProtocolUnitResult] = {} all_results = [] # successes AND failures - shared_paths = [] - for unit in protocoldag.protocol_units: - # translate each `ProtocolUnit` in input into corresponding - # `ProtocolUnitResult` - inputs = _pu_to_pur(unit.inputs, results) - - attempt = 0 - while attempt <= n_retries: - shared = shared_basedir / f'shared_{str(unit.key)}_attempt_{attempt}' - shared_paths.append(shared) - shared.mkdir() - - scratch = scratch_basedir / f'scratch_{str(unit.key)}_attempt_{attempt}' - scratch.mkdir() - - context = Context(shared=shared, - scratch=scratch) - - # execute - result = unit.execute( - context=context, - raise_error=raise_error, - **inputs) - all_results.append(result) - - if not keep_scratch: - shutil.rmtree(scratch) - - if result.ok(): - # attach result to this `ProtocolUnit` - results[unit.key] = result - break - attempt += 1 - if not result.ok(): - break + # the directory given as shared_root is actually the directory for this + # DAG; the "shared_root" for the storage manager is the parent. We'll + # force permanent to be the same. + filestorage = FileStorage(shared_basedir.parent) + dag_label = shared_basedir.name + storage_manager = ReproduceOldBehaviorStorageManager( + scratch_root=scratch_basedir, + shared_root=filestorage, + permanent_root=filestorage, + keep_scratch=keep_scratch, + keep_staging=False, + staging=Path(""), # use the actual directories as the staging + ) + with storage_manager.running_dag(dag_label) as dag_ctx: + for unit in protocoldag.protocol_units: + attempt = 0 + while attempt <= n_retries: + # translate each `ProtocolUnit` in input into corresponding + # `ProtocolUnitResult` + inputs = _pu_to_pur(unit.inputs, results) + + label = f"{str(unit.key)}_attempt_{attempt}", + with dag_ctx.running_unit(label) as (scratch, shared, perm): + context = Context(shared=shared, + scratch=scratch, + permanent=perm) + _logger.info("Starting unit {label}") + _logger.info(context) + result = unit.execute( + context=context, + raise_error=raise_error, + **inputs) + all_results.append(result) + + if result.ok(): + # attach result to this `ProtocolUnit` + results[unit.key] = result + break + attempt += 1 + + if not result.ok(): + break if not keep_shared: - for shared_path in shared_paths: - shutil.rmtree(shared_path) + for objname in filestorage.iter_contents(dag_label): + filestorage.delete(objname) return ProtocolDAGResult( name=protocoldag.name, diff --git a/gufe/protocols/protocolunit.py b/gufe/protocols/protocolunit.py index 73728bdf4..77e1f913c 100644 --- a/gufe/protocols/protocolunit.py +++ b/gufe/protocols/protocolunit.py @@ -23,6 +23,8 @@ GufeTokenizable, GufeKey, TOKENIZABLE_REGISTRY ) +from ..storage.stagingdirectory import StagingDirectory + @dataclass class Context: @@ -31,7 +33,8 @@ class Context: """ scratch: PathLike - shared: PathLike + shared: StagingDirectory + permanent: StagingDirectory def _list_dependencies(inputs, cls): diff --git a/gufe/storage/storagemanager.py b/gufe/storage/storagemanager.py index dbba86ea3..b1cc60eaf 100644 --- a/gufe/storage/storagemanager.py +++ b/gufe/storage/storagemanager.py @@ -125,10 +125,12 @@ def __init__( self.staging = staging self.DAGContextClass = DAGContextClass + def _scratch_loc(self, unit_label): + return self.scratch_root / "scratch" / unit_label + def get_scratch(self, unit_label: str) -> Path: """Get the path for this unit's scratch directory""" - - scratch = self.scratch_root / "scratch" / unit_label + scratch = self._scratch_loc(unit_label) scratch.mkdir(parents=True, exist_ok=True) return scratch From 93fa874cd3f1513591d82cf98957402616803386 Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Wed, 13 Sep 2023 15:44:48 -0500 Subject: [PATCH 02/17] changes after working session --- gufe/protocols/protocoldag.py | 11 ++++++----- gufe/storage/storagemanager.py | 12 ++++++++++++ 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/gufe/protocols/protocoldag.py b/gufe/protocols/protocoldag.py index d877297b9..fa4f4e0c0 100644 --- a/gufe/protocols/protocoldag.py +++ b/gufe/protocols/protocoldag.py @@ -356,9 +356,9 @@ def get_permanent(self, unit_label): def execute_DAG(protocoldag: ProtocolDAG, *, - shared_basedir: PathLike, + shared_basedir: Optional[PathLike], scratch_basedir: PathLike, - permanent: Optional[PathLike] = None, + shared: Optional[ExternalStorage] = None, keep_shared: bool = False, keep_scratch: bool = False, raise_error: bool = True, @@ -406,12 +406,13 @@ def execute_DAG(protocoldag: ProtocolDAG, *, # the directory given as shared_root is actually the directory for this # DAG; the "shared_root" for the storage manager is the parent. We'll # force permanent to be the same. - filestorage = FileStorage(shared_basedir.parent) + if shared is None: + shared = FileStorage(shared_basedir.parent) dag_label = shared_basedir.name storage_manager = ReproduceOldBehaviorStorageManager( scratch_root=scratch_basedir, - shared_root=filestorage, - permanent_root=filestorage, + shared_root=shared, + permanent_root=shared, keep_scratch=keep_scratch, keep_staging=False, staging=Path(""), # use the actual directories as the staging diff --git a/gufe/storage/storagemanager.py b/gufe/storage/storagemanager.py index b1cc60eaf..a1d6f6121 100644 --- a/gufe/storage/storagemanager.py +++ b/gufe/storage/storagemanager.py @@ -43,6 +43,18 @@ def __init__(self, storage_manager: StorageManager, dag_label: str): self.dag_label = dag_label self.permanents: list[PermanentStaging] = [] + def register_dag_result(self, result): + # 1. create alchemiscale real permanent based on result + # 2. register all paths on self.permanents with new permanent + new_permanents = [] + for perm in self.permanents: + new_perm = ... + for file in perm.registry: + new_perm.register(file.path) + + # 3. replace old permanents with new permanent + self.permanents = new_permanents + @classmethod # NB: classmethod must be on top @contextmanager def running_dag(cls, storage_manager: StorageManager, dag_label: str): From 174a4109b8c0a0a0df986f92aa17c7548f992977 Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Fri, 27 Oct 2023 12:50:23 -0500 Subject: [PATCH 03/17] Move StagingPath registration to __fspath__ --- gufe/storage/stagingdirectory.py | 18 +++++++++++++++--- gufe/tests/storage/test_stagingdirectory.py | 10 +++++++++- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/gufe/storage/stagingdirectory.py b/gufe/storage/stagingdirectory.py index 25ac49f12..bd9ae4253 100644 --- a/gufe/storage/stagingdirectory.py +++ b/gufe/storage/stagingdirectory.py @@ -178,7 +178,7 @@ def register_path(self, staging_path: StagingPath): the path to track """ label_exists = self.external.exists(staging_path.label) - fspath = Path(staging_path.__fspath__()) + fspath = Path(staging_path.fspath) if not fspath.parent.exists(): fspath.parent.mkdir(parents=True, exist_ok=True) @@ -336,19 +336,31 @@ class StagingPath: On creation, this registers with a :class:`.StagingDirectory` that will manage the local path and transferring data with its :class:`.ExternalStorage`. + + This object can always be used as a FileLike (using, e.g., the standard + ``open`` builtin). This requires that a staged path that exists on an + external resource be downloaded into a local file when it is referenced. + + For a representation of a file that does not require the download (for + example, when deserializing results that point to files) instead use + :class:`.ExternalFile`. """ def __init__(self, root: StagingDirectory, path: Union[PathLike, str]): self.root = root self.path = Path(path) - self.root.register_path(self) def __truediv__(self, path: Union[PathLike, str]): return StagingPath(self.root, self.path / path) - def __fspath__(self): + @property + def fspath(self): return str(self.root.staging_dir / self.path) + def __fspath__(self): + self.root.register_path(self) + return self.fspath + @property def label(self) -> str: """Label used in :class:`.ExternalStorage` for this path""" diff --git a/gufe/tests/storage/test_stagingdirectory.py b/gufe/tests/storage/test_stagingdirectory.py index bed953782..7e3d68a6c 100644 --- a/gufe/tests/storage/test_stagingdirectory.py +++ b/gufe/tests/storage/test_stagingdirectory.py @@ -43,6 +43,8 @@ def read_only_with_overwritten(root_with_contents): filename = pathlib.Path(read_only) / "data.txt" assert not filename.exists() staged = read_only / "data.txt" + assert not filename.exists() + staged.__fspath__() assert filename.exists() with open(staged, mode='w') as f: f.write("changed") @@ -180,8 +182,9 @@ def test_write_new(self, root): def test_write_old_fail(self, root): old_staging = root._get_other_shared("old_unit") + staged = old_staging / "foo.txt" with pytest.raises(IOError, match="read-only"): - old_staging / "foo.txt" + staged.__fspath__() def test_transfer_to_external(self, root_with_contents): path = list(root_with_contents.registry)[0] # only 1 @@ -262,6 +265,7 @@ def test_cleanup(self, root_with_contents): def test_cleanup_missing(self, root, caplog): root.delete_staging = True file = root / "foo.txt" + file.__fspath__() assert file in root.registry assert not pathlib.Path(file).exists() logger_name = "gufe.storage.stagingdirectory" @@ -279,6 +283,9 @@ def test_register_cleanup_preexisting_file(self, root): assert len(root.preexisting) == 0 staging = root / "foo.txt" assert staging.label == "new_unit/foo.txt" + assert len(root.registry) == 0 + assert len(root.preexisting) == 0 + staging.__fspath__() assert len(root.registry) == 1 assert len(root.preexisting) == 1 @@ -305,6 +312,7 @@ def test_load_missing_for_transfer(self, permanent): fname = pathlib.Path(permanent) / "old_unit/data.txt" assert not fname.exists() staging = permanent / "old_unit/data.txt" + staging.__fspath__() assert not fname.exists() assert permanent.external._data == {} permanent.transfer_staging_to_external() From 1a4852ebc7b92fb004957954dbe3f5ea2bc47071 Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Mon, 30 Oct 2023 09:47:41 -0500 Subject: [PATCH 04/17] a couple thoughts scribbled down --- gufe/protocols/protocoldag.py | 1 + gufe/storage/stagingdirectory.py | 42 +++++++++++++++++++++++++++++++- gufe/storage/storagemanager.py | 25 +++++++++++++++++++ 3 files changed, 67 insertions(+), 1 deletion(-) diff --git a/gufe/protocols/protocoldag.py b/gufe/protocols/protocoldag.py index fa4f4e0c0..facd33b19 100644 --- a/gufe/protocols/protocoldag.py +++ b/gufe/protocols/protocoldag.py @@ -19,6 +19,7 @@ from ..storage.storagemanager import StorageManager from ..storage.externalresource.filestorage import FileStorage +from ..storage.externalresource.base import ExternalStorage import logging _logger = logging.getLogger(__name__) diff --git a/gufe/storage/stagingdirectory.py b/gufe/storage/stagingdirectory.py index bd9ae4253..37efcf0d5 100644 --- a/gufe/storage/stagingdirectory.py +++ b/gufe/storage/stagingdirectory.py @@ -367,9 +367,49 @@ def label(self) -> str: return str(self.root.prefix / self.path) def __repr__(self): - return f"StagingPath({self.__fspath__()})" + return f"StagingPath({self.fspath})" # TODO: how much of the pathlib.Path interface do we want to wrap? # although edge cases may be a pain, we can get most of it with, e.g.: # def exists(self): return Path(self).exists() # but also, can do pathlib.Path(staging_path) and get hte whole thing + + +class StagingPathCodec: + def __init__(self, manager): + self.manager = manager + self.codec = JSONCodec( + cls=StagingPath, + is_my_dict=self.is_my_dict, + to_dict=self.to_dict, + from_dict=self.from_dict, + ) + self.encoder, self.decoder = ... + + def to_dict(self, path): + # scratch, shared, permanent may form nested with progressively + # smaller contexts, so the last of those it is in is where it should + # be labelled + loc = None + if path in self.manager.scratch_root: + loc = "scratch" + if path in self.manager.shared_root: + loc = "shared" + if path in self.manager.permanent_root: + loc = "permanent" + + return { + ':container:': loc, + ':unit_label:': path.root.prefix, + ':path:': path.path, + } + + def from_dict(self, dct): + loader = getattr(self.manager, f"get_{dct[':container:']}") + staging_dir = loader(dct[':unit_label:']) + return staging_dir / dct[':path:'] + + def is_my_dict(self, dct): + ... + + diff --git a/gufe/storage/storagemanager.py b/gufe/storage/storagemanager.py index a1d6f6121..bb80a9647 100644 --- a/gufe/storage/storagemanager.py +++ b/gufe/storage/storagemanager.py @@ -110,6 +110,31 @@ def running_unit(self, unit_label: str): if not self.manager.keep_staging: shared.cleanup() +class PerUnitContextManager: + """Variant to use when doing only a single process per unit""" + def __init__(self, storage_manager: StorageManager, dag_label: str): + self.manager = storage_manager + self.dag_label = dag_label + + @classmethod + @contextmanager + def running_dag(cls, storage_manager, dag_label): + yield cls(storage_manager, dag_label) + + @contextmanager + def running_unit(self, unit_label: str): + scratch = self.manager.get_scratch(unit_label) + shared = self.manager.get_shared(unit_label) + permanent = self.manager.get_permanent(unit_label) + try: + yield scratch, shared, permanent + finally: + shared.tranfer_staging_to_external() + for file in permanent.registry: + shared.transfer_single_file_to_expected(file) + + permanent.transfer_staging_to_external() + class StorageManager: """Tool to manage the storage lifecycle during a DAG. From f588d3721fce53e2003d48c11da4ade502ce7616 Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Mon, 30 Oct 2023 16:51:14 -0500 Subject: [PATCH 05/17] describe user stories and serialization approach --- gufe/storage/stagingdirectory.py | 62 ++++++++++++++++++++++++++++++-- 1 file changed, 59 insertions(+), 3 deletions(-) diff --git a/gufe/storage/stagingdirectory.py b/gufe/storage/stagingdirectory.py index 37efcf0d5..2ea338828 100644 --- a/gufe/storage/stagingdirectory.py +++ b/gufe/storage/stagingdirectory.py @@ -5,6 +5,8 @@ from os import PathLike, rmdir, remove from .externalresource import ExternalStorage, FileStorage from contextlib import contextmanager +from gufe.tokenization import JSON_HANDLER +from gufe.custom_json import JSONCodec, JSONSerializeDeserializer import logging _logger = logging.getLogger(__name__) @@ -375,7 +377,54 @@ def __repr__(self): # but also, can do pathlib.Path(staging_path) and get hte whole thing -class StagingPathCodec: +class StagingPathSerialization: + # TODO: where should this go? I think maybe on the storage manager + + # Serializing staging paths + # ------------------------- + # + # Some important user stories to consider: + # + # 1. I am loading my results object, and I will want to use the + # associated files. This should be transparent, regardless of where + # the permanent storage is located. + # 2. I am loading my results object, but I do not need the large stored + # files. I do not want to download them when they aren't needed. + # 3. My permanent storage was a directory on my file system, but I have + # moved that directory (with use cases of (a) I moved the absolute + # path; (b) it is at a different relative path with respect to my + # pwd. + # 4. I'm working with files from two different permenant storages. I + # need to be able to load from both in the same Python process. + # + # Outputs from a protocol may contain a :class:`.StagingPath`. Note that + # a :class:`.StagingPath` is inherently not a complete description of + # how to obtain the associated data: in almost every situation, there is + # some additional context required. This can include the credentials to + # an external server, or a base path that the file can be found relative + # to (which may have changed if the user moves it.) Because of this, we + # need to inject that context in to the deserialization. + # + # This object injects the relevant context, provided by the + # :class:`.StorageManager`. It creates a JSONSerialierDeserializer based + # on the one being used by gufe in this process, using all the installed + # codecs plus an additional codec specific to this context. + # + # User stories 1 and 2 are handled by the nature of the + # :class:`.StagingPath` object. The external file downloads as part of + # the ``__fspath__`` method. This means that when using the ``open`` + # builtin, you will automatically download the file to a local staging + # directory. However, the reference to the file can exist in the results + # object without downloading the file. + # + # User stories 3 and 4 are handled by this + # :class:`.StagingPathSerialization` class. Story 3 is handled by + # allowing the appropriate context (in the form of a + # :class:`.StorageManager`) to be injected into the deserialization + # process. Story 4 can be handled by using more than one + # :class:`.StagingPathSerialization` context (associated with different + # :class:`.StorageManager` objects. + def __init__(self, manager): self.manager = manager self.codec = JSONCodec( @@ -384,7 +433,14 @@ def __init__(self, manager): to_dict=self.to_dict, from_dict=self.from_dict, ) - self.encoder, self.decoder = ... + codecs = JSON_HANDLER.codecs + [self.codec] + self.json_handler = JSONSerializeDeserializer(codecs) + + def serialize(self, obj): + return self.json_handler.serialize(obj) + + def deserialize(self, string): + retrun self.json_handler.deserialize(string) def to_dict(self, path): # scratch, shared, permanent may form nested with progressively @@ -410,6 +466,6 @@ def from_dict(self, dct): return staging_dir / dct[':path:'] def is_my_dict(self, dct): - ... + return set(dct) == {':container:', ':unit_label:', ':path:'} From e84a4ff25c378c3ccec1d44809377818f8cfbbdb Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Tue, 31 Oct 2023 17:51:58 -0500 Subject: [PATCH 06/17] start on storage demo protocol & tests --- gufe/protocols/protocoldag.py | 7 + gufe/storage/stagingdirectory.py | 17 +-- gufe/storage/storagemanager.py | 3 +- gufe/tests/storage/storage_demo_protocol.py | 141 ++++++++++++++++++++ 4 files changed, 157 insertions(+), 11 deletions(-) create mode 100644 gufe/tests/storage/storage_demo_protocol.py diff --git a/gufe/protocols/protocoldag.py b/gufe/protocols/protocoldag.py index facd33b19..53d42db04 100644 --- a/gufe/protocols/protocoldag.py +++ b/gufe/protocols/protocoldag.py @@ -418,6 +418,13 @@ def execute_DAG(protocoldag: ProtocolDAG, *, keep_staging=False, staging=Path(""), # use the actual directories as the staging ) + return new_execute_DAG(protocoldag, storage_manager, n_retries) + + +def new_execute_DAG(protocoldag, storage_manager, n_retries): + # this simplifies setup of execute_DAG by allowing you to directly + # provide the storage_manager; the extra option in the old one just + # configure the storage_manager with storage_manager.running_dag(dag_label) as dag_ctx: for unit in protocoldag.protocol_units: attempt = 0 diff --git a/gufe/storage/stagingdirectory.py b/gufe/storage/stagingdirectory.py index 2ea338828..121878172 100644 --- a/gufe/storage/stagingdirectory.py +++ b/gufe/storage/stagingdirectory.py @@ -394,7 +394,7 @@ class StagingPathSerialization: # moved that directory (with use cases of (a) I moved the absolute # path; (b) it is at a different relative path with respect to my # pwd. - # 4. I'm working with files from two different permenant storages. I + # 4. I'm working with files from two different permanent storages. I # need to be able to load from both in the same Python process. # # Outputs from a protocol may contain a :class:`.StagingPath`. Note that @@ -406,9 +406,9 @@ class StagingPathSerialization: # need to inject that context in to the deserialization. # # This object injects the relevant context, provided by the - # :class:`.StorageManager`. It creates a JSONSerialierDeserializer based - # on the one being used by gufe in this process, using all the installed - # codecs plus an additional codec specific to this context. + # :class:`.StorageManager`. It creates a JSONSerializerDeserializer + # based on the one being used by gufe in this process, using all the + # installed codecs plus an additional codec specific to this context. # # User stories 1 and 2 are handled by the nature of the # :class:`.StagingPath` object. The external file downloads as part of @@ -433,15 +433,12 @@ def __init__(self, manager): to_dict=self.to_dict, from_dict=self.from_dict, ) + self.refresh_handler() + + def refresh_handler(self): codecs = JSON_HANDLER.codecs + [self.codec] self.json_handler = JSONSerializeDeserializer(codecs) - def serialize(self, obj): - return self.json_handler.serialize(obj) - - def deserialize(self, string): - retrun self.json_handler.deserialize(string) - def to_dict(self, path): # scratch, shared, permanent may form nested with progressively # smaller contexts, so the last of those it is in is where it should diff --git a/gufe/storage/storagemanager.py b/gufe/storage/storagemanager.py index bb80a9647..297852da1 100644 --- a/gufe/storage/storagemanager.py +++ b/gufe/storage/storagemanager.py @@ -110,7 +110,8 @@ def running_unit(self, unit_label: str): if not self.manager.keep_staging: shared.cleanup() -class PerUnitContextManager: + +class PerUnitDAGContextManager(DAGContextManager): """Variant to use when doing only a single process per unit""" def __init__(self, storage_manager: StorageManager, dag_label: str): self.manager = storage_manager diff --git a/gufe/tests/storage/storage_demo_protocol.py b/gufe/tests/storage/storage_demo_protocol.py new file mode 100644 index 000000000..a7400643c --- /dev/null +++ b/gufe/tests/storage/storage_demo_protocol.py @@ -0,0 +1,141 @@ +import gufe + +""" +This module contains a complete integration test for the +""" + + +class Unit1(gufe.ProtocolUnit): + def _execute(ctx): + share_file = ctx.shared / "shared.txt" + with open(share_file, mode='w') as f: + f.write("I can be shared") + + perm_file = ctx.permanent / "permanent.txt" + with open(perm_file, mode='w') as f: + f.write("I'm permanent (but I can be shared)") + + scratch_file = ctx.scratch / "scratch.txt" + with open(scratch_file, mode='w') as f: + f.write("This is scratch -- can't be shared") + + return {'share_file': share_file, + 'perm_file': perm_file, + 'scratch_file': scratch_file} + + +class Unit2(gufe.ProtocolUnit): + def _execute(ctx, unit1_result): + u1_outputs = unit1_result.outputs + share_file = u1_outputs['share_file'] + perm_file = u1_outputs['perm_file'] + scratch_file = u1_outputs['scratch_file'] + + outputs = {} + for file_label, file in unit1_result.outputs.items(): + # labels are, e.g., share_file; file is StagingPath + key = f"{file_label}_contents" + try: + with open(file, mode='r') as f: + outputs[key] = f.read() + except FileNotFoundError: + outputs[key] = "File not found" + + return outputs + + +class StorageDemoProtocol(gufe.Protocol): + @classmethod + def _default_settings(cls): + return {} + + @classmethod + def _defaults(cls): + return {} + + def _create(self, *, stateA, stateB, mapping, extends, name, + transformation_key): + u1 = Unit1() + u2 = Unit2(unit1_results=u1) + return [u1, u2] + + def _gather(self, protocol_dag_results): + return {} + + +# TODO: execute_unit should actually be moved somewhere else; this is likely +# to be the starting point for a real approach to do that +def execute_unit(dag_label, protocolunit, storage_manager, inputs): + label = f"{str(unit.key)}" + with storage_manager(running_dag(dag_label)) as dag_ctx: + with dag_ctx.running_unit(label) as (scratch, shared, perm): + context = Context(shared=shared, + scratch=scratch, + permanent=perm) + + unit_result = protocolunit.execute(context, + raise_error=False, + **inputs) + + return unit_result + + +def execute_per_unit(protocoldag, storage_manager, dag_directory): + # fake like we're executing each unit in a different process + all_unit_filenames = [] + dag_label = protocoldag.key # TODO: we can change this + for num, unit in enumerate(protocoldag.protocol_units): + unit_result = execute_unit(dag_label, unit, storage_manager) + fname = dag_directory / f"result_{num}.json" + # serialize the unit result + with open(fname, mode='w') as f: + f.write(json.dumps(unit_result.to_dict(), + cls=storage_manager.json_encoder)) + + all_unit_filenames.append(fname) + + # now let's force the unit_result to get cleared from memory + del unit_result + assert gc.is_finalized(unit_result) + + ... # TODO: make ProtocolDAGResult + + +def test_execute_DAG(solvated_ligand, solvated_complex, tmp_path): + transformation = gufe.Transformation( + solvated_liquid, + solvated_complex, + protocol=StorageDemoProtocol(), + mapping=None + ) + dag = transformation.create() + shared = MemoryStorage() + permanent = MemoryStorage() + storage_manager = StorageManager( + scratch_root=tmp_path / "scratch", + shared_root=shared, + permanent_root=permanent + ) + result = gufe.protocols.new_execute_DAG(dag, storage_manager, + n_retries=3) + assert result.ok + assert len(result.protocol_unit_results) == 2 + res1, res2 = result.protocol_unit_results + assert set(res1.outputs) == {'share_file', 'perm_file', 'scratch_file'} + # further tests of res1? + + assert res2.outputs = { + ... + } + + assert shared._data = { + 'shared.txt': "I can be shared" + } + assert permanent._data = { + 'permanent.txt': "I'm permanent (but I can be shared)", + } + # test that scratch is empty + + + + From 63d327bac16aac4794b948b19f29bde9f3d43254 Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Wed, 1 Nov 2023 07:28:21 -0500 Subject: [PATCH 07/17] first test of new execute_DAG works --- gufe/protocols/protocoldag.py | 37 +++++--- gufe/storage/stagingdirectory.py | 93 +------------------ gufe/storage/stagingserialization.py | 93 +++++++++++++++++++ ..._demo_protocol.py => test_storage_demo.py} | 47 ++++++---- 4 files changed, 148 insertions(+), 122 deletions(-) create mode 100644 gufe/storage/stagingserialization.py rename gufe/tests/storage/{storage_demo_protocol.py => test_storage_demo.py} (71%) diff --git a/gufe/protocols/protocoldag.py b/gufe/protocols/protocoldag.py index 53d42db04..e24e0d68e 100644 --- a/gufe/protocols/protocoldag.py +++ b/gufe/protocols/protocoldag.py @@ -397,13 +397,6 @@ def execute_DAG(protocoldag: ProtocolDAG, *, The result of executing the `ProtocolDAG`. """ - if n_retries < 0: - raise ValueError("Must give positive number of retries") - - # iterate in DAG order - results: dict[GufeKey, ProtocolUnitResult] = {} - all_results = [] # successes AND failures - # the directory given as shared_root is actually the directory for this # DAG; the "shared_root" for the storage manager is the parent. We'll # force permanent to be the same. @@ -418,13 +411,28 @@ def execute_DAG(protocoldag: ProtocolDAG, *, keep_staging=False, staging=Path(""), # use the actual directories as the staging ) - return new_execute_DAG(protocoldag, storage_manager, n_retries) - - -def new_execute_DAG(protocoldag, storage_manager, n_retries): + return new_execute_DAG(protocoldag, dag_label, storage_manager, + keep_shared, raise_error, n_retries) + + +def new_execute_DAG( + protocoldag, + dag_label, + storage_manager, + keep_shared=False, + raise_error=False, + n_retries=0 +): # this simplifies setup of execute_DAG by allowing you to directly # provide the storage_manager; the extra option in the old one just # configure the storage_manager + if n_retries < 0: + raise ValueError("Must give positive number of retries") + + # iterate in DAG order + results: dict[GufeKey, ProtocolUnitResult] = {} + all_results = [] # successes AND failures + with storage_manager.running_dag(dag_label) as dag_ctx: for unit in protocoldag.protocol_units: attempt = 0 @@ -433,7 +441,7 @@ def new_execute_DAG(protocoldag, storage_manager, n_retries): # `ProtocolUnitResult` inputs = _pu_to_pur(unit.inputs, results) - label = f"{str(unit.key)}_attempt_{attempt}", + label = f"{str(unit.key)}_attempt_{attempt}" with dag_ctx.running_unit(label) as (scratch, shared, perm): context = Context(shared=shared, scratch=scratch, @@ -456,8 +464,9 @@ def new_execute_DAG(protocoldag, storage_manager, n_retries): break if not keep_shared: - for objname in filestorage.iter_contents(dag_label): - filestorage.delete(objname) + shared = storage_manager.shared_root + for objname in shared.iter_contents(dag_label): + shared.delete(objname) return ProtocolDAGResult( name=protocoldag.name, diff --git a/gufe/storage/stagingdirectory.py b/gufe/storage/stagingdirectory.py index 121878172..c63a241b3 100644 --- a/gufe/storage/stagingdirectory.py +++ b/gufe/storage/stagingdirectory.py @@ -5,8 +5,6 @@ from os import PathLike, rmdir, remove from .externalresource import ExternalStorage, FileStorage from contextlib import contextmanager -from gufe.tokenization import JSON_HANDLER -from gufe.custom_json import JSONCodec, JSONSerializeDeserializer import logging _logger = logging.getLogger(__name__) @@ -369,7 +367,7 @@ def label(self) -> str: return str(self.root.prefix / self.path) def __repr__(self): - return f"StagingPath({self.fspath})" + return f"StagingPath('{self.fspath}')" # TODO: how much of the pathlib.Path interface do we want to wrap? # although edge cases may be a pain, we can get most of it with, e.g.: @@ -377,92 +375,3 @@ def __repr__(self): # but also, can do pathlib.Path(staging_path) and get hte whole thing -class StagingPathSerialization: - # TODO: where should this go? I think maybe on the storage manager - - # Serializing staging paths - # ------------------------- - # - # Some important user stories to consider: - # - # 1. I am loading my results object, and I will want to use the - # associated files. This should be transparent, regardless of where - # the permanent storage is located. - # 2. I am loading my results object, but I do not need the large stored - # files. I do not want to download them when they aren't needed. - # 3. My permanent storage was a directory on my file system, but I have - # moved that directory (with use cases of (a) I moved the absolute - # path; (b) it is at a different relative path with respect to my - # pwd. - # 4. I'm working with files from two different permanent storages. I - # need to be able to load from both in the same Python process. - # - # Outputs from a protocol may contain a :class:`.StagingPath`. Note that - # a :class:`.StagingPath` is inherently not a complete description of - # how to obtain the associated data: in almost every situation, there is - # some additional context required. This can include the credentials to - # an external server, or a base path that the file can be found relative - # to (which may have changed if the user moves it.) Because of this, we - # need to inject that context in to the deserialization. - # - # This object injects the relevant context, provided by the - # :class:`.StorageManager`. It creates a JSONSerializerDeserializer - # based on the one being used by gufe in this process, using all the - # installed codecs plus an additional codec specific to this context. - # - # User stories 1 and 2 are handled by the nature of the - # :class:`.StagingPath` object. The external file downloads as part of - # the ``__fspath__`` method. This means that when using the ``open`` - # builtin, you will automatically download the file to a local staging - # directory. However, the reference to the file can exist in the results - # object without downloading the file. - # - # User stories 3 and 4 are handled by this - # :class:`.StagingPathSerialization` class. Story 3 is handled by - # allowing the appropriate context (in the form of a - # :class:`.StorageManager`) to be injected into the deserialization - # process. Story 4 can be handled by using more than one - # :class:`.StagingPathSerialization` context (associated with different - # :class:`.StorageManager` objects. - - def __init__(self, manager): - self.manager = manager - self.codec = JSONCodec( - cls=StagingPath, - is_my_dict=self.is_my_dict, - to_dict=self.to_dict, - from_dict=self.from_dict, - ) - self.refresh_handler() - - def refresh_handler(self): - codecs = JSON_HANDLER.codecs + [self.codec] - self.json_handler = JSONSerializeDeserializer(codecs) - - def to_dict(self, path): - # scratch, shared, permanent may form nested with progressively - # smaller contexts, so the last of those it is in is where it should - # be labelled - loc = None - if path in self.manager.scratch_root: - loc = "scratch" - if path in self.manager.shared_root: - loc = "shared" - if path in self.manager.permanent_root: - loc = "permanent" - - return { - ':container:': loc, - ':unit_label:': path.root.prefix, - ':path:': path.path, - } - - def from_dict(self, dct): - loader = getattr(self.manager, f"get_{dct[':container:']}") - staging_dir = loader(dct[':unit_label:']) - return staging_dir / dct[':path:'] - - def is_my_dict(self, dct): - return set(dct) == {':container:', ':unit_label:', ':path:'} - - diff --git a/gufe/storage/stagingserialization.py b/gufe/storage/stagingserialization.py new file mode 100644 index 000000000..4b479396f --- /dev/null +++ b/gufe/storage/stagingserialization.py @@ -0,0 +1,93 @@ +from gufe.tokenization import JSON_HANDLER +from gufe.custom_json import JSONCodec, JSONSerializerDeserializer +from .stagingdirectory import StagingPath + +class StagingPathSerialization: + # TODO: where should this go? I think maybe on the storage manager + + # Serializing staging paths + # ------------------------- + # + # Some important user stories to consider: + # + # 1. I am loading my results object, and I will want to use the + # associated files. This should be transparent, regardless of where + # the permanent storage is located. + # 2. I am loading my results object, but I do not need the large stored + # files. I do not want to download them when they aren't needed. + # 3. My permanent storage was a directory on my file system, but I have + # moved that directory (with use cases of (a) I moved the absolute + # path; (b) it is at a different relative path with respect to my + # pwd. + # 4. I'm working with files from two different permanent storages. I + # need to be able to load from both in the same Python process. + # + # Outputs from a protocol may contain a :class:`.StagingPath`. Note that + # a :class:`.StagingPath` is inherently not a complete description of + # how to obtain the associated data: in almost every situation, there is + # some additional context required. This can include the credentials to + # an external server, or a base path that the file can be found relative + # to (which may have changed if the user moves it.) Because of this, we + # need to inject that context in to the deserialization. + # + # This object injects the relevant context, provided by the + # :class:`.StorageManager`. It creates a JSONSerializerDeserializer + # based on the one being used by gufe in this process, using all the + # installed codecs plus an additional codec specific to this context. + # + # User stories 1 and 2 are handled by the nature of the + # :class:`.StagingPath` object. The external file downloads as part of + # the ``__fspath__`` method. This means that when using the ``open`` + # builtin, you will automatically download the file to a local staging + # directory. However, the reference to the file can exist in the results + # object without downloading the file. + # + # User stories 3 and 4 are handled by this + # :class:`.StagingPathSerialization` class. Story 3 is handled by + # allowing the appropriate context (in the form of a + # :class:`.StorageManager`) to be injected into the deserialization + # process. Story 4 can be handled by using more than one + # :class:`.StagingPathSerialization` context (associated with different + # :class:`.StorageManager` objects. + + def __init__(self, manager): + self.manager = manager + self.codec = JSONCodec( + cls=StagingPath, + is_my_dict=self.is_my_dict, + to_dict=self.to_dict, + from_dict=self.from_dict, + ) + self.refresh_handler() + + def refresh_handler(self): + codecs = JSON_HANDLER.codecs + [self.codec] + self.json_handler = JSONSerializerDeserializer(codecs) + + def to_dict(self, path): + # scratch, shared, permanent may form nested with progressively + # smaller contexts, so the last of those it is in is where it should + # be labelled + loc = None + if path in self.manager.scratch_root: + loc = "scratch" + if path in self.manager.shared_root: + loc = "shared" + if path in self.manager.permanent_root: + loc = "permanent" + + return { + ':container:': loc, + ':unit_label:': path.root.prefix, + ':path:': path.path, + } + + def from_dict(self, dct): + loader = getattr(self.manager, f"get_{dct[':container:']}") + staging_dir = loader(dct[':unit_label:']) + return staging_dir / dct[':path:'] + + def is_my_dict(self, dct): + return set(dct) == {':container:', ':unit_label:', ':path:'} + + diff --git a/gufe/tests/storage/storage_demo_protocol.py b/gufe/tests/storage/test_storage_demo.py similarity index 71% rename from gufe/tests/storage/storage_demo_protocol.py rename to gufe/tests/storage/test_storage_demo.py index a7400643c..95abe40f4 100644 --- a/gufe/tests/storage/storage_demo_protocol.py +++ b/gufe/tests/storage/test_storage_demo.py @@ -1,12 +1,22 @@ +import pytest import gufe +from gufe.storage.externalresource import MemoryStorage +from gufe.storage.storagemanager import StorageManager +from gufe.protocols.protocoldag import new_execute_DAG """ -This module contains a complete integration test for the +This module contains complete integration tests for the storage lifecycle, +using an actual protocol as an example. + +These tests are largely redundant from the perspective of unit testing, but +the :class:`.StoragedDemoProtocol` is useful as an example for +implementation. Furthermore, as integration tests, they ensure that the +whole setup works together. """ class Unit1(gufe.ProtocolUnit): - def _execute(ctx): + def _execute(self, ctx): share_file = ctx.shared / "shared.txt" with open(share_file, mode='w') as f: f.write("I can be shared") @@ -25,7 +35,7 @@ def _execute(ctx): class Unit2(gufe.ProtocolUnit): - def _execute(ctx, unit1_result): + def _execute(self, ctx, unit1_result): u1_outputs = unit1_result.outputs share_file = u1_outputs['share_file'] perm_file = u1_outputs['perm_file'] @@ -53,10 +63,9 @@ def _default_settings(cls): def _defaults(cls): return {} - def _create(self, *, stateA, stateB, mapping, extends, name, - transformation_key): + def _create(self, stateA, stateB, mapping, extends): u1 = Unit1() - u2 = Unit2(unit1_results=u1) + u2 = Unit2(unit1_result=u1) return [u1, u2] def _gather(self, protocol_dag_results): @@ -103,9 +112,9 @@ def execute_per_unit(protocoldag, storage_manager, dag_directory): def test_execute_DAG(solvated_ligand, solvated_complex, tmp_path): transformation = gufe.Transformation( - solvated_liquid, + solvated_ligand, solvated_complex, - protocol=StorageDemoProtocol(), + protocol=StorageDemoProtocol(StorageDemoProtocol.default_settings()), mapping=None ) dag = transformation.create() @@ -116,23 +125,29 @@ def test_execute_DAG(solvated_ligand, solvated_complex, tmp_path): shared_root=shared, permanent_root=permanent ) - result = gufe.protocols.new_execute_DAG(dag, storage_manager, - n_retries=3) + dag_label = "dag" + result = new_execute_DAG(dag, dag_label, storage_manager, + raise_error=True, n_retries=3) assert result.ok assert len(result.protocol_unit_results) == 2 res1, res2 = result.protocol_unit_results assert set(res1.outputs) == {'share_file', 'perm_file', 'scratch_file'} # further tests of res1? - assert res2.outputs = { - ... + u1_label = f"{dag.protocol_units[0].key}_attempt_0" + + assert res2.outputs == { + 'share_file_contents': "I can be shared", + 'perm_file_contents': "I'm permanent (but I can be shared)", + 'scratch_file_contents': "File not found", } - assert shared._data = { - 'shared.txt': "I can be shared" + assert shared._data == { + f'{u1_label}/shared.txt': b"I can be shared", + f'{u1_label}/permanent.txt': b"I'm permanent (but I can be shared)", } - assert permanent._data = { - 'permanent.txt': "I'm permanent (but I can be shared)", + assert permanent._data == { + f'{u1_label}/permanent.txt': b"I'm permanent (but I can be shared)", } # test that scratch is empty From 183ce7edd8a0c35f973a85998a744a37762f21d0 Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Wed, 1 Nov 2023 18:12:27 -0500 Subject: [PATCH 08/17] update with new version of storage manager --- gufe/protocols/protocoldag.py | 1 + gufe/storage/storagemanager.py | 96 ++++++++++++++++++++++++- gufe/tests/storage/test_storage_demo.py | 46 ++++++++---- 3 files changed, 125 insertions(+), 18 deletions(-) diff --git a/gufe/protocols/protocoldag.py b/gufe/protocols/protocoldag.py index e24e0d68e..579039ada 100644 --- a/gufe/protocols/protocoldag.py +++ b/gufe/protocols/protocoldag.py @@ -435,6 +435,7 @@ def new_execute_DAG( with storage_manager.running_dag(dag_label) as dag_ctx: for unit in protocoldag.protocol_units: + # import pdb; pdb.set_trace() attempt = 0 while attempt <= n_retries: # translate each `ProtocolUnit` in input into corresponding diff --git a/gufe/storage/storagemanager.py b/gufe/storage/storagemanager.py index 297852da1..d082b7af8 100644 --- a/gufe/storage/storagemanager.py +++ b/gufe/storage/storagemanager.py @@ -79,6 +79,8 @@ def running_dag(cls, storage_manager: StorageManager, dag_label: str): # import pdb; pdb.set_trace() d.cleanup() + # TODO: remove scratch root if empty + @contextmanager def running_unit(self, unit_label: str): """Unit level of the storage lifecycle. @@ -99,16 +101,17 @@ def running_unit(self, unit_label: str): finally: # TODO: should some of this be in an else clause instead? self.permanents.append(permanent) - shared.transfer_staging_to_external() + shared.root.transfer_staging_to_external() # everything in permanent must also be available in shared for file in permanent.registry: - shared.transfer_single_file_to_external(file) + shared.root.transfer_single_file_to_external(file) if not self.manager.keep_scratch: shutil.rmtree(scratch) if not self.manager.keep_staging: - shared.cleanup() + # TODO: for some reason this isn't cleaning up as expected? + shared.root.cleanup() class PerUnitDAGContextManager(DAGContextManager): @@ -137,6 +140,78 @@ def running_unit(self, unit_label: str): permanent.transfer_staging_to_external() +class NewStorageManager: + def __init__( + self, + scratch_root: PathLike, + shared_root: ExternalStorage, + permanent_root: ExternalStorage, + *, + keep_scratch: bool = False, + keep_staging: bool = False, + staging: PathLike = Path(".staging"), + DAGContextClass: _DCMType = SingleProcDAGContextManager, + ): + self.scratch_root = Path(scratch_root) + self.shared_root = shared_root + self.permanent_root = permanent_root + self.keep_scratch = keep_scratch + self.keep_staging = keep_staging + self.staging = staging + self.DAGContextClass = DAGContextClass + + self.permanent_staging = PermanentStaging( + scratch=self.scratch_root, + external=self.permanent_root, + shared=self.shared_root, + staging=self.staging, + prefix="" + ) + + self.shared_staging = SharedStaging( + scratch=self.scratch_root, + external=self.shared_root, + staging=self.staging, + prefix="" + ) + + def _scratch_loc(self, unit_label): + return self.scratch_root / "scratch" / unit_label + + @contextmanager + def running_dag(self, dag_label=None): + # TODO: remove dag_label + try: + yield self + finally: + self.permanent_staging.transfer_staging_to_external() + + if not self.keep_staging: + self.permanent_staging.cleanup() + + # TODO: remove empty dirs + + @contextmanager + def running_unit(self, unit_label): + scratch = self._scratch_loc(unit_label) + scratch.mkdir(parents=True, exist_ok=True) + shared = self.shared_staging / unit_label + permanent = self.permanent_staging / unit_label + try: + yield scratch, shared, permanent + finally: + self.shared_staging.transfer_staging_to_external() + + if not self.keep_scratch: + shutil.rmtree(scratch) + + if not self.keep_staging: + self.shared_staging.cleanup() + + + + + class StorageManager: """Tool to manage the storage lifecycle during a DAG. @@ -163,6 +238,19 @@ def __init__( self.staging = staging self.DAGContextClass = DAGContextClass + self.permanent_staging = PermanentStaging( + scratch=self.scratch_root, + external=self.permanent_root, + shared=self.shared_root, + staging=self.staging + ) + + self.shared_staging = SharedStaging( + scratch=self.scratch_root, + external=self.shared_root, + staging=self.staging, + ) + def _scratch_loc(self, unit_label): return self.scratch_root / "scratch" / unit_label @@ -174,6 +262,7 @@ def get_scratch(self, unit_label: str) -> Path: def get_permanent(self, unit_label) -> PermanentStaging: """Get the object for this unit's permanent staging directory""" + return self.permanent_staging / unit_label return PermanentStaging( scratch=self.scratch_root, external=self.permanent_root, @@ -184,6 +273,7 @@ def get_permanent(self, unit_label) -> PermanentStaging: def get_shared(self, unit_label) -> SharedStaging: """Get the object for this unit's shared staging directory""" + return self.shared_staging / unit_label return SharedStaging( scratch=self.scratch_root, external=self.shared_root, diff --git a/gufe/tests/storage/test_storage_demo.py b/gufe/tests/storage/test_storage_demo.py index 95abe40f4..0f5992211 100644 --- a/gufe/tests/storage/test_storage_demo.py +++ b/gufe/tests/storage/test_storage_demo.py @@ -1,7 +1,8 @@ import pytest + import gufe from gufe.storage.externalresource import MemoryStorage -from gufe.storage.storagemanager import StorageManager +from gufe.storage.storagemanager import StorageManager, NewStorageManager from gufe.protocols.protocoldag import new_execute_DAG """ @@ -37,9 +38,6 @@ def _execute(self, ctx): class Unit2(gufe.ProtocolUnit): def _execute(self, ctx, unit1_result): u1_outputs = unit1_result.outputs - share_file = u1_outputs['share_file'] - perm_file = u1_outputs['perm_file'] - scratch_file = u1_outputs['scratch_file'] outputs = {} for file_label, file in unit1_result.outputs.items(): @@ -110,7 +108,14 @@ def execute_per_unit(protocoldag, storage_manager, dag_directory): ... # TODO: make ProtocolDAGResult -def test_execute_DAG(solvated_ligand, solvated_complex, tmp_path): +@pytest.mark.parametrize('keep', [ + 'nothing', 'scratch', 'staging', 'shared', 'scratch,staging', + 'scratch,shared', 'staging,shared', 'scratch,staging,shared' +]) +def test_execute_DAG(solvated_ligand, solvated_complex, tmp_path, keep): + keep_scratch = 'scratch' in keep + keep_staging = 'staging' in keep + keep_shared = 'shared' in keep transformation = gufe.Transformation( solvated_ligand, solvated_complex, @@ -120,12 +125,15 @@ def test_execute_DAG(solvated_ligand, solvated_complex, tmp_path): dag = transformation.create() shared = MemoryStorage() permanent = MemoryStorage() - storage_manager = StorageManager( - scratch_root=tmp_path / "scratch", + scratch = tmp_path + storage_manager = NewStorageManager( + scratch_root=scratch, shared_root=shared, - permanent_root=permanent + permanent_root=permanent, + keep_scratch=keep_scratch, + keep_staging=keep_staging, ) - dag_label = "dag" + dag_label = "dag" # currently unused? result = new_execute_DAG(dag, dag_label, storage_manager, raise_error=True, n_retries=3) assert result.ok @@ -136,20 +144,28 @@ def test_execute_DAG(solvated_ligand, solvated_complex, tmp_path): u1_label = f"{dag.protocol_units[0].key}_attempt_0" + if keep_scratch: + scratch_res2 = "This is scratch -- can't be shared" + n_scratch = 2 + else: + scratch_res2 = "File not found" + n_scratch = 0 + assert res2.outputs == { 'share_file_contents': "I can be shared", 'perm_file_contents': "I'm permanent (but I can be shared)", - 'scratch_file_contents': "File not found", + 'scratch_file_contents': scratch_res2 } - assert shared._data == { - f'{u1_label}/shared.txt': b"I can be shared", - f'{u1_label}/permanent.txt': b"I'm permanent (but I can be shared)", - } + # assert shared._data == { + # f'{u1_label}/shared.txt': b"I can be shared", + # f'{u1_label}/permanent.txt': b"I'm permanent (but I can be shared)", + # } assert permanent._data == { f'{u1_label}/permanent.txt': b"I'm permanent (but I can be shared)", } - # test that scratch is empty + + assert len(list((scratch / "scratch").iterdir())) == n_scratch From 0faf7a6412425bdcf8620884fbc695e5c2a096a3 Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Thu, 2 Nov 2023 16:02:16 -0500 Subject: [PATCH 09/17] at least the test_storage_demo tests pass --- gufe/protocols/protocoldag.py | 9 +-- gufe/storage/stagingdirectory.py | 40 +++++++--- gufe/storage/storagemanager.py | 27 +++++-- gufe/tests/storage/test_storage_demo.py | 100 ++++++++++++++++-------- 4 files changed, 116 insertions(+), 60 deletions(-) diff --git a/gufe/protocols/protocoldag.py b/gufe/protocols/protocoldag.py index 579039ada..15c13ba9f 100644 --- a/gufe/protocols/protocoldag.py +++ b/gufe/protocols/protocoldag.py @@ -408,18 +408,18 @@ def execute_DAG(protocoldag: ProtocolDAG, *, shared_root=shared, permanent_root=shared, keep_scratch=keep_scratch, + keep_shared=keep_shared, keep_staging=False, staging=Path(""), # use the actual directories as the staging ) return new_execute_DAG(protocoldag, dag_label, storage_manager, - keep_shared, raise_error, n_retries) + raise_error, n_retries) def new_execute_DAG( protocoldag, dag_label, storage_manager, - keep_shared=False, raise_error=False, n_retries=0 ): @@ -464,11 +464,6 @@ def new_execute_DAG( if not result.ok(): break - if not keep_shared: - shared = storage_manager.shared_root - for objname in shared.iter_contents(dag_label): - shared.delete(objname) - return ProtocolDAGResult( name=protocoldag.name, protocol_units=protocoldag.protocol_units, diff --git a/gufe/storage/stagingdirectory.py b/gufe/storage/stagingdirectory.py index c63a241b3..0cb4ea825 100644 --- a/gufe/storage/stagingdirectory.py +++ b/gufe/storage/stagingdirectory.py @@ -128,7 +128,7 @@ def _delete_staging_safe(self): def transfer_single_file_to_external(self, held_file: StagingPath): """Transfer a given file from staging into external storage """ - path = Path(held_file) + path = Path(held_file.fspath) if not path.exists(): _logger.info(f"Found nonexistent path {path}, not " "transfering to external storage") @@ -138,20 +138,31 @@ def transfer_single_file_to_external(self, held_file: StagingPath): else: _logger.info(f"Transfering {path} to external storage") self.external.store_path(held_file.label, path) + return held_file + + return None # no transfer + def transfer_staging_to_external(self): - """Transfer all objects in the registry to external storage""" - for obj in self.registry: - self.transfer_single_file_to_external(obj) + """Transfer all objects in the registry to external storage + + """ + return [ + transferred + for file in self.registry + if (transferred := self.transfer_single_file_to_external(file)) + ] def cleanup(self): """Perform end-of-lifecycle cleanup. """ if self.delete_staging and self._delete_staging_safe(): for file in self.registry - self.preexisting: - if Path(file).exists(): + path = Path(file.fspath) + if path.exists(): _logger.debug(f"Removing file {file}") - remove(file) + path.unlink() + self.registry.remove(file) else: _logger.warning("During staging cleanup, file " f"{file} was marked for deletion, but " @@ -192,13 +203,16 @@ def register_path(self, staging_path: StagingPath): def _load_file_from_external(self, external: ExternalStorage, staging_path: StagingPath): + # import pdb; pdb.set_trace() scratch_path = self.staging_dir / staging_path.path # TODO: switch this to using `get_filename` and `store_path` - with external.load_stream(staging_path.label) as f: - external_bytes = f.read() if scratch_path.exists(): self.preexisting.add(staging_path) - ... # TODO: something to check that the bytes are the same? + + with external.load_stream(staging_path.label) as f: + external_bytes = f.read() + ... # TODO: check that the bytes are the same if preexisting? + scratch_path.parent.mkdir(exist_ok=True, parents=True) with open(scratch_path, mode='wb') as f: f.write(external_bytes) @@ -211,7 +225,7 @@ def __fspath__(self): def __repr__(self): return ( - f"{self.__class__.__name__}({self.scratch}, {self.external}, " + f"{self.__class__.__name__}('{self.scratch}', {self.external}, " f"{self.prefix})" ) @@ -273,14 +287,14 @@ def transfer_single_file_to_external(self, held_file: StagingPath): _logger.debug("Read-only: Not transfering to external storage") return # early exit - super().transfer_single_file_to_external(held_file) + return super().transfer_single_file_to_external(held_file) def transfer_staging_to_external(self): if self.read_only: _logger.debug("Read-only: Not transfering to external storage") return # early exit - super().transfer_staging_to_external() + return super().transfer_staging_to_external() def register_path(self, staging_path: StagingPath): label_exists = self.external.exists(staging_path.label) @@ -323,7 +337,7 @@ def _delete_staging_safe(self): def transfer_single_file_to_external(self, held_file: StagingPath): # if we can't find it locally, we load it from shared storage - path = Path(held_file) + path = Path(held_file.fspath) if not path.exists(): self._load_file_from_external(self.shared, held_file) diff --git a/gufe/storage/storagemanager.py b/gufe/storage/storagemanager.py index d082b7af8..d436887d6 100644 --- a/gufe/storage/storagemanager.py +++ b/gufe/storage/storagemanager.py @@ -149,6 +149,7 @@ def __init__( *, keep_scratch: bool = False, keep_staging: bool = False, + keep_shared: bool = False, staging: PathLike = Path(".staging"), DAGContextClass: _DCMType = SingleProcDAGContextManager, ): @@ -157,8 +158,10 @@ def __init__( self.permanent_root = permanent_root self.keep_scratch = keep_scratch self.keep_staging = keep_staging + self.keep_shared = keep_shared self.staging = staging self.DAGContextClass = DAGContextClass + self.shared_xfer = set() self.permanent_staging = PermanentStaging( scratch=self.scratch_root, @@ -179,16 +182,22 @@ def _scratch_loc(self, unit_label): return self.scratch_root / "scratch" / unit_label @contextmanager - def running_dag(self, dag_label=None): - # TODO: remove dag_label + def running_dag(self, dag_label): + # TODO: remove (or use) dag_label try: yield self finally: + # import pdb; pdb.set_trace() + # clean up after DAG completes self.permanent_staging.transfer_staging_to_external() if not self.keep_staging: self.permanent_staging.cleanup() + if not self.keep_shared: + for file in self.shared_xfer: + self.shared_root.delete(file.label) + # TODO: remove empty dirs @contextmanager @@ -200,13 +209,17 @@ def running_unit(self, unit_label): try: yield scratch, shared, permanent finally: - self.shared_staging.transfer_staging_to_external() + # import pdb; pdb.set_trace() + # clean up after unit + self.shared_xfer.update(set( + self.shared_staging.transfer_staging_to_external() + )) - if not self.keep_scratch: - shutil.rmtree(scratch) + if not self.keep_scratch: + shutil.rmtree(scratch) - if not self.keep_staging: - self.shared_staging.cleanup() + if not self.keep_staging: + self.shared_staging.cleanup() diff --git a/gufe/tests/storage/test_storage_demo.py b/gufe/tests/storage/test_storage_demo.py index 0f5992211..8b839a96b 100644 --- a/gufe/tests/storage/test_storage_demo.py +++ b/gufe/tests/storage/test_storage_demo.py @@ -1,8 +1,11 @@ import pytest +import pathlib + import gufe from gufe.storage.externalresource import MemoryStorage from gufe.storage.storagemanager import StorageManager, NewStorageManager +from gufe.storage.stagingdirectory import StagingPath from gufe.protocols.protocoldag import new_execute_DAG """ @@ -41,6 +44,7 @@ def _execute(self, ctx, unit1_result): outputs = {} for file_label, file in unit1_result.outputs.items(): + # import pdb; pdb.set_trace() # labels are, e.g., share_file; file is StagingPath key = f"{file_label}_contents" try: @@ -107,15 +111,8 @@ def execute_per_unit(protocoldag, storage_manager, dag_directory): ... # TODO: make ProtocolDAGResult - -@pytest.mark.parametrize('keep', [ - 'nothing', 'scratch', 'staging', 'shared', 'scratch,staging', - 'scratch,shared', 'staging,shared', 'scratch,staging,shared' -]) -def test_execute_DAG(solvated_ligand, solvated_complex, tmp_path, keep): - keep_scratch = 'scratch' in keep - keep_staging = 'staging' in keep - keep_shared = 'shared' in keep +@pytest.fixture +def demo_dag(solvated_ligand, solvated_complex): transformation = gufe.Transformation( solvated_ligand, solvated_complex, @@ -123,33 +120,25 @@ def test_execute_DAG(solvated_ligand, solvated_complex, tmp_path, keep): mapping=None ) dag = transformation.create() - shared = MemoryStorage() - permanent = MemoryStorage() - scratch = tmp_path - storage_manager = NewStorageManager( - scratch_root=scratch, - shared_root=shared, - permanent_root=permanent, - keep_scratch=keep_scratch, - keep_staging=keep_staging, - ) - dag_label = "dag" # currently unused? - result = new_execute_DAG(dag, dag_label, storage_manager, - raise_error=True, n_retries=3) + return dag + + +def assert_dag_result(result, u1_label, keep_scratch): + # no matter how you set up storage, the DAGResult and UnitResults from + # this protocol should be the same assert result.ok assert len(result.protocol_unit_results) == 2 res1, res2 = result.protocol_unit_results assert set(res1.outputs) == {'share_file', 'perm_file', 'scratch_file'} - # further tests of res1? + assert isinstance(res1.outputs['scratch_file'], pathlib.Path) + assert isinstance(res1.outputs['share_file'], StagingPath) + assert isinstance(res1.outputs['perm_file'], StagingPath) - u1_label = f"{dag.protocol_units[0].key}_attempt_0" if keep_scratch: scratch_res2 = "This is scratch -- can't be shared" - n_scratch = 2 else: scratch_res2 = "File not found" - n_scratch = 0 assert res2.outputs == { 'share_file_contents': "I can be shared", @@ -157,16 +146,61 @@ def test_execute_DAG(solvated_ligand, solvated_complex, tmp_path, keep): 'scratch_file_contents': scratch_res2 } - # assert shared._data == { - # f'{u1_label}/shared.txt': b"I can be shared", - # f'{u1_label}/permanent.txt': b"I'm permanent (but I can be shared)", - # } +def _parse_keep(keep): + return ( + 'scratch' in keep, + 'staging' in keep, + 'shared' in keep + ) + + + +@pytest.mark.parametrize('keep', [ + 'nothing', 'scratch', 'staging', 'shared', 'scratch,staging', + 'scratch,shared', 'staging,shared', 'scratch,staging,shared' +]) +def test_execute_DAG(demo_dag, tmp_path, keep): + keep_scratch, keep_staging, keep_shared = _parse_keep(keep) + dag = demo_dag + + shared = MemoryStorage() + permanent = MemoryStorage() + scratch = tmp_path + + storage_manager = NewStorageManager( + scratch_root=scratch, + shared_root=shared, + permanent_root=permanent, + keep_scratch=keep_scratch, + keep_shared=keep_shared, + keep_staging=keep_staging, + ) + + dag_label = "dag" # currently unused? + result = new_execute_DAG(dag, dag_label, storage_manager, + raise_error=True, n_retries=3) + + # test the ProtocolDAGResult that comes out + u1_label = f"{dag.protocol_units[0].key}_attempt_0" + assert_dag_result(result, u1_label, keep_scratch) + + # test the shared/permanent storage assert permanent._data == { f'{u1_label}/permanent.txt': b"I'm permanent (but I can be shared)", } + if keep_shared: + assert shared._data == { + f'{u1_label}/shared.txt': b"I can be shared", + # f'{u1_label}/permanent.txt': b"I'm permanent (but I can be shared)", + } + else: + assert shared._data == {} + # test the directories that we generated + n_scratch = 2 if keep_scratch else 0 assert len(list((scratch / "scratch").iterdir())) == n_scratch - - - + if keep_staging: + ... + else: + assert ".staging" not in list(scratch.iterdir()) From 07cca8c745214944aa88b2a586d9a554b6939371 Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Thu, 2 Nov 2023 16:31:43 -0500 Subject: [PATCH 10/17] improvements on storage file handling --- gufe/storage/stagingdirectory.py | 25 +++------------------ gufe/storage/storagemanager.py | 30 +++++++++++-------------- gufe/tests/storage/test_storage_demo.py | 11 ++++++--- gufe/utils.py | 28 +++++++++++++++++++++++ 4 files changed, 52 insertions(+), 42 deletions(-) diff --git a/gufe/storage/stagingdirectory.py b/gufe/storage/stagingdirectory.py index 0cb4ea825..69c7968be 100644 --- a/gufe/storage/stagingdirectory.py +++ b/gufe/storage/stagingdirectory.py @@ -6,6 +6,8 @@ from .externalresource import ExternalStorage, FileStorage from contextlib import contextmanager +from gufe.utils import delete_empty_dirs + import logging _logger = logging.getLogger(__name__) @@ -36,27 +38,6 @@ def _safe_to_delete_staging(external: ExternalStorage, path: PathLike, return False -def _delete_empty_dirs(root: PathLike, delete_root: bool = True): - """Delete all empty directories. - - Repeats so that directories that only contained empty directories also - get deleted. - """ - root = Path(root) - - def find_empty_dirs(directory): - if not (paths := list(directory.iterdir())): - return [directory] - directories = [p for p in paths if p.is_dir()] - return sum([find_empty_dirs(d) for d in directories], []) - - while root.exists() and (empties := find_empty_dirs(root)): - if empties == [root] and not delete_root: - return - for directory in empties: - _logger.debug(f"Removing '{directory}'") - rmdir(directory) - class StagingDirectory: """PathLike local representation of an :class:`.ExternalStorage`. @@ -167,7 +148,7 @@ def cleanup(self): _logger.warning("During staging cleanup, file " f"{file} was marked for deletion, but " "can not be found on disk.") - _delete_empty_dirs(self.staging_dir) + delete_empty_dirs(self.staging_dir) def register_path(self, staging_path: StagingPath): """ diff --git a/gufe/storage/storagemanager.py b/gufe/storage/storagemanager.py index d436887d6..eb7ff51b5 100644 --- a/gufe/storage/storagemanager.py +++ b/gufe/storage/storagemanager.py @@ -4,6 +4,8 @@ from contextlib import contextmanager import shutil +from gufe.utils import delete_empty_dirs + from typing import Type from .externalresource import ExternalStorage, FileStorage @@ -178,8 +180,15 @@ def __init__( prefix="" ) + def _make_label(self, dag_label, unit_label): + return f"{dag_label}/{unit_label}" + + @property + def _scratch_base(self): + return self.scratch_root / "scratch" + def _scratch_loc(self, unit_label): - return self.scratch_root / "scratch" / unit_label + return self._scratch_base / unit_label @contextmanager def running_dag(self, dag_label): @@ -193,11 +202,14 @@ def running_dag(self, dag_label): if not self.keep_staging: self.permanent_staging.cleanup() + delete_empty_dirs(self.scratch_root / self.staging) if not self.keep_shared: for file in self.shared_xfer: self.shared_root.delete(file.label) + if not self.keep_scratch: + delete_empty_dirs(self._scratch_base, delete_root=False) # TODO: remove empty dirs @contextmanager @@ -222,9 +234,6 @@ def running_unit(self, unit_label): self.shared_staging.cleanup() - - - class StorageManager: """Tool to manage the storage lifecycle during a DAG. @@ -276,23 +285,10 @@ def get_scratch(self, unit_label: str) -> Path: def get_permanent(self, unit_label) -> PermanentStaging: """Get the object for this unit's permanent staging directory""" return self.permanent_staging / unit_label - return PermanentStaging( - scratch=self.scratch_root, - external=self.permanent_root, - shared=self.shared_root, - prefix=unit_label, - staging=self.staging, - ) def get_shared(self, unit_label) -> SharedStaging: """Get the object for this unit's shared staging directory""" return self.shared_staging / unit_label - return SharedStaging( - scratch=self.scratch_root, - external=self.shared_root, - prefix=unit_label, - staging=self.staging, - ) def running_dag(self, dag_label: str): """Return a context manager that handles storage. diff --git a/gufe/tests/storage/test_storage_demo.py b/gufe/tests/storage/test_storage_demo.py index 8b839a96b..872d8f3f3 100644 --- a/gufe/tests/storage/test_storage_demo.py +++ b/gufe/tests/storage/test_storage_demo.py @@ -197,10 +197,15 @@ def test_execute_DAG(demo_dag, tmp_path, keep): assert shared._data == {} # test the directories that we generated - n_scratch = 2 if keep_scratch else 0 - assert len(list((scratch / "scratch").iterdir())) == n_scratch + assert scratch.is_dir() + + if keep_scratch: + assert len(list((scratch / "scratch").iterdir())) == 2 + else: + assert 'scratch' not in list(scratch.iterdir()) if keep_staging: - ... + assert (tmp_path / ".staging" / u1_label / "shared.txt").exists() + assert (tmp_path / ".staging" / u1_label / "permanent.txt").exists() else: assert ".staging" not in list(scratch.iterdir()) diff --git a/gufe/utils.py b/gufe/utils.py index f9d3b0ffd..4382a801a 100644 --- a/gufe/utils.py +++ b/gufe/utils.py @@ -4,6 +4,12 @@ import io import warnings +from os import PathLike, rmdir +import pathlib + +import logging +_logger = logging.getLogger(__name__) + class ensure_filelike: """Context manager to convert pathlike or filelike to filelike. @@ -52,3 +58,25 @@ def __exit__(self, type, value, traceback): if self.do_close: self.context.close() + + +def delete_empty_dirs(root: PathLike, delete_root: bool = True): + """Delete all empty directories. + + Repeats so that directories that only contained empty directories also + get deleted. + """ + root = pathlib.Path(root) + + def find_empty_dirs(directory): + if not (paths := list(directory.iterdir())): + return [directory] + directories = [p for p in paths if p.is_dir()] + return sum([find_empty_dirs(d) for d in directories], []) + + while root.exists() and (empties := find_empty_dirs(root)): + if empties == [root] and not delete_root: + return + for directory in empties: + _logger.debug(f"Removing '{directory}'") + rmdir(directory) From 856d6cd9c2be018e6ed4277962458ba0407ed5ad Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Fri, 3 Nov 2023 14:38:48 -0500 Subject: [PATCH 11/17] that should be the tests for StorageDemoProtcol --- gufe/storage/stagingdirectory.py | 18 +- gufe/storage/storagemanager.py | 17 +- gufe/tests/storage/test_storage_demo.py | 360 ++++++++++++++++++------ 3 files changed, 295 insertions(+), 100 deletions(-) diff --git a/gufe/storage/stagingdirectory.py b/gufe/storage/stagingdirectory.py index 69c7968be..29e36a558 100644 --- a/gufe/storage/stagingdirectory.py +++ b/gufe/storage/stagingdirectory.py @@ -55,7 +55,7 @@ class StagingDirectory: 2. When requested, it transfers any newly created files to the :class:`.ExternalStorage`. - 3. It can delete all of the files it manages + 3. It can delete all of the files it manages. Parameters ---------- @@ -85,11 +85,13 @@ def __init__( *, staging: PathLike = Path(".staging"), delete_staging: bool = True, + delete_empty_dirs: bool = True, ): self.external = external self.scratch = Path(scratch) self.prefix = Path(prefix) self.delete_staging = delete_staging + self.delete_empty_dirs = delete_empty_dirs self.staging = staging self.registry : set[StagingPath] = set() @@ -120,7 +122,7 @@ def transfer_single_file_to_external(self, held_file: StagingPath): _logger.info(f"Transfering {path} to external storage") self.external.store_path(held_file.label, path) return held_file - + return None # no transfer @@ -148,7 +150,9 @@ def cleanup(self): _logger.warning("During staging cleanup, file " f"{file} was marked for deletion, but " "can not be found on disk.") - delete_empty_dirs(self.staging_dir) + + if self.delete_empty_dirs: + delete_empty_dirs(self.staging_dir) def register_path(self, staging_path: StagingPath): """ @@ -229,10 +233,12 @@ def __init__( *, staging: PathLike = Path(".staging"), delete_staging: bool = True, + delete_empty_dirs: bool = True, read_only: bool = False, ): super().__init__(scratch, external, prefix, staging=staging, - delete_staging=delete_staging) + delete_staging=delete_staging, + delete_empty_dirs=delete_empty_dirs) self.read_only = read_only def _get_other_shared(self, prefix: str, @@ -303,9 +309,11 @@ def __init__( *, staging: PathLike = Path(".staging"), delete_staging: bool = True, + delete_empty_dirs: bool = True, ): super().__init__(scratch, external, prefix, staging=staging, - delete_staging=delete_staging) + delete_staging=delete_staging, + delete_empty_dirs=delete_empty_dirs) self.shared = shared def _delete_staging_safe(self): diff --git a/gufe/storage/storagemanager.py b/gufe/storage/storagemanager.py index eb7ff51b5..a07971463 100644 --- a/gufe/storage/storagemanager.py +++ b/gufe/storage/storagemanager.py @@ -135,7 +135,7 @@ def running_unit(self, unit_label: str): try: yield scratch, shared, permanent finally: - shared.tranfer_staging_to_external() + shared.transfer_staging_to_external() for file in permanent.registry: shared.transfer_single_file_to_expected(file) @@ -153,7 +153,7 @@ def __init__( keep_staging: bool = False, keep_shared: bool = False, staging: PathLike = Path(".staging"), - DAGContextClass: _DCMType = SingleProcDAGContextManager, + delete_empty_dirs: bool = True, ): self.scratch_root = Path(scratch_root) self.shared_root = shared_root @@ -162,7 +162,7 @@ def __init__( self.keep_staging = keep_staging self.keep_shared = keep_shared self.staging = staging - self.DAGContextClass = DAGContextClass + self.delete_empty_dirs = delete_empty_dirs self.shared_xfer = set() self.permanent_staging = PermanentStaging( @@ -170,6 +170,7 @@ def __init__( external=self.permanent_root, shared=self.shared_root, staging=self.staging, + delete_empty_dirs=delete_empty_dirs, prefix="" ) @@ -177,6 +178,7 @@ def __init__( scratch=self.scratch_root, external=self.shared_root, staging=self.staging, + delete_empty_dirs=delete_empty_dirs, prefix="" ) @@ -202,15 +204,13 @@ def running_dag(self, dag_label): if not self.keep_staging: self.permanent_staging.cleanup() - delete_empty_dirs(self.scratch_root / self.staging) if not self.keep_shared: for file in self.shared_xfer: self.shared_root.delete(file.label) - if not self.keep_scratch: + if self.delete_empty_dirs: delete_empty_dirs(self._scratch_base, delete_root=False) - # TODO: remove empty dirs @contextmanager def running_unit(self, unit_label): @@ -223,9 +223,8 @@ def running_unit(self, unit_label): finally: # import pdb; pdb.set_trace() # clean up after unit - self.shared_xfer.update(set( - self.shared_staging.transfer_staging_to_external() - )) + shared_xfers = self.shared_staging.transfer_staging_to_external() + self.shared_xfer.update(set(shared_xfers)) if not self.keep_scratch: shutil.rmtree(scratch) diff --git a/gufe/tests/storage/test_storage_demo.py b/gufe/tests/storage/test_storage_demo.py index 872d8f3f3..d7d984798 100644 --- a/gufe/tests/storage/test_storage_demo.py +++ b/gufe/tests/storage/test_storage_demo.py @@ -3,7 +3,7 @@ import pathlib import gufe -from gufe.storage.externalresource import MemoryStorage +from gufe.storage.externalresource import MemoryStorage, FileStorage from gufe.storage.storagemanager import StorageManager, NewStorageManager from gufe.storage.stagingdirectory import StagingPath from gufe.protocols.protocoldag import new_execute_DAG @@ -123,89 +123,277 @@ def demo_dag(solvated_ligand, solvated_complex): return dag -def assert_dag_result(result, u1_label, keep_scratch): - # no matter how you set up storage, the DAGResult and UnitResults from - # this protocol should be the same - assert result.ok - assert len(result.protocol_unit_results) == 2 - res1, res2 = result.protocol_unit_results - assert set(res1.outputs) == {'share_file', 'perm_file', 'scratch_file'} - assert isinstance(res1.outputs['scratch_file'], pathlib.Path) - assert isinstance(res1.outputs['share_file'], StagingPath) - assert isinstance(res1.outputs['perm_file'], StagingPath) - - - if keep_scratch: - scratch_res2 = "This is scratch -- can't be shared" - else: - scratch_res2 = "File not found" - - assert res2.outputs == { - 'share_file_contents': "I can be shared", - 'perm_file_contents': "I'm permanent (but I can be shared)", - 'scratch_file_contents': scratch_res2 - } - -def _parse_keep(keep): - return ( - 'scratch' in keep, - 'staging' in keep, - 'shared' in keep - ) - - - -@pytest.mark.parametrize('keep', [ - 'nothing', 'scratch', 'staging', 'shared', 'scratch,staging', - 'scratch,shared', 'staging,shared', 'scratch,staging,shared' -]) -def test_execute_DAG(demo_dag, tmp_path, keep): - keep_scratch, keep_staging, keep_shared = _parse_keep(keep) - dag = demo_dag - - shared = MemoryStorage() - permanent = MemoryStorage() - scratch = tmp_path - - storage_manager = NewStorageManager( - scratch_root=scratch, - shared_root=shared, - permanent_root=permanent, - keep_scratch=keep_scratch, - keep_shared=keep_shared, - keep_staging=keep_staging, - ) - - dag_label = "dag" # currently unused? - result = new_execute_DAG(dag, dag_label, storage_manager, - raise_error=True, n_retries=3) - - # test the ProtocolDAGResult that comes out - u1_label = f"{dag.protocol_units[0].key}_attempt_0" - assert_dag_result(result, u1_label, keep_scratch) - - # test the shared/permanent storage - assert permanent._data == { - f'{u1_label}/permanent.txt': b"I'm permanent (but I can be shared)", - } - if keep_shared: - assert shared._data == { - f'{u1_label}/shared.txt': b"I can be shared", - # f'{u1_label}/permanent.txt': b"I'm permanent (but I can be shared)", +class ExecutionStorageDemoTest: + """ + Template method pattern ABC for tests of StorageDemoProtocol execution. + + Using template method here because it ensures that all aspects get + tested for all implementations, even though individual aspects may + differ between different setups. + """ + def get_shared_and_permanent(self): + raise NotImplementedError() + + @staticmethod + def _parse_keep(keep): + return ( + 'scratch' in keep, + 'staging' in keep, + 'shared' in keep, + 'empties' in keep + ) + + def assert_dag_result(self, result, demo_dag, storage_manager): + """Test that the ProtocolDAGResult has the expected contents. + + This should be preserved across all execution methods. + """ + u1_label = self.u1_label(demo_dag) + keep_scratch = storage_manager.keep_scratch + + assert result.ok + assert len(result.protocol_unit_results) == 2 + res1, res2 = result.protocol_unit_results + assert set(res1.outputs) == {'share_file', 'perm_file', 'scratch_file'} + assert isinstance(res1.outputs['scratch_file'], pathlib.Path) + assert isinstance(res1.outputs['share_file'], StagingPath) + assert isinstance(res1.outputs['perm_file'], StagingPath) + + if keep_scratch: + scratch_res2 = "This is scratch -- can't be shared" + else: + scratch_res2 = "File not found" + + assert res2.outputs == { + 'share_file_contents': "I can be shared", + 'perm_file_contents': "I'm permanent (but I can be shared)", + 'scratch_file_contents': scratch_res2 } - else: - assert shared._data == {} - - # test the directories that we generated - assert scratch.is_dir() - - if keep_scratch: - assert len(list((scratch / "scratch").iterdir())) == 2 - else: - assert 'scratch' not in list(scratch.iterdir()) - - if keep_staging: - assert (tmp_path / ".staging" / u1_label / "shared.txt").exists() - assert (tmp_path / ".staging" / u1_label / "permanent.txt").exists() - else: - assert ".staging" not in list(scratch.iterdir()) + + def assert_shared_and_permanent(self, storage_manager, dag): + """Check the final status of the shared and permanent containers. + + The can depend on the relation between the shared and permanent + external storage containers. For example, if they are the same + object, the final contents of permament will also include the final + contents of shared (and vice versa). + + Default behavior here is for the case of distinct backends. + """ + shared = storage_manager.shared_root + permanent = storage_manager.permanent_root + u1_label = self.u1_label(dag) + keep_shared = storage_manager.keep_shared + + perm_file = f"{u1_label}/permanent.txt" + shared_file = f"{u1_label}/shared.txt" + + assert list(permanent.iter_contents()) == [perm_file] + with permanent.load_stream(perm_file) as f: + assert f.read() == b"I'm permanent (but I can be shared)" + + if keep_shared: + assert list(shared.iter_contents()) == [shared_file] + with shared.load_stream(shared_file) as f: + assert f.read() == b"I can be shared" + else: + assert list(shared.iter_contents()) == [] + + def assert_scratch(self, storage_manager): + """Check the final status of the scratch directory. + + This will change if the scratch is within the staging root directory + (for cases where we want to keep one of staging/scratch and not the + other; empty directories might get deleted in one case). + """ + scratch = storage_manager.scratch_root + keep_scratch = storage_manager.keep_scratch + del_empty_dirs = storage_manager.delete_empty_dirs + assert scratch.is_dir() + + if keep_scratch: + n_expected = 1 if del_empty_dirs else 2 + assert len(list((scratch / "scratch").iterdir())) == n_expected + else: + assert 'scratch' not in list(scratch.iterdir()) + + def assert_staging(self, storage_manager, dag): + """Check the final status of the staging directory. + + Behavior here will change if staging overlaps with a FileStorage for + either shared or permanent. + """ + keep_staging = storage_manager.keep_staging + u1_label = self.u1_label(dag) + scratch_root = storage_manager.scratch_root + u1_staging = scratch_root / ".staging" / u1_label + + if keep_staging: + assert (u1_staging / "shared.txt").exists() + assert (u1_staging / "permanent.txt").exists() + else: + assert ".staging" not in list(scratch_root.iterdir()) + + @staticmethod + def u1_label(dag): + """Unit 1 label""" + return f"{dag.protocol_units[0].key}_attempt_0" + + @staticmethod + def u2_label(dag_label, dag): + """Unit 2 label""" + return f"{dag.protocol_units[1].key}_attempt_0" + + def get_storage_manager(self, keep, tmp_path): + keep_scr, keep_sta, keep_sha, empties = self._parse_keep(keep) + del_empty_dirs = not empties + shared, permanent = self.get_shared_and_permanent() + + storage_manager = NewStorageManager( + scratch_root=tmp_path, + shared_root=shared, + permanent_root=permanent, + keep_scratch=keep_scr, + keep_staging=keep_sta, + keep_shared=keep_sha, + delete_empty_dirs=del_empty_dirs, + ) + return storage_manager + + @pytest.mark.parametrize('keep', [ + 'nothing', 'scratch', 'staging', 'shared', 'scratch,staging', + 'scratch,shared', 'staging,shared', 'scratch,staging,shared', + 'scratch,empties', 'scratch,shared,empties', + ]) + def test_execute_dag(self, demo_dag, keep, tmp_path): + storage_manager = self.get_storage_manager(keep, tmp_path) + + dag_label = "dag" + result = new_execute_DAG(demo_dag, dag_label, storage_manager, + raise_error=True, n_retries=2) + + self.assert_dag_result(result, demo_dag, storage_manager) + self.assert_shared_and_permanent(storage_manager, demo_dag) + self.assert_scratch(storage_manager) + self.assert_staging(storage_manager, demo_dag) + + +class TestExecuteStorageDemoDiffBackends(ExecutionStorageDemoTest): + """ + Test execution when permanent and shared are different MemoryStorages. + + This is considered the standard base case; this should be easiest to + pass, as there should be no special case code that needs to be invoked. + """ + def get_shared_and_permanent(self): + return MemoryStorage(), MemoryStorage() + + +class TestExecuteStorageDemoSameBackend(ExecutionStorageDemoTest): + """ + Test execution when permanent and shared are the same MemoryStorage. + """ + def get_shared_and_permanent(self): + backend = MemoryStorage() + return backend, backend + + def assert_shared_and_permanent(self, storage_manager, dag): + shared = storage_manager.shared_root + permanent = storage_manager.permanent_root + u1_label = self.u1_label(dag) + keep_shared = storage_manager.keep_shared + + perm_file = f"{u1_label}/permanent.txt" + shared_file = f"{u1_label}/shared.txt" + + assert shared is permanent + # we'll test everything in permanent, because shared is identical + + if keep_shared: + expected = {perm_file, shared_file} + else: + expected = {perm_file} + + assert set(permanent.iter_contents()) == expected + with permanent.load_stream(perm_file) as f: + assert f.read() == b"I'm permanent (but I can be shared)" + + if keep_shared: + with permanent.load_stream(shared_file) as f: + assert f.read() == b"I can be shared" + + +class TestExecuteStorageDemoStagingOverlap(TestExecuteStorageDemoSameBackend): + """ + Test execution when permanent and shared overlap with staging. + + This represents the approach we will probably actually use. In this + case, we use identical FileStorage for shared and permanent, and those + overlap with the staging directory. The result is that file locations + don't actually change. + """ + def get_shared_and_permanent(self): + ... # override the need for this; not the prettiest, but it works + + def get_storage_manager(self, keep, tmp_path): + keep_scr, keep_sta, keep_sha, empties = self._parse_keep(keep) + del_empty_dirs = not empties + backend = FileStorage(tmp_path) + storage_manager = NewStorageManager( + scratch_root=tmp_path, + shared_root=backend, + permanent_root=backend, + keep_scratch=keep_scr, + keep_staging=keep_sta, + keep_shared=keep_sha, + delete_empty_dirs=del_empty_dirs, + staging="", + ) + return storage_manager + + def assert_shared_and_permanent(self, storage_manager, dag): + shared = storage_manager.shared_root + permanent = storage_manager.permanent_root + u1_label = self.u1_label(dag) + keep_shared = storage_manager.keep_shared + keep_scratch = storage_manager.keep_scratch + + perm_file = f"{u1_label}/permanent.txt" + shared_file = f"{u1_label}/shared.txt" + scratch_file = f"scratch/{u1_label}/scratch.txt" + + assert shared is permanent + # we'll test everything in permanent, because shared is identical + + expected = {perm_file} + + if keep_shared: + expected.add(shared_file) + + if keep_scratch: + expected.add(scratch_file) + + assert set(permanent.iter_contents()) == expected + with permanent.load_stream(perm_file) as f: + assert f.read() == b"I'm permanent (but I can be shared)" + + if keep_shared: + with permanent.load_stream(shared_file) as f: + assert f.read() == b"I can be shared" + + if keep_scratch: + with permanent.load_stream(scratch_file) as f: + assert f.read() == b"This is scratch -- can't be shared" + + def assert_staging(self, storage_manager, dag): + # in this case, keep_staging is ignored in favor of the behavior of + # keep_shared + keep_shared = storage_manager.keep_shared + u1_label = self.u1_label(dag) + scratch_root = storage_manager.scratch_root + u1_staging = scratch_root / u1_label + + assert (u1_staging / "permanent.txt").exists() + + if keep_shared: + assert (u1_staging / "shared.txt").exists() From 19d40b0d5d97a880a87aef70ad068eca67d5432f Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Fri, 3 Nov 2023 15:30:43 -0500 Subject: [PATCH 12/17] got the tests all working again --- gufe/storage/storagemanager.py | 34 ++++++++++----------- gufe/tests/storage/test_stagingdirectory.py | 8 ++--- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/gufe/storage/storagemanager.py b/gufe/storage/storagemanager.py index a07971463..9df808015 100644 --- a/gufe/storage/storagemanager.py +++ b/gufe/storage/storagemanager.py @@ -103,17 +103,17 @@ def running_unit(self, unit_label: str): finally: # TODO: should some of this be in an else clause instead? self.permanents.append(permanent) - shared.root.transfer_staging_to_external() + shared.transfer_staging_to_external() # everything in permanent must also be available in shared for file in permanent.registry: - shared.root.transfer_single_file_to_external(file) + shared.transfer_single_file_to_external(file) if not self.manager.keep_scratch: shutil.rmtree(scratch) if not self.manager.keep_staging: # TODO: for some reason this isn't cleaning up as expected? - shared.root.cleanup() + shared.cleanup() class PerUnitDAGContextManager(DAGContextManager): @@ -259,19 +259,6 @@ def __init__( self.staging = staging self.DAGContextClass = DAGContextClass - self.permanent_staging = PermanentStaging( - scratch=self.scratch_root, - external=self.permanent_root, - shared=self.shared_root, - staging=self.staging - ) - - self.shared_staging = SharedStaging( - scratch=self.scratch_root, - external=self.shared_root, - staging=self.staging, - ) - def _scratch_loc(self, unit_label): return self.scratch_root / "scratch" / unit_label @@ -283,11 +270,22 @@ def get_scratch(self, unit_label: str) -> Path: def get_permanent(self, unit_label) -> PermanentStaging: """Get the object for this unit's permanent staging directory""" - return self.permanent_staging / unit_label + return PermanentStaging( + scratch=self.scratch_root, + external=self.permanent_root, + shared=self.shared_root, + prefix=unit_label, + staging=self.staging, + ) def get_shared(self, unit_label) -> SharedStaging: """Get the object for this unit's shared staging directory""" - return self.shared_staging / unit_label + return SharedStaging( + scratch=self.scratch_root, + external=self.shared_root, + prefix=unit_label, + staging=self.staging, + ) def running_dag(self, dag_label: str): """Return a context manager that handles storage. diff --git a/gufe/tests/storage/test_stagingdirectory.py b/gufe/tests/storage/test_stagingdirectory.py index 7e3d68a6c..f94a1888f 100644 --- a/gufe/tests/storage/test_stagingdirectory.py +++ b/gufe/tests/storage/test_stagingdirectory.py @@ -7,8 +7,8 @@ from gufe.storage.externalresource import MemoryStorage, FileStorage from gufe.storage.stagingdirectory import ( - SharedStaging, PermanentStaging, _delete_empty_dirs, - _safe_to_delete_staging + SharedStaging, PermanentStaging, _safe_to_delete_staging, + delete_empty_dirs, # TODO: move to appropriate place ) @pytest.fixture @@ -99,7 +99,7 @@ def test_delete_empty_dirs(tmp_path): path.parent.mkdir(parents=True, exist_ok=True) path.touch() - _delete_empty_dirs(base) + delete_empty_dirs(base) for path in paths: assert path.exists() @@ -118,7 +118,7 @@ def test_delete_empty_dirs_delete_root(tmp_path, delete_root): for directory in dirs: directory.mkdir(parents=True, exist_ok=True) - _delete_empty_dirs(base, delete_root=delete_root) + delete_empty_dirs(base, delete_root=delete_root) for directory in dirs: assert not directory.exists() From 651bb19fb3683d898a2ae0652060f63ee4c043b4 Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Mon, 6 Nov 2023 16:06:48 -0600 Subject: [PATCH 13/17] Get the old tests working again Now to fix the new ones --- gufe/protocols/protocoldag.py | 27 ++++++++++-------- gufe/storage/externalresource/filestorage.py | 5 ++-- gufe/storage/storagemanager.py | 23 +++++++++------ gufe/tests/storage/test_storagemanager.py | 30 ++++++++++---------- gufe/tests/test_protocol.py | 3 +- gufe/tests/test_protocoldag.py | 16 +++++------ gufe/tests/test_protocolunit.py | 24 ++++++++++++---- 7 files changed, 76 insertions(+), 52 deletions(-) diff --git a/gufe/protocols/protocoldag.py b/gufe/protocols/protocoldag.py index 4ce90ed62..deb7b3e3c 100644 --- a/gufe/protocols/protocoldag.py +++ b/gufe/protocols/protocoldag.py @@ -17,7 +17,7 @@ ProtocolUnit, ProtocolUnitResult, ProtocolUnitFailure, Context ) -from ..storage.storagemanager import StorageManager +from ..storage.storagemanager import NewStorageManager from ..storage.externalresource.filestorage import FileStorage from ..storage.externalresource.base import ExternalStorage @@ -352,20 +352,18 @@ def _from_dict(cls, dct: dict): return cls(**dct) -class ReproduceOldBehaviorStorageManager(StorageManager): +class ReproduceOldBehaviorStorageManager(NewStorageManager): # Default behavior has scratch at {dag_label}/scratch/{unit_label} and # shared at {dag_label}/{unit_label}. This little class makes changes # that get us back to the original behavior of this class: scratch at # {dag_label}/scratch_{unit_label} and shared at # {dag_label}/shared_{unit_label}. - def _scratch_loc(self, unit_label): - return self.scratch_root / f"scratch_{unit_label}" + def _scratch_loc(self, dag_label, unit_label, attempt): + return self.scratch_root / f"scratch_{unit_label}_attempt_{attempt}" - def get_shared(self, unit_label): - return super().get_shared(f"shared_{unit_label}") + def make_label(self, dag_label, unit_label, attempt): + return f"{dag_label}/shared_{unit_label}_attempt_{attempt}" - def get_permanent(self, unit_label): - return super().get_permanent(f"shared_{unit_label}") def execute_DAG(protocoldag: ProtocolDAG, *, @@ -421,8 +419,9 @@ def execute_DAG(protocoldag: ProtocolDAG, *, permanent_root=shared, keep_scratch=keep_scratch, keep_shared=keep_shared, - keep_staging=False, - staging=Path(""), # use the actual directories as the staging + keep_staging=True, + delete_empty_dirs=False, + #staging=Path(""), # use the actual directories as the staging ) return new_execute_DAG(protocoldag, dag_label, storage_manager, raise_error, n_retries) @@ -454,8 +453,12 @@ def new_execute_DAG( # `ProtocolUnitResult` inputs = _pu_to_pur(unit.inputs, results) - label = f"{str(unit.key)}_attempt_{attempt}" - with dag_ctx.running_unit(label) as (scratch, shared, perm): + label = storage_manager.make_label(dag_label, unit.key, + attempt=attempt) + with dag_ctx.running_unit(dag_label, unit.key, attempt=attempt) as ( + scratch, shared, perm + ): + # TODO: context manager should return context context = Context(shared=shared, scratch=scratch, permanent=perm) diff --git a/gufe/storage/externalresource/filestorage.py b/gufe/storage/externalresource/filestorage.py index 164538e36..c40e5a8bc 100644 --- a/gufe/storage/externalresource/filestorage.py +++ b/gufe/storage/externalresource/filestorage.py @@ -30,8 +30,9 @@ def _store_bytes(self, location, byte_data): f.write(byte_data) def _store_path(self, location, path): - my_path = self._as_path(location) - if path.resolve() != my_path.resolve(): + my_path = self._as_path(location).resolve() + if path.resolve() != my_path: + my_path.parent.mkdir(parents=True, exist_ok=True) shutil.copyfile(path, my_path) def _iter_contents(self, prefix): diff --git a/gufe/storage/storagemanager.py b/gufe/storage/storagemanager.py index 9df808015..e574c1cfa 100644 --- a/gufe/storage/storagemanager.py +++ b/gufe/storage/storagemanager.py @@ -179,17 +179,23 @@ def __init__( external=self.shared_root, staging=self.staging, delete_empty_dirs=delete_empty_dirs, - prefix="" + prefix="" # TODO: remove prefix ) - def _make_label(self, dag_label, unit_label): + def make_label(self, dag_label, unit_label, **kwargs): + """ + + The specific executor may change this by making a very simple + adapter subclass and overriding this method, which can take + arbitrary additional kwargs that may tie it to a specific executor. + """ return f"{dag_label}/{unit_label}" @property def _scratch_base(self): return self.scratch_root / "scratch" - def _scratch_loc(self, unit_label): + def _scratch_loc(self, dag_label, unit_label, **kwargs): return self._scratch_base / unit_label @contextmanager @@ -213,11 +219,12 @@ def running_dag(self, dag_label): delete_empty_dirs(self._scratch_base, delete_root=False) @contextmanager - def running_unit(self, unit_label): - scratch = self._scratch_loc(unit_label) + def running_unit(self, dag_label, unit_label, **kwargs): + scratch = self._scratch_loc(dag_label, unit_label, **kwargs) + label = self.make_label(dag_label, unit_label, **kwargs) scratch.mkdir(parents=True, exist_ok=True) - shared = self.shared_staging / unit_label - permanent = self.permanent_staging / unit_label + shared = self.shared_staging / label + permanent = self.permanent_staging / label try: yield scratch, shared, permanent finally: @@ -259,7 +266,7 @@ def __init__( self.staging = staging self.DAGContextClass = DAGContextClass - def _scratch_loc(self, unit_label): + def _scratch_loc(self, unit_label, **kwargs): return self.scratch_root / "scratch" / unit_label def get_scratch(self, unit_label: str) -> Path: diff --git a/gufe/tests/storage/test_storagemanager.py b/gufe/tests/storage/test_storagemanager.py index fb584be14..ad1c09060 100644 --- a/gufe/tests/storage/test_storagemanager.py +++ b/gufe/tests/storage/test_storagemanager.py @@ -1,6 +1,6 @@ import pytest from gufe.storage.storagemanager import ( - StorageManager + StorageManager, NewStorageManager ) from gufe.storage.stagingdirectory import StagingDirectory from gufe.storage.externalresource import MemoryStorage, FileStorage @@ -8,7 +8,7 @@ @pytest.fixture def storage_manager_std(tmp_path): - return StorageManager( + return NewStorageManager( scratch_root=tmp_path / "working", shared_root=MemoryStorage(), permanent_root=MemoryStorage(), @@ -294,18 +294,18 @@ def _after_unit_existing_files(self, unit_label): return {"baz"} -class TestStorageManager: - def test_get_scratch(self, storage_manager_std): - scratch = storage_manager_std.get_scratch("dag_label/unit_label") - assert str(scratch).endswith("scratch/dag_label/unit_label") - assert isinstance(scratch, Path) +class TestStorageManager: pass + # def test_get_scratch(self, storage_manager_std): + # scratch = storage_manager_std.get_scratch("dag_label/unit_label") + # assert str(scratch).endswith("scratch/dag_label/unit_label") + # assert isinstance(scratch, Path) - def test_get_permanent(self, storage_manager_std): - perm = storage_manager_std.get_permanent("dag_label/unit_label") - assert perm.__fspath__().endswith(".staging/dag_label/unit_label") - assert isinstance(perm, StagingDirectory) + # def test_get_permanent(self, storage_manager_std): + # perm = storage_manager_std.get_permanent("dag_label/unit_label") + # assert perm.__fspath__().endswith(".staging/dag_label/unit_label") + # assert isinstance(perm, StagingDirectory) - def test_get_shared(self, storage_manager_std): - shared = storage_manager_std.get_shared("dag_label/unit_label") - assert shared.__fspath__().endswith(".staging/dag_label/unit_label") - assert isinstance(shared, StagingDirectory) + # def test_get_shared(self, storage_manager_std): + # shared = storage_manager_std.get_shared("dag_label/unit_label") + # assert shared.__fspath__().endswith(".staging/dag_label/unit_label") + # assert isinstance(shared, StagingDirectory) diff --git a/gufe/tests/test_protocol.py b/gufe/tests/test_protocol.py index f700fa456..6a468bfd7 100644 --- a/gufe/tests/test_protocol.py +++ b/gufe/tests/test_protocol.py @@ -696,7 +696,8 @@ def test_execute_DAG_retries(solvated_ligand, vacuum_ligand, tmpdir): # we did 3 retries, so 4 total failures assert len(r.protocol_unit_results) == 5 assert len(r.protocol_unit_failures) == 4 - assert len(list(shared.iterdir())) == 5 + assert len(list(shared.iterdir())) == 0 + # assert len(list(shared.iterdir())) == 5 def test_execute_DAG_bad_nretries(solvated_ligand, vacuum_ligand, tmpdir): diff --git a/gufe/tests/test_protocoldag.py b/gufe/tests/test_protocoldag.py index a4a08a7b1..d248ec309 100644 --- a/gufe/tests/test_protocoldag.py +++ b/gufe/tests/test_protocoldag.py @@ -14,9 +14,9 @@ class WriterUnit(gufe.ProtocolUnit): def _execute(ctx, **inputs): my_id = inputs['identity'] - with open(os.path.join(ctx.shared, f'unit_{my_id}_shared.txt'), 'w') as out: + with open(ctx.shared / f'unit_{my_id}_shared.txt', 'w') as out: out.write(f'unit {my_id} existed!\n') - with open(os.path.join(ctx.scratch, f'unit_{my_id}_scratch.txt'), 'w') as out: + with open(ctx.scratch / f'unit_{my_id}_scratch.txt', 'w') as out: out.write(f'unit {my_id} was here\n') return { @@ -94,12 +94,12 @@ def test_execute_dag(tmpdir, keep_shared, keep_scratch, writefile_dag): # will have produced 4 files in scratch and shared directory for pu in writefile_dag.protocol_units: identity = pu.inputs['identity'] - shared_file = os.path.join(shared, - f'shared_{str(pu.key)}_attempt_0', - f'unit_{identity}_shared.txt') - scratch_file = os.path.join(scratch, - f'scratch_{str(pu.key)}_attempt_0', - f'unit_{identity}_scratch.txt') + shared_file = (shared + / f'shared_{str(pu.key)}_attempt_0' + / f'unit_{identity}_shared.txt') + scratch_file = (scratch + / f'scratch_{str(pu.key)}_attempt_0' + / f'unit_{identity}_scratch.txt') if keep_shared: assert os.path.exists(shared_file) else: diff --git a/gufe/tests/test_protocolunit.py b/gufe/tests/test_protocolunit.py index 9896f856c..2daaa6877 100644 --- a/gufe/tests/test_protocolunit.py +++ b/gufe/tests/test_protocolunit.py @@ -57,8 +57,12 @@ def test_execute(self, tmpdir): scratch = Path('scratch') / str(unit.key) scratch.mkdir(parents=True) - ctx = Context(shared=shared, scratch=scratch) - + permanent = Path('permanent') / str(unit.key) + permanent.mkdir(parents=True) + + ctx = Context(shared=shared, scratch=scratch, + permanent=permanent) + u: ProtocolUnitFailure = unit.execute(context=ctx, an_input=3) assert u.exception[0] == "ValueError" @@ -70,8 +74,12 @@ def test_execute(self, tmpdir): scratch = Path('scratch') / str(unit.key) scratch.mkdir(parents=True) - ctx = Context(shared=shared, scratch=scratch) - + permanent = Path('permanent') / str(unit.key) + permanent.mkdir(parents=True) + + ctx = Context(shared=shared, scratch=scratch, + permanent=permanent) + # now try actually letting the error raise on execute with pytest.raises(ValueError, match="should always be 2"): unit.execute(context=ctx, raise_error=True, an_input=3) @@ -87,8 +95,12 @@ def test_execute_KeyboardInterrupt(self, tmpdir): scratch = Path('scratch') / str(unit.key) scratch.mkdir(parents=True) - ctx = Context(shared=shared, scratch=scratch) - + permanent = Path('permanent') / str(unit.key) + permanent.mkdir(parents=True) + + ctx = Context(shared=shared, scratch=scratch, + permanent=permanent) + with pytest.raises(KeyboardInterrupt): unit.execute(context=ctx, an_input=3) From 8a180920d8513a64d65db4df6f17976826deaf00 Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Wed, 8 Nov 2023 12:34:15 -0600 Subject: [PATCH 14/17] work on cleaning up storage manager unit tests --- gufe/protocols/protocoldag.py | 1 - gufe/storage/storagemanager.py | 13 +++--- gufe/tests/storage/test_storage_demo.py | 9 +++-- gufe/tests/storage/test_storagemanager.py | 49 +++++++++++++---------- 4 files changed, 40 insertions(+), 32 deletions(-) diff --git a/gufe/protocols/protocoldag.py b/gufe/protocols/protocoldag.py index deb7b3e3c..862a7b029 100644 --- a/gufe/protocols/protocoldag.py +++ b/gufe/protocols/protocoldag.py @@ -365,7 +365,6 @@ def make_label(self, dag_label, unit_label, attempt): return f"{dag_label}/shared_{unit_label}_attempt_{attempt}" - def execute_DAG(protocoldag: ProtocolDAG, *, shared_basedir: Optional[PathLike], scratch_basedir: PathLike, diff --git a/gufe/storage/storagemanager.py b/gufe/storage/storagemanager.py index e574c1cfa..f2916194f 100644 --- a/gufe/storage/storagemanager.py +++ b/gufe/storage/storagemanager.py @@ -23,7 +23,7 @@ def running_dag(cls, storage_manager: StorageManager, dag_label: str): raise NotImplementedError() @contextmanager - def running_unit(self, unit_label: str): + def running_unit(self, dag_label: str, unit_label: str, **kwargs): """Return a context manager for when unit is started. This context manager handles the unit scale of the lifecycle. @@ -84,7 +84,7 @@ def running_dag(cls, storage_manager: StorageManager, dag_label: str): # TODO: remove scratch root if empty @contextmanager - def running_unit(self, unit_label: str): + def running_unit(self, dag_label: str, unit_label: str, **kwargs): """Unit level of the storage lifecycle. This provides the staging directories used for scratch, shared, and @@ -182,21 +182,22 @@ def __init__( prefix="" # TODO: remove prefix ) - def make_label(self, dag_label, unit_label, **kwargs): + def make_label(self, dag_label, unit_label, attempt, **kwargs): """ The specific executor may change this by making a very simple adapter subclass and overriding this method, which can take arbitrary additional kwargs that may tie it to a specific executor. """ - return f"{dag_label}/{unit_label}" + return f"{dag_label}/{unit_label}_attempt_{attempt}" @property def _scratch_base(self): return self.scratch_root / "scratch" - def _scratch_loc(self, dag_label, unit_label, **kwargs): - return self._scratch_base / unit_label + def _scratch_loc(self, dag_label, unit_label, attempt, **kwargs): + label = self.make_label(dag_label, unit_label, attempt) + return self._scratch_base / label @contextmanager def running_dag(self, dag_label): diff --git a/gufe/tests/storage/test_storage_demo.py b/gufe/tests/storage/test_storage_demo.py index d7d984798..42eaaa479 100644 --- a/gufe/tests/storage/test_storage_demo.py +++ b/gufe/tests/storage/test_storage_demo.py @@ -213,7 +213,8 @@ def assert_scratch(self, storage_manager): if keep_scratch: n_expected = 1 if del_empty_dirs else 2 - assert len(list((scratch / "scratch").iterdir())) == n_expected + dag_dir = scratch / "scratch/dag" + assert len(list(dag_dir.iterdir())) == n_expected else: assert 'scratch' not in list(scratch.iterdir()) @@ -237,12 +238,12 @@ def assert_staging(self, storage_manager, dag): @staticmethod def u1_label(dag): """Unit 1 label""" - return f"{dag.protocol_units[0].key}_attempt_0" + return f"dag/{dag.protocol_units[0].key}_attempt_0" @staticmethod - def u2_label(dag_label, dag): + def u2_label(dag): """Unit 2 label""" - return f"{dag.protocol_units[1].key}_attempt_0" + return f"dag/{dag.protocol_units[1].key}_attempt_0" def get_storage_manager(self, keep, tmp_path): keep_scr, keep_sta, keep_sha, empties = self._parse_keep(keep) diff --git a/gufe/tests/storage/test_storagemanager.py b/gufe/tests/storage/test_storagemanager.py index ad1c09060..ea9404087 100644 --- a/gufe/tests/storage/test_storagemanager.py +++ b/gufe/tests/storage/test_storagemanager.py @@ -35,7 +35,7 @@ def run(self, scratch, shared, permanent): (scratch / "foo2.txt").touch() # TODO: this will change; the inputs should include a way to get # the previous shared unit label - with shared.other_shared("dag/unit1") as prev_shared: + with shared.root.other_shared("dag/unit1_attempt_0") as prev_shared: with open(prev_shared / "bar.txt", mode='r') as f: bar = f.read() @@ -48,6 +48,7 @@ def run(self, scratch, shared, permanent): return [Unit1(), Unit2()] + class LifecycleHarness: @pytest.fixture def storage_manager(self, tmp_path): @@ -58,10 +59,10 @@ def get_files_dict(storage_manager): root = storage_manager.scratch_root staging = storage_manager.staging return { - "foo": root / "scratch/dag/unit1/foo.txt", - "foo2": root / "scratch/dag/unit2/foo2.txt", - "bar": root / staging / "dag/unit1/bar.txt", - "baz": root / staging / "dag/unit1/baz.txt", + "foo": root / "scratch/dag/unit1_attempt_0/foo.txt", + "foo2": root / "scratch/dag/unit2_attempt_0/foo2.txt", + "bar": root / staging / "dag/unit1_attempt_0/bar.txt", + "baz": root / staging / "dag/unit1_attempt_0/baz.txt", } def test_lifecycle(self, storage_manager, dag_units, tmp_path): @@ -69,9 +70,12 @@ def test_lifecycle(self, storage_manager, dag_units, tmp_path): dag_label = "dag" with storage_manager.running_dag(dag_label) as dag_ctx: for unit in dag_units: - label = f"{dag_ctx.dag_label}/{unit.key}" - with dag_ctx.running_unit(label) as (scratch, shared, perm): + label = f"{dag_label}/{unit.key}" + with dag_ctx.running_unit(dag_label, unit.key, attempt=0) as ( + scratch, shared, perm + ): results.append(unit.run(scratch, shared, perm)) + import pdb; pdb.set_trace() self.in_unit_asserts(storage_manager, label) self.after_unit_asserts(storage_manager, label) self.after_dag_asserts(storage_manager) @@ -117,7 +121,8 @@ def in_unit_asserts(self, storage_manager, unit_label): permanent_root = storage_manager.permanent_root expected_in_shared = { "dag/unit1": set(), - "dag/unit2": {"dag/unit1/bar.txt", "dag/unit1/baz.txt"} + "dag/unit2": {"dag/unit1_attempt_0/bar.txt", + "dag/unit1_attempt_0/baz.txt"} }[unit_label] | self._in_staging_shared(unit_label, "in") assert set(shared_root.iter_contents()) == expected_in_shared @@ -134,7 +139,8 @@ def after_unit_asserts(self, storage_manager, unit_label): permanent_root = storage_manager.permanent_root shared_extras = self._in_staging_shared(unit_label, "after") permanent_extras = self._in_staging_permanent(unit_label, "after") - expected_in_shared = {"dag/unit1/bar.txt", "dag/unit1/baz.txt"} + expected_in_shared = {"dag/unit1_attempt_0/bar.txt", + "dag/unit1_attempt_0/baz.txt"} expected_in_shared |= shared_extras assert set(shared_root.iter_contents()) == expected_in_shared assert set(permanent_root.iter_contents()) == permanent_extras @@ -155,7 +161,8 @@ def after_dag_asserts(self, storage_manager): # expected_in_shared = {"dag/unit1/bar.txt", "dag/unit1/baz.txt"} # expected_in_shared |= shared_extras # assert set(shared_root.iter_contents()) == expected_in_shared - expected_in_permanent = {"dag/unit1/baz.txt"} | permanent_extras + expected_in_permanent = ({"dag/unit1_attempt_0/baz.txt"} + | permanent_extras) assert set(permanent_root.iter_contents()) == expected_in_permanent # manager-specific check for files @@ -188,7 +195,7 @@ def _after_dag_existing_files(self): class TestKeepScratchAndStagingStorageManager(LifecycleHarness): @pytest.fixture def storage_manager(self, tmp_path): - return StorageManager( + return NewStorageManager( scratch_root=tmp_path / "working", shared_root=MemoryStorage(), permanent_root=MemoryStorage(), @@ -220,7 +227,7 @@ class TestStagingOverlapsSharedStorageManager(LifecycleHarness): @pytest.fixture def storage_manager(self, tmp_path): root = tmp_path / "working" - return StorageManager( + return NewStorageManager( scratch_root=root, shared_root=FileStorage(root), permanent_root=MemoryStorage(), @@ -245,10 +252,10 @@ def _after_dag_existing_files(self): return {"bar", "baz"} def _in_staging_shared(self, unit_label, in_after): - bar = "dag/unit1/bar.txt" - baz = "dag/unit1/baz.txt" - foo = "scratch/dag/unit1/foo.txt" - foo2 = "scratch/dag/unit2/foo2.txt" + bar = "dag/unit1_attempt_0/bar.txt" + baz = "dag/unit1_attempt_0/baz.txt" + foo = "scratch/dag/unit1_attempt_0/foo.txt" + foo2 = "scratch/dag/unit2_attempt_0/foo2.txt" return { ("dag/unit1", "in"): {bar, baz, foo}, ("dag/unit1", "after"): {bar, baz}, @@ -261,7 +268,7 @@ class TestStagingOverlapsPermanentStorageManager(LifecycleHarness): @pytest.fixture def storage_manager(self, tmp_path): root = tmp_path / "working" - return StorageManager( + return NewStorageManager( scratch_root=root, permanent_root=FileStorage(root), shared_root=MemoryStorage(), @@ -278,10 +285,10 @@ def _after_dag_existing_files(self): return {"baz"} def _in_staging_permanent(self, unit_label, in_after): - bar = "dag/unit1/bar.txt" - baz = "dag/unit1/baz.txt" - foo = "scratch/dag/unit1/foo.txt" - foo2 = "scratch/dag/unit2/foo2.txt" + bar = "dag/unit1_attempt_0/bar.txt" + baz = "dag/unit1_attempt_0/baz.txt" + foo = "scratch/dag/unit1_attempt_0/foo.txt" + foo2 = "scratch/dag/unit2_attempt_0/foo2.txt" return { ("dag/unit1", "in"): {bar, baz, foo}, ("dag/unit1", "after"): {baz}, From f7cf84ff5955f1cca0f4f3bbdfd2a1f2ca94600a Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Tue, 21 Nov 2023 14:17:33 -0600 Subject: [PATCH 15/17] All storage tests pass! --- gufe/storage/externalresource/filestorage.py | 6 ++++++ gufe/storage/externalresource/memorystorage.py | 3 +++ gufe/storage/storagemanager.py | 16 ++++++++++++++++ gufe/tests/storage/test_storage_demo.py | 4 +++- gufe/tests/storage/test_storagemanager.py | 8 +++----- 5 files changed, 31 insertions(+), 6 deletions(-) diff --git a/gufe/storage/externalresource/filestorage.py b/gufe/storage/externalresource/filestorage.py index c40e5a8bc..5330f88bf 100644 --- a/gufe/storage/externalresource/filestorage.py +++ b/gufe/storage/externalresource/filestorage.py @@ -20,6 +20,12 @@ def __init__(self, root_dir: Union[pathlib.Path, str]): def _exists(self, location): return self._as_path(location).exists() + def __eq__(self, other): + return ( + isinstance(other, FileStorage) + and self.root_dir == other.root_dir + ) + def _store_bytes(self, location, byte_data): path = self._as_path(location) directory = path.parent diff --git a/gufe/storage/externalresource/memorystorage.py b/gufe/storage/externalresource/memorystorage.py index da0931bd0..40c050873 100644 --- a/gufe/storage/externalresource/memorystorage.py +++ b/gufe/storage/externalresource/memorystorage.py @@ -26,6 +26,9 @@ def _delete(self, location): f"Unable to delete '{location}': key does not exist" ) + def __eq__(self, other): + return self is other + def _store_bytes(self, location, byte_data): self._data[location] = byte_data return location, self.get_metadata(location) diff --git a/gufe/storage/storagemanager.py b/gufe/storage/storagemanager.py index f2916194f..90e624a16 100644 --- a/gufe/storage/storagemanager.py +++ b/gufe/storage/storagemanager.py @@ -163,7 +163,11 @@ def __init__( self.keep_shared = keep_shared self.staging = staging self.delete_empty_dirs = delete_empty_dirs + + # these are used to track what files can be deleted from shared if + # keep_shared is False self.shared_xfer = set() + self.permanent_xfer = set() self.permanent_staging = PermanentStaging( scratch=self.scratch_root, @@ -216,6 +220,10 @@ def running_dag(self, dag_label): for file in self.shared_xfer: self.shared_root.delete(file.label) + for file in self.permanent_xfer: + if self.shared_root != self.permanent_root: + self.shared_root.delete(file.label) + if self.delete_empty_dirs: delete_empty_dirs(self._scratch_base, delete_root=False) @@ -231,9 +239,17 @@ def running_unit(self, dag_label, unit_label, **kwargs): finally: # import pdb; pdb.set_trace() # clean up after unit + + # track the files that were in shared so that we can delete them + # at the end of the DAG if requires shared_xfers = self.shared_staging.transfer_staging_to_external() self.shared_xfer.update(set(shared_xfers)) + # everything in permanent should also be in shared + for file in self.permanent_staging.registry: + self.shared_staging.transfer_single_file_to_external(file) + self.permanent_xfer.add(file) + if not self.keep_scratch: shutil.rmtree(scratch) diff --git a/gufe/tests/storage/test_storage_demo.py b/gufe/tests/storage/test_storage_demo.py index 42eaaa479..9dd711e83 100644 --- a/gufe/tests/storage/test_storage_demo.py +++ b/gufe/tests/storage/test_storage_demo.py @@ -193,9 +193,11 @@ def assert_shared_and_permanent(self, storage_manager, dag): assert f.read() == b"I'm permanent (but I can be shared)" if keep_shared: - assert list(shared.iter_contents()) == [shared_file] + assert list(shared.iter_contents()) == [shared_file, perm_file] with shared.load_stream(shared_file) as f: assert f.read() == b"I can be shared" + with shared.load_stream(perm_file) as f: + assert f.read() == b"I'm permanent (but I can be shared)" else: assert list(shared.iter_contents()) == [] diff --git a/gufe/tests/storage/test_storagemanager.py b/gufe/tests/storage/test_storagemanager.py index ea9404087..05ae31304 100644 --- a/gufe/tests/storage/test_storagemanager.py +++ b/gufe/tests/storage/test_storagemanager.py @@ -75,7 +75,7 @@ def test_lifecycle(self, storage_manager, dag_units, tmp_path): scratch, shared, perm ): results.append(unit.run(scratch, shared, perm)) - import pdb; pdb.set_trace() + # import pdb; pdb.set_trace() self.in_unit_asserts(storage_manager, label) self.after_unit_asserts(storage_manager, label) self.after_dag_asserts(storage_manager) @@ -246,10 +246,8 @@ def _after_unit_existing_files(self, unit_label): return {"bar", "baz"} def _after_dag_existing_files(self): - # NOTE: currently we don't delete bar at the end of a cycle, but we - # don't guarantee that we would not. So it exists, but changing that - # isn't API-breaking. - return {"bar", "baz"} + # these get deleted because we don't keep shared here + return set() def _in_staging_shared(self, unit_label, in_after): bar = "dag/unit1_attempt_0/bar.txt" From 6552cd1658d5abec7af9c52eb7c82bee57e022ed Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Tue, 21 Nov 2023 14:57:43 -0600 Subject: [PATCH 16/17] strip out old version --- gufe/protocols/protocoldag.py | 4 +- gufe/storage/storagemanager.py | 204 +--------------------- gufe/tests/storage/test_storage_demo.py | 6 +- gufe/tests/storage/test_storagemanager.py | 12 +- 4 files changed, 11 insertions(+), 215 deletions(-) diff --git a/gufe/protocols/protocoldag.py b/gufe/protocols/protocoldag.py index 862a7b029..7f044457f 100644 --- a/gufe/protocols/protocoldag.py +++ b/gufe/protocols/protocoldag.py @@ -17,7 +17,7 @@ ProtocolUnit, ProtocolUnitResult, ProtocolUnitFailure, Context ) -from ..storage.storagemanager import NewStorageManager +from ..storage.storagemanager import StorageManager from ..storage.externalresource.filestorage import FileStorage from ..storage.externalresource.base import ExternalStorage @@ -352,7 +352,7 @@ def _from_dict(cls, dct: dict): return cls(**dct) -class ReproduceOldBehaviorStorageManager(NewStorageManager): +class ReproduceOldBehaviorStorageManager(StorageManager): # Default behavior has scratch at {dag_label}/scratch/{unit_label} and # shared at {dag_label}/{unit_label}. This little class makes changes # that get us back to the original behavior of this class: scratch at diff --git a/gufe/storage/storagemanager.py b/gufe/storage/storagemanager.py index 90e624a16..9061a8748 100644 --- a/gufe/storage/storagemanager.py +++ b/gufe/storage/storagemanager.py @@ -12,137 +12,7 @@ from .stagingdirectory import SharedStaging, PermanentStaging -class DAGContextManager: - @classmethod - @contextmanager - def running_dag(cls, storage_manager: StorageManager, dag_label: str): - """Return a context manager for when a DAG is started. - - This context manager handles the DAG scale of the lifecycle. - """ - raise NotImplementedError() - - @contextmanager - def running_unit(self, dag_label: str, unit_label: str, **kwargs): - """Return a context manager for when unit is started. - - This context manager handles the unit scale of the lifecycle. - """ - raise NotImplementedError() - -_DCMType = Type[DAGContextManager] # to shorten some lines - -class SingleProcDAGContextManager(DAGContextManager): - """Context manager to handle details of storage lifecycle. - - Making this a separate class ensures that ``running_unit`` is always - called within the context of a given DAG. This is usually not created - directly; instead, it is created (and used) with its ``running_dag`` - classmethod, typically from within a ``StorageManager``. - """ - def __init__(self, storage_manager: StorageManager, dag_label: str): - self.manager = storage_manager - self.dag_label = dag_label - self.permanents: list[PermanentStaging] = [] - - def register_dag_result(self, result): - # 1. create alchemiscale real permanent based on result - # 2. register all paths on self.permanents with new permanent - new_permanents = [] - for perm in self.permanents: - new_perm = ... - for file in perm.registry: - new_perm.register(file.path) - - # 3. replace old permanents with new permanent - self.permanents = new_permanents - - @classmethod # NB: classmethod must be on top - @contextmanager - def running_dag(cls, storage_manager: StorageManager, dag_label: str): - """DAG level of the storage lifecycle - - When the DAG is completed, transfer everything to the permanent - storage, and delete the staging area for permanent (if we are - supposed to). - - This is not usually called by users; instead it is called from - within the ``StorageManager``. - """ - dag_manager = cls(storage_manager, dag_label) - try: - yield dag_manager - finally: - for permanent in dag_manager.permanents: - permanent.transfer_staging_to_external() - - if not dag_manager.manager.keep_staging: - for d in dag_manager.permanents: - # import pdb; pdb.set_trace() - d.cleanup() - - # TODO: remove scratch root if empty - - @contextmanager - def running_unit(self, dag_label: str, unit_label: str, **kwargs): - """Unit level of the storage lifecycle. - - This provides the staging directories used for scratch, shared, and - permanent. At the end of the unit, it transfers anything from shared - to the real shared external storage, cleans up the scratch - directory and the shared staging directory. - - Note that the unit label here is the *entire* label; that is, it - would also include information identifying the DAG. - """ - scratch = self.manager.get_scratch(unit_label) - shared = self.manager.get_shared(unit_label) - permanent = self.manager.get_permanent(unit_label) - try: - yield scratch, shared, permanent - finally: - # TODO: should some of this be in an else clause instead? - self.permanents.append(permanent) - shared.transfer_staging_to_external() - # everything in permanent must also be available in shared - for file in permanent.registry: - shared.transfer_single_file_to_external(file) - - if not self.manager.keep_scratch: - shutil.rmtree(scratch) - - if not self.manager.keep_staging: - # TODO: for some reason this isn't cleaning up as expected? - shared.cleanup() - - -class PerUnitDAGContextManager(DAGContextManager): - """Variant to use when doing only a single process per unit""" - def __init__(self, storage_manager: StorageManager, dag_label: str): - self.manager = storage_manager - self.dag_label = dag_label - - @classmethod - @contextmanager - def running_dag(cls, storage_manager, dag_label): - yield cls(storage_manager, dag_label) - - @contextmanager - def running_unit(self, unit_label: str): - scratch = self.manager.get_scratch(unit_label) - shared = self.manager.get_shared(unit_label) - permanent = self.manager.get_permanent(unit_label) - try: - yield scratch, shared, permanent - finally: - shared.transfer_staging_to_external() - for file in permanent.registry: - shared.transfer_single_file_to_expected(file) - - permanent.transfer_staging_to_external() - - -class NewStorageManager: +class StorageManager: def __init__( self, scratch_root: PathLike, @@ -255,75 +125,3 @@ def running_unit(self, dag_label, unit_label, **kwargs): if not self.keep_staging: self.shared_staging.cleanup() - - -class StorageManager: - """Tool to manage the storage lifecycle during a DAG. - - This object primarily contains the logic for getting the staging - directories. A separate class, in the ``DAGContextClass`` variable, - handles the logic for the context managers. - """ - def __init__( - self, - scratch_root: PathLike, - shared_root: ExternalStorage, - permanent_root: ExternalStorage, - *, - keep_scratch: bool = False, - keep_staging: bool = False, - staging: PathLike = Path(".staging"), - DAGContextClass: _DCMType = SingleProcDAGContextManager, - ): - self.scratch_root = Path(scratch_root) - self.shared_root = shared_root - self.permanent_root = permanent_root - self.keep_scratch = keep_scratch - self.keep_staging = keep_staging - self.staging = staging - self.DAGContextClass = DAGContextClass - - def _scratch_loc(self, unit_label, **kwargs): - return self.scratch_root / "scratch" / unit_label - - def get_scratch(self, unit_label: str) -> Path: - """Get the path for this unit's scratch directory""" - scratch = self._scratch_loc(unit_label) - scratch.mkdir(parents=True, exist_ok=True) - return scratch - - def get_permanent(self, unit_label) -> PermanentStaging: - """Get the object for this unit's permanent staging directory""" - return PermanentStaging( - scratch=self.scratch_root, - external=self.permanent_root, - shared=self.shared_root, - prefix=unit_label, - staging=self.staging, - ) - - def get_shared(self, unit_label) -> SharedStaging: - """Get the object for this unit's shared staging directory""" - return SharedStaging( - scratch=self.scratch_root, - external=self.shared_root, - prefix=unit_label, - staging=self.staging, - ) - - def running_dag(self, dag_label: str): - """Return a context manager that handles storage. - - For simple use cases, this is the only method a user needs to call. - Usage is something like: - - .. code:: - - with manager.running_dag(dag_label) as dag_ctx: - for unit in dag_ordered_units: - label = f"{dag_ctx.dag_label}/{unit.key}" - with dag_ctx.running_unit(label) as dirs: - scratch, shared, permanent = dirs - # run the unit - """ - return self.DAGContextClass.running_dag(self, dag_label) diff --git a/gufe/tests/storage/test_storage_demo.py b/gufe/tests/storage/test_storage_demo.py index 9dd711e83..efff04b1d 100644 --- a/gufe/tests/storage/test_storage_demo.py +++ b/gufe/tests/storage/test_storage_demo.py @@ -4,7 +4,7 @@ import gufe from gufe.storage.externalresource import MemoryStorage, FileStorage -from gufe.storage.storagemanager import StorageManager, NewStorageManager +from gufe.storage.storagemanager import StorageManager from gufe.storage.stagingdirectory import StagingPath from gufe.protocols.protocoldag import new_execute_DAG @@ -252,7 +252,7 @@ def get_storage_manager(self, keep, tmp_path): del_empty_dirs = not empties shared, permanent = self.get_shared_and_permanent() - storage_manager = NewStorageManager( + storage_manager = StorageManager( scratch_root=tmp_path, shared_root=shared, permanent_root=permanent, @@ -342,7 +342,7 @@ def get_storage_manager(self, keep, tmp_path): keep_scr, keep_sta, keep_sha, empties = self._parse_keep(keep) del_empty_dirs = not empties backend = FileStorage(tmp_path) - storage_manager = NewStorageManager( + storage_manager = StorageManager( scratch_root=tmp_path, shared_root=backend, permanent_root=backend, diff --git a/gufe/tests/storage/test_storagemanager.py b/gufe/tests/storage/test_storagemanager.py index 05ae31304..1cb5bdd29 100644 --- a/gufe/tests/storage/test_storagemanager.py +++ b/gufe/tests/storage/test_storagemanager.py @@ -1,14 +1,12 @@ import pytest -from gufe.storage.storagemanager import ( - StorageManager, NewStorageManager -) +from gufe.storage.storagemanager import StorageManager from gufe.storage.stagingdirectory import StagingDirectory from gufe.storage.externalresource import MemoryStorage, FileStorage from pathlib import Path @pytest.fixture def storage_manager_std(tmp_path): - return NewStorageManager( + return StorageManager( scratch_root=tmp_path / "working", shared_root=MemoryStorage(), permanent_root=MemoryStorage(), @@ -195,7 +193,7 @@ def _after_dag_existing_files(self): class TestKeepScratchAndStagingStorageManager(LifecycleHarness): @pytest.fixture def storage_manager(self, tmp_path): - return NewStorageManager( + return StorageManager( scratch_root=tmp_path / "working", shared_root=MemoryStorage(), permanent_root=MemoryStorage(), @@ -227,7 +225,7 @@ class TestStagingOverlapsSharedStorageManager(LifecycleHarness): @pytest.fixture def storage_manager(self, tmp_path): root = tmp_path / "working" - return NewStorageManager( + return StorageManager( scratch_root=root, shared_root=FileStorage(root), permanent_root=MemoryStorage(), @@ -266,7 +264,7 @@ class TestStagingOverlapsPermanentStorageManager(LifecycleHarness): @pytest.fixture def storage_manager(self, tmp_path): root = tmp_path / "working" - return NewStorageManager( + return StorageManager( scratch_root=root, permanent_root=FileStorage(root), shared_root=MemoryStorage(), From 527c1c3bace62164ee9fb493853dc6a10152e366 Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Fri, 1 Dec 2023 10:26:18 -0600 Subject: [PATCH 17/17] Add everything to this branch; pick file out --- gufe/storage/stagingdirectory.py | 23 +- gufe/storage/stagingserialization.py | 31 +-- gufe/storage/unitstoragemanager.py | 64 +++++ gufe/tests/storage/__init__.py | 0 .../storage/test_stagingserialization.py | 247 ++++++++++++++++++ gufe/tests/storage/test_storage_demo.py | 38 --- gufe/tests/storage/test_storagemanager.py | 17 -- gufe/tests/storage/test_unitstoragemanager.py | 122 +++++++++ 8 files changed, 471 insertions(+), 71 deletions(-) create mode 100644 gufe/storage/unitstoragemanager.py create mode 100644 gufe/tests/storage/__init__.py create mode 100644 gufe/tests/storage/test_stagingserialization.py create mode 100644 gufe/tests/storage/test_unitstoragemanager.py diff --git a/gufe/storage/stagingdirectory.py b/gufe/storage/stagingdirectory.py index 29e36a558..356a5d34f 100644 --- a/gufe/storage/stagingdirectory.py +++ b/gufe/storage/stagingdirectory.py @@ -144,6 +144,7 @@ def cleanup(self): path = Path(file.fspath) if path.exists(): _logger.debug(f"Removing file {file}") + # TODO: handle special case of directory? path.unlink() self.registry.remove(file) else: @@ -175,6 +176,10 @@ def register_path(self, staging_path: StagingPath): """ label_exists = self.external.exists(staging_path.label) fspath = Path(staging_path.fspath) + + # TODO: what if the staging path is a directory? not sure that we + # have a way to know that; but not sure that adding it to the + # registry is right either if not fspath.parent.exists(): fspath.parent.mkdir(parents=True, exist_ok=True) @@ -353,15 +358,31 @@ def __init__(self, root: StagingDirectory, self.root = root self.path = Path(path) + def register(self): + """Register this path with its StagingDirectory. + + If a file associated with this path exists in an external storage, + it will be downloaded to the staging area as part of registration. + """ + self.root.register_path(self) + def __truediv__(self, path: Union[PathLike, str]): return StagingPath(self.root, self.path / path) + def __eq__(self, other): + return (isinstance(other, StagingPath) + and self.root == other.root + and self.path == other.path) + + def __hash__(self): + return hash((self.root, self.path)) + @property def fspath(self): return str(self.root.staging_dir / self.path) def __fspath__(self): - self.root.register_path(self) + self.register() return self.fspath @property diff --git a/gufe/storage/stagingserialization.py b/gufe/storage/stagingserialization.py index 4b479396f..b50817fe6 100644 --- a/gufe/storage/stagingserialization.py +++ b/gufe/storage/stagingserialization.py @@ -54,7 +54,6 @@ def __init__(self, manager): self.manager = manager self.codec = JSONCodec( cls=StagingPath, - is_my_dict=self.is_my_dict, to_dict=self.to_dict, from_dict=self.from_dict, ) @@ -64,30 +63,32 @@ def refresh_handler(self): codecs = JSON_HANDLER.codecs + [self.codec] self.json_handler = JSONSerializerDeserializer(codecs) + @property + def encoder(self): + return self.json_handler.encoder + + @property + def decoder(self): + return self.json_handler.decoder + def to_dict(self, path): # scratch, shared, permanent may form nested with progressively # smaller contexts, so the last of those it is in is where it should - # be labelled + # be labelled. TODO: opportunity for performance improvement if + # needed loc = None - if path in self.manager.scratch_root: + if path.label in self.manager.scratch_root.iterdir(): loc = "scratch" - if path in self.manager.shared_root: + if path.label in self.manager.shared_root.iter_contents(): loc = "shared" - if path in self.manager.permanent_root: + if path.label in self.manager.permanent_root.iter_contents(): loc = "permanent" return { ':container:': loc, - ':unit_label:': path.root.prefix, - ':path:': path.path, + ':label:': path.label, } def from_dict(self, dct): - loader = getattr(self.manager, f"get_{dct[':container:']}") - staging_dir = loader(dct[':unit_label:']) - return staging_dir / dct[':path:'] - - def is_my_dict(self, dct): - return set(dct) == {':container:', ':unit_label:', ':path:'} - - + staging = getattr(self.manager, f"{dct[':container:']}_staging") + return staging / dct[':label:'] diff --git a/gufe/storage/unitstoragemanager.py b/gufe/storage/unitstoragemanager.py new file mode 100644 index 000000000..95d8c1c99 --- /dev/null +++ b/gufe/storage/unitstoragemanager.py @@ -0,0 +1,64 @@ +from gufe.storage.storagemanager import StorageManager +from gufe.storage.stagingserialization import StagingPathSerialization +from gufe.storage.externalresource import ExternalStorage +from contextlib import contextmanager +from pathlib import Path +from os import PathLike + +from gufe.protocols.protocoldag import Context + +class PerUnitStorageManager(StorageManager): + """Variant to use when doing only a single process per unit""" + def __init__( + self, + scratch_root: PathLike, + shared_root: ExternalStorage, + permanent_root: ExternalStorage, + *, + keep_scratch: bool = False, + keep_staging: bool = False, + keep_shared: bool = False, + staging: PathLike = Path(".staging"), + delete_empty_dirs: bool = True, + ): + super().__init__( + scratch_root=scratch_root, + shared_root=shared_root, + permanent_root=permanent_root, + keep_scratch=keep_scratch, + keep_staging=keep_staging, + keep_shared=keep_shared, + delete_empty_dirs=delete_empty_dirs, + ) + # TODO: move this to the base class + self.serialization = StagingPathSerialization(self) + + @property + def json_encoder(self): + self.serialization.refresh_handler() + return self.serialization.encoder + + @property + def json_decoder(self): + self.serialization.refresh_handler() + return self.serialization.decoder + + @contextmanager + def running_dag(self, dag_label): + yield self + + @contextmanager + def running_unit(self, dag_label, unit_label, **kwargs): + scratch = self._scratch_loc(dag_label, unit_label, **kwargs) + label = self.make_label(dag_label, unit_label, **kwargs) + scratch.mkdir(parents=True, exist_ok=True) + shared = self.shared_staging / label + permanent = self.permanent_staging / label + try: + yield scratch, shared, permanent + finally: + self.shared_staging.transfer_staging_to_external() + for file in self.permanent_staging.registry: + self.shared_staging.transfer_single_file_to_external(file) + + self.permanent_staging.transfer_staging_to_external() diff --git a/gufe/tests/storage/__init__.py b/gufe/tests/storage/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/gufe/tests/storage/test_stagingserialization.py b/gufe/tests/storage/test_stagingserialization.py new file mode 100644 index 000000000..a0d1a16df --- /dev/null +++ b/gufe/tests/storage/test_stagingserialization.py @@ -0,0 +1,247 @@ +import pytest +from gufe.storage.stagingserialization import StagingPathSerialization + +from gufe.storage.stagingdirectory import StagingPath +from gufe.storage.storagemanager import StorageManager +from gufe.storage.externalresource import MemoryStorage, FileStorage + +import json +import pathlib +import shutil + +@pytest.fixture +def storage_manager(tmp_path): + return StorageManager( + scratch_root=tmp_path / "working", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + +@pytest.fixture +def shared_path(storage_manager): + label = storage_manager.make_label("dag", "unit", attempt=0) + path = storage_manager.shared_staging / label / "file.txt" + with open(path, mode='w') as f: + f.write("contents here") + + storage_manager.shared_staging.transfer_staging_to_external() + return path + +@pytest.fixture +def permanent_path(storage_manager): + label = storage_manager.make_label("dag", "unit", attempt=0) + path = storage_manager.permanent_staging / label / "file.txt" + with open(path, mode='w') as f: + f.write("contents here") + + storage_manager.permanent_staging.transfer_staging_to_external() + return path + +@pytest.fixture +def scratch_path(storage_manager): + scratch_dir = storage_manager._scratch_loc("dag", "unit", attempt=0) + path = scratch_dir / "file.txt" + return path + +@pytest.fixture +def serialization_handler(storage_manager): + return StagingPathSerialization(storage_manager) + +class TestStagingPathSerialization: + @pytest.mark.parametrize('pathtype', ['scratch', 'shared', 'permanent']) + def test_round_trip(self, serialization_handler, pathtype, request): + # NB: scratch is a pathlib.Path, not a StagingPath. It is tested + # here to ensure round-trips as part of the overall user story for + # this, but it doesn't invoke the machinery of the + # StagingPathSerialization object + path = request.getfixturevalue(f"{pathtype}_path") + as_json = json.dumps( + path, + cls=serialization_handler.json_handler.encoder + ) + reloaded = json.loads( + as_json, + cls=serialization_handler.json_handler.decoder + ) + + assert path == reloaded + + @pytest.mark.parametrize('pathtype', ['shared', 'permanent']) + def test_to_dict(self, serialization_handler, pathtype, request): + path = request.getfixturevalue(f"{pathtype}_path") + dct = serialization_handler.to_dict(path) + assert dct == { + ':container:': pathtype, + ':label:': "dag/unit_attempt_0/file.txt", + } + + # tests for specific user stories + @pytest.mark.parametrize('pathtype', ['shared', 'permanent']) + def test_reload_file_contents(self, pathtype, request): + # USER STORY: I am loading my results object, and I will want to use + # the associated files. This should be transparent, regardless of + # where the storage is located (e.g., not local storage). (This is + # actually a test of the staging tools, but we include it here for + # completeness of the user stories.) + path = request.getfixturevalue(f"{pathtype}_path") + + # remove the file (remains in the MemoryStorage) + p = pathlib.Path(path.fspath) + assert p.exists() + p.unlink() + assert not p.exists() + + # reload the file (NB: nothing special done here; download is + # transparent to user) + with open(path, mode='r') as f: + contents = f.read() + + assert p.exists() + assert contents == "contents here" + + @pytest.mark.parametrize('pathtype', ['shared', 'permanent']) + def test_load_results_object_file_not_downloaded(self, + serialization_handler, + pathtype, request): + # USER STORY: I am loading my results object, but I do not need the + # large stored files. I do not want to download them when they + # aren't needed. + path = request.getfixturevalue(f"{pathtype}_path") + # serialize the path object + json_str = json.dumps(path, cls=serialization_handler.encoder) + + # delete the path from the directory + p = pathlib.Path(path) + assert p.exists() + p.unlink() + assert not p.exists() + + # reload the serialized form of the object + reloaded = json.loads(json_str, cls=serialization_handler.decoder) + + # check that the deserialized version has the path, but that the + # path does not exist on the filesystem + assert isinstance(reloaded, StagingPath) + assert reloaded.label == path.label + assert reloaded.path == path.path + assert not p.exists() + # NOTE: as soon as you call `__fspath__`, the file will download + + @pytest.mark.parametrize('move', ['relative', 'absolute']) + def test_permanent_storage_moved(self, move, tmp_path, monkeypatch): + # USER STORY: My permanent storage was a directory on my file + # system, but I have moved that directory (with use cases of (a) I + # moved the absolute path; (b) it is at a different relative path + # with respect to my pwd). + monkeypatch.chdir(tmp_path) + old_manager = StorageManager( + scratch_root="old/scratch", + shared_root=FileStorage("old/shared"), + permanent_root=FileStorage("old/permanent") + ) + old_handler = StagingPathSerialization(old_manager) + old_path = old_manager.permanent_staging / "dag/unit/result.txt" + with open(old_path, mode='w') as f: + f.write("contents here") + + old_manager.permanent_staging.transfer_staging_to_external() + perm_p = pathlib.Path(tmp_path / "old/permanent/dag/unit/result.txt") + assert perm_p.exists() + + # serialize the path object + json_str = json.dumps(old_path, cls=old_handler.encoder) + + # move the storage subdirectory; create a new, associated storage + # manager/serialization handler + if move == "relative": + # change to within t + monkeypatch.chdir(tmp_path / "old") + new_manager = StorageManager( + scratch_root="scratch", + shared_root=FileStorage("shared"), + permanent_root=FileStorage("permanent") + ) + expected_path = tmp_path / "old/permanent/dag/unit/result.txt" + elif move == "absolute": + shutil.move(tmp_path / "old", tmp_path / "new") + new_manager = StorageManager( + scratch_root="new/scratch", + shared_root=FileStorage("new/shared"), + permanent_root=FileStorage("new/permanent") + ) + expected_path = tmp_path / "new/permanent/dag/unit/result.txt" + else: # -no-cov- + raise RuntimeWarning(f"Bad test parameter '{move}': should be " + "'relative' or 'absolute'") + + new_handler = StagingPathSerialization(new_manager) + + # deserialize the path using the new serialization handler + reloaded = json.loads(json_str, cls=new_handler.decoder) + + # ensure that the path exists and that the data can be reloaded + assert isinstance(reloaded, StagingPath) + assert reloaded.label == old_path.label + assert pathlib.Path(expected_path).exists() + + with open(reloaded, mode='r') as f: + contents = f.read() + + assert contents == "contents here" + + def test_two_different_permanent_storages(self, tmp_path): + # I'm working with files from two different permanent storages. I + # need to be able to load from both in the same Python process. + # (NOTE: this user story is primarily to prevent us from changing to + # a solution based on global/class vars to set context.) + manager1 = StorageManager( + scratch_root=tmp_path / "working1", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + manager2 = StorageManager( + scratch_root=tmp_path / "working2", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + handler1 = StagingPathSerialization(manager1) + handler2 = StagingPathSerialization(manager2) + + path1 = manager1.permanent_staging / "file1.txt" + with open(path1, mode='w') as f: + f.write("contents 1") + manager1.permanent_staging.transfer_staging_to_external() + + path2 = manager2.permanent_staging / "file2.txt" + with open(path2, mode='w') as f: + f.write("contents 2") + manager2.permanent_staging.transfer_staging_to_external() + + # serialize the paths + json_str1 = json.dumps(path1, cls=handler1.encoder) + json_str2 = json.dumps(path2, cls=handler2.encoder) + + # delete all staged files + assert pathlib.Path(path1.fspath).exists() + manager1.permanent_staging.cleanup() + assert not pathlib.Path(path1.fspath).exists() + + assert pathlib.Path(path2.fspath).exists() + manager2.permanent_staging.cleanup() + assert not pathlib.Path(path2.fspath).exists() + + # reload and check contents of both permanent files + reloaded1 = json.loads(json_str1, cls=handler1.decoder) + reloaded2 = json.loads(json_str2, cls=handler2.decoder) + + assert isinstance(reloaded1, StagingPath) + assert reloaded1.label == path1.label + assert not pathlib.Path(reloaded1.fspath).exists() + with open(reloaded1, mode='r') as f: + assert f.read() == "contents 1" + + assert isinstance(reloaded2, StagingPath) + assert reloaded2.label == path2.label + assert not pathlib.Path(reloaded2.fspath).exists() + with open(reloaded2, mode='r') as f: + assert f.read() == "contents 2" diff --git a/gufe/tests/storage/test_storage_demo.py b/gufe/tests/storage/test_storage_demo.py index efff04b1d..490117842 100644 --- a/gufe/tests/storage/test_storage_demo.py +++ b/gufe/tests/storage/test_storage_demo.py @@ -73,44 +73,6 @@ def _create(self, stateA, stateB, mapping, extends): def _gather(self, protocol_dag_results): return {} - -# TODO: execute_unit should actually be moved somewhere else; this is likely -# to be the starting point for a real approach to do that -def execute_unit(dag_label, protocolunit, storage_manager, inputs): - label = f"{str(unit.key)}" - with storage_manager(running_dag(dag_label)) as dag_ctx: - with dag_ctx.running_unit(label) as (scratch, shared, perm): - context = Context(shared=shared, - scratch=scratch, - permanent=perm) - - unit_result = protocolunit.execute(context, - raise_error=False, - **inputs) - - return unit_result - - -def execute_per_unit(protocoldag, storage_manager, dag_directory): - # fake like we're executing each unit in a different process - all_unit_filenames = [] - dag_label = protocoldag.key # TODO: we can change this - for num, unit in enumerate(protocoldag.protocol_units): - unit_result = execute_unit(dag_label, unit, storage_manager) - fname = dag_directory / f"result_{num}.json" - # serialize the unit result - with open(fname, mode='w') as f: - f.write(json.dumps(unit_result.to_dict(), - cls=storage_manager.json_encoder)) - - all_unit_filenames.append(fname) - - # now let's force the unit_result to get cleared from memory - del unit_result - assert gc.is_finalized(unit_result) - - ... # TODO: make ProtocolDAGResult - @pytest.fixture def demo_dag(solvated_ligand, solvated_complex): transformation = gufe.Transformation( diff --git a/gufe/tests/storage/test_storagemanager.py b/gufe/tests/storage/test_storagemanager.py index 1cb5bdd29..80af74c1e 100644 --- a/gufe/tests/storage/test_storagemanager.py +++ b/gufe/tests/storage/test_storagemanager.py @@ -295,20 +295,3 @@ def _in_staging_permanent(self, unit_label, in_after): def _after_unit_existing_files(self, unit_label): # same for both; all files come from unit 1 return {"baz"} - - -class TestStorageManager: pass - # def test_get_scratch(self, storage_manager_std): - # scratch = storage_manager_std.get_scratch("dag_label/unit_label") - # assert str(scratch).endswith("scratch/dag_label/unit_label") - # assert isinstance(scratch, Path) - - # def test_get_permanent(self, storage_manager_std): - # perm = storage_manager_std.get_permanent("dag_label/unit_label") - # assert perm.__fspath__().endswith(".staging/dag_label/unit_label") - # assert isinstance(perm, StagingDirectory) - - # def test_get_shared(self, storage_manager_std): - # shared = storage_manager_std.get_shared("dag_label/unit_label") - # assert shared.__fspath__().endswith(".staging/dag_label/unit_label") - # assert isinstance(shared, StagingDirectory) diff --git a/gufe/tests/storage/test_unitstoragemanager.py b/gufe/tests/storage/test_unitstoragemanager.py new file mode 100644 index 000000000..82621a6c2 --- /dev/null +++ b/gufe/tests/storage/test_unitstoragemanager.py @@ -0,0 +1,122 @@ +import pytest +from gufe.tests.storage.test_storagemanager import dag_units, LifecycleHarness +from gufe.tests.storage.test_storage_demo import demo_dag +from gufe.storage.unitstoragemanager import PerUnitStorageManager +from gufe.tokenization import from_dict + +from gufe.storage.externalresource import MemoryStorage, FileStorage +from gufe.protocols.protocoldag import Context, ProtocolDAGResult + +import json + +# TODO: execute_unit should actually be moved somewhere else; this is likely +# to be the starting point for a real approach to do that +def execute_unit(dag_label, protocolunit, storage_manager, attempt, inputs): + with storage_manager.running_dag(dag_label) as dag_ctx: + with dag_ctx.running_unit( + dag_label, + protocolunit.key, + attempt=attempt + ) as (scratch, shared, perm): + context = Context(shared=shared, + scratch=scratch, + permanent=perm) + + unit_result = protocolunit.execute(context=context, + raise_error=False, + **inputs) + + return unit_result + +# the next functions will probably become conveniences on the warehouse +def _result_filenames_for_unit(unit, results_dir): + yield from ( + f for f in results_dir.iterdir() + if f.name.startswith(unit.key) + ) + +def load_results_for_unit(unit, results_dir, storage_manager): + for filename in _result_filenames_for_unit(unit, results_dir): + with open(filename, mode='r') as f: + res = from_dict(json.load(f, cls=storage_manager.json_decoder)) + + yield res + + +def get_inputs(unit, results_dir, storage_manager): + inputs = {} + for inp_name, inp_unit in unit.inputs.items(): + for res in load_results_for_unit(inp_unit, results_dir, + storage_manager): + if res.ok: + # there should be only 1 unit result that is ok, although + # we're not being safe about that in this little demo + inputs[inp_name] = res + break + + return inputs + + +def get_attempt_number(unit, results_dir): + return len(list(_result_filenames_for_unit(unit, results_dir))) + + +def save_unit_result(result, storage_manager, results_dir, attempt): + fname = results_dir / f"{result.source_key}_attempt_{attempt}.json" + dct = result.to_dict() # real approach should be more efficient + with open(fname, mode='w') as f: + f.write(json.dumps(dct, cls=storage_manager.json_encoder)) + +# now we have a test-only method, which will fake independent processes +# (although the actual executor will have some similar things) +def execute_per_unit(protocoldag, storage_manager, results_dir): + # fake like we're executing each unit in a different process + dag_label = "dag" # is this needed? check with other version + for num, unit in enumerate(protocoldag.protocol_units): + # when you run a unit, get its info + inputs = get_inputs(unit, results_dir, storage_manager) + attempt = get_attempt_number(unit, results_dir) + unit_result = execute_unit(dag_label, unit, storage_manager, + attempt, inputs) + save_unit_result(unit_result, storage_manager, results_dir, attempt) + + # now let's force the unit_result to get cleared from memory + del unit_result + + # reload all the serialized units (this would be a task generated after + # the normal tasks to create a DAG result) -- results_dir is specific to + # this DAG + unit_results = [] + for fname in results_dir.iterdir(): + with open(fname, mode='r') as f: + unit_results.append( + from_dict(json.load(f, cls=storage_manager.json_decoder)) + ) + + dag_result = ProtocolDAGResult( + protocol_units=protocoldag.protocol_units, + protocol_unit_results=unit_results, + transformation_key=protocoldag.transformation_key, + # NOTE: this function doesn't yet allow extends, etc. + ) + return dag_result + + +def test_execute_per_unit(tmp_path, demo_dag): + results = tmp_path / "result_objs" + scratch = tmp_path / "working" + shared = MemoryStorage() + permanent = MemoryStorage() + results.mkdir(parents=True, exist_ok=True) + manager = PerUnitStorageManager( + scratch_root=scratch, + shared_root=shared, + permanent_root=permanent, + ) + dag_result = execute_per_unit( + demo_dag, + manager, + results + ) + assert dag_result.ok() + # TODO: further asserts to make sure everything behaved as expected