Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions bluepyemodel/emodel_pipeline/emodel_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(
recipes_path=None,
use_ipyparallel=None,
use_multiprocessing=None,
use_mpi=None,
data_access_point="local",
nexus_endpoint="staging",
forge_path=None,
Expand Down Expand Up @@ -121,22 +122,26 @@ def __init__(
the e-model building pipeline be based on ipyparallel.
use_multiprocessing (bool): should the parallelization map used for the different steps
of the e-model building pipeline be based on multiprocessing.
use_mpi (bool): should the parallelization map used for the different steps
of the e-model building pipeline be based on mpi4py.
data_access_point (str): name of the access_point used to access the data,
can be "nexus" or "local".
access_token (str): access token used to connect to Nexus.
"""

# pylint: disable=too-many-arguments

if use_ipyparallel and use_multiprocessing:
if sum(bool(x) for x in (use_ipyparallel, use_multiprocessing, use_mpi)) > 1:
raise ValueError(
"use_ipyparallel and use_multiprocessing cannot be both True at the same time. "
"Please choose one."
"use_ipyparallel, use_multiprocessing, and use_mpi cannot be more than one True "
"at the same time. Please choose one."
)
if use_ipyparallel:
self.mapper = get_mapper(backend="ipyparallel")
elif use_multiprocessing:
self.mapper = get_mapper(backend="multiprocessing")
elif use_mpi:
self.mapper = get_mapper(backend="mpi")
else:
self.mapper = map

Expand Down
52 changes: 52 additions & 0 deletions bluepyemodel/tools/multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,55 @@ class NestedPool(pool.Pool): # pylint: disable=abstract-method
Process = NoDaemonProcess


def mpi_map_function():
"""Get the map function linked to MPI via mpi4py.

Returns:
A map-like function that uses MPI COMM_WORLD to distribute work.
Only the root process (rank 0) returns results; other processes
participate in computation but return None.
"""
from mpi4py import MPI # pylint: disable=no-name-in-module

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

def mapper(func, it):
start_time = datetime.datetime.now()

# Convert iterator to list for indexing
items = list(it)
n_items = len(items)

if rank == 0:
# Divide work among processes
chunk_sizes = [n_items // size + (1 if i < n_items % size else 0) for i in range(size)]
offsets = [sum(chunk_sizes[:i]) for i in range(size)]
chunks = [items[offsets[i] : offsets[i] + chunk_sizes[i]] for i in range(size)]
else:
chunks = None

# Scatter work to all processes
local_chunk = comm.scatter(chunks, root=0)

# Each process applies function to its local chunk
local_results = [func(item) for item in local_chunk]

# Gather results back to root
results = comm.gather(local_results, root=0)

logger.debug("Took %s", datetime.datetime.now() - start_time)

# Only root returns the flattened results
if rank == 0:
# Flatten the list of results
return [item for sublist in results for item in sublist]
return None

return mapper


def get_mapper(backend, ipyparallel_profile=None):
"""Get a mapper for parallel computations."""
if backend == "ipyparallel":
Expand All @@ -95,5 +144,8 @@ def get_mapper(backend, ipyparallel_profile=None):
nested_pool = NestedPool()
return nested_pool.map

if backend == "mpi":
return mpi_map_function()

# For any other backend, default to the built-in map function
return map
2 changes: 2 additions & 0 deletions examples/L5PC/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def get_parser():
parser.add_argument("--githash", type=str, required=False, default=None)
parser.add_argument("--use_ipyparallel", action="store_true", default=False)
parser.add_argument("--use_multiprocessing", action="store_true", default=False)
parser.add_argument("--use_mpi", action="store_true", default=False)
parser.add_argument("-v", "--verbose", action="count", dest="verbosity", default=0)

return parser
Expand Down Expand Up @@ -152,6 +153,7 @@ def main():
iteration_tag=args.githash,
use_ipyparallel=args.use_ipyparallel,
use_multiprocessing=args.use_multiprocessing,
use_mpi=args.use_mpi,
)

if args.step == "extract":
Expand Down
2 changes: 2 additions & 0 deletions examples/nexus/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def get_parser():
parser.add_argument("--iteration_tag", type=str, default="test")
parser.add_argument("--use_ipyparallel", action="store_true", default=True)
parser.add_argument("--use_multiprocessing", action="store_true", default=False)
parser.add_argument("--use_mpi", action="store_true", default=False)
parser.add_argument("-v", "--verbose", action="count", dest="verbosity", default=1)
parser.add_argument("--seed", type=int, required=False, default=1)

Expand Down Expand Up @@ -170,6 +171,7 @@ def configure_model(pipeline, legacy_conf_json_file, morphology_name, morphology
nexus_endpoint=nexus_endpoint,
use_ipyparallel=args.use_ipyparallel,
use_multiprocessing=args.use_multiprocessing,
use_mpi=args.use_mpi,
mtype=args.mtype,
etype=args.etype,
data_access_point="nexus",
Expand Down
1 change: 1 addition & 0 deletions examples/nexus/run_pipeline.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
" nexus_endpoint=nexus_endpoint,\n",
" use_ipyparallel=True,\n",
" use_multiprocessing=False,\n",
" use_mpi=False,\n",
" data_access_point=\"nexus\",\n",
")"
]
Expand Down
Loading