diff --git a/gufe/storage/stagingserialization.py b/gufe/storage/stagingserialization.py new file mode 100644 index 000000000..c4db2df72 --- /dev/null +++ b/gufe/storage/stagingserialization.py @@ -0,0 +1,154 @@ +from gufe.tokenization import JSON_HANDLER +from gufe.custom_json import JSONCodec, JSONSerializerDeserializer +from .stagingregistry import StagingPath + + +class StagingPathSerialization: + """Class for managing serialization of a :class:`.StagingPath`. + + This class is only created internally. Developers of executors will + interface with this indirectly through the :class:`.StorageManager`; the + expectation is that the only thing they will need is access to the + ``encoder`` and ``decoder`` properties. + + Serialization of a :class:`.StagingPath` needs to strip the specific + storage context (path to external files storage) because we should able + to change that out-of-process (e.g., move the directory containing + results) and still be able to deserialize correctly. This class is + responsible for abstracting/injecting the storage context for a + :class:`.StagingPath` is serialized/deserialized. + """ + # TODO: this long comment should probably go somewhere where it will + # show up in docs as well? Maybe just bump it into the class docstring? + # + # Serializing staging paths + # ------------------------- + # + # 1. I am loading my results object, and I will want to use the + # associated files. This should be transparent, regardless of where + # the permanent storage is located. + # 2. I am loading my results object, but I do not need the large stored + # files. I do not want to download them when they aren't needed. + # 3. My permanent storage was a directory on my file system, but I have + # moved that directory (with use cases of (a) I moved the absolute + # path; (b) it is at a different relative path with respect to my + # pwd. + # 4. I'm working with files from two different permanent storages. I + # need to be able to load from both in the same Python process. + # 5. I have generated data in one backend, and I tranferred it to + # another backend. It needs to be readable from the other backend. + # (Use case: data is in long-term cloud storage that requires + # credentials, but I want to share some part of that data with + # someone else by transferring it to a disk.) + # 6. I am interfacing with a package that adds serialization types to + # the gufe JSON_HANDLER via an external JSONCodec. Maybe, in the + # worst case, the external codec gets added *after* I've created my + # serialization object. I need to be able to serialize those custom + # types. + # + # Outputs from a protocol may contain a :class:`.StagingPath`. Note that + # a :class:`.StagingPath` is inherently not a complete description of + # how to obtain the associated data: in almost every situation, there is + # some additional context required. This can include the credentials to + # an external server, or a base path that the file can be found relative + # to (which may have changed if the user moves it.) Because of this, we + # need to inject that context in to the deserialization. + # + # This object injects the relevant context, provided by the + # :class:`.StorageManager`. It creates a JSONSerializerDeserializer + # based on the one being used by gufe in this process, using all the + # installed codecs plus an additional codec specific to this context. + # + # User stories 1 and 2 are handled by the nature of the + # :class:`.StagingPath` object. The external file downloads as part of + # the ``__fspath__`` method. This means that when using the ``open`` + # builtin, you will automatically download the file to a local staging + # directory. However, the reference to the file can exist in the results + # object without downloading the file. + # + # User stories 3--6 are handled by this + # :class:`.StagingPathSerialization` class. Story 3 is handled by + # allowing the appropriate context (in the form of a + # :class:`.StorageManager`) to be injected into the deserialization + # process. Story 4 can be handled by using more than one + # :class:`.StagingPathSerialization` context (associated with different + # :class:`.StorageManager` objects. Story 5 is handled by injecting + # the appropriate context (and, in principle, is a variant of story 3.) + # Story 6 is handled by doing a just-in-time generation of the + # JSONSerializerDeserializer that we use for this class. + def __init__(self, manager): + self.manager = manager + self.codec = JSONCodec( + cls=StagingPath, + to_dict=self.to_dict, + from_dict=self.from_dict, + ) + self.refresh_handler() + + def refresh_handler(self): + """Ensure that the current handler includes all registered codecs""" + codecs = JSON_HANDLER.codecs + [self.codec] + self.json_handler = JSONSerializerDeserializer(codecs) + + @property + def encoder(self): + """ + JSONEncoder class to use when serializing a :class:`.StagingPath` + """ + self.refresh_handler() + return self.json_handler.encoder + + @property + def decoder(self): + """ + JSONdecoder class to use when deserializing a :class:`.StagingPath` + """ + self.refresh_handler() + return self.json_handler.decoder + + def to_dict(self, path: StagingPath): + """ + Dict representation of a StagingPath, abstracting specific context. + + This provides a JSON-serializable representation of a StagingPath + where the specific context of the StagingPath (the specific storage + backend where it is located) is replaced by a generic representation + of 'scratch', 'shared', or 'permanent', allowing a new specific + context to be injected on deserialization. + """ + # scratch, shared, permanent may form nested with progressively + # smaller contexts, so the last of those it is in is where it should + # be labelled. TODO: opportunity for performance improvement if + # needed + loc = None + if path.label in self.manager.scratch_root.iterdir(): + # TODO: does this happen? we should only trigger this function + # on a StagingPath, and anything in scratch will only be + # pathlib.Path, right? + loc = "scratch" + if path.label in self.manager.shared_root.iter_contents(): + loc = "shared" + if path.label in self.manager.permanent_root.iter_contents(): + loc = "permanent" + + if loc is None: + raise RuntimeError( + f"Unable to serialize {path}: it does not appear to be " + "associated with storage managed by the context manager " + f"{self.manager}." + ) + + return { + ':container:': loc, + ':label:': path.label, + } + + def from_dict(self, dct: dict) -> StagingPath: + """Recreate a StagingPath from its dict representation. + + This undoes the process from :method:`.to_dict`. It injects the + storage context in ``self.storage_manager`` into the deserialized + :class:`.StagingPath` instance. + """ + staging = getattr(self.manager, f"{dct[':container:']}_staging") + return staging / dct[':label:'] diff --git a/gufe/storage/storagemanager.py b/gufe/storage/storagemanager.py index 08f83e6ab..ef95286ab 100644 --- a/gufe/storage/storagemanager.py +++ b/gufe/storage/storagemanager.py @@ -10,6 +10,7 @@ from .externalresource import ExternalStorage, FileStorage from .stagingregistry import SharedStaging, PermanentStaging +from .stagingserialization import StagingPathSerialization from .stagingregistry import StagingPath # typing from gufe.protocols.protocolunit import Context @@ -57,6 +58,16 @@ def __init__( keep_empty_dirs=keep_empty_dirs, ) + self._serialization = StagingPathSerialization(self) + + @property + def json_encoder(self): + return self._serialization.encoder + + @property + def json_decoder(self): + return self._serialization.decoder + def make_label(self, dag_label, unit_label, attempt, **kwargs): """ diff --git a/gufe/tests/storage/test_stagingserialization.py b/gufe/tests/storage/test_stagingserialization.py new file mode 100644 index 000000000..f36de19ab --- /dev/null +++ b/gufe/tests/storage/test_stagingserialization.py @@ -0,0 +1,358 @@ +import pytest +from gufe.storage.stagingserialization import StagingPathSerialization + +from gufe.storage.stagingregistry import StagingPath +from gufe.storage.storagemanager import StorageManager +from gufe.storage.externalresource import MemoryStorage, FileStorage + +from gufe.tokenization import GufeTokenizable, from_dict +from gufe.custom_json import JSONCodec + +import json +import pathlib +import shutil + + +@pytest.fixture +def storage_manager(tmp_path): + return StorageManager( + scratch_root=tmp_path / "working", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + + +@pytest.fixture +def shared_path(storage_manager): + label = storage_manager.make_label("dag", "unit", attempt=0) + path = storage_manager.shared_staging / label / "file.txt" + with open(path, mode='w') as f: + f.write("contents here") + + storage_manager.shared_staging.transfer_staging_to_external() + return path + + +@pytest.fixture +def permanent_path(storage_manager): + label = storage_manager.make_label("dag", "unit", attempt=0) + path = storage_manager.permanent_staging / label / "file.txt" + with open(path, mode='w') as f: + f.write("contents here") + + storage_manager.permanent_staging.transfer_staging_to_external() + return path + + +@pytest.fixture +def scratch_path(storage_manager): + scratch_dir = storage_manager._scratch_loc("dag", "unit", attempt=0) + path = scratch_dir / "file.txt" + return path + + +@pytest.fixture +def serialization_handler(storage_manager): + return StagingPathSerialization(storage_manager) + + +class NewType: + # used in new codec test (putting as a nested class required fancier + # serialization approaches, easier to put it at module level) + # class where any instance is equivalent (carries no data) + def __eq__(self, other): + return isinstance(other, self.__class__) + + + +class TestStagingPathSerialization: + @pytest.mark.parametrize('pathtype', ['scratch', 'shared', 'permanent']) + def test_round_trip(self, serialization_handler, pathtype, request): + # NB: scratch is a pathlib.Path, not a StagingPath. It is tested + # here to ensure round-trips as part of the overall user story for + # this, but it doesn't invoke the machinery of the + # StagingPathSerialization object + path = request.getfixturevalue(f"{pathtype}_path") + as_json = json.dumps( + path, + cls=serialization_handler.json_handler.encoder + ) + reloaded = json.loads( + as_json, + cls=serialization_handler.json_handler.decoder + ) + + assert path == reloaded + + @pytest.mark.parametrize('pathtype', ['shared', 'permanent']) + def test_to_dict(self, serialization_handler, pathtype, request): + path = request.getfixturevalue(f"{pathtype}_path") + dct = serialization_handler.to_dict(path) + assert dct == { + ':container:': pathtype, + ':label:': "dag/unit_attempt_0/file.txt", + } + + # tests for specific user stories + @pytest.mark.parametrize('pathtype', ['shared', 'permanent']) + def test_reload_file_contents(self, pathtype, request): + # USER STORY: I am loading my results object, and I will want to use + # the associated files. This should be transparent, regardless of + # where the storage is located (e.g., not local storage). (This is + # actually a test of the staging tools, but we include it here for + # completeness of the user stories.) + path = request.getfixturevalue(f"{pathtype}_path") + + # remove the file (remains in the MemoryStorage) + p = path.as_path() + assert p.exists() + p.unlink() + assert not p.exists() + + # reload the file (NB: nothing special done here; download is + # transparent to user) + with open(path, mode='r') as f: + contents = f.read() + + assert p.exists() + assert contents == "contents here" + + @pytest.mark.parametrize('pathtype', ['shared', 'permanent']) + def test_load_results_object_file_not_downloaded(self, + serialization_handler, + pathtype, request): + # USER STORY: I am loading my results object, but I do not need the + # large stored files. I do not want to download them when they + # aren't needed. + path = request.getfixturevalue(f"{pathtype}_path") + # serialize the path object + json_str = json.dumps(path, cls=serialization_handler.encoder) + + # delete the path from the directory + p = pathlib.Path(path) + assert p.exists() + p.unlink() + assert not p.exists() + + # reload the serialized form of the object + reloaded = json.loads(json_str, cls=serialization_handler.decoder) + + # check that the deserialized version has the path, but that the + # path does not exist on the filesystem + assert isinstance(reloaded, StagingPath) + assert reloaded.label == path.label + assert reloaded.path == path.path + assert not p.exists() + # NOTE: as soon as you call `__fspath__`, the file will download + + @pytest.mark.parametrize('move', ['relative', 'absolute']) + def test_permanent_storage_moved(self, move, tmp_path, monkeypatch): + # USER STORY: My permanent storage was a directory on my file + # system, but I have moved that directory (with use cases of (a) I + # moved the absolute path; (b) it is at a different relative path + # with respect to my pwd). + monkeypatch.chdir(tmp_path) + old_manager = StorageManager( + scratch_root="old/scratch", + shared_root=FileStorage("old/shared"), + permanent_root=FileStorage("old/permanent") + ) + old_path = old_manager.permanent_staging / "dag/unit/result.txt" + with open(old_path, mode='w') as f: + f.write("contents here") + + old_manager.permanent_staging.transfer_staging_to_external() + perm_p = pathlib.Path(tmp_path / "old/permanent/dag/unit/result.txt") + assert perm_p.exists() + + # serialize the path object + json_str = json.dumps(old_path, cls=old_manager.json_encoder) + + # move the storage subdirectory; create a new, associated storage + # manager/serialization handler + if move == "relative": + # change to within t + monkeypatch.chdir(tmp_path / "old") + new_manager = StorageManager( + scratch_root="scratch", + shared_root=FileStorage("shared"), + permanent_root=FileStorage("permanent") + ) + expected_path = tmp_path / "old/permanent/dag/unit/result.txt" + elif move == "absolute": + shutil.move(tmp_path / "old", tmp_path / "new") + new_manager = StorageManager( + scratch_root="new/scratch", + shared_root=FileStorage("new/shared"), + permanent_root=FileStorage("new/permanent") + ) + expected_path = tmp_path / "new/permanent/dag/unit/result.txt" + else: # -no-cov- + raise RuntimeWarning(f"Bad test parameter '{move}': should be " + "'relative' or 'absolute'") + + # deserialize the path using the new serialization handler + reloaded = json.loads(json_str, cls=new_manager.json_decoder) + + # ensure that the path exists and that the data can be reloaded + assert isinstance(reloaded, StagingPath) + assert reloaded.label == old_path.label + assert pathlib.Path(expected_path).exists() + + with open(reloaded, mode='r') as f: + contents = f.read() + + assert contents == "contents here" + + def test_two_different_permanent_storages(self, tmp_path): + # USER STORY: I'm working with files from two different permanent + # storages. I need to be able to load from both in the same Python + # process. (NOTE: this user story is primarily to prevent us from + # changing to a solution based on global/class vars to set context.) + manager1 = StorageManager( + scratch_root=tmp_path / "working1", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + manager2 = StorageManager( + scratch_root=tmp_path / "working2", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + + path1 = manager1.permanent_staging / "file1.txt" + with open(path1, mode='w') as f: + f.write("contents 1") + manager1.permanent_staging.transfer_staging_to_external() + + path2 = manager2.permanent_staging / "file2.txt" + with open(path2, mode='w') as f: + f.write("contents 2") + manager2.permanent_staging.transfer_staging_to_external() + + # serialize the paths + json_str1 = json.dumps(path1, cls=manager1.json_encoder) + json_str2 = json.dumps(path2, cls=manager2.json_encoder) + + # delete all staged files + assert path1.as_path().exists() + manager1.permanent_staging.cleanup() + assert not path1.as_path().exists() + + assert path2.as_path().exists() + manager2.permanent_staging.cleanup() + assert not path2.as_path().exists() + + # reload and check contents of both permanent files + reloaded1 = json.loads(json_str1, cls=manager1.json_decoder) + reloaded2 = json.loads(json_str2, cls=manager2.json_decoder) + + assert isinstance(reloaded1, StagingPath) + assert reloaded1.label == path1.label + assert not reloaded1.as_path().exists() + with open(reloaded1, mode='r') as f: + assert f.read() == "contents 1" + + assert isinstance(reloaded2, StagingPath) + assert reloaded2.label == path2.label + assert not reloaded2.as_path().exists() + with open(reloaded2, mode='r') as f: + assert f.read() == "contents 2" + + def test_change_storage_backend(self, tmp_path): + # USER STORY: I have generated data in one backend, and I tranferred + # it to another backend. It needs to be readable from the other + # backend. (Use case: data is in long-term cloud storage that + # requires credentials, but I want to share some part of that data + # with someone else by transferring it to a disk.) + cloud_manager = StorageManager( + scratch_root=tmp_path / "cloud", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + + local_manager = StorageManager( + scratch_root=tmp_path / "local_scratch", + shared_root=MemoryStorage(), + permanent_root=FileStorage(tmp_path / "local_perm"), + ) + + # TODO: maybe add some more safety asserts in here? that each step + # goes as expected, to better diagnose potential failures? + # load data into the cloud storage + with cloud_manager.running_dag("dag") as dag_ctx: + with dag_ctx.running_unit("dag", "unit", attempt=0) as ctx: + cloud_path = ctx.permanent / "data.txt" + with open(cloud_path, mode='w') as f: + f.write("will store on cloud") + + # serialize the cloud_path (assume it is saved somewhere) + serialized = json.dumps(cloud_path, cls=cloud_manager.json_encoder) + + # transfer from cloud storage to the local_manager + for label in cloud_manager.permanent_root.iter_contents("dag/unit"): + with cloud_manager.permanent_root.load_stream(label) as f: + local_manager.permanent_root.store_bytes(label, f.read()) + + # ensure that we can reload objects from the local manager + local_path = json.loads(serialized, cls=local_manager.json_decoder) + + assert local_path != cloud_path + + with open(local_path, mode='r') as f: + contents = f.read() + + assert contents == "will store on cloud" + + def test_requires_new_codec(self, tmp_path): + # USER STORY: I am interfacing with a package that adds + # serialization types to the gufe JSON_HANDLER via an external + # JSONCodec. Maybe, in the worst case, the external codec gets added + # *after* I've created my serialization object. I need to be able to + # serialize those custom types. (NOTE: A better solution here is to + # have JSONCodecs also include some codec identifier in their + # `:is_custom:` field. That would allow us to dynamically add any + # missing codec, and only to do so when deserialization is needed. + # This is a change to the custom JSON stuff which hasn't been made + # yet. This might also allow faster deserialization by having + # :is_custom: map to something that can be used in a dispatch table.) + manager = StorageManager( + scratch_root=tmp_path / "working", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + + # add a new custom codec for serialization + new_type_codec = JSONCodec( + cls=NewType, + to_dict=lambda obj: {}, + from_dict=lambda dct: NewType(), + ) + + # Create a dict to serialize; this represents the output dict that + # might come from a unit_result object. NB: including the file stuff + # here is actually extraneous (unless implementation changes + # significantly). + with manager.running_dag("dag") as dag_ctx: + with manager.running_unit("dag", "unit", attempt=0) as context: + file = context.permanent / "dag/unit/file.txt" + with open(file, mode='w') as f: + f.write("contents") + + output_dict = { + 'new_type_result': NewType(), + 'file_result': file, + } + + # before codec registration, error as not JSON serializable + with pytest.raises(TypeError, match="not JSON serializable"): + _ = json.dumps(output_dict, cls=manager.json_encoder) + + # register codec and it works + from gufe.tokenization import JSON_HANDLER + JSON_HANDLER.add_codec(new_type_codec) + dumped = json.dumps(output_dict, cls=manager.json_encoder) + + reloaded = json.loads(dumped, cls=manager.json_decoder) + + assert reloaded == output_dict