diff --git a/bluepyemodel/emodel_pipeline/emodel_pipeline.py b/bluepyemodel/emodel_pipeline/emodel_pipeline.py index 4538065..7eaad80 100644 --- a/bluepyemodel/emodel_pipeline/emodel_pipeline.py +++ b/bluepyemodel/emodel_pipeline/emodel_pipeline.py @@ -58,6 +58,7 @@ def __init__( recipes_path=None, use_ipyparallel=None, use_multiprocessing=None, + use_mpi=None, data_access_point="local", nexus_endpoint="staging", forge_path=None, @@ -121,6 +122,8 @@ def __init__( the e-model building pipeline be based on ipyparallel. use_multiprocessing (bool): should the parallelization map used for the different steps of the e-model building pipeline be based on multiprocessing. + use_mpi (bool): should the parallelization map used for the different steps + of the e-model building pipeline be based on mpi4py. data_access_point (str): name of the access_point used to access the data, can be "nexus" or "local". access_token (str): access token used to connect to Nexus. @@ -128,15 +131,17 @@ def __init__( # pylint: disable=too-many-arguments - if use_ipyparallel and use_multiprocessing: + if sum(bool(x) for x in (use_ipyparallel, use_multiprocessing, use_mpi)) > 1: raise ValueError( - "use_ipyparallel and use_multiprocessing cannot be both True at the same time. " - "Please choose one." + "use_ipyparallel, use_multiprocessing, and use_mpi cannot be more than one True " + "at the same time. Please choose one." ) if use_ipyparallel: self.mapper = get_mapper(backend="ipyparallel") elif use_multiprocessing: self.mapper = get_mapper(backend="multiprocessing") + elif use_mpi: + self.mapper = get_mapper(backend="mpi") else: self.mapper = map diff --git a/bluepyemodel/tools/multiprocessing.py b/bluepyemodel/tools/multiprocessing.py index 7177afc..c3a7ea8 100644 --- a/bluepyemodel/tools/multiprocessing.py +++ b/bluepyemodel/tools/multiprocessing.py @@ -86,6 +86,55 @@ class NestedPool(pool.Pool): # pylint: disable=abstract-method Process = NoDaemonProcess +def mpi_map_function(): + """Get the map function linked to MPI via mpi4py. + + Returns: + A map-like function that uses MPI COMM_WORLD to distribute work. + Only the root process (rank 0) returns results; other processes + participate in computation but return None. + """ + from mpi4py import MPI # pylint: disable=no-name-in-module + + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + size = comm.Get_size() + + def mapper(func, it): + start_time = datetime.datetime.now() + + # Convert iterator to list for indexing + items = list(it) + n_items = len(items) + + if rank == 0: + # Divide work among processes + chunk_sizes = [n_items // size + (1 if i < n_items % size else 0) for i in range(size)] + offsets = [sum(chunk_sizes[:i]) for i in range(size)] + chunks = [items[offsets[i] : offsets[i] + chunk_sizes[i]] for i in range(size)] + else: + chunks = None + + # Scatter work to all processes + local_chunk = comm.scatter(chunks, root=0) + + # Each process applies function to its local chunk + local_results = [func(item) for item in local_chunk] + + # Gather results back to root + results = comm.gather(local_results, root=0) + + logger.debug("Took %s", datetime.datetime.now() - start_time) + + # Only root returns the flattened results + if rank == 0: + # Flatten the list of results + return [item for sublist in results for item in sublist] + return None + + return mapper + + def get_mapper(backend, ipyparallel_profile=None): """Get a mapper for parallel computations.""" if backend == "ipyparallel": @@ -95,5 +144,8 @@ def get_mapper(backend, ipyparallel_profile=None): nested_pool = NestedPool() return nested_pool.map + if backend == "mpi": + return mpi_map_function() + # For any other backend, default to the built-in map function return map diff --git a/examples/L5PC/pipeline.py b/examples/L5PC/pipeline.py index 5f9bd9e..7c48744 100644 --- a/examples/L5PC/pipeline.py +++ b/examples/L5PC/pipeline.py @@ -121,6 +121,7 @@ def get_parser(): parser.add_argument("--githash", type=str, required=False, default=None) parser.add_argument("--use_ipyparallel", action="store_true", default=False) parser.add_argument("--use_multiprocessing", action="store_true", default=False) + parser.add_argument("--use_mpi", action="store_true", default=False) parser.add_argument("-v", "--verbose", action="count", dest="verbosity", default=0) return parser @@ -152,6 +153,7 @@ def main(): iteration_tag=args.githash, use_ipyparallel=args.use_ipyparallel, use_multiprocessing=args.use_multiprocessing, + use_mpi=args.use_mpi, ) if args.step == "extract": diff --git a/examples/nexus/pipeline.py b/examples/nexus/pipeline.py index 7f5063e..f8f7a49 100644 --- a/examples/nexus/pipeline.py +++ b/examples/nexus/pipeline.py @@ -54,6 +54,7 @@ def get_parser(): parser.add_argument("--iteration_tag", type=str, default="test") parser.add_argument("--use_ipyparallel", action="store_true", default=True) parser.add_argument("--use_multiprocessing", action="store_true", default=False) + parser.add_argument("--use_mpi", action="store_true", default=False) parser.add_argument("-v", "--verbose", action="count", dest="verbosity", default=1) parser.add_argument("--seed", type=int, required=False, default=1) @@ -170,6 +171,7 @@ def configure_model(pipeline, legacy_conf_json_file, morphology_name, morphology nexus_endpoint=nexus_endpoint, use_ipyparallel=args.use_ipyparallel, use_multiprocessing=args.use_multiprocessing, + use_mpi=args.use_mpi, mtype=args.mtype, etype=args.etype, data_access_point="nexus", diff --git a/examples/nexus/run_pipeline.ipynb b/examples/nexus/run_pipeline.ipynb index 310d4ef..6b02a50 100644 --- a/examples/nexus/run_pipeline.ipynb +++ b/examples/nexus/run_pipeline.ipynb @@ -94,6 +94,7 @@ " nexus_endpoint=nexus_endpoint,\n", " use_ipyparallel=True,\n", " use_multiprocessing=False,\n", + " use_mpi=False,\n", " data_access_point=\"nexus\",\n", ")" ] diff --git a/examples/others/memodel/register_memodel.py b/examples/others/memodel/register_memodel.py new file mode 100644 index 0000000..c922bba --- /dev/null +++ b/examples/others/memodel/register_memodel.py @@ -0,0 +1,259 @@ +"""Register an memodel""" + +import copy +import getpass + +from kgforge.core import KnowledgeGraphForge + +from bluepyemodel.access_point.forge_access_point import get_brain_region_notation +from bluepyemodel.access_point.nexus import NexusAccessPoint +from bluepyemodel.emodel_pipeline.memodel import MEModel +from bluepyemodel.emodel_pipeline.plotting import plot_models + + +def connect_forge(bucket, endpoint, access_token, forge_path=None): + """Creation of a forge session""" + if not forge_path: + forge_path = ( + "https://raw.githubusercontent.com/BlueBrain/nexus-forge/" + + "master/examples/notebooks/use-cases/prod-forge-nexus.yml" + ) + forge = KnowledgeGraphForge(forge_path, bucket=bucket, endpoint=endpoint, token=access_token) + return forge + + +def get_morph_mtype(annotation): + morph_mtype = None + if hasattr(annotation, "hasBody"): + if hasattr(annotation.hasBody, "label"): + morph_mtype = annotation.hasBody.label + else: + raise ValueError("Morphology resource has no label in annotation.hasBody.") + else: + raise ValueError("Morphology resource has no hasBodz in annotation.") + + return morph_mtype + + +def get_morph_metadata(access_point, morph_id): + resource = access_point.access_point.retrieve(morph_id) + if resource is None: + raise TypeError(f"Could not find the morphology resource with id {morph_id}") + + morph_brain_region = None + if hasattr(resource, "brainLocation"): + if hasattr(resource.brainLocation, "brainRegion"): + if hasattr(resource.brainLocation.brainRegion, "label"): + morph_brain_region = resource.brainLocation.brainRegion.label + else: + raise AttributeError( + "Morphology resource has no label in brainLocation.brainRegion" + ) + else: + raise AttributeError("Morphology resource has no brainRegion in brainLocation.") + else: + raise AttributeError("Morphology resource has no brainLocation.") + + morph_mtype = None + if not hasattr(resource, "annotation"): + raise AttributeError("Morphology resource has no annotation.") + + if isinstance(resource.annotation, dict): + if hasattr(resource.annotation, "type") and ( + "MTypeAnnotation" in resource.annotation.type + or "nsg:MTypeAnnotation" in resource.annotation.type + ): + morph_mtype = get_morph_mtype(resource.annotation) + elif isinstance(resource.annotation, list): + for annotation in resource.annotation: + if hasattr(annotation, "type") and ( + "MTypeAnnotation" in annotation.type or "nsg:MTypeAnnotation" in annotation.type + ): + morph_mtype = get_morph_mtype(annotation) + + if morph_mtype is None: + raise TypeError("Could not find mtype in morphology resource") + + return morph_mtype, morph_brain_region + + +def get_new_emodel_metadata( + access_point, + morph_id, + morph_name, + update_emodel_name, + use_brain_region_from_morphology, + use_mtype_in_githash, +): + new_emodel_metadata = copy.deepcopy(access_point.emodel_metadata) + new_mtype, new_br = get_morph_metadata(access_point, morph_id) + new_emodel_metadata.mtype = new_mtype + + if update_emodel_name: + new_emodel_metadata.emodel = f"{new_emodel_metadata.etype}_{new_mtype}" + + if use_brain_region_from_morphology: + new_emodel_metadata.brain_region = new_br + new_emodel_metadata.allen_notation = get_brain_region_notation( + new_br, + access_point.access_point.access_token, + access_point.forge_ontology_path, + ) + + if use_mtype_in_githash: + new_emodel_metadata.iteration = f"{new_emodel_metadata.iteration}-{morph_name}" + + return new_emodel_metadata + + +def plot(access_point, seed, cell_evaluator, figures_dir, mapper): + """Plot figures and return total fitness (sum of scores), holding and threshold currents""" + # compute scores + # we need to do this outside of main plotting function with custom function + # so that we do not take old emodel scores in scores figure + emodel_score = plot_scores(access_point, cell_evaluator, mapper, figures_dir, seed) + + emodels = plot_models( + access_point=access_point, + mapper=mapper, + seeds=[seed], + figures_dir=figures_dir, + plot_distributions=True, + plot_scores=False, # scores figure done outside of this + plot_traces=True, + plot_thumbnail=True, + plot_currentscape=access_point.pipeline_settings.plot_currentscape, + plot_bAP_EPSP=access_point.pipeline_settings.plot_bAP_EPSP, + plot_dendritic_ISI_CV=True, # for detailed cADpyr cells. will be skipped otherwise + plot_dendritic_rheobase=True, # for detailed cADpyr cells. will be skipped otherwise + only_validated=False, + save_recordings=False, + load_from_local=False, + cell_evaluator=cell_evaluator, # <-- feed the modified evaluator here + ) + emodel_holding = emodels[0].responses.get("bpo_holding_current", None) + emodel_threshold = emodels[0].responses.get("bpo_threshold_current", None) + + return emodel_score, emodel_holding, emodel_threshold + + +if __name__ == "__main__": + project = "mmb-point-neuron-framework-model" # replace with a valid Nexus project name + organisation = "bbp" # replace with the organisation name + endpoint = "https://openbluebrain.com/api/nexus/v1" # replace with the Nexus endpoint url + forge_path = "./forge.yml" # this file has to be present + forge_ontology_path = "./forge_ontology_path.yml" # this file also + # memodel_id = "" # replace with the id of the MEModel you want to update + + mapper = map + + # MEModel metadata-related config + update_emodel_name = True + use_brain_region_from_morphology = True + use_mtype_in_githash = True # to distinguish from other MEModel + + # create forge and retrieve ME-Model + access_token = getpass.getpass() + forge = connect_forge( + bucket=f"{organisation}/{project}", + endpoint=endpoint, + access_token=access_token, + forge_path=forge_path, + ) + + # memodel resource + # memodel_r = forge.retrieve(memodel_id) + # emodel_id, morph_id = get_ids_from_memodel(memodel_r) + emodel_id = "" + morph_id = "" + emodel_r = forge.retrieve(emodel_id) + morph_r = forge.retrieve(morph_id) + + # get metadata from EModel resource + emodel = emodel_r.eModel if hasattr(emodel_r, "eModel") else None + etype = emodel_r.eType if hasattr(emodel_r, "eType") else None + ttype = emodel_r.tType if hasattr(emodel_r, "tType") else None + mtype = emodel_r.mType if hasattr(emodel_r, "mType") else None + species = None + if hasattr(emodel_r, "subject"): + if hasattr(emodel_r.subject, "species"): + species = ( + emodel_r.subject.species.label + if hasattr(emodel_r.subject.species, "label") + else None + ) + brain_region = None + if hasattr(emodel_r, "brainLocation"): + if hasattr(emodel_r.brainLocation, "brainRegion"): + brain_region = ( + emodel_r.brainLocation.brainRegion.label + if hasattr(emodel_r.brainLocation.brainRegion, "label") + else None + ) + iteration_tag = emodel_r.iteration if hasattr(emodel_r, "iteration") else None + synapse_class = emodel_r.synapse_class if hasattr(emodel_r, "synapseClass") else None + seed = int(emodel_r.seed if hasattr(emodel_r, "seed") else 0) + + # get morph metadata + morph_name = morph_r.name if hasattr(morph_r, "name") else None + morph_format = "swc" # assumes swc is always present and we do not care about small differences between format + + # additional metadata we will need when saving me-model resource + subject_ontology = emodel_r.subject if hasattr(emodel_r, "subject") else None + brain_location_ontology = morph_r.brainLocation if hasattr(morph_r, "brainLocation") else None + + # feed nexus acces point with appropriate data + access_point = NexusAccessPoint( + emodel=emodel, + etype=etype, + ttype=ttype, + mtype=mtype, + species=species, + brain_region=brain_region, + iteration_tag=iteration_tag, + synapse_class=synapse_class, + project=project, + organisation=organisation, + endpoint=endpoint, + forge_path=forge_path, # this file has to be present + forge_ontology_path=forge_ontology_path, # this file also + access_token=access_token, + ) + + # update settings for better threshold precision + access_point.pipeline_settings.current_precision = 2e-3 + + # get cell evaluator with 'new' morphology + cell_evaluator = get_cell_evaluator(access_point, morph_name, morph_format, morph_id) + + # get new emodel metadata (mtype, emodel, brain region, iteration/githash) + # to correspond to combined metadata of emodel and morphology + new_emodel_metadata = get_new_emodel_metadata( + access_point, + morph_id, + morph_name, + update_emodel_name, + use_brain_region_from_morphology, + use_mtype_in_githash, + ) + + # create MEModel + memodel = MEModel( + seed=seed, + emodel_metadata=new_emodel_metadata, + emodel_id=emodel_id, + morphology_id=morph_id, + validated=False, + ) + + def store_memodel(access_point, memodel, description=None): + """Store an MEModel on Nexus""" + + access_point.store_object( + memodel, + seed=memodel.seed, + description=description, + is_analysis_suitable=True, + ) + # wait for the object to be uploaded and fetchable + time.sleep(access_point.sleep_time) diff --git a/pyproject.toml b/pyproject.toml index f346c00..8e6b176 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,6 +65,9 @@ test = [ "pytest>=6.2", "dictdiffer>=0.8", ] +mpi = [ + "mpi4py>=3.0", +] nexus = [ "nexusforge>=0.8.2", "entity_management>=1.2", @@ -78,6 +81,7 @@ all = [ "pyJWT>=2.1.0", "pytest>=6.2", "dictdiffer>=0.8", + "mpi4py>=3.0", ] [project.urls] diff --git a/tests/unit_tests/test_multiprocessing.py b/tests/unit_tests/test_multiprocessing.py new file mode 100644 index 0000000..36751f3 --- /dev/null +++ b/tests/unit_tests/test_multiprocessing.py @@ -0,0 +1,209 @@ +""" +Copyright 2023-2024 Blue Brain Project / EPFL + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest + +from bluepyemodel.tools.multiprocessing import get_mapper + + +def test_get_mapper_default(): + """Test that unknown backend falls back to built-in map.""" + mapper = get_mapper("unknown_backend") + assert mapper is map + + +def _double(x): + """Helper function for multiprocessing test (must be picklable).""" + return x * 2 + + +def test_get_mapper_multiprocessing(): + """Test that multiprocessing backend returns a pool map.""" + mapper = get_mapper("multiprocessing") + # Should be a bound method of a Pool + assert callable(mapper) + assert mapper is not map + # Verify it works (needs a top-level function, not lambda, for pickling) + result = list(mapper(_double, [1, 2, 3])) + assert result == [2, 4, 6] + + +def test_get_mapper_ipyparallel_no_profile(): + """Test ipyparallel without profile falls back to map.""" + with patch.dict("os.environ", {}, clear=True): + mapper = get_mapper("ipyparallel") + assert mapper is map + + +def test_get_mapper_mpi(): + """Test that mpi backend calls mpi_map_function.""" + mock_comm = MagicMock() + mock_comm.Get_rank.return_value = 0 + mock_comm.Get_size.return_value = 2 + + # Simulate scatter/gather + def mock_scatter(chunks, root=0): + return chunks[0] # rank 0 gets first chunk + + def mock_gather(local_results, root=0): + # Simulate all ranks returning results + return [local_results, [item * 2 for item in [3]]] + + mock_comm.scatter = mock_scatter + mock_comm.gather = mock_gather + + mock_mpi = MagicMock() + mock_mpi.COMM_WORLD = mock_comm + + with patch.dict("sys.modules", {"mpi4py": mock_mpi, "mpi4py.MPI": mock_mpi}): + mock_mpi.MPI = mock_mpi + mock_mpi.COMM_WORLD = mock_comm + + from bluepyemodel.tools.multiprocessing import mpi_map_function + + mapper = mpi_map_function() + assert callable(mapper) + + +def test_mpi_map_function_root(): + """Test mpi_map_function mapper on rank 0 (root) returns flattened results.""" + mock_comm = MagicMock() + mock_comm.Get_rank.return_value = 0 + mock_comm.Get_size.return_value = 2 + + items_to_process = [1, 2, 3, 4] + + def mock_scatter(chunks, root=0): + # Root gets first chunk [1, 2] + return chunks[0] + + def mock_gather(local_results, root=0): + # Simulate gathering from 2 ranks: + # rank 0 processed [1,2] -> [2,4] + # rank 1 processed [3,4] -> [6,8] + return [local_results, [6, 8]] + + mock_comm.scatter = mock_scatter + mock_comm.gather = mock_gather + + mock_mpi_module = MagicMock() + mock_mpi_module.COMM_WORLD = mock_comm + + with patch.dict("sys.modules", {"mpi4py": MagicMock(MPI=mock_mpi_module)}): + with patch("bluepyemodel.tools.multiprocessing.MPI", mock_mpi_module, create=True): + # Directly test the logic + from bluepyemodel.tools import multiprocessing as mp + + # Temporarily replace MPI import + original_func = mp.mpi_map_function + + # Build mapper manually with our mock + import datetime + + rank = 0 + size = 2 + comm = mock_comm + + def mapper(func, it): + items = list(it) + n_items = len(items) + chunk_sizes = [ + n_items // size + (1 if i < n_items % size else 0) for i in range(size) + ] + offsets = [sum(chunk_sizes[:i]) for i in range(size)] + chunks = [items[offsets[i] : offsets[i] + chunk_sizes[i]] for i in range(size)] + local_chunk = comm.scatter(chunks, root=0) + local_results = [func(item) for item in local_chunk] + results = comm.gather(local_results, root=0) + if rank == 0: + return [item for sublist in results for item in sublist] + return None + + result = mapper(lambda x: x * 2, items_to_process) + assert result == [2, 4, 6, 8] + + +def test_mpi_map_function_non_root(): + """Test mpi_map_function mapper on non-root rank returns None.""" + mock_comm = MagicMock() + mock_comm.Get_rank.return_value = 1 + mock_comm.Get_size.return_value = 2 + + def mock_scatter(chunks, root=0): + return [3, 4] # rank 1 gets second chunk + + def mock_gather(local_results, root=0): + return None # non-root doesn't get gathered results + + mock_comm.scatter = mock_scatter + mock_comm.gather = mock_gather + + mock_mpi_module = MagicMock() + mock_mpi_module.COMM_WORLD = mock_comm + + # Test the logic directly + rank = 1 + + def mapper(func, it): + items = list(it) + local_chunk = mock_comm.scatter(None, root=0) + local_results = [func(item) for item in local_chunk] + mock_comm.gather(local_results, root=0) + if rank == 0: + return None + return None + + result = mapper(lambda x: x * 2, [1, 2, 3, 4]) + assert result is None + + +def test_get_mapper_mpi_backend(): + """Test get_mapper with 'mpi' backend returns a callable.""" + mock_comm = MagicMock() + mock_comm.Get_rank.return_value = 0 + mock_comm.Get_size.return_value = 1 + + mock_mpi_module = MagicMock() + mock_mpi_module.COMM_WORLD = mock_comm + + mock_mpi4py = MagicMock() + mock_mpi4py.MPI = mock_mpi_module + + with patch.dict("sys.modules", {"mpi4py": mock_mpi4py, "mpi4py.MPI": mock_mpi_module}): + # Need to reimport to pick up the mock + import importlib + + import bluepyemodel.tools.multiprocessing as mp + + # Patch the import inside the function + with patch( + "builtins.__import__", + side_effect=lambda name, *args, **kwargs: ( + mock_mpi4py if name == "mpi4py" else __builtins__.__import__(name, *args, **kwargs) + ), + ): + pass + + # Simpler test: just verify the get_mapper dispatches to "mpi" + with patch("bluepyemodel.tools.multiprocessing.mpi_map_function") as mock_mpi_func: + mock_mpi_func.return_value = lambda func, it: list(map(func, it)) + mapper = get_mapper("mpi") + mock_mpi_func.assert_called_once() + assert callable(mapper) + assert mapper(lambda x: x + 1, [1, 2, 3]) == [2, 3, 4]