From 425e0f6071a2d04cda22cfb77093c2bd8add711c Mon Sep 17 00:00:00 2001 From: jgslunde Date: Tue, 16 Jun 2026 18:26:56 +0200 Subject: [PATCH 1/4] 1. Added sparse map representation where each rank holds only pixels it hits before master Reduce, and 2. Fixed some problems in the CG mapmaker. --- .../data_models/detector_group_TOD.py | 14 + src/commander4/tod_processing.py | 120 ++++---- src/commander4/utils/CG_mapmaker.py | 281 ++++++++---------- src/commander4/utils/mapmaker.py | 105 +++---- 4 files changed, 237 insertions(+), 283 deletions(-) diff --git a/src/commander4/data_models/detector_group_TOD.py b/src/commander4/data_models/detector_group_TOD.py index 0d5930b..2dcdd61 100644 --- a/src/commander4/data_models/detector_group_TOD.py +++ b/src/commander4/data_models/detector_group_TOD.py @@ -40,6 +40,20 @@ def __init__(self, scans: list[ScanTOD], experiment_name: str, band_name: str, n self.scan_idx_stop: int = 0 # Index of my last scan. self.nscans_allranks: int = 0 # Total number of scans across all ranks (on this band). self.noise_model = noise_model + self._pixel_domain = None # Cached PixelDomain (built lazily; pointing is static). + + def get_pixel_domain(self, scan_view, comm, sparse: bool): + """Return the band's map-distribution :class:`PixelDomain`, building and caching it once. + + The domain depends only on the (static) pointing, so it is built once per run and reused + across Gibbs iterations. ``sparse=False`` gives the historical full-sky-per-rank layout; + ``sparse=True`` restricts each rank's map buffers to its locally-observed pixels. + """ + from commander4.utils.pixel_domain import PixelDomain + mode = "sparse" if sparse else "full" + if self._pixel_domain is None or self._pixel_domain.mode != mode: + self._pixel_domain = PixelDomain.from_view(scan_view, comm, mode, self.nside) + return self._pixel_domain def iter_detector_scans(self, accept: NDArray | None = None): """Iterate over present detector-scans, yielding ``(iscan, det)`` pairs. diff --git a/src/commander4/tod_processing.py b/src/commander4/tod_processing.py index a718f62..9a267af 100644 --- a/src/commander4/tod_processing.py +++ b/src/commander4/tod_processing.py @@ -19,6 +19,7 @@ from commander4.utils.CG_mapmaker import CGMapmakerI, CGMapmakerIQU from commander4.solvers.preconditioners import InvNPreconditionerI, InvNPreconditionerIQU from commander4.noise_sampling.sample_ncorr import sample_correlated_noise, log_corr_noise_stats +from commander4.noise_sampling.noise_sampling import fill_all_masked from commander4.utils.math_operations import forward_rfft, backward_rfft from commander4.utils.execution_ids import get_execution_band_ids from commander4.noise_sampling.sigma0 import calc_sigma0_robust @@ -130,12 +131,19 @@ def tod2map_CG(band_comm: MPI.Comm, experiment_data: DetGroupTOD, compsep_output # as we have to de-compress pix and psi twice. pols = experiment_data.pols scan_view = TODView(experiment_data, tod_samples, compsep_output=compsep_output) + # Optional per-experiment sparse map storage: each rank holds only its locally-observed pixels + # rather than a full sky map. The band master still ends up with full-sky maps. + exp_cfg = params.experiments[experiment_data.experiment_name] + sparse_maps = bool(exp_cfg["sparse_maps"]) if "sparse_maps" in exp_cfg else False + domain = experiment_data.get_pixel_domain(scan_view, band_comm, sparse_maps) if pols == "IQU": - mapmaker_invvar = WeightsMapmakerIQU(band_comm, experiment_data.nside) + mapmaker_invvar = WeightsMapmakerIQU(band_comm, experiment_data.nside, pixel_domain=domain) for view in scan_view.iter_focused(accepted_only=True): - good_data_mask = view.good_data_mask - pix = view.pix[good_data_mask] - psi = view.psi[good_data_mask] + # Full-length pointing (no good_data_mask compaction): the CG operator gap-fills flagged + # samples rather than removing them, so every sample carries weight (gain/sigma0)^2 and + # the inverse-variance / preconditioner must count them all to match the A operator. + pix = view.pix + psi = view.psi sigma0 = view.sigma0 gain = view.get_gain() inv_var = (gain/sigma0)**2 @@ -143,22 +151,25 @@ def tod2map_CG(band_comm: MPI.Comm, experiment_data: DetGroupTOD, compsep_output mapmaker_invvar.gather_map() mapmaker_invvar.normalize_map() if ismaster: - # Unobserved/singular pixels get rms=inf from the per-pixel inversion, so rms**2 is inf - # there. The diagonal preconditioner multiplies this into the residual; since the RHS is - # 0 at those pixels, 0*inf=nan would poison the global dot product (nan residual from - # iter 0). Zero them out (matching the I-only path's without_nan below): an unobserved - # pixel gets zero preconditioner weight, which is correct. - precond = InvNPreconditionerIQU(utils.without_nan(mapmaker_invvar.final_rms_map**2)) + # Jacobi preconditioner M = 1/diag(A), where A is the accumulated inverse-noise matrix + # (final_cov_map holds its 6 unique elements; [0,3,5] are A_II, A_QQ, A_UU). This matches + # the robust I-only path below. The previous choice -- diag(A^-1) via rms**2 -- blows up + # at near-singular pixels (poor per-pixel polarization-angle coverage, where the 3x3 + # inverse is inflated by a vanishing determinant), which wrecks the conditioning of the + # preconditioned operator and makes PCG diverge. 1/diag(A) stays bounded by the actual + # per-component inverse variance. without_nan zeros unobserved pixels (1/0 -> inf -> 0). + A_diag = mapmaker_invvar.final_cov_map[(0, 3, 5), :] + precond = InvNPreconditionerIQU(utils.without_nan(1.0 / A_diag)) else: precond = called_on_non_master cg_mapmaker = CGMapmakerIQU(experiment_data, tod_samples, band_comm, - preconditioner=precond, nthreads=params.general.nthreads_tod, - CG_maxiter=params.general.CG_mapmaker.maxiter) + preconditioner=precond, nthreads=params.general.nthreads_tod, + CG_maxiter=params.general.CG_mapmaker.maxiter, pixel_domain=domain) elif pols == "I": - mapmaker_invvar = WeightsMapmaker(band_comm, experiment_data.nside) + mapmaker_invvar = WeightsMapmaker(band_comm, experiment_data.nside, pixel_domain=domain) for view in scan_view.iter_focused(accepted_only=True): - good_data_mask = view.good_data_mask - pix = view.pix[good_data_mask] + # Full-length pointing (see IQU branch above): gap-filled samples are counted too. + pix = view.pix sigma0 = view.sigma0 gain = view.get_gain() inv_var = (gain/sigma0)**2 @@ -169,17 +180,17 @@ def tod2map_CG(band_comm: MPI.Comm, experiment_data: DetGroupTOD, compsep_output else: precond = called_on_non_master cg_mapmaker = CGMapmakerI(experiment_data, tod_samples, band_comm, - preconditioner=precond, nthreads=params.general.nthreads_tod, - CG_maxiter=params.general.CG_mapmaker.maxiter) + preconditioner=precond, nthreads=params.general.nthreads_tod, + CG_maxiter=params.general.CG_mapmaker.maxiter, pixel_domain=domain) else: raise ValueError(f"specified polarizations {pols} is notsupported yet.") BinMapmaker = MapmakerIQU if pols == "IQU" else Mapmaker #general bin mapmaker class object. # mapmaker = BinMapmaker(band_comm, experiment_data.nside) - mapmaker_orbdipole = BinMapmaker(band_comm, experiment_data.nside) + mapmaker_orbdipole = BinMapmaker(band_comm, experiment_data.nside, pixel_domain=domain) if ncorr_cfg.do_ncorr: - mapmaker_ncorr = BinMapmaker(band_comm, experiment_data.nside) + mapmaker_ncorr = BinMapmaker(band_comm, experiment_data.nside, pixel_domain=domain) sampled_params = [] residuals = [] niters = [] @@ -191,8 +202,6 @@ def tod2map_CG(band_comm: MPI.Comm, experiment_data: DetGroupTOD, compsep_output for view in scan_view.iter_focused(accepted_only=True): pix, psi = view.pix, view.psi good_data_mask = view.good_data_mask - pix_masked = pix[good_data_mask] - psi_masked = psi[good_data_mask] sigma0 = view.sigma0 gain = view.get_gain() inv_var = (gain/sigma0)**2 @@ -246,14 +255,20 @@ def tod2map_CG(band_comm: MPI.Comm, experiment_data: DetGroupTOD, compsep_output _record_tod_diagnostics(tod_samples, view.iscan, view.idet, view, n_corr_est if ncorr_cfg.do_ncorr else None) - d_sky_masked = d_sky[good_data_mask] + # Gap-fill flagged samples instead of compacting them away. The CG operator applies a + # Fourier transform (apply_T), which requires a continuous, full-length TOD: removing masked + # samples corrupts the FFT, and a single non-finite sample (or an empty compacted scan) + # otherwise poisons/crashes the whole solve. fill_all_masked (linear interpolation + white + # noise) is the same gap-filling used in correlated-noise sampling; the filled samples are + # noisy realizations carrying weight 1/sigma0^2, consistent with the full-length A operator. + fill_all_masked(d_sky, good_data_mask, sigma0) cg_mapmaker.accum_to_RHS( scan_tod=view.detector, sigma0=sigma0, - pix=pix_masked, - psi=psi_masked, - scan_tod_arr=d_sky_masked/gain + pix=pix, + psi=psi, + scan_tod_arr=d_sky/gain ) ### PRINT NOISE SAMPLING STATS ### @@ -279,46 +294,6 @@ def tod2map_CG(band_comm: MPI.Comm, experiment_data: DetGroupTOD, compsep_output cg_mapmaker.finalize_RHS() cg_mapmaker.solve() map_signal = cg_mapmaker.solved_map - - ########### temporary debug plots - # if ismaster: - # if pols == "IQU": - # plt.figure(figsize=(8.5*3, 5.4)) - # npol = 3 - # for i in range(npol): - # limup = np.nanpercentile(cg_mapmaker.RHS_map[i,:], 99) - # limdown = np.nanpercentile(cg_mapmaker.RHS_map[i,:], 1) - # hp.mollview(cg_mapmaker.RHS_map[i,:], cmap='RdBu_r', title='RHS', - # sub=(1,npol,i+1), min=limdown, max=limup) - # plt.savefig(f"/mn/stornext/u3/leoab/cmdr4_plots/RHS.png") - # plt.close() - - # plt.figure(figsize=(8.5*3, 5.4)) - # for i in range(npol): - # limup = np.nanpercentile(precond(cg_mapmaker.RHS_map)[i,:], 99) - # limdown = np.nanpercentile(precond(cg_mapmaker.RHS_map)[i,:], 1) - # hp.mollview(precond(cg_mapmaker.RHS_map)[i,:], cmap='RdBu_r', title='M RHS', - # sub=(1,npol,i+1), min=limdown, max=limup) - # plt.savefig("/mn/stornext/u3/leoab/cmdr4_plots/M_RHS.png") - # plt.close() - # else: - # plt.figure() - # limup = np.nanpercentile(cg_mapmaker.RHS_map[0,:], 99) - # limdown = np.nanpercentile(cg_mapmaker.RHS_map[0,:], 1) - # hp.mollview(cg_mapmaker.RHS_map[0,:], cmap='RdBu_r', title='RHS', - # min=limdown, max=limup) - # plt.savefig(f"/mn/stornext/u3/leoab/cmdr4_plots/RHS.png") - # plt.close() - - # plt.figure() - # limup = np.nanpercentile(precond(cg_mapmaker.RHS_map)[0,:], 99) - # limdown = np.nanpercentile(precond(cg_mapmaker.RHS_map)[0,:], 1) - # hp.mollview(precond(cg_mapmaker.RHS_map)[0,:], cmap='RdBu_r', title='M RHS', - # min=limdown, max=limup) - # plt.savefig(f"/mn/stornext/u3/leoab/cmdr4_plots/M_RHS.png") - # plt.close() - - ##################### if ncorr_cfg.do_ncorr: mapmaker_ncorr.gather_map() @@ -384,7 +359,12 @@ def tod2map_bin(band_comm: MPI.Comm, experiment_data: DetGroupTOD, compsep_outpu start_bench("binned-mapmaker") pols = experiment_data.pols scan_view = TODView(experiment_data, tod_samples, compsep_output=compsep_output) - mapmaker_invvar = WeightsMapmakerIQU(band_comm, experiment_data.nside) + # Optional per-experiment sparse map storage: each rank holds only its locally-observed pixels + # rather than a full sky map. The band master still ends up with full-sky maps. + exp_cfg = params.experiments[experiment_data.experiment_name] + sparse_maps = bool(exp_cfg["sparse_maps"]) if "sparse_maps" in exp_cfg else False + domain = experiment_data.get_pixel_domain(scan_view, band_comm, sparse_maps) + mapmaker_invvar = WeightsMapmakerIQU(band_comm, experiment_data.nside, pixel_domain=domain) for view in scan_view.iter_focused(accepted_only=True): good_data_mask = view.good_data_mask pix = view.pix[good_data_mask] @@ -397,11 +377,11 @@ def tod2map_bin(band_comm: MPI.Comm, experiment_data: DetGroupTOD, compsep_outpu mapmaker_invvar.gather_map() mapmaker_invvar.normalize_map() - mapmaker = MapmakerIQU(band_comm, experiment_data.nside) - mapmaker_orbdipole = MapmakerIQU(band_comm, experiment_data.nside) - + mapmaker = MapmakerIQU(band_comm, experiment_data.nside, pixel_domain=domain) + mapmaker_orbdipole = MapmakerIQU(band_comm, experiment_data.nside, pixel_domain=domain) + if ncorr_cfg.do_ncorr: - mapmaker_ncorr = MapmakerIQU(band_comm, experiment_data.nside) + mapmaker_ncorr = MapmakerIQU(band_comm, experiment_data.nside, pixel_domain=domain) sampled_params = [] residuals = [] niters = [] diff --git a/src/commander4/utils/CG_mapmaker.py b/src/commander4/utils/CG_mapmaker.py index 95fd185..ad03ef7 100644 --- a/src/commander4/utils/CG_mapmaker.py +++ b/src/commander4/utils/CG_mapmaker.py @@ -3,12 +3,8 @@ from mpi4py import MPI import logging from numpy.typing import NDArray -import ducc0 import healpy as hp -from pixell import utils from typing import Callable -import time -import matplotlib.pyplot as plt from commander4.output.log import logassert from commander4.utils.ctypes_lib import load_cmdr4_ctypes_lib @@ -17,7 +13,7 @@ from commander4.data_models.tod_view import TODView from commander4.solvers.CG_driver import distributed_CG_arr from commander4.data_models.detector_samples import DetectorSamples -from commander4.data_models.scan_samples import ScanSamples +from commander4.utils.pixel_domain import PixelDomain from commander4.utils.math_operations import inplace_scale, dot, norm, forward_rfft, backward_rfft # I need to CG-solve P^T T^T N^−1 T P m = P^T T^T N^-1 d @@ -47,10 +43,11 @@ def __init__(self, T_omega:Callable = np.ones_like, preconditioner:Callable = np.copy, nthreads:int=1, - double_prec:bool = True, - CG_maxiter:int=200, - CG_tol:float=1e-10, - CG_check_interval:int = 1): + double_prec:bool = True, + CG_maxiter:int=60, + CG_tol:float=1e-6, + CG_check_interval:int = 1, + pixel_domain:PixelDomain|None = None): """Initialise the CG mapmaker. Args: @@ -66,8 +63,13 @@ def __init__(self, CG_maxiter: Maximum number of CG iterations. CG_tol: Convergence tolerance on the CG residual. CG_check_interval: Check convergence every this many iterations. + pixel_domain: Pixel-distribution domain. When ``None`` a full-sky domain is built, in + which case every rank holds full-sky local maps (the historical behaviour). In + sparse mode each rank's RHS/LHS buffers cover only its observed pixels, and the + full-sky iterate held by the master is scattered/gathered to the ranks each + iteration. """ - + self.logger = logging.getLogger(__name__) self.detector_tod = detector_tod self.detector_samples = detector_samples @@ -81,6 +83,15 @@ def __init__(self, self.CG_tol = CG_tol self.CG_check_interval = CG_check_interval self.M = preconditioner + self.domain = pixel_domain if pixel_domain is not None \ + else PixelDomain(map_comm, detector_tod.nside, "full") + # The sparse gather/scatter collectives operate in float64; the float32 map path is only + # supported full-sky (and is unused in production, which always runs double_prec). + if self.domain.mode == "sparse" and not double_prec: + raise NotImplementedError("Sparse CG maps require double_prec=True.") + self._nloc = self.domain.n_local + # View over the band's detector-scans, used to access pointing (pix/psi) when applying the + self._scan_view = TODView(detector_tod, detector_samples) self._rhs_loca_map = None self._rhs_finalized_map = None @@ -192,6 +203,17 @@ def accum_to_RHS(self, scan_tod: DetectorTOD, sigma0: float, if scan_tod_arr is None: scan_tod_arr = np.copy(scan_tod.tod) #aux array to not modify scan.tod + # Guard against pathological scans that slip past read-in: an empty scan crashes the FFT, + # and a single non-finite sample is spread across the whole scan by apply_T (and then across + # every pixel that scan hits), making the CG residual NaN. Readers should discard these, but + # not all of them do, so fail loudly here identifying the offending detector-scan. + logassert(scan_tod_arr.shape[-1] > 0, + f"Empty TOD passed to CG RHS for detector {getattr(scan_tod, 'name', '?')}.", + self.logger) + logassert(np.isfinite(scan_tod_arr).all(), + f"Non-finite samples in CG RHS for detector {getattr(scan_tod, 'name', '?')} " + "(check gain, sigma0, and that flagged/non-finite samples are gap-filled).", + self.logger) #N^-1 d # if self.ismaster: # self.logger.info(f"RHS_1: {scan_tod_arr.shape}") @@ -208,50 +230,44 @@ def accum_to_RHS(self, scan_tod: DetectorTOD, sigma0: float, def finalize_RHS(self, root=0): """ - Reduces RHS map on main rank, summing up all the contributions. + Reduces the local RHS contributions onto the full-sky RHS map held by the master rank. """ - logassert(self._rhs_loca_map is not None, + logassert(self._rhs_loca_map is not None, "Attempted to reduce RHS map on master rank before its contributions have been computed.", self.logger) - if self.map_comm.Get_rank() == 0: - send, recv = (self._rhs_loca_map, self._rhs_finalized_map) - else: - send, recv = (self._rhs_loca_map, np.empty(())) - self.map_comm.Reduce(send, recv, op=MPI.SUM, root=root) + full = self.domain.reduce_to_full(self._rhs_loca_map, root=root) + if self.map_comm.Get_rank() == root: + self._rhs_finalized_map = full self.map_comm.Barrier() - - #free memory - self._rhs_loca_map = None - return recv + self._rhs_loca_map = None # free memory + return self._rhs_finalized_map if self.map_comm.Get_rank() == root else np.empty(()) def apply_LHS(self, in_map: NDArray): """ - Applies the LHS of the mapmaking problem P^T T^T N^-1 T P m to an input map, without modifying it, - and updates the result in the out_tods object. + Applies the LHS of the mapmaking problem P^T T^T N^-1 T P m to an input map. + + The master holds the full-sky iterate ``in_map``; each rank receives only the values at its + locally-observed pixels (a broadcast of the full map in full mode), applies its block of the + operator into a local buffer, and the contributions are summed back into a full-sky map on + the master. """ ismaster = self.map_comm.Get_rank() == 0 - if in_map.shape==(): - if ismaster: - raise ValueError("input map can not be empty on master rank.") - else: - in_map = self._zeros_map - - self.map_comm.Bcast(in_map, root=0) - out_map = np.zeros_like(in_map) + # Distribute the iterate to the ranks' local pixel domains (master -> ranks). + local_in = self.domain.scatter_from_full(in_map if ismaster else None, self._ncomp, + dtype=self.f_dtype) + out_local = self._zeros_map # The LHS operator P^T T^T N^-1 T P and the RHS P^T T^T N^-1 d must span the same set of - # detector-scans AND the same per-sample selection, or the CG solves an inconsistent (A, b). - # We therefore iterate through the exact same TODView path the RHS loop uses (accept gating + - # good_data_mask, addressing the dense (nscans, ndet) arrays by det_idx_fullband), so the two - # stay consistent by construction. - scan_view = TODView(self.detector_tod, self.detector_samples) - for view in scan_view.iter_focused(accepted_only=True): - good_data_mask = view.good_data_mask - pix = view.pix[good_data_mask] - psi = view.psi[good_data_mask] + # detector-scans AND the same samples, or the CG solves an inconsistent (A, b). We iterate + # the same accept-gated TODView path the RHS loop uses, on the *full-length* pointing: the + # RHS gap-fills flagged samples rather than removing them (apply_T needs a continuous TOD), + # so both sides run over every sample of each accepted detector-scan. + for view in self._scan_view.iter_focused(accepted_only=True): + pix = view.pix + psi = view.psi sigma0 = view.sigma0 - scan_tod_arr_aux = np.zeros(pix.shape[0], dtype=self.f_dtype) # good samples only, as RHS + scan_tod_arr_aux = np.zeros(pix.shape[0], dtype=self.f_dtype) # full-length, as RHS #P m - scan_tod_arr_aux = self.apply_P(in_map, view.detector, pix=pix, psi=psi, scan_tod_arr=scan_tod_arr_aux) + scan_tod_arr_aux = self.apply_P(local_in, view.detector, pix=pix, psi=psi, scan_tod_arr=scan_tod_arr_aux) #T P m scan_tod_arr_aux = self.apply_T(scan_tod_arr_aux) #N^-1 T P m @@ -259,13 +275,9 @@ def apply_LHS(self, in_map: NDArray): #T^T N^-1 T P m scan_tod_arr_aux = self.apply_T_adjoint(scan_tod_arr_aux) #P^T T^T N^-1 T P - out_map = self.apply_P_adjoint(view.detector, out_map, pix=pix, psi=psi, scan_tod_arr=scan_tod_arr_aux) - send, recv = (MPI.IN_PLACE, out_map) if self.map_comm.Get_rank() == 0 else (out_map, None) - self.map_comm.Reduce(send, recv, op=MPI.SUM, root=0) - if not ismaster: - in_map = np.empty(()) - out_map = None - return recv + out_local = self.apply_P_adjoint(view.detector, out_local, pix=pix, psi=psi, scan_tod_arr=scan_tod_arr_aux) + # Sum the local contributions back to the full-sky map on the master (None on other ranks). + return self.domain.reduce_to_full(out_local) def solve(self, x_true=None): """ @@ -274,73 +286,26 @@ def solve(self, x_true=None): RHS_map = self.RHS_map ismaster = self.map_comm.Get_rank() == 0 - def mydot(a, b): - return np.dot(a.flatten(), b.flatten()) - - my_dot = mydot # lambda arr1, arr2: MPI_dot(arr1, arr2, self.map_comm, double_prec=self.double_perc) - CG_solver = distributed_CG_arr(self.apply_LHS, - RHS_map, + CG_solver = distributed_CG_arr(self.apply_LHS, + RHS_map, ismaster, - M = self.M, + M = self.M, dot = dot, destroy_b=True) - + if ismaster: - self.logger.info(f"Mapmaker CG starting up!") - res_s = [] + self.logger.info("Mapmaker CG starting up!") for i in range(self.CG_maxiter): CG_solver.step() - if i%self.CG_check_interval == 0: - if ismaster: - self.logger.info(f"Mapmaker CG iter {i:3d} - Residual {CG_solver.err:.6e}") - res_s.append(CG_solver.err) - # plt.figure(figsize=(8.5*3, 5.4)) - # npol = 3 - # self.logger.info(f"## Plotting ... iter {i}") - # for p in range(npol): - # limup = np.nanpercentile(CG_solver.x[p,:], 99) - # limdown = np.nanpercentile(CG_solver.x[p,:], 1) - # hp.mollview(CG_solver.x[p,:], cmap='RdBu_r', title='CG sol', - # sub=(1,npol,p+1), min=limdown, max=limup) - # plt.savefig(f"/mn/stornext/u3/leoab/cmdr4_plots/x_sol_IQU_iter{i}.png") - # plt.close() - if x_true is not None: - CG_errors_true = norm(CG_solver.x - x_true)/norm(x_true) - A_residual = self.apply_LHS(CG_solver.x - x_true) - #A_residual = np.concatenate(A_residual, axis=-1) - CG_Anorm_error = dot(CG_solver.x - x_true, A_residual) - # A-norm error is only defined for the full vector. - self.logger.info(f"CG iter {i:3d} - Mean X: {norm(CG_solver.x)}") - - self.logger.info(f"CG iter {i:3d} - Mean X true: {norm(x_true)}") - - self.logger.info(f"CG iter {i:3d} - True A-norm error: {CG_Anorm_error:.3e}") - # We can print the individual component L2 errors. - self.logger.info(f"CG iter {i:3d} - True L2 error: {CG_errors_true:.3e}") - - # for s in ["I", "Q", "U"]: - # plt.figure() - # hp.mollview(CG_solver.x[0,:], min=-1e4, max=1e4, cmap = 'RdBu_r') - # plt.savefig(f"/mn/stornext/u3/leoab/cmdr4_plots/x_sol_iter{i}_pol{s}.png") - # plt.close() - - # else: - # plt.figure() - # limup = np.nanpercentile(CG_solver.x[0,:], 99) - # limdown = np.nanpercentile(CG_solver.x[0,:], 1) - # hp.mollview(CG_solver.x[0,:], cmap='RdBu_r', title='CG sol', - # min=limdown, max=limup) - # plt.savefig(f"/mn/stornext/u3/leoab/cmdr4_plots/x_sol_iter{i}_Akari.png") - # plt.close() - + if i % self.CG_check_interval == 0 and ismaster: + self.logger.info(f"Mapmaker CG iter {i:3d} - Residual {CG_solver.err:.6e}") + if x_true is not None: # Optional error against a known solution (testing only). + CG_L2_error = norm(CG_solver.x - x_true)/norm(x_true) + CG_Anorm_error = dot(CG_solver.x - x_true, self.apply_LHS(CG_solver.x - x_true)) + self.logger.info(f"CG iter {i:3d} - True A-norm error: {CG_Anorm_error:.3e} " + f"- True L2 error: {CG_L2_error:.3e}") if CG_solver.err < self.CG_tol: break - # if ismaster: - # plt.figure() - # plt.plot(np.arange(self.CG_maxiter), res_s) - # plt.yscale('log') - # plt.savefig(f"/mn/stornext/u3/leoab/cmdr4_plots/CG_residuals.png") - # plt.close self._map_signal = CG_solver.x @@ -356,20 +321,23 @@ def __init__(self, detector_samples, map_comm, T_omega = np.ones_like, preconditioner = np.copy, - nthreads = 1, - double_prec = True, - CG_maxiter = 200, - CG_tol = 1e-10, - CG_check_interval = 1): - - super().__init__(detector_tod, detector_samples, map_comm, T_omega, preconditioner, - nthreads, double_prec, CG_maxiter, CG_tol, CG_check_interval) - - #output map to be solved for - self._map_signal = np.zeros((1,hp.nside2npix(detector_tod.nside)), + nthreads = 1, + double_prec = True, + CG_maxiter = 200, + CG_tol = 1e-10, + CG_check_interval = 1, + pixel_domain = None): + + super().__init__(detector_tod, detector_samples, map_comm, T_omega, preconditioner, + nthreads, double_prec, CG_maxiter, CG_tol, CG_check_interval, pixel_domain) + + self._ncomp = 1 + # Master holds the full-sky solution and RHS; the iterate is scattered to the ranks' local + # domains each iteration (see apply_LHS). + self._map_signal = np.zeros((1,hp.nside2npix(detector_tod.nside)), dtype=self.f_dtype) if self.ismaster else None #RHS map to be accumulated on master rank - self._rhs_finalized_map = np.zeros((1,hp.nside2npix(detector_tod.nside)), + self._rhs_finalized_map = np.zeros((1,hp.nside2npix(detector_tod.nside)), dtype=self.f_dtype) if self.ismaster else None #RHS map to be accumulate @@ -408,10 +376,9 @@ def apply_P(self, in_map: NDArray, out_scan:ScanTOD, pix=None, psi=None, scan_to In the CGMapmakerI the psi will be ignored. """ scan_tod_arr = out_scan.tod if scan_tod_arr is None else scan_tod_arr - npix_out = hp.nside2npix(out_scan.nside) - assert npix_out == in_map.shape[-1], "in_map size must match scan's eval nside." + # in_map is indexed by pix, so its pixel axis defines the domain (full-sky or rank-local). + pix = self.domain.to_local(out_scan.pix if pix is None else pix) assert pix.shape == scan_tod_arr.shape, "pix shape must match scan_tod_arr." - pix = out_scan.pix if pix is None else pix # Use the passed array length, not the full detector ntod: apply_LHS masks pix/scan_tod_arr # down to good samples, so this must match (mirrors apply_P_adjoint). ntod = scan_tod_arr.shape[-1] @@ -429,20 +396,17 @@ def apply_P_adjoint(self, in_scan: ScanTOD, out_map:NDArray, pix=None, psi=None, In the CGMapmakerI the psi will be ignored. """ scan_tod_arr = in_scan.tod if scan_tod_arr is None else scan_tod_arr - npix_out = out_map.shape[-1] - assert npix_out == hp.nside2npix(in_scan.nside), "out_map size must match scan's eval nside." + # out_map is indexed by pix, so its pixel axis defines the domain (full-sky or rank-local). + pix = self.domain.to_local(in_scan.pix if pix is None else pix) assert pix.shape == scan_tod_arr.shape, "pix shape must match scan_tod_arr." - pix = in_scan.pix if pix is None else pix ntod = scan_tod_arr.shape[-1] self.map_accumulator(out_map, scan_tod_arr, 1, pix.astype(np.int64, copy=False), ntod) return out_map @property def _zeros_map(self): - """ - Internal function to allocate an empty map. - """ - return np.zeros((1, hp.nside2npix(self.detector_tod.nside)), dtype=self.f_dtype) + """Allocate a zero local map buffer (full-sky in full mode, rank-local in sparse mode).""" + return np.zeros((1, self._nloc), dtype=self.f_dtype) class CGMapmakerIQU(CGMapmaker): """Polarised (I, Q, U) CG mapmaker. @@ -457,22 +421,25 @@ def __init__(self, map_comm, T_omega = np.ones_like, preconditioner = np.copy, - nthreads = 1, - double_prec = True, - CG_maxiter = 200, - CG_tol = 1e-10, - CG_check_interval = 1): - - super().__init__(detector_tod, detector_samples, map_comm, T_omega, preconditioner, - nthreads, double_prec, CG_maxiter, CG_tol, CG_check_interval) - - #output map to be solved for - self._map_signal = np.zeros((3,hp.nside2npix(detector_tod.nside)), + nthreads = 1, + double_prec = True, + CG_maxiter = 200, + CG_tol = 1e-10, + CG_check_interval = 1, + pixel_domain = None): + + super().__init__(detector_tod, detector_samples, map_comm, T_omega, preconditioner, + nthreads, double_prec, CG_maxiter, CG_tol, CG_check_interval, pixel_domain) + + self._ncomp = 3 + # Master holds the full-sky solution and RHS; the iterate is scattered to the ranks' local + # domains each iteration (see apply_LHS). + self._map_signal = np.zeros((3,hp.nside2npix(detector_tod.nside)), dtype=self.f_dtype) if self.ismaster else None #local RHS map self._rhs_loca_map = None #RHS map to be accumulated on master rank - self._rhs_finalized_map = np.zeros((3,hp.nside2npix(detector_tod.nside)), + self._rhs_finalized_map = np.zeros((3,hp.nside2npix(detector_tod.nside)), dtype=self.f_dtype) if self.ismaster else None if double_prec: @@ -518,13 +485,10 @@ def apply_P(self, in_map: NDArray, out_scan:ScanTOD, pix=None, psi=None, scan_to If a `scan_tod_arr` is passed it is used instead of overwriting `out_scan` """ scan_tod_arr = out_scan.tod if scan_tod_arr is None else scan_tod_arr - npix_out = hp.nside2npix(out_scan.nside) - assert npix_out == in_map.shape[-1], "in_map size must match scan's eval nside." - # if pix.shape != scan_tod_arr.shape: - # self.logger.info(f"### Shape pix: {pix.shape}, shape scan: {scan_tod_arr.shape}") - # assert pix.shape == scan_tod_arr.shape, "pix shape must match scan_tod_arr." - # assert psi.shape == scan_tod_arr.shape, "psi shape must match scan_tod_arr." - pix = out_scan.pix if pix is None else pix + # in_map is indexed by pix and strided by its pixel axis, which defines the domain + # (full-sky or rank-local); num_pix is that axis length. + npix_out = in_map.shape[-1] + pix = self.domain.to_local(out_scan.pix if pix is None else pix) psi = out_scan.psi if psi is None else psi # Use the passed array length, not the full detector ntod: apply_LHS masks pix/psi/scan_tod_arr # down to good samples, so this must match (mirrors apply_P_adjoint). @@ -544,22 +508,17 @@ def apply_P_adjoint(self, in_scan: ScanTOD, out_map:NDArray, pix=None, psi=None, If a `scan_tod_arr` is passed it is used instead of overwriting `out_scan` """ scan_tod_arr = in_scan.tod if scan_tod_arr is None else scan_tod_arr + # out_map is indexed by pix and strided by its pixel axis, which defines the domain + # (full-sky or rank-local); num_pix is that axis length. npix_out = out_map.shape[-1] - assert npix_out == hp.nside2npix(in_scan.nside), "out_map size must match scan's eval nside." - # if pix.shape != scan_tod_arr.shape: - # self.logger.info(f"### Shape pix: {pix.shape}, shape scan: {scan_tod_arr.shape}") - # assert pix.shape == scan_tod_arr.shape, "pix shape must match scan_tod_arr." - # assert psi.shape == scan_tod_arr.shape, "psi shape must match scan_tod_arr." - pix = in_scan.pix if pix is None else pix + pix = self.domain.to_local(in_scan.pix if pix is None else pix) psi = in_scan.psi if psi is None else psi ntod = scan_tod_arr.shape[-1] - self.map_accumulator_IQU(out_map, scan_tod_arr, 1, pix.astype(np.int64, copy=False), + self.map_accumulator_IQU(out_map, scan_tod_arr, 1, pix.astype(np.int64, copy=False), psi.astype(np.float64, copy=False), ntod, npix_out) return out_map - + @property def _zeros_map(self): - """ - Internal function to allocate an empty map. - """ - return np.zeros((3, hp.nside2npix(self.detector_tod.nside)), dtype=self.f_dtype) \ No newline at end of file + """Allocate a zero local map buffer (full-sky in full mode, rank-local in sparse mode).""" + return np.zeros((3, self._nloc), dtype=self.f_dtype) \ No newline at end of file diff --git a/src/commander4/utils/mapmaker.py b/src/commander4/utils/mapmaker.py index 1f48976..fdf3e86 100644 --- a/src/commander4/utils/mapmaker.py +++ b/src/commander4/utils/mapmaker.py @@ -6,6 +6,7 @@ from commander4.output.log import logassert from commander4.utils.ctypes_lib import load_cmdr4_ctypes_lib +from commander4.utils.pixel_domain import PixelDomain logger = logging.getLogger(__name__) @@ -15,16 +16,23 @@ class Mapmaker: Accumulates weighted TOD samples into a map, reduces across MPI ranks, and normalizes with a precomputed weights map. The internal accumulation is performed in float64; output dtype is controlled by `dtype`. + + The accumulation buffer lives on the rank's ``PixelDomain``: full-sky in the default mode, or + only the locally-observed pixels in sparse mode. ``gather_map`` always produces a full-sky map + on the master, so normalization and all downstream consumers are unchanged. """ - def __init__(self, map_comm:MPI.Comm, nside:int, dtype=np.float32): + def __init__(self, map_comm:MPI.Comm, nside:int, dtype=np.float32, + pixel_domain:PixelDomain|None=None): self.map_comm = map_comm self.nside = nside self.npix = 12*nside**2 self.dtype= dtype - self._map_signal = np.zeros(self.npix, dtype=np.float64) + self.domain = pixel_domain if pixel_domain is not None else PixelDomain(map_comm, nside, "full") + self._nloc = self.domain.n_local + self._map_signal = np.zeros(self._nloc, dtype=np.float64) self._gathered_map = None self._finalized_map = None - + # Setting up Ctypes mapmaker self.maplib = load_cmdr4_ctypes_lib() ct_i64_dim1 = np.ctypeslib.ndpointer(dtype=ct.c_int64, ndim=1, flags="contiguous") @@ -46,16 +54,15 @@ def accumulate_to_map(self, tod:NDArray, weights:NDArray, pix:NDArray, psi=None) ntod = tod.shape[0] tod_f64 = np.ascontiguousarray(tod, dtype=np.float64) weight_f64 = float(weights) + pix = self.domain.to_local(pix) self.maplib.map_accumulator_f64(self._map_signal, tod_f64, weight_f64, pix.astype(np.int64, copy=False), ntod) def gather_map(self): - """Reduce the local map buffers across MPI ranks into the root map.""" - if self.map_comm.Get_rank() == 0: - self._gathered_map = np.zeros(self.npix, dtype=np.float64) - self.map_comm.Reduce(self._map_signal, self._gathered_map, op=MPI.SUM, root=0) + """Reduce the local map buffers across MPI ranks into the full-sky root map.""" + self._gathered_map = self.domain.reduce_to_full(self._map_signal) self._map_signal = None # Free memory and indicate that accumulation is done. - + def normalize_map(self, normalization_map): """Normalize the gathered map by the provided weights map.""" if self.map_comm.Get_rank() == 0: @@ -75,14 +82,17 @@ class WeightsMapmaker: Accumulates per-sample weights into a map, reduces across MPI ranks, and exposes the gathered weights map for normalization of Mapmaker. """ - def __init__(self, map_comm:MPI.Comm, nside:int, dtype=np.float32): + def __init__(self, map_comm:MPI.Comm, nside:int, dtype=np.float32, + pixel_domain:PixelDomain|None=None): self.map_comm = map_comm self.nside = nside self.npix = 12*nside**2 self.dtype= dtype - self._map_signal = np.zeros(self.npix, dtype=np.float64) + self.domain = pixel_domain if pixel_domain is not None else PixelDomain(map_comm, nside, "full") + self._nloc = self.domain.n_local + self._map_signal = np.zeros(self._nloc, dtype=np.float64) self._gathered_map = None - + # Setting up Ctypes mapmaker self.maplib = load_cmdr4_ctypes_lib() ct_i64_dim1 = np.ctypeslib.ndpointer(dtype=ct.c_int64, ndim=1, flags="contiguous") @@ -103,14 +113,14 @@ def accumulate_to_map(self, weight:NDArray, pix:NDArray, psi=None): logassert(self._map_signal is not None, "Tried accumulating to finalized map", logger) ntod = pix.shape[0] weight_f64 = float(weight) + pix = self.domain.to_local(pix) + # The scalar weight kernel indexes map[pix] directly and takes no num_pix argument. self.maplib.map_weight_accumulator_f64(self._map_signal, weight_f64, - pix.astype(np.int64, copy=False), ntod, self.npix) + pix.astype(np.int64, copy=False), ntod) def gather_map(self): - """Reduce the local weights buffers across MPI ranks into the root map.""" - if self.map_comm.Get_rank() == 0: - self._gathered_map = np.zeros(self.npix, dtype=np.float64) - self.map_comm.Reduce(self._map_signal, self._gathered_map, op=MPI.SUM, root=0) + """Reduce the local weights buffers across MPI ranks into the full-sky root map.""" + self._gathered_map = self.domain.reduce_to_full(self._map_signal) self._map_signal = None # Free memory and indicate that accumulation is done. @@ -129,16 +139,19 @@ class MapmakerIQU: - Use MapmakerIQU to accumulate signal maps, then call `normalize_map(A)` with the gathered A map to produce the finalized I,Q,U map. """ - def __init__(self, map_comm:MPI.Comm, nside:int, dtype=np.float32): + def __init__(self, map_comm:MPI.Comm, nside:int, dtype=np.float32, + pixel_domain:PixelDomain|None=None): self.map_comm = map_comm self.nside = nside self.npix = 12*nside**2 self.dtype= dtype - self._map_signal = np.zeros((3, self.npix), dtype=np.float64) + self.domain = pixel_domain if pixel_domain is not None else PixelDomain(map_comm, nside, "full") + self._nloc = self.domain.n_local + self._map_signal = np.zeros((3, self._nloc), dtype=np.float64) self._gathered_map = None self._finalized_map = None self._has_gathered = False - + # Setting up Ctypes mapmaker self.maplib = load_cmdr4_ctypes_lib() ct_i64_dim1 = np.ctypeslib.ndpointer(dtype=ct.c_int64, ndim=1, flags="contiguous") @@ -172,23 +185,24 @@ def accumulate_to_map(self, tod:NDArray, weights:NDArray, pix:NDArray, psi:NDArr tod_f64 = np.ascontiguousarray(tod, dtype=np.float64) weight_f64 = float(weights) psi_f64 = np.ascontiguousarray(psi, dtype=np.float64) - pix_i64 = pix.astype(np.int64, copy=False) + # The IQU kernels stride the (3, n) buffer by its pixel count, so num_pix must be n_local. + pix_i64 = self.domain.to_local(pix).astype(np.int64, copy=False) if response is None: self.maplib.map_accumulator_IQU_f64(self._map_signal, tod_f64, weight_f64, - pix_i64, psi_f64, ntod, self.npix) + pix_i64, psi_f64, ntod, self._nloc) else: response_I = float(response[0]) response_QU = float(response[1]) self.maplib.map_accumulator_IQU_response_f64(self._map_signal, tod_f64, weight_f64, pix_i64, psi_f64, response_I, - response_QU, ntod, self.npix) + response_QU, ntod, self._nloc) def accumulate_to_map_Python(self, tod:NDArray, weights:NDArray, pix:NDArray, psi:NDArray, response: NDArray | None = None): """Reference accumulator matching the ctypes IQU implementation.""" # Reference implementation matching the ctypes IQU accumulator. logassert(self._map_signal is not None, "Tried accumulating to finalized map", logger) - pix_idx = pix.astype(np.int64, copy=False) + pix_idx = self.domain.to_local(pix).astype(np.int64, copy=False) w_tod = np.ascontiguousarray(tod, dtype=np.float64) * float(weights) ang = 2.0 * np.ascontiguousarray(psi, dtype=np.float64) c2 = np.cos(ang) @@ -203,13 +217,11 @@ def accumulate_to_map_Python(self, tod:NDArray, weights:NDArray, pix:NDArray, ps np.add.at(self._map_signal[2], pix_idx, w_tod * response_QU * s2) def gather_map(self): - """Reduce the local IQU buffers across MPI ranks into the root map.""" - if self.map_comm.Get_rank() == 0: - self._gathered_map = np.zeros((3, self.npix), dtype=np.float64) - self.map_comm.Reduce(self._map_signal, self._gathered_map, op=MPI.SUM, root=0) + """Reduce the local IQU buffers across MPI ranks into the full-sky root map.""" + self._gathered_map = self.domain.reduce_to_full(self._map_signal) self._map_signal = None # Free memory and indicate that accumulation is done. self._has_gathered = True - + def normalize_map(self, normalization_map): """Solve the per-pixel 3x3 system using the provided A matrix.""" if self.map_comm.Get_rank() == 0: @@ -273,16 +285,19 @@ class WeightsMapmakerIQU: - Call `normalize_map()` to compute RMS maps and expose `final_cov_map` for MapmakerIQU normalization. """ - def __init__(self, map_comm:MPI.Comm, nside:int, dtype=np.float32): + def __init__(self, map_comm:MPI.Comm, nside:int, dtype=np.float32, + pixel_domain:PixelDomain|None=None): self.map_comm = map_comm self.nside = nside self.npix = 12*nside**2 self.dtype= dtype - self._map_signal = np.zeros((6, self.npix), dtype=np.float64) + self.domain = pixel_domain if pixel_domain is not None else PixelDomain(map_comm, nside, "full") + self._nloc = self.domain.n_local + self._map_signal = np.zeros((6, self._nloc), dtype=np.float64) self._gathered_map = None self._finalized_rms_map = None self._has_gathered = False - + # Setting up Ctypes mapmaker self.maplib = load_cmdr4_ctypes_lib() ct_i64_dim1 = np.ctypeslib.ndpointer(dtype=ct.c_int64, ndim=1, flags="contiguous") @@ -316,23 +331,24 @@ def accumulate_to_map(self, weight:float, pix:NDArray, psi:NDArray, response:NDA ntod = pix.shape[0] weight_f64 = float(weight) psi_f64 = np.ascontiguousarray(psi, dtype=np.float64) - pix_i64 = pix.astype(np.int64, copy=False) + # The IQU kernels stride the (6, n) buffer by its pixel count, so num_pix must be n_local. + pix_i64 = self.domain.to_local(pix).astype(np.int64, copy=False) if response is None: self.maplib.map_weight_accumulator_IQU_f64(self._map_signal, weight_f64, - pix_i64, psi_f64, ntod, self.npix) + pix_i64, psi_f64, ntod, self._nloc) else: response_I = float(response[0]) response_QU = float(response[1]) self.maplib.map_weight_accumulator_IQU_response_f64(self._map_signal, weight_f64, pix_i64, psi_f64, response_I, - response_QU, ntod, self.npix) + response_QU, ntod, self._nloc) def accumulate_to_map_Python(self, weight:float, pix:NDArray, psi:NDArray, response: NDArray | None = None): """Reference accumulator matching the ctypes IQU weights implementation.""" # Reference implementation matching the ctypes IQU weight accumulator. logassert(self._map_signal is not None, "Tried accumulating to finalized map", logger) - pix_idx = pix.astype(np.int64, copy=False) + pix_idx = self.domain.to_local(pix).astype(np.int64, copy=False) ang = 2.0 * np.ascontiguousarray(psi, dtype=np.float64) c2 = np.cos(ang) s2 = np.sin(ang) @@ -349,24 +365,9 @@ def accumulate_to_map_Python(self, weight:float, pix:NDArray, psi:NDArray, np.add.at(self._map_signal[4], pix_idx, weight_f64 * response_QU * response_QU * s2 * c2) np.add.at(self._map_signal[5], pix_idx, weight_f64 * response_QU * response_QU * s2 * s2) - @property - def inv_N_diag(self): - """ - Gives the diagonal of the accumulated inverse covariance matrix, per each pixel. - - It is useful as a preconditioner for the CG mapmaker. - - Note: call it before `gather_map` wipes the inverse covariance matrix away. - """ - logassert(self._map_signal is not None, "Attempted to access inv cov map after finalization.", - logger) - return self._map_signal[(0,3,5), :] - def gather_map(self): - """Reduce the local IQU weight buffers across MPI ranks into the root map.""" - if self.map_comm.Get_rank() == 0: - self._gathered_map = np.zeros((6, self.npix), dtype=np.float64) - self.map_comm.Reduce(self._map_signal, self._gathered_map, op=MPI.SUM, root=0) + """Reduce the local IQU weight buffers across MPI ranks into the full-sky root map.""" + self._gathered_map = self.domain.reduce_to_full(self._map_signal) self._map_signal = None # Free memory and indicate that accumulation is done. self._has_gathered = True From a3bf586a8e3d62ed0eaf9ba3f36cc9f89fa482c0 Mon Sep 17 00:00:00 2001 From: jgslunde Date: Tue, 16 Jun 2026 18:30:38 +0200 Subject: [PATCH 2/4] Fixed some wrongly tuned scan reject checks in Planck TOD reader --- src/commander4/experiments/planck/tod_reader_planck.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/commander4/experiments/planck/tod_reader_planck.py b/src/commander4/experiments/planck/tod_reader_planck.py index b5cce53..993c9f6 100644 --- a/src/commander4/experiments/planck/tod_reader_planck.py +++ b/src/commander4/experiments/planck/tod_reader_planck.py @@ -134,7 +134,7 @@ def tod_reader(band_comm: MPI.Comm, my_experiment: str, my_band: Bunch, all_det_ continue if not np.isfinite(detector.tod).all(): continue - if detector.full_mask.mean() < 0.5: + if detector.good_data_mask.mean() < 0.75: continue detector_list.append(detector) ntod_sum_original += ntod From 84a09bb30d7277b6dde17f21f6c6b0aa57da693c Mon Sep 17 00:00:00 2001 From: jgslunde Date: Tue, 16 Jun 2026 21:51:36 +0200 Subject: [PATCH 3/4] Added some forgotted code that should have been in previous commit. --- src/commander4/utils/pixel_domain.py | 145 ++++++++++++ tests/test_pixel_domain.py | 166 ++++++++++++++ tests/test_polarized_operator.py | 154 +++++++++++++ tests/test_spectral_index_sampler.py | 323 +++++++++++++++++++++++++++ 4 files changed, 788 insertions(+) create mode 100644 src/commander4/utils/pixel_domain.py create mode 100644 tests/test_pixel_domain.py create mode 100644 tests/test_polarized_operator.py create mode 100644 tests/test_spectral_index_sampler.py diff --git a/src/commander4/utils/pixel_domain.py b/src/commander4/utils/pixel_domain.py new file mode 100644 index 0000000..44c043f --- /dev/null +++ b/src/commander4/utils/pixel_domain.py @@ -0,0 +1,145 @@ +import numpy as np +from mpi4py import MPI +from numpy.typing import NDArray + +# Map distribution across the ranks of a band communicator. +# +# Commander4 builds each band map collaboratively: every rank accumulates the contributions of its +# own detector-scans into a map, and the contributions are summed onto the band master, which holds +# the full-sky result. The naive layout gives every rank a full-sky (ncomp, npix) buffer even when +# its scans only ever touch a small fraction of the sky. ``PixelDomain`` factors out the pixel +# bookkeeping and the reduction/scatter MPI so the per-rank buffers can instead hold only the pixels +# that rank observes ("sparse" mode), while the master still ends up with a full-sky map. +# +# Two modes: +# - "full": the historical behaviour. ``local_pix`` is the whole sky, ``to_local`` is the +# identity, and the collectives are a plain ``Reduce`` / ``Bcast`` over npix. +# - "sparse": each rank holds ``local_pix`` (the sorted unique global pixels its scans touch). +# Local buffers are (ncomp, n_local). Reduction is a ``Gatherv`` of the local data to +# the master followed by a scatter-add into the full-sky map; the symmetric scatter +# (master -> ranks, for the CG LHS) is a ``Scatterv`` of the per-rank pixel slices. +# The index plan (counts/displacements and the concatenated global pixels) is static +# across Gibbs iterations and is exchanged once at construction. + + +class PixelDomain: + """Owns a rank's local pixel set and the MPI that maps local map buffers to a full-sky map. + + Attributes: + comm: Band communicator across which the map is reduced. + nside: HEALPix nside of the full-sky map. + npix: Full-sky pixel count (``12*nside**2``). + mode: ``"full"`` or ``"sparse"``. + local_pix: Sorted unique global pixel ids held by this rank (sparse mode), else ``None``. + n_local: Length of the local buffer (``n_local == npix`` in full mode). + """ + + def __init__(self, comm: MPI.Comm, nside: int, mode: str, + local_pix: NDArray | None = None): + self.comm = comm + self.nside = nside + self.npix = 12 * nside**2 + self.mode = mode + rank = comm.Get_rank() + + if mode == "full": + self.local_pix = None # identity remapping; buffers are full-sky. + self.n_local = self.npix + return + if mode != "sparse": + raise ValueError(f"Unknown PixelDomain mode '{mode}'.") + + self.local_pix = np.ascontiguousarray(local_pix, dtype=np.int64) + self.n_local = int(self.local_pix.size) + # Static gather plan, exchanged once: per-rank element counts, their displacements, and the + # concatenation of every rank's global pixels (held only on the master for the scatter-add). + counts = np.asarray(comm.allgather(self.n_local), dtype=np.int32) + self._recvcounts = counts + self._displs = np.insert(np.cumsum(counts), 0, 0)[:-1].astype(np.int32) + self._all_pix = np.empty(int(counts.sum()), dtype=np.int64) if rank == 0 else None + recvbuf = [self._all_pix, counts, self._displs, MPI.INT64_T] if rank == 0 else None + comm.Gatherv(self.local_pix, recvbuf, root=0) + + @classmethod + def from_view(cls, scan_view, comm: MPI.Comm, mode: str, nside: int) -> "PixelDomain": + """Build a domain by collecting the pixels every present detector-scan touches. + + The local pixel set is the union over *all present* detector-scans (the accept flag is + ignored on purpose), so the domain is a static superset of both the masked and the unmasked + pointing used by every downstream mapmaker and does not change if ``accept`` toggles between + Gibbs iterations. Pointing is accessed through the ``TODView`` (``view.pix``), never decoded + directly. ``"full"`` mode skips the pass entirely. + """ + if mode == "full": + return cls(comm, nside, "full") + # A transient full-sky boolean hitmap (npix bytes) is the cheapest robust way to union the + # pixels; it is freed before the large accumulation buffers are allocated. + hit = np.zeros(12 * nside**2, dtype=bool) + for view in scan_view.iter_focused(): + hit[view.pix] = True + local_pix = np.flatnonzero(hit).astype(np.int64) + return cls(comm, nside, "sparse", local_pix=local_pix) + + def to_local(self, pix: NDArray) -> NDArray: + """Map global HEALPix indices to compact local-buffer indices (identity in full mode).""" + if self.mode == "full": + return pix + # local_pix is sorted and contains every pixel this rank can pass, so searchsorted is exact. + return np.searchsorted(self.local_pix, pix) + + def reduce_to_full(self, local_data: NDArray, root: int = 0) -> NDArray | None: + """Sum the per-rank local buffers into a full-sky map on ``root`` (else return ``None``). + + Accepts a 1-D ``(n_local,)`` buffer (scalar maps) or a 2-D ``(ncomp, n_local)`` buffer; the + returned full-sky map matches the input rank (``(npix,)`` or ``(ncomp, npix)``). + """ + rank = self.comm.Get_rank() + if self.mode == "full": + out = np.zeros_like(local_data) if rank == root else None + self.comm.Reduce(local_data, out, op=MPI.SUM, root=root) + return out + + is_1d = local_data.ndim == 1 + data2d = local_data.reshape(1, -1) if is_1d else local_data + ncomp = data2d.shape[0] + counts, displs = self._recvcounts, self._displs + if rank == root: + full = np.zeros((ncomp, self.npix), dtype=np.float64) + recv = np.empty(int(counts.sum()), dtype=np.float64) + for c in range(ncomp): + send = np.ascontiguousarray(data2d[c], dtype=np.float64) + recvbuf = [recv, counts, displs, MPI.DOUBLE] if rank == root else None + self.comm.Gatherv(send, recvbuf, root=root) + if rank == root: + # Pixels observed by several ranks land at repeated indices; add.at accumulates them. + np.add.at(full[c], self._all_pix, recv) + if rank != root: + return None + return full[0] if is_1d else full + + def scatter_from_full(self, full_data: NDArray | None, ncomp: int, root: int = 0, + dtype=np.float64) -> NDArray: + """Send each rank the values of a full-sky map at its local pixels (inverse of reduce). + + Used by the CG LHS, where the master holds the full iterate and each rank needs only its + local slice to apply its block of the operator. Returns an ``(ncomp, n_local)`` buffer on + every rank. In full mode this is a plain broadcast of the full-sky map. + """ + rank = self.comm.Get_rank() + if self.mode == "full": + buf = np.ascontiguousarray(full_data, dtype=dtype) if rank == root \ + else np.empty((ncomp, self.npix), dtype=dtype) + self.comm.Bcast(buf, root=root) + return buf + + counts, displs = self._recvcounts, self._displs + local = np.empty((ncomp, self.n_local), dtype=np.float64) + for c in range(ncomp): + if rank == root: + # Gather the full map at the concatenated per-rank pixels, then scatter the slices. + send = np.ascontiguousarray(full_data[c][self._all_pix], dtype=np.float64) + sendbuf = [send, counts, displs, MPI.DOUBLE] + else: + sendbuf = None + self.comm.Scatterv(sendbuf, local[c], root=root) + return local diff --git a/tests/test_pixel_domain.py b/tests/test_pixel_domain.py new file mode 100644 index 0000000..141e343 --- /dev/null +++ b/tests/test_pixel_domain.py @@ -0,0 +1,166 @@ +"""Tests for PixelDomain and the sparse map storage it enables. + +The core correctness checks (to_local, reduce/scatter round-trips, and sparse-vs-full mapmaker +equivalence) run under a plain single-rank ``pytest`` because they use COMM_SELF / a size-1 +COMM_WORLD. The genuinely distributed checks are written collectively against COMM_WORLD, so they +exercise real multi-rank Gatherv/Scatterv when launched as:: + + mpirun -n 4 python -m pytest tests/test_pixel_domain.py +""" + +import numpy as np +from mpi4py import MPI +from numpy.testing import assert_allclose +import pytest + +from commander4.utils.pixel_domain import PixelDomain +from commander4.utils.mapmaker import MapmakerIQU, WeightsMapmakerIQU, Mapmaker, WeightsMapmaker + + +def _sparse_domain(comm, nside, local_pix): + return PixelDomain(comm, nside, "sparse", local_pix=np.asarray(local_pix, dtype=np.int64)) + + +# --- to_local -------------------------------------------------------------------------------- + +def test_to_local_is_identity_in_full_mode(): + dom = PixelDomain(MPI.COMM_SELF, nside=2, mode="full") + pix = np.array([5, 0, 47, 12], dtype=np.int64) + assert dom.to_local(pix) is pix or np.array_equal(dom.to_local(pix), pix) + + +def test_to_local_maps_global_to_compact_indices(): + local_pix = [3, 7, 8, 20, 41] + dom = _sparse_domain(MPI.COMM_SELF, nside=2, local_pix=local_pix) + pix = np.array([20, 3, 41, 8, 7], dtype=np.int64) + expected = np.array([3, 0, 4, 2, 1], dtype=np.int64) # index of each pix within local_pix + assert_allclose(dom.to_local(pix), expected) + # The mapped indices must round-trip back to the original global pixels. + assert_allclose(np.asarray(local_pix)[dom.to_local(pix)], pix) + + +# --- from_view ------------------------------------------------------------------------------- + +class _FakeView: + def __init__(self, pix): + self.pix = pix + +class _FakeScanView: + """Minimal stand-in exposing the only thing PixelDomain.from_view touches: view.pix.""" + def __init__(self, pix_arrays): + self._pix_arrays = pix_arrays + def iter_focused(self, *, accepted_only=False): + for pix in self._pix_arrays: + yield _FakeView(pix) + +def test_from_view_collects_union_of_observed_pixels(): + scan_view = _FakeScanView([np.array([5, 5, 1]), np.array([1, 40, 7])]) + dom = PixelDomain.from_view(scan_view, MPI.COMM_SELF, "sparse", nside=2) + assert_allclose(dom.local_pix, np.array([1, 5, 7, 40])) + assert dom.n_local == 4 + + +# --- reduce / scatter round-trips (collective; meaningful under mpirun) ---------------------- + +def test_reduce_to_full_matches_reference(): + comm = MPI.COMM_WORLD + rank, size = comm.Get_rank(), comm.Get_size() + nside, npix = 4, 12 * 16 + rng = np.random.default_rng(100 + rank) + # Each rank observes an overlapping window of pixels, so several ranks share pixels. + local_pix = np.unique(rng.integers(0, npix, size=30)) + dom = _sparse_domain(comm, nside, local_pix) + + local = rng.normal(size=(3, dom.n_local)) + full = dom.reduce_to_full(local) + + # Reference: scatter-add each rank's contribution into a full-sky map and sum across ranks. + ref_local = np.zeros((3, npix)) + np.add.at(ref_local[0], local_pix, local[0]) + np.add.at(ref_local[1], local_pix, local[1]) + np.add.at(ref_local[2], local_pix, local[2]) + ref = np.zeros((3, npix)) + comm.Reduce(ref_local, ref if rank == 0 else None, op=MPI.SUM, root=0) + if rank == 0: + assert_allclose(full, ref, rtol=1e-12, atol=1e-12) + else: + assert full is None + + +def test_scatter_from_full_round_trips(): + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + nside, npix = 4, 12 * 16 + rng = np.random.default_rng(200 + rank) + local_pix = np.unique(rng.integers(0, npix, size=25)) + dom = _sparse_domain(comm, nside, local_pix) + + full = rng.normal(size=(2, npix)) if rank == 0 else None + local = dom.scatter_from_full(full, ncomp=2) + # Each rank must receive exactly the master's values at its own pixels. + full_b = comm.bcast(full, root=0) + assert_allclose(local, full_b[:, local_pix], rtol=1e-12, atol=1e-12) + + +# --- sparse-vs-full mapmaker equivalence (single-rank, exercises the local-buffer kernels) --- + +def _random_scan(rng, npix_observed, ntod): + pix = rng.integers(0, npix_observed, size=ntod).astype(np.int64) + psi = rng.uniform(0, np.pi, size=ntod) + tod = rng.normal(size=ntod) + return pix, psi, tod + +@pytest.mark.parametrize("response", [None, np.array([1.0, 0.7])]) +def test_weights_iqu_sparse_matches_full(response): + rng = np.random.default_rng(7) + nside = 4 + pix, psi, _ = _random_scan(rng, npix_observed=40, ntod=500) + local_pix = np.unique(pix) + + full = WeightsMapmakerIQU(MPI.COMM_SELF, nside, dtype=np.float64) + sparse = WeightsMapmakerIQU(MPI.COMM_SELF, nside, dtype=np.float64, + pixel_domain=_sparse_domain(MPI.COMM_SELF, nside, local_pix)) + full.accumulate_to_map(2.5, pix, psi, response=response) + sparse.accumulate_to_map(2.5, pix, psi, response=response) + full.gather_map() + sparse.gather_map() + assert_allclose(sparse._gathered_map, full._gathered_map, rtol=1e-12, atol=1e-12) + + +@pytest.mark.parametrize("response", [None, np.array([1.0, 0.7])]) +def test_signal_iqu_sparse_matches_full(response): + rng = np.random.default_rng(11) + nside = 4 + pix, psi, tod = _random_scan(rng, npix_observed=40, ntod=500) + local_pix = np.unique(pix) + + full = MapmakerIQU(MPI.COMM_SELF, nside, dtype=np.float64) + sparse = MapmakerIQU(MPI.COMM_SELF, nside, dtype=np.float64, + pixel_domain=_sparse_domain(MPI.COMM_SELF, nside, local_pix)) + full.accumulate_to_map(tod, 2.5, pix, psi, response=response) + sparse.accumulate_to_map(tod, 2.5, pix, psi, response=response) + full.gather_map() + sparse.gather_map() + assert_allclose(sparse._gathered_map, full._gathered_map, rtol=1e-12, atol=1e-12) + + +def test_scalar_sparse_matches_full(): + rng = np.random.default_rng(13) + nside = 4 + pix, _, tod = _random_scan(rng, npix_observed=40, ntod=500) + local_pix = np.unique(pix) + + full_sig = Mapmaker(MPI.COMM_SELF, nside, dtype=np.float64) + sparse_sig = Mapmaker(MPI.COMM_SELF, nside, dtype=np.float64, + pixel_domain=_sparse_domain(MPI.COMM_SELF, nside, local_pix)) + full_w = WeightsMapmaker(MPI.COMM_SELF, nside, dtype=np.float64) + sparse_w = WeightsMapmaker(MPI.COMM_SELF, nside, dtype=np.float64, + pixel_domain=_sparse_domain(MPI.COMM_SELF, nside, local_pix)) + full_sig.accumulate_to_map(tod, 2.5, pix) + sparse_sig.accumulate_to_map(tod, 2.5, pix) + full_w.accumulate_to_map(2.5, pix) + sparse_w.accumulate_to_map(2.5, pix) + full_sig.gather_map(); sparse_sig.gather_map() + full_w.gather_map(); sparse_w.gather_map() + assert_allclose(sparse_sig._gathered_map, full_sig._gathered_map, rtol=1e-12, atol=1e-12) + assert_allclose(sparse_w._gathered_map, full_w._gathered_map, rtol=1e-12, atol=1e-12) diff --git a/tests/test_polarized_operator.py b/tests/test_polarized_operator.py new file mode 100644 index 0000000..28a8c29 --- /dev/null +++ b/tests/test_polarized_operator.py @@ -0,0 +1,154 @@ +import numpy as np +import ducc0 +import healpy as hp +from mpi4py import MPI +from numpy.testing import assert_allclose + +from commander4.data_models.detector_map import DetectorMap +from commander4.solvers.preconditioners import JointPreconditioner +from commander4.utils.math_operations import alm_dot_product + + +def _spin2_cross_channel_leakage(q_rms: float, u_rms: float, nside: int, lmax: int) -> float: + map_sky = np.zeros((2, 12 * nside**2), dtype=np.float64) + map_rms = np.vstack([ + np.full(12 * nside**2, q_rms, dtype=np.float64), + np.full(12 * nside**2, u_rms, dtype=np.float64), + ]) + detector_map = DetectorMap( + map_sky, + map_rms, + nu=100.0, + fwhm=0.0, + nside=nside, + double_precision=True, + lmax=lmax, + ) + + alm = np.zeros((2, hp.Alm.getsize(lmax)), dtype=np.complex128) + alm[0, hp.Alm.getidx(lmax, min(10, lmax), min(2, lmax))] = 1.0 + transformed = detector_map.apply_inv_N_alm(alm, nthreads=1, inplace=False) + + main_norm = np.sqrt(alm_dot_product(transformed[0], transformed[0], lmax)) + cross_norm = np.sqrt(alm_dot_product(transformed[1], transformed[1], lmax)) + return float(cross_norm / main_norm) + + +class _DummyDetectorMap: + def __init__(self, inv_n_map: np.ndarray, lmax: int): + self.inv_n_map = inv_n_map + self.lmax = lmax + + +class _DummyBand: + def __init__(self, nu: float, fwhm: float): + self.nu = nu + self.fwhm = fwhm + + +class _DummyCompSep: + def __init__(self, inv_n_map: np.ndarray, lmax: int): + self.CompSep_comm = MPI.COMM_SELF + self.det_map = _DummyDetectorMap(inv_n_map, lmax) + self.my_band = _DummyBand(nu=100.0, fwhm=0.0) + + +class _DummyDiffuseComp: + def __init__(self, lmax: int, npol: int): + self.lmax = lmax + self.npol = npol + self._alms = np.zeros((npol, hp.Alm.getsize(lmax)), dtype=np.complex128) + + @property + def alms(self) -> np.ndarray: + return self._alms + + @alms.setter + def alms(self, value: np.ndarray) -> None: + self._alms = value + + @property + def P_smoothing_prior_inv(self) -> np.ndarray: + return np.ones(self.lmax + 1, dtype=np.float64) + + def get_sed(self, nu: float) -> float: + return 1.0 + + +class _DummyCompList: + def __init__(self, comps): + self._comps = comps + + def __len__(self): + return len(self._comps) + + def __getitem__(self, index): + return self._comps[index] + + def __iter__(self): + return iter(self._comps) + + +def test_spin2_standard_mode_matches_grad_only_when_curl_is_zero(): + nside = 16 + lmax = 12 + alm = np.zeros((2, hp.Alm.getsize(lmax)), dtype=np.complex128) + alm[0, hp.Alm.getidx(lmax, 6, 2)] = 1.0 + 0.5j + geom = ducc0.healpix.Healpix_Base(nside, "RING").sht_info() + + standard_map = ducc0.sht.synthesis(alm=alm, lmax=lmax, spin=2, nthreads=1, **geom) + grad_only_map = ducc0.sht.synthesis( + alm=alm[:1], + lmax=lmax, + spin=2, + nthreads=1, + mode="GRAD_ONLY", + **geom, + ) + + assert_allclose(standard_map, grad_only_map, rtol=1e-12, atol=1e-12) + + +def test_spin2_equal_qu_weights_have_small_cross_channel_leakage(): + leakage = _spin2_cross_channel_leakage(q_rms=1.0, u_rms=1.0, nside=32, lmax=32) + assert leakage < 1.0e-2 + + +def test_spin2_unequal_qu_weights_induce_large_cross_channel_leakage(): + leakage = _spin2_cross_channel_leakage(q_rms=1.0, u_rms=2.0, nside=32, lmax=32) + assert leakage > 1.0e-1 + + +def test_spin2_equal_weight_leakage_drops_when_nside_increases(): + coarse = _spin2_cross_channel_leakage(q_rms=1.0, u_rms=1.0, nside=32, lmax=32) + fine = _spin2_cross_channel_leakage(q_rms=1.0, u_rms=1.0, nside=64, lmax=32) + assert fine < coarse + + +def test_spin2_equal_weight_leakage_grows_when_lmax_increases(): + low_lmax = _spin2_cross_channel_leakage(q_rms=1.0, u_rms=1.0, nside=32, lmax=16) + high_lmax = _spin2_cross_channel_leakage(q_rms=1.0, u_rms=1.0, nside=32, lmax=64) + assert high_lmax > low_lmax + + +def test_spin2_unequal_weight_leakage_is_not_fixed_by_higher_nside(): + coarse = _spin2_cross_channel_leakage(q_rms=1.0, u_rms=2.0, nside=32, lmax=32) + fine = _spin2_cross_channel_leakage(q_rms=1.0, u_rms=2.0, nside=64, lmax=32) + assert fine > 1.0e-1 + assert coarse > 1.0e-1 + + +def test_joint_preconditioner_uses_trace_weight_for_spin2_channels(): + nside = 16 + npix = 12 * nside**2 + inv_n_map = np.vstack([ + np.full(npix, 1.0, dtype=np.float64), + np.full(npix, 0.25, dtype=np.float64), + ]) + compsep = _DummyCompSep(inv_n_map=inv_n_map, lmax=16) + comp_list = _DummyCompList([_DummyDiffuseComp(lmax=16, npol=2)]) + + precond = JointPreconditioner(compsep, comp_list) + _, _, _, block_inv = precond.ell_block_data[4] + + assert_allclose(block_inv, np.eye(2) * block_inv[0, 0], rtol=1e-12, atol=1e-12) \ No newline at end of file diff --git a/tests/test_spectral_index_sampler.py b/tests/test_spectral_index_sampler.py new file mode 100644 index 0000000..72d50ca --- /dev/null +++ b/tests/test_spectral_index_sampler.py @@ -0,0 +1,323 @@ +import importlib +import sys +import types + +import numpy as np +import pytest +from mpi4py import MPI + + +class AttrBunch(dict): + def __getattr__(self, item): + try: + return self[item] + except KeyError as exc: + raise AttributeError(item) from exc + + def __setattr__(self, key, value): + self[key] = value + + +class FakeCompList: + def __init__(self, comp_list): + self.comp_list = list(comp_list) + + def __iter__(self): + for item in self.comp_list: + yield item + + def __len__(self): + return len(self.comp_list) + + def __getitem__(self, index): + return self.comp_list[index] + + +class StubComponent: + pass + + +class FakeDetectorMap: + def __init__(self, map_sky, map_rms, nu, fwhm, nside, double_precision=True): + self.map_sky = np.array(map_sky, dtype=np.float64, copy=True) + self._map_rms = np.array(map_rms, dtype=np.float64, copy=True) + self.nu = nu + self.fwhm = fwhm + self.nside = nside + self.double_precision = double_precision + + @property + def map_rms(self): + return self._map_rms.copy() + + @property + def pol(self): + return self.map_sky.shape[0] == 2 + + +class FakeSkyModel: + def __init__(self, components): + self._components = components + + def get_sky_at_nu(self, nu, nside, pols_required, fwhm=None): + npix = 12 * nside**2 + if pols_required == "I": + skymap = np.zeros((1, npix), dtype=np.float64) + for component in self._components: + if not component.is_pol: + skymap[0] += component.get_sky(nu, nside, fwhm)[0] + return skymap + if pols_required == "QU": + skymap = np.zeros((2, npix), dtype=np.float64) + for component in self._components: + if component.is_pol: + skymap += component.get_sky(nu, nside, fwhm) + return skymap + raise ValueError("Unsupported polarization in test stub.") + + +pixell_module = types.ModuleType("pixell") +bunch_module = types.ModuleType("pixell.bunch") +bunch_module.Bunch = AttrBunch +pixell_module.bunch = bunch_module +sys.modules["pixell"] = pixell_module +sys.modules["pixell.bunch"] = bunch_module + +component_module = types.ModuleType("commander4.sky_models.component") +component_module.Component = StubComponent +component_module.CompList = FakeCompList +sys.modules["commander4.sky_models.component"] = component_module + +detector_map_module = types.ModuleType("commander4.data_models.detector_map") +detector_map_module.DetectorMap = FakeDetectorMap +sys.modules["commander4.data_models.detector_map"] = detector_map_module + +sky_model_module = types.ModuleType("commander4.sky_models.sky_model") +sky_model_module.SkyModel = FakeSkyModel +sys.modules["commander4.sky_models.sky_model"] = sky_model_module + +perpix_module = types.ModuleType("commander4.solvers.perpix_compsep_solver") +perpix_module.smooth_signal_map_noiseweighted = lambda map_signal, map_rms, fwhm_rad: map_signal +perpix_module.smooth_rms_map_noiseweighted = lambda map_rms, fwhm_rad: map_rms +perpix_module.solve_compsep_perpix = lambda proc_comm, detector_data, comp_list, params: comp_list +sys.modules["commander4.solvers.perpix_compsep_solver"] = perpix_module + +spectral_index_sampler = importlib.import_module("commander4.solvers.spectral_index_sampler") + + +class FakeComponent: + def __init__(self, shortname, eval_pol, beta, amplitude, comp_params=None, sample=False, + proposal_sigma=None, bounds=None): + self.eval_pol = eval_pol + self.shortname = shortname + self.longname = shortname + if comp_params is None: + comp_params = AttrBunch({"shortname": shortname, "beta": beta}) + if sample: + comp_params["sample_spectral_index"] = True + comp_params["spectral_index_proposal_sigma"] = proposal_sigma + if bounds is not None: + comp_params["spectral_index_bounds"] = bounds + self.comp_params = comp_params + self.beta = beta + self.alms = self._make_alms(amplitude) + + @property + def is_pol(self): + return self.eval_pol == "QU" + + @property + def npol(self): + return 2 if self.is_pol else 1 + + @staticmethod + def _make_alms(amplitude, npol=1): + return np.full((npol, 1), amplitude, dtype=np.float64) + + @property + def alms(self): + return self._data + + @alms.setter + def alms(self, alms): + self._data = np.array(alms, dtype=np.float64, copy=True) + + def get_sed(self, nu): + return np.asarray(nu, dtype=np.float64)**self.beta + + def get_sky(self, nu, nside, fwhm=0.0): + npix = 12 * nside**2 + amplitude = float(self.alms[0, 0]) + base = np.full((self.npol, npix), amplitude, dtype=np.float64) + return base * self.get_sed(nu) + + +def make_params(smooth_to_common_res=False): + return AttrBunch({"general": AttrBunch({"smooth_to_common_res": smooth_to_common_res})}) + + +def make_detector_data(component, nu=2.0, nside=1, rms=1.0): + map_sky = component.get_sky(nu, nside) + map_rms = np.full_like(map_sky, rms, dtype=np.float64) + return FakeDetectorMap(map_sky, map_rms, nu=nu, fwhm=0.0, nside=nside, + double_precision=True) + + +def make_iqu_components(beta=1.5, amplitude=2.0, bounds=(1.0, 2.0), proposal_sigma=0.1): + shared_params = AttrBunch({ + "shortname": "dust", + "beta": beta, + "sample_spectral_index": True, + "spectral_index_proposal_sigma": proposal_sigma, + "spectral_index_bounds": list(bounds), + }) + comp_i = FakeComponent("dust_I", "I", beta, amplitude, comp_params=shared_params) + comp_qu = FakeComponent("dust_QU", "QU", beta, amplitude, + comp_params=shared_params) + comp_qu.alms = FakeComponent._make_alms(amplitude, npol=2) + return comp_i, comp_qu + + +class TestDiscoverSpectralIndexGroups: + def test_groups_shared_iqu_component_once(self): + comp_i, comp_qu = make_iqu_components() + groups = spectral_index_sampler._discover_spectral_index_groups( + FakeCompList([comp_i, comp_qu]) + ) + + assert len(groups) == 1 + assert groups[0].name == "dust" + assert groups[0].proposal_sigma == pytest.approx(0.1) + assert groups[0].bounds == pytest.approx((1.0, 2.0)) + assert groups[0].components == (comp_i, comp_qu) + + +class TestSampleSpectralIndicesMH: + def test_out_of_bounds_rejects_without_resolve(self, monkeypatch): + comp_i, comp_qu = make_iqu_components() + detector_data = make_detector_data(comp_i) + full_list = FakeCompList([comp_i, comp_qu]) + sub_list = FakeCompList([comp_i]) + + def fail_if_called(*args, **kwargs): + raise AssertionError("solve_compsep_perpix should not run for out-of-bounds proposals") + + monkeypatch.setattr(spectral_index_sampler, "solve_compsep_perpix", fail_if_called) + monkeypatch.setattr(np.random, "normal", lambda loc, scale: 2.5) + + _, result = spectral_index_sampler.sample_spectral_indices_mh( + MPI.COMM_SELF, + MPI.COMM_SELF, + detector_data, + full_list, + sub_list, + make_params(), + ) + + assert not result.accepted + assert result.out_of_bounds + assert comp_i.beta == pytest.approx(1.5) + assert comp_qu.beta == pytest.approx(1.5) + assert comp_i.comp_params["beta"] == pytest.approx(1.5) + np.testing.assert_allclose(comp_i.alms, np.array([[2.0]])) + + def test_rejection_restores_parameters_and_alms(self, monkeypatch): + comp_i, comp_qu = make_iqu_components() + detector_data = make_detector_data(comp_i) + full_list = FakeCompList([comp_i, comp_qu]) + sub_list = FakeCompList([comp_i]) + + def fake_solver(_comm, _detector_data, local_sub_list, _params): + for comp in local_sub_list: + comp.alms = np.array([[9.0]]) + return local_sub_list + + loglikes = iter([0.0, -5.0]) + monkeypatch.setattr(spectral_index_sampler, "solve_compsep_perpix", fake_solver) + monkeypatch.setattr(spectral_index_sampler, "_evaluate_local_loglike", + lambda *args, **kwargs: next(loglikes)) + monkeypatch.setattr(np.random, "normal", lambda loc, scale: 1.6) + monkeypatch.setattr(np.random, "random", lambda: 0.5) + + _, result = spectral_index_sampler.sample_spectral_indices_mh( + MPI.COMM_SELF, + MPI.COMM_SELF, + detector_data, + full_list, + sub_list, + make_params(), + ) + + assert not result.accepted + assert not result.out_of_bounds + assert comp_i.beta == pytest.approx(1.5) + assert comp_qu.beta == pytest.approx(1.5) + assert comp_i.comp_params["beta"] == pytest.approx(1.5) + np.testing.assert_allclose(comp_i.alms, np.array([[2.0]])) + + def test_acceptance_keeps_proposed_parameters_and_alms(self, monkeypatch): + comp_i, comp_qu = make_iqu_components() + detector_data = make_detector_data(comp_i) + full_list = FakeCompList([comp_i, comp_qu]) + sub_list = FakeCompList([comp_i]) + + def fake_solver(_comm, _detector_data, local_sub_list, _params): + for comp in local_sub_list: + comp.alms = np.array([[9.0]]) + return local_sub_list + + loglikes = iter([0.0, 2.0]) + monkeypatch.setattr(spectral_index_sampler, "solve_compsep_perpix", fake_solver) + monkeypatch.setattr(spectral_index_sampler, "_evaluate_local_loglike", + lambda *args, **kwargs: next(loglikes)) + monkeypatch.setattr(np.random, "normal", lambda loc, scale: 1.6) + monkeypatch.setattr(np.random, "random", lambda: 0.5) + + _, result = spectral_index_sampler.sample_spectral_indices_mh( + MPI.COMM_SELF, + MPI.COMM_SELF, + detector_data, + full_list, + sub_list, + make_params(), + ) + + assert result.accepted + assert comp_i.beta == pytest.approx(1.6) + assert comp_qu.beta == pytest.approx(1.6) + assert comp_i.comp_params["beta"] == pytest.approx(1.6) + np.testing.assert_allclose(comp_i.alms, np.array([[9.0]])) + + def test_single_rank_integration_uses_real_likelihood(self, monkeypatch): + comp_i, comp_qu = make_iqu_components(beta=1.0, amplitude=2.0) + full_list = FakeCompList([comp_i, comp_qu]) + sub_list = FakeCompList([comp_i]) + + target_map = np.full((1, 12), 8.0, dtype=np.float64) + detector_data = FakeDetectorMap(target_map, np.ones_like(target_map), nu=2.0, + fwhm=0.0, nside=1, double_precision=True) + + def fake_solver(_comm, det_data, local_sub_list, _params): + for comp in local_sub_list: + target_amplitude = det_data.map_sky[0, 0] / (det_data.nu**comp.beta) + comp.alms = np.array([[target_amplitude]]) + return local_sub_list + + monkeypatch.setattr(spectral_index_sampler, "solve_compsep_perpix", fake_solver) + monkeypatch.setattr(np.random, "normal", lambda loc, scale: 1.5) + monkeypatch.setattr(np.random, "random", lambda: 0.5) + + _, result = spectral_index_sampler.sample_spectral_indices_mh( + MPI.COMM_SELF, + MPI.COMM_SELF, + detector_data, + full_list, + sub_list, + make_params(), + ) + + assert result.accepted + assert result.loglike_proposed > result.loglike_current + assert comp_i.beta == pytest.approx(1.5) + assert comp_qu.beta == pytest.approx(1.5) + np.testing.assert_allclose(comp_i.alms, np.array([[8.0 / (2.0**1.5)]])) \ No newline at end of file From 80461b1fe89c4ed99fbea3409793115354d5fd7c Mon Sep 17 00:00:00 2001 From: jgslunde Date: Thu, 18 Jun 2026 20:51:39 +0200 Subject: [PATCH 4/4] The sky model realizations can now also be partial-sky. There no more full-sky elements, so we can now run e.g. SO with many cores. --- src/commander4/communication.py | 63 ++++++++++++++----- src/commander4/data_models/detector_TOD.py | 4 +- .../data_models/detector_group_TOD.py | 5 ++ src/commander4/data_models/tod_view.py | 17 ++++- src/commander4/tod_processing.py | 24 +++++-- src/commander4/utils/pixel_domain.py | 16 +++-- 6 files changed, 100 insertions(+), 29 deletions(-) diff --git a/src/commander4/communication.py b/src/commander4/communication.py index 0433ab4..2532a0d 100644 --- a/src/commander4/communication.py +++ b/src/commander4/communication.py @@ -49,9 +49,47 @@ def _should_send_compsep_result(compsep_my_band_id: str, # TODO: Communication in this script should be switched from picked (lowercase) to buffered # (uppercase) whereever possible (e.g. where arrays are communicated). +def _realize_and_distribute_sky(sky_model, experiment_data: DetGroupTOD, + band_comm) -> NDArray[np.floating]: + """Realize the band sky model on the master and hand each rank only the pixels it needs. + + The band master realizes the full-sky ``(npols, npix)`` detector map and keeps it (it doubles + as the full-sky map written to the chain). Each rank then receives, through the band's + ``PixelDomain``, just the sky values at the pixels its own scans observe. In the default + (non-sparse) map mode the domain scatter is a plain broadcast, so every rank ends up with the + full-sky map exactly as before. Only the master needs the ``SkyModel`` object, so it is never + broadcast, avoiding a full-sky realization and a copy of the model on every other rank + (the dominant TOD-side memory cost at high nside). + + Downstream, ``TODView`` detects whether its sky map is full-sky or domain-restricted by its + column count and indexes the pointing accordingly, so consumers are agnostic to the mode. + """ + is_band_master = band_comm.Get_rank() == 0 + pols = experiment_data.pols + ncomp = {"I": 1, "QU": 2, "IQU": 3}[pols] + if is_band_master: + full_map = sky_model.get_sky_at_nu(experiment_data.nu, experiment_data.nside, pols, + fwhm=np.deg2rad(experiment_data.fwhm / 60.0)) + else: + full_map = None + domain = experiment_data.pixel_domain + if domain is None: + # Domain not built yet (unexpected call order): fall back to the historical full-sky bcast. + return band_comm.bcast(full_map, root=0) + local_map = domain.scatter_from_full(full_map, ncomp, dtype=np.float32) + # Master keeps the full-sky map (chain output + its own pointing); workers keep only their + # local slice. In full mode scatter_from_full already broadcast the full map to everyone. + return full_map if (is_band_master and domain.mode == "sparse") else local_map + + def receive_compsep(mpi_info: Bunch, experiment_data: DetGroupTOD, todproc_my_band_id: str, senders: dict[str, int]) -> NDArray[np.floating]: - """Receive the CompSep sky model, realize the local band map, and broadcast it within TOD. + """Receive the CompSep sky model and distribute the realized band map within TOD. + + The band master receives the ``SkyModel`` from the CompSep side and realizes it at the band + frequency/resolution; the result is distributed to all band ranks (see + ``_realize_and_distribute_sky`` -- full-sky in non-sparse mode, per-rank local pixels in sparse + mode). Args: mpi_info (Bunch): The data structure containing all MPI relevant data. @@ -63,23 +101,17 @@ def receive_compsep(mpi_info: Bunch, experiment_data: DetGroupTOD, todproc_my_ba world rank of the sender task (on the CompSep side), keyed by execution-view band ID. Returns: - NDArray: The sky model realized at the local band frequency and resolution, broadcast to - all processes on the band communicator. + NDArray: The realized sky map for this rank (full-sky on the master / in non-sparse mode, + otherwise restricted to the rank's locally-observed pixels). """ world_comm = mpi_info.world.comm band_comm = mpi_info.band.comm - is_band_master = mpi_info.band.is_master - if is_band_master: + if mpi_info.band.is_master: source_band_id = _get_compsep_sender_id_for_tod_band(todproc_my_band_id, senders) sky_model = world_comm.recv(source=senders[source_band_id]) else: sky_model = None - # Currently all TOD MPI ranks need a copy of the relevant detector map, - # which is very wasteful - a reason for doing OpenMP for mapmaking. - sky_model = band_comm.bcast(sky_model, root=0) - detector_map_arr = sky_model.get_sky_at_nu(experiment_data.nu, experiment_data.nside, - experiment_data.pols, fwhm=np.deg2rad(experiment_data.fwhm/60.0)) - return detector_map_arr + return _realize_and_distribute_sky(sky_model, experiment_data, band_comm) def get_local_initial_sky(mpi_info: Bunch, experiment_data: DetGroupTOD, @@ -87,16 +119,15 @@ def get_local_initial_sky(mpi_info: Bunch, experiment_data: DetGroupTOD, """Build the initial sky model locally and realize it at this TOD band. Used when there are no CompSep ranks: the band master builds the SkyModel from the component - parameters and init files, broadcasts it within the band communicator, and every rank realizes - it at the band frequency/resolution. Mirrors `receive_compsep`, minus the cross-world receive. + parameters and init files and realizes it; the realized map is distributed to all band ranks + (full-sky in non-sparse mode, per-rank local pixels in sparse mode). Mirrors `receive_compsep`, + minus the cross-world receive. """ if mpi_info.band.is_master: sky_model = build_initial_sky_model(params) else: sky_model = None - sky_model = mpi_info.band.comm.bcast(sky_model, root=0) - return sky_model.get_sky_at_nu(experiment_data.nu, experiment_data.nside, experiment_data.pols, - fwhm=np.deg2rad(experiment_data.fwhm/60.0)) + return _realize_and_distribute_sky(sky_model, experiment_data, mpi_info.band.comm) def send_tod(mpi_info: Bunch, tod_map_dict: dict[DetectorMap], todproc_my_band_id: str, diff --git a/src/commander4/data_models/detector_TOD.py b/src/commander4/data_models/detector_TOD.py index 5f9f921..077d473 100644 --- a/src/commander4/data_models/detector_TOD.py +++ b/src/commander4/data_models/detector_TOD.py @@ -163,8 +163,10 @@ def __init__( # The Huffman decoder expects uint8 arrays; for bytes and HDF5-backed # numpy.void payloads the internal storage is rewritten as a zero-copy # uint8 view once at construction. - self.processing_mask_map = processing_mask_map self.pointing = pointing + # The full-sky processing mask (npix bytes, shared across detectors) is only needed to build + # the packed per-sample mask below. It is deliberately not retained on the detector: holding + # it would keep a ~npix-byte array alive on every rank for the whole run for nothing. processing_mask = processing_mask_map[self.get_pix()] self._processing_mask = np.packbits(processing_mask) self.det_response = det_response diff --git a/src/commander4/data_models/detector_group_TOD.py b/src/commander4/data_models/detector_group_TOD.py index 2dcdd61..91ecf5d 100644 --- a/src/commander4/data_models/detector_group_TOD.py +++ b/src/commander4/data_models/detector_group_TOD.py @@ -55,6 +55,11 @@ def get_pixel_domain(self, scan_view, comm, sparse: bool): self._pixel_domain = PixelDomain.from_view(scan_view, comm, mode, self.nside) return self._pixel_domain + @property + def pixel_domain(self): + """The cached map-distribution :class:`PixelDomain`, or ``None`` if not built yet.""" + return self._pixel_domain + def iter_detector_scans(self, accept: NDArray | None = None): """Iterate over present detector-scans, yielding ``(iscan, det)`` pairs. diff --git a/src/commander4/data_models/tod_view.py b/src/commander4/data_models/tod_view.py index e6cd450..3cafccc 100644 --- a/src/commander4/data_models/tod_view.py +++ b/src/commander4/data_models/tod_view.py @@ -308,6 +308,18 @@ def _require_compsep_output(self, compsep_output: NDArray | None) -> NDArray: raise ValueError("A component-separation sky map must be provided for sky subtraction.") return sky_model + def _sky_map_pix(self, sky_model: NDArray) -> NDArray[np.integer]: + """Pixel indices into a sky map that may be full-sky or restricted to this rank's domain. + + The realized sky model is full-sky ``(ncomp, npix)`` on the band master and in non-sparse + map mode, but only ``(ncomp, n_local)`` on workers in sparse mode (see + ``communication._realize_and_distribute_sky``). Distinguish the two by the map's column + count and return either global HEALPix indices or compact local-buffer indices. + """ + if sky_model.shape[-1] == 12 * self.experiment_data.nside**2: + return self.pix + return self.experiment_data.pixel_domain.to_local(self.pix) + def _block_average(self, tod: NDArray[np.floating], factor: int) -> NDArray[np.floating]: """Average a full-rate array over the same contiguous blocks as the downsampled TOD.""" ntod_down = self._materialize_downsampled(factor).tod.shape[0] @@ -327,13 +339,14 @@ def get_static_sky_tod( """ factor = self._downsample_factor_or_default(downsample_factor) sky_model = self._require_compsep_output(compsep_output) + sky_pix = self._sky_map_pix(sky_model) if compsep_output is None: if self._static_sky is None: # Reuse the full-resolution sky TOD when both pointing and sky model match. - self._static_sky = get_static_sky_TOD(sky_model, self.pix, psi=self.psi) + self._static_sky = get_static_sky_TOD(sky_model, sky_pix, psi=self.psi) sky_tod = self._static_sky else: - sky_tod = get_static_sky_TOD(sky_model, self.pix, psi=self.psi) + sky_tod = get_static_sky_TOD(sky_model, sky_pix, psi=self.psi) return sky_tod if factor == 1 else self._block_average(sky_tod, factor) def get_orbital_dipole_tod(self, downsample_factor: int | None = None) -> NDArray[np.floating]: diff --git a/src/commander4/tod_processing.py b/src/commander4/tod_processing.py index 9a267af..a4db3f5 100644 --- a/src/commander4/tod_processing.py +++ b/src/commander4/tod_processing.py @@ -31,6 +31,17 @@ logger = logging.getLogger(__name__) +def _read_sparse_maps_flag(params: Bunch, experiment_data: DetGroupTOD) -> bool: + """Whether sparse (per-rank local-pixel) map storage is enabled for this experiment. + + ``sparse_maps`` is an experiment-level option: when set, each rank's map buffers *and* its slice + of the sky model hold only the pixels its scans observe, rather than a full sky map (the band + master still assembles full-sky maps). Defaults to the historical full-sky-per-rank layout. + """ + exp_cfg = params.experiments[experiment_data.experiment_name] + return bool(exp_cfg["sparse_maps"]) if "sparse_maps" in exp_cfg else False + + def called_on_non_master(arr): logger.debug("Dummy precond has been called") return np.copy(arr) @@ -133,8 +144,7 @@ def tod2map_CG(band_comm: MPI.Comm, experiment_data: DetGroupTOD, compsep_output scan_view = TODView(experiment_data, tod_samples, compsep_output=compsep_output) # Optional per-experiment sparse map storage: each rank holds only its locally-observed pixels # rather than a full sky map. The band master still ends up with full-sky maps. - exp_cfg = params.experiments[experiment_data.experiment_name] - sparse_maps = bool(exp_cfg["sparse_maps"]) if "sparse_maps" in exp_cfg else False + sparse_maps = _read_sparse_maps_flag(params, experiment_data) domain = experiment_data.get_pixel_domain(scan_view, band_comm, sparse_maps) if pols == "IQU": mapmaker_invvar = WeightsMapmakerIQU(band_comm, experiment_data.nside, pixel_domain=domain) @@ -361,8 +371,7 @@ def tod2map_bin(band_comm: MPI.Comm, experiment_data: DetGroupTOD, compsep_outpu scan_view = TODView(experiment_data, tod_samples, compsep_output=compsep_output) # Optional per-experiment sparse map storage: each rank holds only its locally-observed pixels # rather than a full sky map. The band master still ends up with full-sky maps. - exp_cfg = params.experiments[experiment_data.experiment_name] - sparse_maps = bool(exp_cfg["sparse_maps"]) if "sparse_maps" in exp_cfg else False + sparse_maps = _read_sparse_maps_flag(params, experiment_data) domain = experiment_data.get_pixel_domain(scan_view, band_comm, sparse_maps) mapmaker_invvar = WeightsMapmakerIQU(band_comm, experiment_data.nside, pixel_domain=domain) for view in scan_view.iter_focused(accepted_only=True): @@ -588,6 +597,13 @@ def init_tod_processing(mpi_info: Bunch, params: Bunch) -> tuple[Bunch, str, Det tod_samples_chain1 = TODSamples(experiment_data, params, my_band, band_comm, 1) tod_samples_chain2 = TODSamples(experiment_data, params, my_band, band_comm, 2) + # Build the band's map-distribution PixelDomain once now: the pointing is static, so it is + # reused across Gibbs iterations by both the mapmakers and the sky-model distribution (which + # needs it to give each rank only its local pixels). In full mode this is a cheap no-op. + sparse_maps = _read_sparse_maps_flag(params, experiment_data) + experiment_data.get_pixel_domain(TODView(experiment_data, tod_samples_chain1), band_comm, + sparse_maps) + # Creating "tod_band_masters", an array which maps the band index to the rank of the master # of that band. todproc_my_band_id = my_band_name diff --git a/src/commander4/utils/pixel_domain.py b/src/commander4/utils/pixel_domain.py index 44c043f..7337251 100644 --- a/src/commander4/utils/pixel_domain.py +++ b/src/commander4/utils/pixel_domain.py @@ -2,6 +2,9 @@ from mpi4py import MPI from numpy.typing import NDArray +# MPI elementary datatypes for the float buffers exchanged by the collectives below. +_NUMPY_TO_MPI_DTYPE = {np.dtype(np.float64): MPI.DOUBLE, np.dtype(np.float32): MPI.FLOAT} + # Map distribution across the ranks of a band communicator. # # Commander4 builds each band map collaboratively: every rank accumulates the contributions of its @@ -121,9 +124,9 @@ def scatter_from_full(self, full_data: NDArray | None, ncomp: int, root: int = 0 dtype=np.float64) -> NDArray: """Send each rank the values of a full-sky map at its local pixels (inverse of reduce). - Used by the CG LHS, where the master holds the full iterate and each rank needs only its - local slice to apply its block of the operator. Returns an ``(ncomp, n_local)`` buffer on - every rank. In full mode this is a plain broadcast of the full-sky map. + Used by the CG LHS (full map -> per-rank operator slice) and by the sky-model distribution. + Returns an ``(ncomp, n_local)`` buffer on every rank. In full mode this is a plain broadcast + of the full-sky map. ``dtype`` selects the float precision of the exchanged buffers. """ rank = self.comm.Get_rank() if self.mode == "full": @@ -133,12 +136,13 @@ def scatter_from_full(self, full_data: NDArray | None, ncomp: int, root: int = 0 return buf counts, displs = self._recvcounts, self._displs - local = np.empty((ncomp, self.n_local), dtype=np.float64) + mpi_dtype = _NUMPY_TO_MPI_DTYPE[np.dtype(dtype)] + local = np.empty((ncomp, self.n_local), dtype=dtype) for c in range(ncomp): if rank == root: # Gather the full map at the concatenated per-rank pixels, then scatter the slices. - send = np.ascontiguousarray(full_data[c][self._all_pix], dtype=np.float64) - sendbuf = [send, counts, displs, MPI.DOUBLE] + send = np.ascontiguousarray(full_data[c][self._all_pix], dtype=dtype) + sendbuf = [send, counts, displs, mpi_dtype] else: sendbuf = None self.comm.Scatterv(sendbuf, local[c], root=root)