From a659afa6d054a4cd81df9857772fc4e68dbef440 Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Fri, 1 Dec 2023 13:13:07 -0600 Subject: [PATCH 1/9] staging serialization --- gufe/storage/stagingserialization.py | 94 ++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 gufe/storage/stagingserialization.py diff --git a/gufe/storage/stagingserialization.py b/gufe/storage/stagingserialization.py new file mode 100644 index 000000000..b50817fe6 --- /dev/null +++ b/gufe/storage/stagingserialization.py @@ -0,0 +1,94 @@ +from gufe.tokenization import JSON_HANDLER +from gufe.custom_json import JSONCodec, JSONSerializerDeserializer +from .stagingdirectory import StagingPath + +class StagingPathSerialization: + # TODO: where should this go? I think maybe on the storage manager + + # Serializing staging paths + # ------------------------- + # + # Some important user stories to consider: + # + # 1. I am loading my results object, and I will want to use the + # associated files. This should be transparent, regardless of where + # the permanent storage is located. + # 2. I am loading my results object, but I do not need the large stored + # files. I do not want to download them when they aren't needed. + # 3. My permanent storage was a directory on my file system, but I have + # moved that directory (with use cases of (a) I moved the absolute + # path; (b) it is at a different relative path with respect to my + # pwd. + # 4. I'm working with files from two different permanent storages. I + # need to be able to load from both in the same Python process. + # + # Outputs from a protocol may contain a :class:`.StagingPath`. Note that + # a :class:`.StagingPath` is inherently not a complete description of + # how to obtain the associated data: in almost every situation, there is + # some additional context required. This can include the credentials to + # an external server, or a base path that the file can be found relative + # to (which may have changed if the user moves it.) Because of this, we + # need to inject that context in to the deserialization. + # + # This object injects the relevant context, provided by the + # :class:`.StorageManager`. It creates a JSONSerializerDeserializer + # based on the one being used by gufe in this process, using all the + # installed codecs plus an additional codec specific to this context. + # + # User stories 1 and 2 are handled by the nature of the + # :class:`.StagingPath` object. The external file downloads as part of + # the ``__fspath__`` method. This means that when using the ``open`` + # builtin, you will automatically download the file to a local staging + # directory. However, the reference to the file can exist in the results + # object without downloading the file. + # + # User stories 3 and 4 are handled by this + # :class:`.StagingPathSerialization` class. Story 3 is handled by + # allowing the appropriate context (in the form of a + # :class:`.StorageManager`) to be injected into the deserialization + # process. Story 4 can be handled by using more than one + # :class:`.StagingPathSerialization` context (associated with different + # :class:`.StorageManager` objects. + + def __init__(self, manager): + self.manager = manager + self.codec = JSONCodec( + cls=StagingPath, + to_dict=self.to_dict, + from_dict=self.from_dict, + ) + self.refresh_handler() + + def refresh_handler(self): + codecs = JSON_HANDLER.codecs + [self.codec] + self.json_handler = JSONSerializerDeserializer(codecs) + + @property + def encoder(self): + return self.json_handler.encoder + + @property + def decoder(self): + return self.json_handler.decoder + + def to_dict(self, path): + # scratch, shared, permanent may form nested with progressively + # smaller contexts, so the last of those it is in is where it should + # be labelled. TODO: opportunity for performance improvement if + # needed + loc = None + if path.label in self.manager.scratch_root.iterdir(): + loc = "scratch" + if path.label in self.manager.shared_root.iter_contents(): + loc = "shared" + if path.label in self.manager.permanent_root.iter_contents(): + loc = "permanent" + + return { + ':container:': loc, + ':label:': path.label, + } + + def from_dict(self, dct): + staging = getattr(self.manager, f"{dct[':container:']}_staging") + return staging / dct[':label:'] From 32b1fe4ce68a1776bc854f39a94f378da336ee7a Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Fri, 1 Dec 2023 13:13:33 -0600 Subject: [PATCH 2/9] tests for staging serialization --- .../storage/test_stagingserialization.py | 247 ++++++++++++++++++ 1 file changed, 247 insertions(+) create mode 100644 gufe/tests/storage/test_stagingserialization.py diff --git a/gufe/tests/storage/test_stagingserialization.py b/gufe/tests/storage/test_stagingserialization.py new file mode 100644 index 000000000..a0d1a16df --- /dev/null +++ b/gufe/tests/storage/test_stagingserialization.py @@ -0,0 +1,247 @@ +import pytest +from gufe.storage.stagingserialization import StagingPathSerialization + +from gufe.storage.stagingdirectory import StagingPath +from gufe.storage.storagemanager import StorageManager +from gufe.storage.externalresource import MemoryStorage, FileStorage + +import json +import pathlib +import shutil + +@pytest.fixture +def storage_manager(tmp_path): + return StorageManager( + scratch_root=tmp_path / "working", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + +@pytest.fixture +def shared_path(storage_manager): + label = storage_manager.make_label("dag", "unit", attempt=0) + path = storage_manager.shared_staging / label / "file.txt" + with open(path, mode='w') as f: + f.write("contents here") + + storage_manager.shared_staging.transfer_staging_to_external() + return path + +@pytest.fixture +def permanent_path(storage_manager): + label = storage_manager.make_label("dag", "unit", attempt=0) + path = storage_manager.permanent_staging / label / "file.txt" + with open(path, mode='w') as f: + f.write("contents here") + + storage_manager.permanent_staging.transfer_staging_to_external() + return path + +@pytest.fixture +def scratch_path(storage_manager): + scratch_dir = storage_manager._scratch_loc("dag", "unit", attempt=0) + path = scratch_dir / "file.txt" + return path + +@pytest.fixture +def serialization_handler(storage_manager): + return StagingPathSerialization(storage_manager) + +class TestStagingPathSerialization: + @pytest.mark.parametrize('pathtype', ['scratch', 'shared', 'permanent']) + def test_round_trip(self, serialization_handler, pathtype, request): + # NB: scratch is a pathlib.Path, not a StagingPath. It is tested + # here to ensure round-trips as part of the overall user story for + # this, but it doesn't invoke the machinery of the + # StagingPathSerialization object + path = request.getfixturevalue(f"{pathtype}_path") + as_json = json.dumps( + path, + cls=serialization_handler.json_handler.encoder + ) + reloaded = json.loads( + as_json, + cls=serialization_handler.json_handler.decoder + ) + + assert path == reloaded + + @pytest.mark.parametrize('pathtype', ['shared', 'permanent']) + def test_to_dict(self, serialization_handler, pathtype, request): + path = request.getfixturevalue(f"{pathtype}_path") + dct = serialization_handler.to_dict(path) + assert dct == { + ':container:': pathtype, + ':label:': "dag/unit_attempt_0/file.txt", + } + + # tests for specific user stories + @pytest.mark.parametrize('pathtype', ['shared', 'permanent']) + def test_reload_file_contents(self, pathtype, request): + # USER STORY: I am loading my results object, and I will want to use + # the associated files. This should be transparent, regardless of + # where the storage is located (e.g., not local storage). (This is + # actually a test of the staging tools, but we include it here for + # completeness of the user stories.) + path = request.getfixturevalue(f"{pathtype}_path") + + # remove the file (remains in the MemoryStorage) + p = pathlib.Path(path.fspath) + assert p.exists() + p.unlink() + assert not p.exists() + + # reload the file (NB: nothing special done here; download is + # transparent to user) + with open(path, mode='r') as f: + contents = f.read() + + assert p.exists() + assert contents == "contents here" + + @pytest.mark.parametrize('pathtype', ['shared', 'permanent']) + def test_load_results_object_file_not_downloaded(self, + serialization_handler, + pathtype, request): + # USER STORY: I am loading my results object, but I do not need the + # large stored files. I do not want to download them when they + # aren't needed. + path = request.getfixturevalue(f"{pathtype}_path") + # serialize the path object + json_str = json.dumps(path, cls=serialization_handler.encoder) + + # delete the path from the directory + p = pathlib.Path(path) + assert p.exists() + p.unlink() + assert not p.exists() + + # reload the serialized form of the object + reloaded = json.loads(json_str, cls=serialization_handler.decoder) + + # check that the deserialized version has the path, but that the + # path does not exist on the filesystem + assert isinstance(reloaded, StagingPath) + assert reloaded.label == path.label + assert reloaded.path == path.path + assert not p.exists() + # NOTE: as soon as you call `__fspath__`, the file will download + + @pytest.mark.parametrize('move', ['relative', 'absolute']) + def test_permanent_storage_moved(self, move, tmp_path, monkeypatch): + # USER STORY: My permanent storage was a directory on my file + # system, but I have moved that directory (with use cases of (a) I + # moved the absolute path; (b) it is at a different relative path + # with respect to my pwd). + monkeypatch.chdir(tmp_path) + old_manager = StorageManager( + scratch_root="old/scratch", + shared_root=FileStorage("old/shared"), + permanent_root=FileStorage("old/permanent") + ) + old_handler = StagingPathSerialization(old_manager) + old_path = old_manager.permanent_staging / "dag/unit/result.txt" + with open(old_path, mode='w') as f: + f.write("contents here") + + old_manager.permanent_staging.transfer_staging_to_external() + perm_p = pathlib.Path(tmp_path / "old/permanent/dag/unit/result.txt") + assert perm_p.exists() + + # serialize the path object + json_str = json.dumps(old_path, cls=old_handler.encoder) + + # move the storage subdirectory; create a new, associated storage + # manager/serialization handler + if move == "relative": + # change to within t + monkeypatch.chdir(tmp_path / "old") + new_manager = StorageManager( + scratch_root="scratch", + shared_root=FileStorage("shared"), + permanent_root=FileStorage("permanent") + ) + expected_path = tmp_path / "old/permanent/dag/unit/result.txt" + elif move == "absolute": + shutil.move(tmp_path / "old", tmp_path / "new") + new_manager = StorageManager( + scratch_root="new/scratch", + shared_root=FileStorage("new/shared"), + permanent_root=FileStorage("new/permanent") + ) + expected_path = tmp_path / "new/permanent/dag/unit/result.txt" + else: # -no-cov- + raise RuntimeWarning(f"Bad test parameter '{move}': should be " + "'relative' or 'absolute'") + + new_handler = StagingPathSerialization(new_manager) + + # deserialize the path using the new serialization handler + reloaded = json.loads(json_str, cls=new_handler.decoder) + + # ensure that the path exists and that the data can be reloaded + assert isinstance(reloaded, StagingPath) + assert reloaded.label == old_path.label + assert pathlib.Path(expected_path).exists() + + with open(reloaded, mode='r') as f: + contents = f.read() + + assert contents == "contents here" + + def test_two_different_permanent_storages(self, tmp_path): + # I'm working with files from two different permanent storages. I + # need to be able to load from both in the same Python process. + # (NOTE: this user story is primarily to prevent us from changing to + # a solution based on global/class vars to set context.) + manager1 = StorageManager( + scratch_root=tmp_path / "working1", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + manager2 = StorageManager( + scratch_root=tmp_path / "working2", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + handler1 = StagingPathSerialization(manager1) + handler2 = StagingPathSerialization(manager2) + + path1 = manager1.permanent_staging / "file1.txt" + with open(path1, mode='w') as f: + f.write("contents 1") + manager1.permanent_staging.transfer_staging_to_external() + + path2 = manager2.permanent_staging / "file2.txt" + with open(path2, mode='w') as f: + f.write("contents 2") + manager2.permanent_staging.transfer_staging_to_external() + + # serialize the paths + json_str1 = json.dumps(path1, cls=handler1.encoder) + json_str2 = json.dumps(path2, cls=handler2.encoder) + + # delete all staged files + assert pathlib.Path(path1.fspath).exists() + manager1.permanent_staging.cleanup() + assert not pathlib.Path(path1.fspath).exists() + + assert pathlib.Path(path2.fspath).exists() + manager2.permanent_staging.cleanup() + assert not pathlib.Path(path2.fspath).exists() + + # reload and check contents of both permanent files + reloaded1 = json.loads(json_str1, cls=handler1.decoder) + reloaded2 = json.loads(json_str2, cls=handler2.decoder) + + assert isinstance(reloaded1, StagingPath) + assert reloaded1.label == path1.label + assert not pathlib.Path(reloaded1.fspath).exists() + with open(reloaded1, mode='r') as f: + assert f.read() == "contents 1" + + assert isinstance(reloaded2, StagingPath) + assert reloaded2.label == path2.label + assert not pathlib.Path(reloaded2.fspath).exists() + with open(reloaded2, mode='r') as f: + assert f.read() == "contents 2" From e39a79026c35b0b9a8643e4f8c1001bd81b74cc3 Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Fri, 1 Dec 2023 14:19:27 -0600 Subject: [PATCH 3/9] pep8 --- gufe/storage/stagingserialization.py | 1 + gufe/tests/storage/test_stagingserialization.py | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/gufe/storage/stagingserialization.py b/gufe/storage/stagingserialization.py index b50817fe6..72a6fc481 100644 --- a/gufe/storage/stagingserialization.py +++ b/gufe/storage/stagingserialization.py @@ -2,6 +2,7 @@ from gufe.custom_json import JSONCodec, JSONSerializerDeserializer from .stagingdirectory import StagingPath + class StagingPathSerialization: # TODO: where should this go? I think maybe on the storage manager diff --git a/gufe/tests/storage/test_stagingserialization.py b/gufe/tests/storage/test_stagingserialization.py index a0d1a16df..47cc3ed10 100644 --- a/gufe/tests/storage/test_stagingserialization.py +++ b/gufe/tests/storage/test_stagingserialization.py @@ -9,6 +9,7 @@ import pathlib import shutil + @pytest.fixture def storage_manager(tmp_path): return StorageManager( @@ -17,6 +18,7 @@ def storage_manager(tmp_path): permanent_root=MemoryStorage(), ) + @pytest.fixture def shared_path(storage_manager): label = storage_manager.make_label("dag", "unit", attempt=0) @@ -27,6 +29,7 @@ def shared_path(storage_manager): storage_manager.shared_staging.transfer_staging_to_external() return path + @pytest.fixture def permanent_path(storage_manager): label = storage_manager.make_label("dag", "unit", attempt=0) @@ -37,16 +40,19 @@ def permanent_path(storage_manager): storage_manager.permanent_staging.transfer_staging_to_external() return path + @pytest.fixture def scratch_path(storage_manager): scratch_dir = storage_manager._scratch_loc("dag", "unit", attempt=0) path = scratch_dir / "file.txt" return path + @pytest.fixture def serialization_handler(storage_manager): return StagingPathSerialization(storage_manager) + class TestStagingPathSerialization: @pytest.mark.parametrize('pathtype', ['scratch', 'shared', 'permanent']) def test_round_trip(self, serialization_handler, pathtype, request): From c4126116865bea878e937211a88cd6694ba7b81c Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Thu, 14 Dec 2023 15:37:42 -0600 Subject: [PATCH 4/9] update for changes in preceding PRs --- gufe/storage/stagingserialization.py | 2 +- gufe/tests/storage/test_stagingserialization.py | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/gufe/storage/stagingserialization.py b/gufe/storage/stagingserialization.py index 72a6fc481..99deef601 100644 --- a/gufe/storage/stagingserialization.py +++ b/gufe/storage/stagingserialization.py @@ -1,6 +1,6 @@ from gufe.tokenization import JSON_HANDLER from gufe.custom_json import JSONCodec, JSONSerializerDeserializer -from .stagingdirectory import StagingPath +from .stagingregistry import StagingPath class StagingPathSerialization: diff --git a/gufe/tests/storage/test_stagingserialization.py b/gufe/tests/storage/test_stagingserialization.py index 47cc3ed10..e5df650b7 100644 --- a/gufe/tests/storage/test_stagingserialization.py +++ b/gufe/tests/storage/test_stagingserialization.py @@ -1,7 +1,7 @@ import pytest from gufe.storage.stagingserialization import StagingPathSerialization -from gufe.storage.stagingdirectory import StagingPath +from gufe.storage.stagingregistry import StagingPath from gufe.storage.storagemanager import StorageManager from gufe.storage.externalresource import MemoryStorage, FileStorage @@ -92,7 +92,7 @@ def test_reload_file_contents(self, pathtype, request): path = request.getfixturevalue(f"{pathtype}_path") # remove the file (remains in the MemoryStorage) - p = pathlib.Path(path.fspath) + p = path.as_path() assert p.exists() p.unlink() assert not p.exists() @@ -228,13 +228,13 @@ def test_two_different_permanent_storages(self, tmp_path): json_str2 = json.dumps(path2, cls=handler2.encoder) # delete all staged files - assert pathlib.Path(path1.fspath).exists() + assert path1.as_path().exists() manager1.permanent_staging.cleanup() - assert not pathlib.Path(path1.fspath).exists() + assert not path1.as_path().exists() - assert pathlib.Path(path2.fspath).exists() + assert path2.as_path().exists() manager2.permanent_staging.cleanup() - assert not pathlib.Path(path2.fspath).exists() + assert not path2.as_path().exists() # reload and check contents of both permanent files reloaded1 = json.loads(json_str1, cls=handler1.decoder) @@ -242,12 +242,12 @@ def test_two_different_permanent_storages(self, tmp_path): assert isinstance(reloaded1, StagingPath) assert reloaded1.label == path1.label - assert not pathlib.Path(reloaded1.fspath).exists() + assert not reloaded1.as_path().exists() with open(reloaded1, mode='r') as f: assert f.read() == "contents 1" assert isinstance(reloaded2, StagingPath) assert reloaded2.label == path2.label - assert not pathlib.Path(reloaded2.fspath).exists() + assert not reloaded2.as_path().exists() with open(reloaded2, mode='r') as f: assert f.read() == "contents 2" From bf49c345ae858e9f87fd90f0b7d4a3899d53f8e6 Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Thu, 14 Dec 2023 16:51:45 -0600 Subject: [PATCH 5/9] First version of the change storage backend story --- .../storage/test_stagingserialization.py | 55 +++++++++++++++++-- 1 file changed, 51 insertions(+), 4 deletions(-) diff --git a/gufe/tests/storage/test_stagingserialization.py b/gufe/tests/storage/test_stagingserialization.py index e5df650b7..bd0181df8 100644 --- a/gufe/tests/storage/test_stagingserialization.py +++ b/gufe/tests/storage/test_stagingserialization.py @@ -196,10 +196,10 @@ def test_permanent_storage_moved(self, move, tmp_path, monkeypatch): assert contents == "contents here" def test_two_different_permanent_storages(self, tmp_path): - # I'm working with files from two different permanent storages. I - # need to be able to load from both in the same Python process. - # (NOTE: this user story is primarily to prevent us from changing to - # a solution based on global/class vars to set context.) + # USER STORY: I'm working with files from two different permanent + # storages. I need to be able to load from both in the same Python + # process. (NOTE: this user story is primarily to prevent us from + # changing to a solution based on global/class vars to set context.) manager1 = StorageManager( scratch_root=tmp_path / "working1", shared_root=MemoryStorage(), @@ -251,3 +251,50 @@ def test_two_different_permanent_storages(self, tmp_path): assert not reloaded2.as_path().exists() with open(reloaded2, mode='r') as f: assert f.read() == "contents 2" + + def test_change_storage_backend(self, tmp_path): + # USER STORY: I have generated data in one backend, and I tranferred + # it to another backend. It needs to be readable from the other + # backend. (Use case: data is in long-term cloud storage that + # requires credentials, but I want to share some part of that data + # with someone else by transferring it to a disk.) + cloud_manager = StorageManager( + scratch_root=tmp_path / "cloud", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + cloud_serialization = StagingPathSerialization(cloud_manager) + + local_manager = StorageManager( + scratch_root=tmp_path / "local_scratch", + shared_root=MemoryStorage(), + permanent_root=FileStorage(tmp_path / "local_perm"), + ) + local_serialization = StagingPathSerialization(local_manager) + + # TODO: maybe add some more safety asserts in here? that each step + # goes as expected, to better diagnose potential failures? + # load data into the cloud storage + with cloud_manager.running_dag("dag") as dag_ctx: + with dag_ctx.running_unit("dag", "unit", attempt=0) as ctx: + cloud_path = ctx.permanent / "data.txt" + with open(cloud_path, mode='w') as f: + f.write("will store on cloud") + + # serialize the cloud_path (assume it is saved somewhere) + serialized = json.dumps(cloud_path, cls=cloud_serialization.encoder) + + # transfer from cloud storage to the local_manager + for label in cloud_manager.permanent_root.iter_contents("dag/unit"): + with cloud_manager.permanent_root.load_stream(label) as f: + local_manager.permanent_root.store_bytes(label, f.read()) + + # ensure that we can reload objects from the local manager + local_path = json.loads(serialized, cls=local_serialization.decoder) + + assert local_path != cloud_path + + with open(local_path, mode='r') as f: + contents = f.read() + + assert contents == "will store on cloud" From 10a40eb6aac37e8b9d4069cee5054b423e4175e5 Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Fri, 15 Dec 2023 15:26:28 -0600 Subject: [PATCH 6/9] docstrings; sketch out yet ANOTHER user story --- gufe/storage/stagingserialization.py | 70 ++++++++++++++++--- .../storage/test_stagingserialization.py | 14 ++++ 2 files changed, 76 insertions(+), 8 deletions(-) diff --git a/gufe/storage/stagingserialization.py b/gufe/storage/stagingserialization.py index 99deef601..39ead2b9f 100644 --- a/gufe/storage/stagingserialization.py +++ b/gufe/storage/stagingserialization.py @@ -4,13 +4,21 @@ class StagingPathSerialization: - # TODO: where should this go? I think maybe on the storage manager + """Class for managing serialization of a :class:`.StagingPath`. + Serialization of a :class:`.StagingPath` needs to strip the specific + storage context (path to external files storage) because we should able + to change that out-of-process (e.g., move the directory containing + results) and still be able to deserialize correctly. This class is + responsible for abstracting/injecting the storage context for a + :class:`.StagingPath` is serialized/deserialized. + """ + # TODO: this long comment should probably go somewhere where it will + # show up in docs as well? Maybe just bump it into the class docstring? + # # Serializing staging paths # ------------------------- # - # Some important user stories to consider: - # # 1. I am loading my results object, and I will want to use the # associated files. This should be transparent, regardless of where # the permanent storage is located. @@ -22,6 +30,16 @@ class StagingPathSerialization: # pwd. # 4. I'm working with files from two different permanent storages. I # need to be able to load from both in the same Python process. + # 5. I have generated data in one backend, and I tranferred it to + # another backend. It needs to be readable from the other backend. + # (Use case: data is in long-term cloud storage that requires + # credentials, but I want to share some part of that data with + # someone else by transferring it to a disk.) + # 6. I am interfacing with a package that adds serialization types to + # the gufe JSON_HANDLER via an external JSONCodec. Maybe, in the + # worst case, the external codec gets added *after* I've created my + # serialization object. I need to be able to serialize those custom + # types. # # Outputs from a protocol may contain a :class:`.StagingPath`. Note that # a :class:`.StagingPath` is inherently not a complete description of @@ -43,14 +61,16 @@ class StagingPathSerialization: # directory. However, the reference to the file can exist in the results # object without downloading the file. # - # User stories 3 and 4 are handled by this + # User stories 3--6 are handled by this # :class:`.StagingPathSerialization` class. Story 3 is handled by # allowing the appropriate context (in the form of a # :class:`.StorageManager`) to be injected into the deserialization # process. Story 4 can be handled by using more than one # :class:`.StagingPathSerialization` context (associated with different - # :class:`.StorageManager` objects. - + # :class:`.StorageManager` objects. Story 5 is handled by injecting + # the appropriate context (and, in principle, is a variant of story 3.) + # Story 6 is handled by doing a just-in-time generation of the + # JSONSerializerDeserializer that we use for this class. def __init__(self, manager): self.manager = manager self.codec = JSONCodec( @@ -61,35 +81,69 @@ def __init__(self, manager): self.refresh_handler() def refresh_handler(self): + """Ensure that the current handler includes all registered codecs""" codecs = JSON_HANDLER.codecs + [self.codec] self.json_handler = JSONSerializerDeserializer(codecs) @property def encoder(self): + """ + JSONEncoder class to use when serializing a :class:`.StagingPath` + """ + self.refresh_handler() return self.json_handler.encoder @property def decoder(self): + """ + JSONdecoder class to use when deserializing a :class:`.StagingPath` + """ + self.refresh_handler() return self.json_handler.decoder - def to_dict(self, path): + def to_dict(self, path: StagingPath): + """ + Dict representation of a StagingPath, abstracting specific context. + + This provides a JSON-serializable representation of a StagingPath + where the specific context of the StagingPath (the specific storage + backend where it is located) is replaced by a generic representation + of 'scratch', 'shared', or 'permanent', allowing a new specific + context to be injected on deserialization. + """ # scratch, shared, permanent may form nested with progressively # smaller contexts, so the last of those it is in is where it should # be labelled. TODO: opportunity for performance improvement if # needed loc = None if path.label in self.manager.scratch_root.iterdir(): + # TODO: does this happen? we should only trigger this function + # on a StagingPath, and anything in scratch will only be + # pathlib.Path, right? loc = "scratch" if path.label in self.manager.shared_root.iter_contents(): loc = "shared" if path.label in self.manager.permanent_root.iter_contents(): loc = "permanent" + if loc is None: + raise RuntimeError( + f"Unable to serialize {path}: it does not appear to be " + "associated with storage managed by the context manager " + f"{self.manager}." + ) + return { ':container:': loc, ':label:': path.label, } - def from_dict(self, dct): + def from_dict(self, dct: dict) -> StagingPath: + """Recreate a StagingPath from its dict represnetation. + + This undoes the process from :method:`.to_dict`. It injects the + storage context in ``self.storage_manager`` into the deserialized + :class:`.StagingPath` instance. + """ staging = getattr(self.manager, f"{dct[':container:']}_staging") return staging / dct[':label:'] diff --git a/gufe/tests/storage/test_stagingserialization.py b/gufe/tests/storage/test_stagingserialization.py index bd0181df8..d41256beb 100644 --- a/gufe/tests/storage/test_stagingserialization.py +++ b/gufe/tests/storage/test_stagingserialization.py @@ -298,3 +298,17 @@ def test_change_storage_backend(self, tmp_path): contents = f.read() assert contents == "will store on cloud" + + def test_requires_new_codec(self, tmp_path): + # USER STORY: I am interfacing with a package that adds + # serialization types to the gufe JSON_HANDLER via an external + # JSONCodec. Maybe, in the worst case, the external codec gets added + # *after* I've created my serialization object. I need to be able to + # serialize those custom types. (NOTE: A better solution here is to + # have JSONCodecs also include some codec identifier in their + # `:is_custom:` field. That would allow us to dynamically add any + # missing codec, and only to do so when deserialization is needed. + # This is a change to the custom JSON stuff which hasn't been made + # yet. This might also allow faster deserialization by having + # :is_custom: map to something that can be used in a dispatch table.) + # TODO: implement test based on this user story From 4b2510cb7cf3ed4010192e93ac28e03cc5453ef6 Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Fri, 15 Dec 2023 15:50:47 -0600 Subject: [PATCH 7/9] minor docs update --- gufe/storage/stagingserialization.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/gufe/storage/stagingserialization.py b/gufe/storage/stagingserialization.py index 39ead2b9f..c4db2df72 100644 --- a/gufe/storage/stagingserialization.py +++ b/gufe/storage/stagingserialization.py @@ -6,6 +6,11 @@ class StagingPathSerialization: """Class for managing serialization of a :class:`.StagingPath`. + This class is only created internally. Developers of executors will + interface with this indirectly through the :class:`.StorageManager`; the + expectation is that the only thing they will need is access to the + ``encoder`` and ``decoder`` properties. + Serialization of a :class:`.StagingPath` needs to strip the specific storage context (path to external files storage) because we should able to change that out-of-process (e.g., move the directory containing @@ -139,7 +144,7 @@ def to_dict(self, path: StagingPath): } def from_dict(self, dct: dict) -> StagingPath: - """Recreate a StagingPath from its dict represnetation. + """Recreate a StagingPath from its dict representation. This undoes the process from :method:`.to_dict`. It injects the storage context in ``self.storage_manager`` into the deserialized From 3390bb0273a5efaa3b8c80f4dfff606816e32198 Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Thu, 21 Dec 2023 17:44:27 -0500 Subject: [PATCH 8/9] test new codec added in staging serialization Adds a user story test where a type supported by a custom JSON codec is added to the result dict, and that codec isn't registered as of serialization object creation. --- .../storage/test_stagingserialization.py | 54 ++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/gufe/tests/storage/test_stagingserialization.py b/gufe/tests/storage/test_stagingserialization.py index d41256beb..55c61ff0b 100644 --- a/gufe/tests/storage/test_stagingserialization.py +++ b/gufe/tests/storage/test_stagingserialization.py @@ -5,6 +5,9 @@ from gufe.storage.storagemanager import StorageManager from gufe.storage.externalresource import MemoryStorage, FileStorage +from gufe.tokenization import GufeTokenizable, from_dict +from gufe.custom_json import JSONCodec + import json import pathlib import shutil @@ -53,6 +56,15 @@ def serialization_handler(storage_manager): return StagingPathSerialization(storage_manager) +class NewType: + # used in new codec test (putting as a nested class required fancier + # serialization approaches, easier to put it at module level) + # class where any instance is equivalent (carries no data) + def __eq__(self, other): + return isinstance(other, self.__class__) + + + class TestStagingPathSerialization: @pytest.mark.parametrize('pathtype', ['scratch', 'shared', 'permanent']) def test_round_trip(self, serialization_handler, pathtype, request): @@ -311,4 +323,44 @@ def test_requires_new_codec(self, tmp_path): # This is a change to the custom JSON stuff which hasn't been made # yet. This might also allow faster deserialization by having # :is_custom: map to something that can be used in a dispatch table.) - # TODO: implement test based on this user story + manager = StorageManager( + scratch_root=tmp_path / "working", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + serialization = StagingPathSerialization(manager) + + # add a new custom codec for serialization + new_type_codec = JSONCodec( + cls=NewType, + to_dict=lambda obj: {}, + from_dict=lambda dct: NewType(), + ) + + # Create a dict to serialize; this represents the output dict that + # might come from a unit_result object. NB: including the file stuff + # here is actually extraneous (unless implementation changes + # significantly). + with manager.running_dag("dag") as dag_ctx: + with manager.running_unit("dag", "unit", attempt=0) as context: + file = context.permanent / "dag/unit/file.txt" + with open(file, mode='w') as f: + f.write("contents") + + output_dict = { + 'new_type_result': NewType(), + 'file_result': file, + } + + # before codec registration, error as not JSON serializable + with pytest.raises(TypeError, match="not JSON serializable"): + _ = json.dumps(output_dict, cls=serialization.encoder) + + # register codec and it works + from gufe.tokenization import JSON_HANDLER + JSON_HANDLER.add_codec(new_type_codec) + dumped = json.dumps(output_dict, cls=serialization.encoder) + + reloaded = json.loads(dumped, cls=serialization.decoder) + + assert reloaded == output_dict From b22445bb3e043d13849a9eba0c3ae32e25799093 Mon Sep 17 00:00:00 2001 From: "David W.H. Swenson" Date: Fri, 22 Dec 2023 11:57:49 -0500 Subject: [PATCH 9/9] Attach StorageSerialization to StorageManager --- gufe/storage/storagemanager.py | 11 +++++++ .../storage/test_stagingserialization.py | 30 +++++++------------ 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/gufe/storage/storagemanager.py b/gufe/storage/storagemanager.py index 08f83e6ab..ef95286ab 100644 --- a/gufe/storage/storagemanager.py +++ b/gufe/storage/storagemanager.py @@ -10,6 +10,7 @@ from .externalresource import ExternalStorage, FileStorage from .stagingregistry import SharedStaging, PermanentStaging +from .stagingserialization import StagingPathSerialization from .stagingregistry import StagingPath # typing from gufe.protocols.protocolunit import Context @@ -57,6 +58,16 @@ def __init__( keep_empty_dirs=keep_empty_dirs, ) + self._serialization = StagingPathSerialization(self) + + @property + def json_encoder(self): + return self._serialization.encoder + + @property + def json_decoder(self): + return self._serialization.decoder + def make_label(self, dag_label, unit_label, attempt, **kwargs): """ diff --git a/gufe/tests/storage/test_stagingserialization.py b/gufe/tests/storage/test_stagingserialization.py index 55c61ff0b..f36de19ab 100644 --- a/gufe/tests/storage/test_stagingserialization.py +++ b/gufe/tests/storage/test_stagingserialization.py @@ -157,7 +157,6 @@ def test_permanent_storage_moved(self, move, tmp_path, monkeypatch): shared_root=FileStorage("old/shared"), permanent_root=FileStorage("old/permanent") ) - old_handler = StagingPathSerialization(old_manager) old_path = old_manager.permanent_staging / "dag/unit/result.txt" with open(old_path, mode='w') as f: f.write("contents here") @@ -167,7 +166,7 @@ def test_permanent_storage_moved(self, move, tmp_path, monkeypatch): assert perm_p.exists() # serialize the path object - json_str = json.dumps(old_path, cls=old_handler.encoder) + json_str = json.dumps(old_path, cls=old_manager.json_encoder) # move the storage subdirectory; create a new, associated storage # manager/serialization handler @@ -192,10 +191,8 @@ def test_permanent_storage_moved(self, move, tmp_path, monkeypatch): raise RuntimeWarning(f"Bad test parameter '{move}': should be " "'relative' or 'absolute'") - new_handler = StagingPathSerialization(new_manager) - # deserialize the path using the new serialization handler - reloaded = json.loads(json_str, cls=new_handler.decoder) + reloaded = json.loads(json_str, cls=new_manager.json_decoder) # ensure that the path exists and that the data can be reloaded assert isinstance(reloaded, StagingPath) @@ -222,8 +219,6 @@ def test_two_different_permanent_storages(self, tmp_path): shared_root=MemoryStorage(), permanent_root=MemoryStorage(), ) - handler1 = StagingPathSerialization(manager1) - handler2 = StagingPathSerialization(manager2) path1 = manager1.permanent_staging / "file1.txt" with open(path1, mode='w') as f: @@ -236,8 +231,8 @@ def test_two_different_permanent_storages(self, tmp_path): manager2.permanent_staging.transfer_staging_to_external() # serialize the paths - json_str1 = json.dumps(path1, cls=handler1.encoder) - json_str2 = json.dumps(path2, cls=handler2.encoder) + json_str1 = json.dumps(path1, cls=manager1.json_encoder) + json_str2 = json.dumps(path2, cls=manager2.json_encoder) # delete all staged files assert path1.as_path().exists() @@ -249,8 +244,8 @@ def test_two_different_permanent_storages(self, tmp_path): assert not path2.as_path().exists() # reload and check contents of both permanent files - reloaded1 = json.loads(json_str1, cls=handler1.decoder) - reloaded2 = json.loads(json_str2, cls=handler2.decoder) + reloaded1 = json.loads(json_str1, cls=manager1.json_decoder) + reloaded2 = json.loads(json_str2, cls=manager2.json_decoder) assert isinstance(reloaded1, StagingPath) assert reloaded1.label == path1.label @@ -275,14 +270,12 @@ def test_change_storage_backend(self, tmp_path): shared_root=MemoryStorage(), permanent_root=MemoryStorage(), ) - cloud_serialization = StagingPathSerialization(cloud_manager) local_manager = StorageManager( scratch_root=tmp_path / "local_scratch", shared_root=MemoryStorage(), permanent_root=FileStorage(tmp_path / "local_perm"), ) - local_serialization = StagingPathSerialization(local_manager) # TODO: maybe add some more safety asserts in here? that each step # goes as expected, to better diagnose potential failures? @@ -294,7 +287,7 @@ def test_change_storage_backend(self, tmp_path): f.write("will store on cloud") # serialize the cloud_path (assume it is saved somewhere) - serialized = json.dumps(cloud_path, cls=cloud_serialization.encoder) + serialized = json.dumps(cloud_path, cls=cloud_manager.json_encoder) # transfer from cloud storage to the local_manager for label in cloud_manager.permanent_root.iter_contents("dag/unit"): @@ -302,7 +295,7 @@ def test_change_storage_backend(self, tmp_path): local_manager.permanent_root.store_bytes(label, f.read()) # ensure that we can reload objects from the local manager - local_path = json.loads(serialized, cls=local_serialization.decoder) + local_path = json.loads(serialized, cls=local_manager.json_decoder) assert local_path != cloud_path @@ -328,7 +321,6 @@ def test_requires_new_codec(self, tmp_path): shared_root=MemoryStorage(), permanent_root=MemoryStorage(), ) - serialization = StagingPathSerialization(manager) # add a new custom codec for serialization new_type_codec = JSONCodec( @@ -354,13 +346,13 @@ def test_requires_new_codec(self, tmp_path): # before codec registration, error as not JSON serializable with pytest.raises(TypeError, match="not JSON serializable"): - _ = json.dumps(output_dict, cls=serialization.encoder) + _ = json.dumps(output_dict, cls=manager.json_encoder) # register codec and it works from gufe.tokenization import JSON_HANDLER JSON_HANDLER.add_codec(new_type_codec) - dumped = json.dumps(output_dict, cls=serialization.encoder) + dumped = json.dumps(output_dict, cls=manager.json_encoder) - reloaded = json.loads(dumped, cls=serialization.decoder) + reloaded = json.loads(dumped, cls=manager.json_decoder) assert reloaded == output_dict