Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 47 additions & 16 deletions src/commander4/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,47 @@ def _should_send_compsep_result(compsep_my_band_id: str,

# TODO: Communication in this script should be switched from picked (lowercase) to buffered
# (uppercase) whereever possible (e.g. where arrays are communicated).
def _realize_and_distribute_sky(sky_model, experiment_data: DetGroupTOD,
band_comm) -> NDArray[np.floating]:
"""Realize the band sky model on the master and hand each rank only the pixels it needs.

The band master realizes the full-sky ``(npols, npix)`` detector map and keeps it (it doubles
as the full-sky map written to the chain). Each rank then receives, through the band's
``PixelDomain``, just the sky values at the pixels its own scans observe. In the default
(non-sparse) map mode the domain scatter is a plain broadcast, so every rank ends up with the
full-sky map exactly as before. Only the master needs the ``SkyModel`` object, so it is never
broadcast, avoiding a full-sky realization and a copy of the model on every other rank
(the dominant TOD-side memory cost at high nside).

Downstream, ``TODView`` detects whether its sky map is full-sky or domain-restricted by its
column count and indexes the pointing accordingly, so consumers are agnostic to the mode.
"""
is_band_master = band_comm.Get_rank() == 0
pols = experiment_data.pols
ncomp = {"I": 1, "QU": 2, "IQU": 3}[pols]
if is_band_master:
full_map = sky_model.get_sky_at_nu(experiment_data.nu, experiment_data.nside, pols,
fwhm=np.deg2rad(experiment_data.fwhm / 60.0))
else:
full_map = None
domain = experiment_data.pixel_domain
if domain is None:
# Domain not built yet (unexpected call order): fall back to the historical full-sky bcast.
return band_comm.bcast(full_map, root=0)
local_map = domain.scatter_from_full(full_map, ncomp, dtype=np.float32)
# Master keeps the full-sky map (chain output + its own pointing); workers keep only their
# local slice. In full mode scatter_from_full already broadcast the full map to everyone.
return full_map if (is_band_master and domain.mode == "sparse") else local_map


def receive_compsep(mpi_info: Bunch, experiment_data: DetGroupTOD, todproc_my_band_id: str,
senders: dict[str, int]) -> NDArray[np.floating]:
"""Receive the CompSep sky model, realize the local band map, and broadcast it within TOD.
"""Receive the CompSep sky model and distribute the realized band map within TOD.

The band master receives the ``SkyModel`` from the CompSep side and realizes it at the band
frequency/resolution; the result is distributed to all band ranks (see
``_realize_and_distribute_sky`` -- full-sky in non-sparse mode, per-rank local pixels in sparse
mode).

Args:
mpi_info (Bunch): The data structure containing all MPI relevant data.
Expand All @@ -63,40 +101,33 @@ def receive_compsep(mpi_info: Bunch, experiment_data: DetGroupTOD, todproc_my_ba
world rank of the sender task (on the CompSep side), keyed by execution-view band ID.

Returns:
NDArray: The sky model realized at the local band frequency and resolution, broadcast to
all processes on the band communicator.
NDArray: The realized sky map for this rank (full-sky on the master / in non-sparse mode,
otherwise restricted to the rank's locally-observed pixels).
"""
world_comm = mpi_info.world.comm
band_comm = mpi_info.band.comm
is_band_master = mpi_info.band.is_master
if is_band_master:
if mpi_info.band.is_master:
source_band_id = _get_compsep_sender_id_for_tod_band(todproc_my_band_id, senders)
sky_model = world_comm.recv(source=senders[source_band_id])
else:
sky_model = None
# Currently all TOD MPI ranks need a copy of the relevant detector map,
# which is very wasteful - a reason for doing OpenMP for mapmaking.
sky_model = band_comm.bcast(sky_model, root=0)
detector_map_arr = sky_model.get_sky_at_nu(experiment_data.nu, experiment_data.nside,
experiment_data.pols, fwhm=np.deg2rad(experiment_data.fwhm/60.0))
return detector_map_arr
return _realize_and_distribute_sky(sky_model, experiment_data, band_comm)


def get_local_initial_sky(mpi_info: Bunch, experiment_data: DetGroupTOD,
params: Bunch) -> NDArray[np.floating]:
"""Build the initial sky model locally and realize it at this TOD band.

Used when there are no CompSep ranks: the band master builds the SkyModel from the component
parameters and init files, broadcasts it within the band communicator, and every rank realizes
it at the band frequency/resolution. Mirrors `receive_compsep`, minus the cross-world receive.
parameters and init files and realizes it; the realized map is distributed to all band ranks
(full-sky in non-sparse mode, per-rank local pixels in sparse mode). Mirrors `receive_compsep`,
minus the cross-world receive.
"""
if mpi_info.band.is_master:
sky_model = build_initial_sky_model(params)
else:
sky_model = None
sky_model = mpi_info.band.comm.bcast(sky_model, root=0)
return sky_model.get_sky_at_nu(experiment_data.nu, experiment_data.nside, experiment_data.pols,
fwhm=np.deg2rad(experiment_data.fwhm/60.0))
return _realize_and_distribute_sky(sky_model, experiment_data, mpi_info.band.comm)


def send_tod(mpi_info: Bunch, tod_map_dict: dict[DetectorMap], todproc_my_band_id: str,
Expand Down
4 changes: 3 additions & 1 deletion src/commander4/data_models/detector_TOD.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,10 @@ def __init__(
# The Huffman decoder expects uint8 arrays; for bytes and HDF5-backed
# numpy.void payloads the internal storage is rewritten as a zero-copy
# uint8 view once at construction.
self.processing_mask_map = processing_mask_map
self.pointing = pointing
# The full-sky processing mask (npix bytes, shared across detectors) is only needed to build
# the packed per-sample mask below. It is deliberately not retained on the detector: holding
# it would keep a ~npix-byte array alive on every rank for the whole run for nothing.
processing_mask = processing_mask_map[self.get_pix()]
self._processing_mask = np.packbits(processing_mask)
self.det_response = det_response
Expand Down
19 changes: 19 additions & 0 deletions src/commander4/data_models/detector_group_TOD.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,25 @@ def __init__(self, scans: list[ScanTOD], experiment_name: str, band_name: str, n
self.scan_idx_stop: int = 0 # Index of my last scan.
self.nscans_allranks: int = 0 # Total number of scans across all ranks (on this band).
self.noise_model = noise_model
self._pixel_domain = None # Cached PixelDomain (built lazily; pointing is static).

def get_pixel_domain(self, scan_view, comm, sparse: bool):
"""Return the band's map-distribution :class:`PixelDomain`, building and caching it once.

The domain depends only on the (static) pointing, so it is built once per run and reused
across Gibbs iterations. ``sparse=False`` gives the historical full-sky-per-rank layout;
``sparse=True`` restricts each rank's map buffers to its locally-observed pixels.
"""
from commander4.utils.pixel_domain import PixelDomain
mode = "sparse" if sparse else "full"
if self._pixel_domain is None or self._pixel_domain.mode != mode:
self._pixel_domain = PixelDomain.from_view(scan_view, comm, mode, self.nside)
return self._pixel_domain

@property
def pixel_domain(self):
"""The cached map-distribution :class:`PixelDomain`, or ``None`` if not built yet."""
return self._pixel_domain

def iter_detector_scans(self, accept: NDArray | None = None):
"""Iterate over present detector-scans, yielding ``(iscan, det)`` pairs.
Expand Down
17 changes: 15 additions & 2 deletions src/commander4/data_models/tod_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,18 @@ def _require_compsep_output(self, compsep_output: NDArray | None) -> NDArray:
raise ValueError("A component-separation sky map must be provided for sky subtraction.")
return sky_model

def _sky_map_pix(self, sky_model: NDArray) -> NDArray[np.integer]:
"""Pixel indices into a sky map that may be full-sky or restricted to this rank's domain.

The realized sky model is full-sky ``(ncomp, npix)`` on the band master and in non-sparse
map mode, but only ``(ncomp, n_local)`` on workers in sparse mode (see
``communication._realize_and_distribute_sky``). Distinguish the two by the map's column
count and return either global HEALPix indices or compact local-buffer indices.
"""
if sky_model.shape[-1] == 12 * self.experiment_data.nside**2:
return self.pix
return self.experiment_data.pixel_domain.to_local(self.pix)

def _block_average(self, tod: NDArray[np.floating], factor: int) -> NDArray[np.floating]:
"""Average a full-rate array over the same contiguous blocks as the downsampled TOD."""
ntod_down = self._materialize_downsampled(factor).tod.shape[0]
Expand All @@ -327,13 +339,14 @@ def get_static_sky_tod(
"""
factor = self._downsample_factor_or_default(downsample_factor)
sky_model = self._require_compsep_output(compsep_output)
sky_pix = self._sky_map_pix(sky_model)
if compsep_output is None:
if self._static_sky is None:
# Reuse the full-resolution sky TOD when both pointing and sky model match.
self._static_sky = get_static_sky_TOD(sky_model, self.pix, psi=self.psi)
self._static_sky = get_static_sky_TOD(sky_model, sky_pix, psi=self.psi)
sky_tod = self._static_sky
else:
sky_tod = get_static_sky_TOD(sky_model, self.pix, psi=self.psi)
sky_tod = get_static_sky_TOD(sky_model, sky_pix, psi=self.psi)
return sky_tod if factor == 1 else self._block_average(sky_tod, factor)

def get_orbital_dipole_tod(self, downsample_factor: int | None = None) -> NDArray[np.floating]:
Expand Down
2 changes: 1 addition & 1 deletion src/commander4/experiments/planck/tod_reader_planck.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def tod_reader(band_comm: MPI.Comm, my_experiment: str, my_band: Bunch, all_det_
continue
if not np.isfinite(detector.tod).all():
continue
if detector.full_mask.mean() < 0.5:
if detector.good_data_mask.mean() < 0.75:
continue
detector_list.append(detector)
ntod_sum_original += ntod
Expand Down
Loading