diff --git a/Scripts/mpiMapSpecAngleScan.py b/Scripts/mpiMapSpecAngleScan.py new file mode 100644 index 0000000..5222f10 --- /dev/null +++ b/Scripts/mpiMapSpecAngleScan.py @@ -0,0 +1,150 @@ +''' + Copyright (c) 2017, UChicago Argonne, LLC + See LICENSE file. +''' +''' +Script to process a dataset using the Sector33SpecDataSource +''' + +import os +import numpy as np +import json +import datetime +import argparse + +from rsMap3D.datasource.MpiSector33SpecDataSource import MPISector33SpecDataSource +from rsMap3D.datasource.DetectorGeometryForXrayutilitiesReader import DetectorGeometryForXrayutilitiesReader as detReader +from rsMap3D.utils.srange import srange +from rsMap3D.config.rsmap3dconfigparser import RSMap3DConfigParser +from rsMap3D.constants import ENERGY_WAVELENGTH_CONVERT_FACTOR +from rsMap3D.mappers.mpigridmapper import MPIQGridMapper +from rsMap3D.gui.rsm3dcommonstrings import BINARY_OUTPUT +from rsMap3D.transforms.unitytransform3d import UnityTransform3D +from rsMap3D.mappers.output.vtigridwriter import VTIGridWriter + +from mpi4py import MPI +from mpi4py.futures import MPICommExecutor + +mpiComm = MPI.COMM_WORLD +mpiRank = mpiComm.Get_rank() + +def parseArgs(): + parser = argparse.ArgumentParser(description='Default MPI run script. Expects a json config file pointed at the data.') + parser.add_argument('configPath', + nargs='?', + default=os.path.join(os.getcwd(), 'config.json'), + help='Path to config file. If none supplied, directs to a config.json located in CWD') + return parser.parse_args() + +args = parseArgs() + +def updateDataSourceProgress(value1, value2): + print("DataSource Progress %s/%s" % (value1, value2)) + +def updateMapperProgress(value1): + print("Mapper Progress %s" % (value1)) + +with open(args.configPath, 'r') as config_f: + config = json.load(config_f) + + +if mpiRank == 0: + startTime = datetime.datetime.now() + with open('time.log', 'a') as time_log: + time_log.write(f'Start: {startTime}\n') + + +# +#Change values listed here for a particular experiment +# +roi = None # defined later +projectDir = config["project_dir"] +specFile = config["spec_file"] +scanRange = srange(config["scan_range"]).list() + +if len(scanRange) < mpiComm.Get_size(): + raise ValueError("Too many processes! Reduce proc count to <= number of scans.") + + + +# +#Change values listed here for a particular experiment +# +mapHKL = False +configDir = projectDir +detectorConfigName = os.path.join(configDir, config["detector_config"]) +instConfigName = os.path.join(configDir, config["instrument_config"]) +if not os.path.exists(detectorConfigName): + raise Exception("Detector Config file does not exist: %s" % + detectorConfigName) +if not os.path.exists(instConfigName): + raise Exception("Instrument Config file does not exist: %s" % + instConfigName) + +dReader = detReader(detectorConfigName) + +#detectorSettings +detectorName = "Pilatus" +# set to reduce data by averaging pixels 1,1 produces no averaging of pixels, +# uses the original data +bin = [1,1] +# Region of interesting +# set explicitly since the images are cropped by ROI +# COMMENT OUT roi or set to none to simply grab size from the calib file +roi = [1, 487, 1, 195] +# or get info from the config +if roi is None: + detector = dReader.getDetectorById(detectorName) + nPixels = dReader.getNpixels(detector) + roi = [1, nPixels[0], 1, nPixels[1]] +print ("ROI: %s " % roi) + +#output info = +nx = 200 +ny = 200 +nz = 200 +specName, specExt = os.path.splitext(specFile) +outputFileName = os.path.join(specName + '.vti') +# note that the q ranges in the three dimensions are available +# later. If you know resolution desired, it is possible to calculate more +# appropriate nx, ny, nz + +appConfig = RSMap3DConfigParser() +maxImageMemory = appConfig.getMaxImageMemory() +print ("scanRange %s" % scanRange) +print("specName, specExt: %s, %s" % (specName, specExt)) +ds = MPISector33SpecDataSource(projectDir, specName, specExt, + instConfigName, detectorConfigName, mpiComm, roi=roi, + pixelsToAverage=bin, scanList= scanRange, + appConfig=appConfig) +ds.setCurrentDetector(detectorName) +#ds.setProgressUpdater(updateDataSourceProgress) + +ds.loadSource(mapHKL=mapHKL) + +if mpiRank == 0: + with open('time.log', 'a') as time_log: + time_log.write(f'Source Load Time: {datetime.datetime.now()}\n') + +ds.setRangeBounds(ds.getOverallRanges()) +imageToBeUsed = ds.getImageToBeUsed() +#print("imageToBeUsed %s" % imageToBeUsed) +# wavelen = ENERGY_WAVELENGTH_CONVERT_FACTOR/ds.getIncidentEnergy()[scans[0]] +imageSize = np.prod(ds.getDetectorDimensions()) + +gridMapper = MPIQGridMapper(ds, + outputFileName, + BINARY_OUTPUT, + mpiComm, + transform=UnityTransform3D(), + gridWriter=VTIGridWriter(), + appConfig=appConfig) + +gridMapper.setProgressUpdater(updateMapperProgress) +gridMapper.doMap() + +if mpiRank == 0: + with open('time.log', 'a') as time_log: + endTime = datetime.datetime.now() + time_log.write(f'End: {endTime}\n') + time_log.write(f'Diff: {endTime - startTime}\n') \ No newline at end of file diff --git a/Scripts/profileMapSpecAngleScan.py b/Scripts/profileMapSpecAngleScan.py new file mode 100644 index 0000000..b35e4ac --- /dev/null +++ b/Scripts/profileMapSpecAngleScan.py @@ -0,0 +1,123 @@ +''' + Copyright (c) 2017, UChicago Argonne, LLC + See LICENSE file. +''' +''' +Script to process a dataset using the Sector33SpecDataSource +''' + +import os +import numpy as np +import xrayutilities as xu +import json +import datetime +from rsMap3D.datasource.Sector33SpecDataSource import Sector33SpecDataSource +from rsMap3D.datasource.DetectorGeometryForXrayutilitiesReader import DetectorGeometryForXrayutilitiesReader as detReader +from rsMap3D.utils.srange import srange +from rsMap3D.config.rsmap3dconfigparser import RSMap3DConfigParser +from rsMap3D.constants import ENERGY_WAVELENGTH_CONVERT_FACTOR +from rsMap3D.mappers.gridmapper import QGridMapper +from rsMap3D.gui.rsm3dcommonstrings import BINARY_OUTPUT +from rsMap3D.transforms.unitytransform3d import UnityTransform3D +from rsMap3D.mappers.output.vtigridwriter import VTIGridWriter + +def updateDataSourceProgress(value1, value2): + print("DataSource Progress %s/%s" % (value1, value2)) + +def updateMapperProgress(value1): + print("Mapper Progress %s" % (value1)) + +with open('config.json', 'r') as config_f: + config = json.load(config_f) + +start_time = datetime.datetime.now() +with open('time.log', 'a') as time_log: + time_log.write(f'Start: {start_time}\n') + + +# +#Change values listed here for a particular experiment +# +roi = None # defined later +projectDir = config["project_dir"] +specFile = config["spec_file"] +scanRange = srange(config["scan_range"]).list() + + + +# +#Change values listed here for a particular experiment +# +mapHKL = False +configDir = projectDir +detectorConfigName = os.path.join(configDir, config["detector_config"]) +instConfigName = os.path.join(configDir, config["instrument_config"]) +if not os.path.exists(detectorConfigName): + raise Exception("Detector Config file does not exist: %s" % + detectorConfigName) +if not os.path.exists(instConfigName): + raise Exception("Instrument Config file does not exist: %s" % + instConfigName) + +dReader = detReader(detectorConfigName) + +#detectorSettings +detectorName = "Pilatus" +# set to reduce data by averaging pixels 1,1 produces no averaging of pixels, +# uses the original data +bin = [1,1] +# Region of interesting +# set explicitly since the images are cropped by ROI +# COMMENT OUT roi or set to none to simply grab size from the calib file +roi = [1, 487, 1, 195] +# or get info from the config +if roi is None: + detector = dReader.getDetectorById(detectorName) + nPixels = dReader.getNpixels(detector) + roi = [1, nPixels[0], 1, nPixels[1]] +print ("ROI: %s " % roi) + +#output info = +nx = 200 +ny = 200 +nz = 200 +specName, specExt = os.path.splitext(specFile) +outputFileName = os.path.join(specName + '.vti') +# note that the q ranges in the three dimensions are available +# later. If you know resolution desired, it is possible to calculate more +# appropriate nx, ny, nz + +appConfig = RSMap3DConfigParser() +maxImageMemory = appConfig.getMaxImageMemory() +print ("scanRange %s" % scanRange) +print("specName, specExt: %s, %s" % (specName, specExt)) +ds = Sector33SpecDataSource(projectDir, specName, specExt, + instConfigName, detectorConfigName, roi=roi, + pixelsToAverage=bin, scanList= scanRange, + appConfig=appConfig) +ds.setCurrentDetector(detectorName) +ds.setProgressUpdater(updateDataSourceProgress) +ds.loadSource(mapHKL=mapHKL) +ds.setRangeBounds(ds.getOverallRanges()) +imageToBeUsed = ds.getImageToBeUsed() +#print("imageToBeUsed %s" % imageToBeUsed) +# wavelen = ENERGY_WAVELENGTH_CONVERT_FACTOR/ds.getIncidentEnergy()[scans[0]] +imageSize = np.prod(ds.getDetectorDimensions()) + +with open('time.log', 'a') as time_log: + time_log.write(f'Source Load Time: {datetime.datetime.now()}\n') + +gridMapper = QGridMapper(ds, + outputFileName, + outputType=BINARY_OUTPUT, + transform=UnityTransform3D(), + gridWriter=VTIGridWriter(), + appConfig=appConfig) + +gridMapper.setProgressUpdater(updateMapperProgress) +gridMapper.doMap() + +with open('time.log', 'a') as time_log: + end_time = datetime.datetime.now() + time_log.write(f'End: {end_time}\n') + time_log.write(f'Diff: {end_time - start_time}\n') \ No newline at end of file diff --git a/rsMap3D/datasource/MpiSector33SpecDataSource.py b/rsMap3D/datasource/MpiSector33SpecDataSource.py new file mode 100644 index 0000000..33ab779 --- /dev/null +++ b/rsMap3D/datasource/MpiSector33SpecDataSource.py @@ -0,0 +1,647 @@ +''' + Copyright (c) 2012, UChicago Argonne, LLC + See LICENSE file. +''' +from multiprocessing.sharedctypes import Value +import os +from spec2nexus.spec import SpecDataFile +from rsMap3D.exception.rsmap3dexception import RSMap3DException,\ + ScanDataMissingException +from rsMap3D.gui.rsm3dcommonstrings import CANCEL_STR +from rsMap3D.datasource.pilatusbadpixelfile import PilatusBadPixelFile +from rsMap3D.mappers.abstractmapper import ProcessCanceledException +from rsMap3D.datasource.specxmldrivendatasource import SpecXMLDrivenDataSource +import numpy as np +import xrayutilities as xu +import time +import sys,traceback +import logging +import importlib +import math +from mpi4py import MPI +try: + from PIL import Image +except ImportError: + import Image +logger = logging.getLogger(__name__) + +SCAN_WIN_SIZE = 4 + +IMAGE_DIR_MERGE_STR = "images/%s" +SCAN_NUMBER_MERGE_STR = "S%03d" +TIFF_FILE_MERGE_STR = "S%%03d/%s_S%%03d_%%05d.tif" + +class MPISector33SpecDataSource(SpecXMLDrivenDataSource): + ''' + Class to load data from spec file and configuration xml files from + for the way that data is collected at sector 33. + :members + ''' + + + def __init__(self, + projectDir, + projectName, + projectExtension, + instConfigFile, + detConfigFile, + mpiComm, + **kwargs): + ''' + Constructor + :param projectDir: Directory holding the project file to open + :param projectName: First part of file name for the project + :param projectExt: File extension for the project file. + :param instConfigFile: Full path to Instrument configuration file. + :param detConfigFile: Full path to the detector configuration file + :param kwargs: Assorted keyword arguments + + :rtype: Sector33SpecDataSource + ''' + super(MPISector33SpecDataSource, self).__init__(projectDir, + projectName, + projectExtension, + instConfigFile, + detConfigFile, + **kwargs) + self.mpiComm = mpiComm + self.mpiRank = mpiComm.Get_rank() + + def _calc_eulerian_from_kappa(self, primaryAngles=None, + referenceAngles = None): + """ + Calculate the eulerian sample angles from the kappa stage angles. + :param primaryAngles: list of sample axis numbers to be handled by + the conversion + :param referenceAngles: list of reference angles to be used in angle + conversion + """ + + keta = np.deg2rad(referenceAngles[:,0]) + kappa = np.deg2rad(referenceAngles[:,1]) + kphi = np.deg2rad(referenceAngles[:,2]) + kappaParam = self.instConfig.getSampleAngleMappingParameter('kappa') + + try: + if kappaParam != None: + self.kalpha = np.deg2rad(float(kappaParam)) + else: + self.kalpha = np.deg2rad(50.000) + kappaInvertedParam = \ + self.instConfig.getSampleAngleMappingParameter('kappaInverted') + if kappaInvertedParam != None: + self.kappa_inverted = self.to_bool(kappaInvertedParam) + else: + self.kappa_inverted = False + except Exception as ex: + raise RSMap3DException("Error trying to get parameter for " + \ + "sampleAngleMappingFunction " + \ + "_calc_eulerian_from_kappa in inst config " + \ + "file\n" + \ + str(ex)) + + _t1 = np.arctan(np.tan(kappa / 2.0) * np.cos(self.kalpha)) + if self.kappa_inverted: + eta = np.rad2deg(keta + _t1) + phi = np.rad2deg(kphi + _t1) + else: + eta = np.rad2deg(keta - _t1) + phi = np.rad2deg(kphi - _t1) + chi = 2.0 * np.rad2deg(np.arcsin(np.sin(kappa / 2.0) * \ + np.sin(self.kalpha))) + + return eta, chi, phi + + def _calc_replace_angle_values(self , primaryAngles=None, + referenceAngles=None): + ''' + Fix a situation were some constant angle values have been + recorded incorrectly and need to be fixed. + :param primaryAngles: list of sample axis numbers to be handled by + the conversion + :param referenceAngles: list of reference angles to be used in angle + conversion + ''' + logger.info( "Running " + __name__) + angles = [] + logger.debug("referenceAngles " + str(referenceAngles)) + mappingAngles = self.instConfig.getSampleAngleMappingReferenceAngles() + + logger.debug( "mappingAngles" + str(mappingAngles)) + for ii in range(len(referenceAngles)): + replaceVal = \ + float(self.instConfig.getSampleAngleMappingReferenceAngleAttrib( \ + number= str(mappingAngles[ii]), \ + attribName='replaceValue')) + logger.debug( "primary Angles" + str(referenceAngles)) + angles.append(replaceVal* np.ones(len(referenceAngles[:,ii]),)) + logger.debug("Angles" + str( angles)) + return angles + + def fixGeoAngles(self, scan, angles): + ''' + Fix the angles using a user selected function. + :param scan: scan to set the angles for + :param angles: Array of angles to set for this scan + ''' + logger.debug( "starting " + __name__) + needToCorrect = False + refAngleNames = self.instConfig.getSampleAngleMappingReferenceAngles() + for refAngleName in refAngleNames: + alwaysFix = self.instConfig.getSampleAngleMappingAlwaysFix() + if refAngleName in scan.L or alwaysFix: + needToCorrect = True + + if needToCorrect: + logger.debug( __name__ + ": Fixing angles") + refAngles = self.getScanAngles(scan, refAngleNames) + primaryAngles = self.instConfig.getSampleAngleMappingPrimaryAngles() + functionName = self.instConfig.getSampleAngleMappingFunctionName() + functionModuleName = self.instConfig.getSampleAngleMappingFunctionModule() + logger.debug("sampleMappingFunction moduleName %s" % functionModuleName) + #Call a defined method to calculate angles from the reference angles. + moduleSource = self + if functionModuleName != None: + functionModule = importlib.import_module(functionModuleName) + logger.debug("dir(functionModule)" % dir(functionModule)) + moduleSource = functionModule + method = getattr(moduleSource, functionName) + fixedAngles = method(primaryAngles=primaryAngles, + referenceAngles=refAngles) + logger.debug ("fixed Angles: " + str(fixedAngles)) + for i in range(len(primaryAngles)): + logger.debug ("Fixing primaryAngles: %d " % primaryAngles[i]) + angles[:,primaryAngles[i]-1] = fixedAngles[i] + + def getGeoAngles(self, scan, angleNames): + """ + This function returns all of the geometry angles for the + for the scan as a N-by-num_geo array, where N is the number of scan + points and num_geo is the number of geometry motors. + """ +# scan = self.sd[scanNo] + geoAngles = self.getScanAngles(scan, angleNames) + if not (self.instConfig.getSampleAngleMappingFunctionName() == ""): + tb = None + try: + self.fixGeoAngles(scan, geoAngles) + except Exception as ex: + tb = traceback.format_exc() + raise RSMap3DException("Handling exception in getGeoAngles." + \ + "\n" + \ + str(ex) + \ + "\n" + \ + str(tb)) + logger.debug("getGeoAngles:\n" + str(geoAngles)) + return geoAngles + + def getUBMatrix(self, scan): + """ + Read UB matrix from the #G3 line from the spec file. + """ + try: + g3 = scan.G["G3"].strip().split() + g3 = np.array(list(map(float, g3))) + ub = g3.reshape(-1,3) + logger.debug("ub " +str(ub)) + return ub + except: + logger.error("Unable to read UB Matrix from G3") + logger.error( '-'*60) + traceback.print_exc(file=sys.stdout) + logger.error('-'*60) + + + def hotpixelkill(self, areaData): + """ + function to remove hot pixels from CCD frames + ADD REMOVE VALUES IF NEEDED! + :param areaData: area detector data + """ + + for pixel in self.getBadPixels(): + badLoc = pixel.getBadLocation() + replaceLoc = pixel.getReplacementLocation() + areaData[badLoc[0],badLoc[1]] = \ + areaData[replaceLoc[0],replaceLoc[1]] + + return areaData + + def loadSource(self, mapHKL=False): + ''' + This method does the work of loading data from the files. This has been + split off from the constructor to allow this to be threaded and later + canceled. + :param mapHKL: boolean to mark if the data should be mapped to HKL + ''' + # Load up the instrument configuration file + self.loadInstrumentXMLConfig() + #Load up the detector configuration file + self.loadDetectorXMLConfig() + + self.specFile = os.path.join(self.projectDir, self.projectName + \ + self.projectExt) + imageDir = os.path.join(self.projectDir, \ + IMAGE_DIR_MERGE_STR % self.projectName) + self.imageFileTmp = os.path.join(imageDir, \ + TIFF_FILE_MERGE_STR % + (self.projectName)) + # if needed load up the bad pixel file. + if not (self.badPixelFile is None): + + badPixelFile = PilatusBadPixelFile(self.badPixelFile) + self.badPixels = badPixelFile.getBadPixels() + + # id needed load the flat field file + if not (self.flatFieldFile is None): + self.flatFieldData = np.array(Image.open(self.flatFieldFile)).T + # Load scan information from the spec file + + try: + self.sd = SpecDataFile(self.specFile) + self.mapHKL = mapHKL + + maxScan = int(self.sd.getMaxScanNumber()) + logger.debug("Number of Scans" + str(maxScan)) + if self.scans is None: + self.scans = range(1, maxScan+1) + imagePath = os.path.join(self.projectDir, + IMAGE_DIR_MERGE_STR % self.projectName) + + self.imageBounds = {} + self.imageToBeUsed = {} + self.availableScans = [] + self.incidentEnergy = {} + self.ubMatrix = {} + self.scanType = {} + self.progress = 0 + self.progressInc = 1 + + + if self.mpiRank == 0: + scanWinSize = SCAN_WIN_SIZE + else: + scanWinSize = 0 + + scanWin = MPI.Win.Allocate(scanWinSize, comm=self.mpiComm) + + scanIdx = self.mpiRank + if self.mpiRank == 0: + scanWin.Lock(rank=0) + scanWin.Put([self.mpiComm.size.to_bytes(SCAN_WIN_SIZE, 'little'), MPI.BYTE], target_rank=0) + scanWin.Unlock(rank=0) + + self.mpiComm.Barrier() + + + while scanIdx < len(self.scans): + scan = self.scans[scanIdx] + if (self.cancelLoad): + self.cancelLoad = False + raise LoadCanceledException(CANCEL_STR) + + else: + if (os.path.exists(os.path.join(imagePath, \ + SCAN_NUMBER_MERGE_STR % scan))): + try: + curScan = self.sd.scans[str(scan)] + self.scanType[scan] = \ + self.sd.scans[str(scan)].scanCmd.split()[0] + angles = self.getGeoAngles(curScan, self.angleNames) + self.availableScans.append(scan) + if self.mapHKL==True: + self.ubMatrix[scan] = self.getUBMatrix(curScan) + if self.ubMatrix[scan] is None: + raise Sector33SpecFileException("UB matrix " + \ + "not found.") + else: + self.ubMatrix[scan] = None + self.incidentEnergy[scan] = 12398.4 /float(curScan.G['G4'].split()[3]) + _start_time = time.time() + self.imageBounds[scan] = \ + self.findImageQs(angles, \ + self.ubMatrix[scan], \ + self.incidentEnergy[scan]) + logger.info (('Elapsed time for Finding qs for scan %d: ' + + '%.3f seconds') % \ + (scan, (time.time() - _start_time))) + except ScanDataMissingException: + logger.error( "Scan " + str(scan) + " has no data") + #Make sure to show 100% completion + + scanBuff = bytearray(SCAN_WIN_SIZE) + scanWin.Lock(rank=0) + scanWin.Get([scanBuff, MPI.BYTE], target_rank=0) + scanIdx = int.from_bytes(scanBuff, 'little') + if scanIdx < len(self.scans): + print(f'Proc {self.mpiRank} Loading Scan {scanIdx+1}/{len(self.scans)}') + else: + print(f'Proc {self.mpiRank} finished load. Beginning merge.') + scanNext = scanIdx + 1 + scanWin.Put([scanNext.to_bytes(SCAN_WIN_SIZE, 'little'), MPI.BYTE], target_rank=0) + scanWin.Unlock(rank=0) + + except IOError: + raise IOError( "Cannot open file " + str(self.specFile)) + if len(self.getAvailableScans()) == 0: + raise ScanDataMissingException("Could not find scan data for " + \ + "input file \n" + self.specFile + \ + "\nOne possible reason for this " + \ + "is that the image files are " + \ + "missing. Images are assumed " + \ + "to be in " + \ + os.path.join(self.projectDir, + IMAGE_DIR_MERGE_STR % self.projectName)) + + scanData = self.exportScans() + scanData = self.mpiMergeSources(scanData) + + scanData = self.mpiComm.bcast(scanData, root=0) + self.importScans(scanData) + + self.availableScanTypes = set(self.scanType.values()) + + + def mpiMergeSources(self, scanData): + """ + Merges data sources, combining their information till all data is collated at proc 0. + Similar to the upward execution of a distributed merge-sort. + Data can be shared back via the export commands. + """ + worldSize = self.mpiComm.Get_size() + + depth = math.floor(np.log2(worldSize)) + 1 + for currDepth in range(depth): + + # Exclude procs that have merged + if self.mpiRank % (2**currDepth) != 0: + break + + # Determine if sending or receiving + if (self.mpiRank / (2**currDepth)) % 2 == 0: + source = self.mpiRank + (2**currDepth) + + # Odd # world size + if source >= worldSize: + continue + + incomingScanData = self.mpiComm.recv(source=source) + for key in incomingScanData.keys(): + if type(incomingScanData[key]) is list: + scanData[key] += incomingScanData[key] + elif type(incomingScanData[key]) is dict: + if len(set(scanData[key].keys()).intersection(set(incomingScanData[key].keys()))) > 0: + raise ValueError(f"Unable to merge {key} dict due to overlapping keys") + scanData[key].update(incomingScanData[key]) + else: + raise NotImplementedError(f"Unable to merge {key}. Please implement merge here.") + + else: + dest = self.mpiRank - (2 ** currDepth) + self.mpiComm.send(scanData, dest=dest) + + return scanData + + + def exportScans(self): + """ + Returns loaded data as an MPI-safe object + """ + return { + 'imageBounds': self.imageBounds, + 'imageToBeUsed': self.imageToBeUsed, + 'availableScans':self.availableScans, + 'incidentEnergy': self.incidentEnergy, + 'ubMatrix':self.ubMatrix, + 'availableScanTypes': self.availableScanTypes + } + + def importScans(self, scanData): + """ + Imports exported scan data from another data source via an MPI-safe object. + """ + self.imageBounds = scanData['imageBounds'] + self.imageToBeUsed = scanData['imageToBeUsed'] + self.availableScans = scanData['availableScans'] + self.incidentEnergy = scanData['incidentEnergy'] + self.ubMatrix = scanData['ubMatrix'] + self.availableScanTypes = scanData['availableScanTypes'] + + def to_bool(self, value): + """ + Note this method found in answer to: + http://stackoverflow.com/questions/715417/converting-from-a-string-to-boolean-in-python + Converts 'something' to boolean. Raises exception if it gets a string + it doesn't handle. + Case is ignored for strings. These string values are handled: + True: 'True', "1", "TRue", "yes", "y", "t" + False: "", "0", "faLse", "no", "n", "f" + Non-string values are passed to bool. + """ + if type(value) == type(''): + if value.lower() in ("yes", "y", "true", "t", "1"): + return True + if value.lower() in ("no", "n", "false", "f", "0", ""): + return False + raise Exception('Invalid value for boolean conversion: ' + value) + return bool(value) + + def rawmap(self,scans, angdelta=[0,0,0,0,0], + adframes=None, mask = None): + """ + read ad frames and and convert them in reciprocal space + angular coordinates are taken from the spec file + or read from the edf file header when no scan number is given (scannr=None) + """ + + if mask is None: + mask_was_none = True + #mask = [True] * len(self.getImageToBeUsed()[scans[0]]) + else: + mask_was_none = False + #sd = spec.SpecDataFile(self.specFile) + intensity = np.array([]) + + # fourc goniometer in fourc coordinates + # convention for coordinate system: + # x: upwards; + # y: along the incident beam; + # z: "outboard" (makes coordinate system right-handed). + # QConversion will set up the goniometer geometry. + # So the first argument describes the sample rotations, the second the + # detector rotations and the third the primary beam direction. + qconv = xu.experiment.QConversion(self.getSampleCircleDirections(), \ + self.getDetectorCircleDirections(), \ + self.getPrimaryBeamDirection()) + + # define experimental class for angle conversion + # + # ipdir: inplane reference direction (ipdir points into the primary beam + # direction at zero angles) + # ndir: surface normal of your sample (ndir points in a direction + # perpendicular to the primary beam and the innermost detector + # rotation axis) + en = self.getIncidentEnergy() + hxrd = xu.HXRD(self.getInplaneReferenceDirection(), \ + self.getSampleSurfaceNormalDirection(), \ + en=en[self.getAvailableScans()[0]], \ + qconv=qconv) + + + # initialize area detector properties + if (self.getDetectorPixelWidth() != None ) and \ + (self.getDistanceToDetector() != None): + hxrd.Ang2Q.init_area(self.getDetectorPixelDirection1(), \ + self.getDetectorPixelDirection2(), \ + cch1=self.getDetectorCenterChannel()[0], \ + cch2=self.getDetectorCenterChannel()[1], \ + Nch1=self.getDetectorDimensions()[0], \ + Nch2=self.getDetectorDimensions()[1], \ + pwidth1=self.getDetectorPixelWidth()[0], \ + pwidth2=self.getDetectorPixelWidth()[1], \ + distance=self.getDistanceToDetector(), \ + Nav=self.getNumPixelsToAverage(), \ + roi=self.getDetectorROI()) + else: + hxrd.Ang2Q.init_area(self.getDetectorPixelDirection1(), \ + self.getDetectorPixelDirection2(), \ + cch1=self.getDetectorCenterChannel()[0], \ + cch2=self.getDetectorCenterChannel()[1], \ + Nch1=self.getDetectorDimensions()[0], \ + Nch2=self.getDetectorDimensions()[1], \ + chpdeg1=self.getDetectorChannelsPerDegree()[0], \ + chpdeg2=self.getDetectorChannelsPerDegree()[1], \ + Nav=self.getNumPixelsToAverage(), + roi=self.getDetectorROI()) + + angleNames = self.getAngles() + scanAngle = {} + for i in range(len(angleNames)): + scanAngle[i] = np.array([]) + + offset = 0 + imageToBeUsed = self.getImageToBeUsed() + monitorName = self.getMonitorName() + monitorScaleFactor = self.getMonitorScaleFactor() + filterName = self.getFilterName() + filterScaleFactor = self.getFilterScaleFactor() + for scannr in scans: + if self.haltMap: + raise ProcessCanceledException("Process Canceled") + scan = self.sd.scans[str(scannr)] + angles = self.getGeoAngles(scan, angleNames) + scanAngle1 = {} + scanAngle2 = {} + for i in range(len(angleNames)): + scanAngle1[i] = angles[:,i] + scanAngle2[i] = [] + if monitorName != None: + monitor_data = scan.data.get(monitorName) + if monitor_data is None: + raise IOError("Did not find Monitor source '" + \ + monitorName + \ + "' in the Spec file. Make sure " + \ + "monitorName is correct in the " + \ + "instrument Config file") + if filterName != None: + filter_data = scan.data.get(filterName) + if filter_data is None: + raise IOError("Did not find filter source '" + \ + filterName + \ + "' in the Spec file. Make sure " + \ + "filterName is correct in the " + \ + "instrument Config file") + # read in the image data + arrayInitializedForScan = False + foundIndex = 0 + + if mask_was_none: + mask = [True] * len(self.getImageToBeUsed()[scannr]) + + for ind in range(len(scan.data[list(scan.data.keys())[0]])): + if imageToBeUsed[scannr][ind] and mask[ind]: + # read tif image + im = Image.open(self.imageFileTmp % (scannr, scannr, ind)) + img = np.array(im.getdata()).reshape(im.size[1],im.size[0]).T + img = self.hotpixelkill(img) + ff_data = self.getFlatFieldData() + if not (ff_data is None): + img = img * ff_data + # reduce data siz + img2 = xu.blockAverage2D(img, + self.getNumPixelsToAverage()[0], \ + self.getNumPixelsToAverage()[1], \ + roi=self.getDetectorROI()) + + # apply intensity corrections + if monitorName != None: + img2 = img2 / monitor_data[ind] * monitorScaleFactor + if filterName != None: + img2 = img2 / filter_data[ind] * filterScaleFactor + + # initialize data array + if not arrayInitializedForScan: + imagesToProcess = [imageToBeUsed[scannr][i] and mask[i] for i in range(len(imageToBeUsed[scannr]))] + if not intensity.shape[0]: + intensity = np.zeros((np.count_nonzero(imagesToProcess),) + img2.shape) + arrayInitializedForScan = True + else: + offset = intensity.shape[0] + intensity = np.concatenate( + (intensity, + (np.zeros((np.count_nonzero(imagesToProcess),) + img2.shape))), + axis=0) + arrayInitializedForScan = True + # add data to intensity array + intensity[foundIndex+offset,:,:] = img2 + for i in range(len(angleNames)): +# logger.debug("appending angles to angle2 " + +# str(scanAngle1[i][ind])) + scanAngle2[i].append(scanAngle1[i][ind]) + foundIndex += 1 + if len(scanAngle2[0]) > 0: + for i in range(len(angleNames)): + scanAngle[i] = \ + np.concatenate((scanAngle[i], np.array(scanAngle2[i])), \ + axis=0) + # transform scan angles to reciprocal space coordinates for all detector pixels + angleList = [] + for i in range(len(angleNames)): + angleList.append(scanAngle[i]) + if self.ubMatrix[scans[0]] is None: + qx, qy, qz = hxrd.Ang2Q.area(*angleList, \ + roi=self.getDetectorROI(), + Nav=self.getNumPixelsToAverage()) + else: + qx, qy, qz = hxrd.Ang2Q.area(*angleList, \ + roi=self.getDetectorROI(), + Nav=self.getNumPixelsToAverage(), \ + UB = self.ubMatrix[scans[0]]) + + + # apply selected transform + qxTrans, qyTrans, qzTrans = \ + self.transform.do3DTransform(qx, qy, qz) + + + return qxTrans, qyTrans, qzTrans, intensity + + +class LoadCanceledException(RSMap3DException): + ''' + Exception Thrown when loading data is canceled. + ''' + def __init__(self, message): + super(LoadCanceledException, self).__init__(message) + +class Sector33SpecFileException(RSMap3DException): + ''' + Exception class to be raised if there is a problem loading information + from a spec file + file + ''' + def __init__(self, message): + super(Sector33SpecFileException, self).__init__(message) + + + diff --git a/rsMap3D/datasource/Sector33SpecDataSource.py b/rsMap3D/datasource/Sector33SpecDataSource.py index 42bf5a3..28da353 100644 --- a/rsMap3D/datasource/Sector33SpecDataSource.py +++ b/rsMap3D/datasource/Sector33SpecDataSource.py @@ -221,7 +221,7 @@ def hotpixelkill(self, areaData): return areaData - def loadSource(self, mapHKL=False): + def loadSource(self, mapHKL=False, loadScans=True): ''' This method does the work of loading data from the files. This has been split off from the constructor to allow this to be threaded and later @@ -250,9 +250,14 @@ def loadSource(self, mapHKL=False): if not (self.flatFieldFile is None): self.flatFieldData = np.array(Image.open(self.flatFieldFile)).T # Load scan information from the spec file + try: self.sd = SpecDataFile(self.specFile) self.mapHKL = mapHKL + + if not loadScans: + return + maxScan = int(self.sd.getMaxScanNumber()) logger.debug("Number of Scans" + str(maxScan)) if self.scans is None: @@ -323,6 +328,31 @@ def loadSource(self, mapHKL=False): self.availableScanTypes = set(self.scanType.values()) + def exportScans(self): + """ + Returns loaded data as an MPI-safe object + """ + return { + 'scans': self.scans, + 'imageBounds': self.imageBounds, + 'imageToBeUsed': self.imageToBeUsed, + 'availableScans':self.availableScans, + 'incidentEnergy': self.incidentEnergy, + 'ubMatrix':self.ubMatrix, + 'availableScanTypes': self.availableScanTypes + } + + def importScans(self, scanData): + """ + Imports exported scan data from another data source via an MPI-safe object. + """ + self.scans = scanData['scans'] + self.imageBounds = scanData['imageBounds'] + self.imageToBeUsed = scanData['imageToBeUsed'] + self.availableScans = scanData['availableScans'] + self.incidentEnergy = scanData['incidentEnergy'] + self.ubMatrix = scanData['ubMatrix'] + self.availableScanTypes = scanData['availableScanTypes'] def to_bool(self, value): """ diff --git a/rsMap3D/docs/Tutorial/MPIAcceleration.rst b/rsMap3D/docs/Tutorial/MPIAcceleration.rst new file mode 100644 index 0000000..4b59d2a --- /dev/null +++ b/rsMap3D/docs/Tutorial/MPIAcceleration.rst @@ -0,0 +1,129 @@ +MPI Acceleration Info +===================== + +This document covers the topics related to usage of the MPI and parallelized components of RSMap3d. + +Installation +------------ + +NOTE: Requires MPI to have Remote Memory Access capabilities. Highly recommended that installations implement MPI >= 3.0. + +Recommended Installation Through Conda +`````````````````````````````````````` + +1. Create a default environment with python=3.6 :code:`conda create -y -n rsMapMPI python=3.6` +2. Activate Environment: :code:`conda activate rsMapMPI` +3. Install MPI and MPI4py with conda: :code:`conda install -y -c conda-forge mpi4py openmpi` +4. Install requirements with pip: :code:`pip install PyQt5==5.15.6 xrayutilities==1.7.3 spec2nexus==2021.1.11 vtk==9.1.0` +5. Install rsMap from root dir: :code:`pip install .` + +Usage +----- + +MPI is implemented through a similar interface to the standard RSMap library files. Due to MPIRun launching many processes in parallel, the GUI is not supported for MPI. A script has been provded as a starting point. + +Running the Program +``````````````````` + +The program can be run via the local mpi run command. In the default installation this is :code:`mpirun -n [num_procs] python [script].py` + +On ALCF systems, this would be run with :code:`aprun -n [num_procs] -N [num_procs per node] python [script].py` + +Script Usage +```````````` + +A script :code:`Scripts/mpiMapSpecAngleScan.py` has been provided as a starting point. The script takes in a .json config file pointing to data files. The path can be provided as a command line argument: :code:`mpiMapSpecAngleScan.py path/to/config.json` If no path is provided, the script searches for :code:`config.json` in the CWD. + +The config format is as follows: + +.. code-block:: json + + { + "scan_range": "[int]-[int]", + "project_dir":"path/to/project", + "spec_file":"spec_file (NOTE: relative to project dir)", + "detector_config":"dtc_file (NOTE: relative to project dir)", + "instrument_config":"inst_file (NOTE: relative to project dir)" + } + + + +Class API Changes +````````````````` + +MPI is implemented in new classes based on the original classes.Due to sharp edges, they are located in source files with the :code:`mpi` prefix. The constructors of MPI classes will require an additional argument: the :code:`mpiComm`. When creating your own scripts, this can be created via the following block of code. It is also highly recommended to use the process rank to prevent the program from repeating script work. + +.. code-block:: python + + from mpi4py import MPI + + mpiComm = MPI.COMM_WORLD + mpiRank = mpiComm.Get_rank() + + if mpiRank == 0: # 0th process only + # Do singleton logging, work, etc. + +Operations like :code:`dataSource.loadSource()` and :code:`gridMapper.doMap()` are now **SYNCHRONOUS**. This means ALL scripts have to enter the same branch for them to complete. The program will _hang_ (not exit) at synchronization points. As an example: + +.. code-block:: python + + if mpiRank == 2: + time.sleep(10) + + gridMapper.doMap() # All process will be forced to wait till process 2 enters this function in 10 seconds + + if mpiRank != 2: # This will cause the program to hang indefinitely + gridMapper.doMap() + + + + +Sharp Edges +----------- + +A major sharp edge is :code:`num scans >= num cores`. If a run only uses 10 scans, no more than 10 cores may be applied. Therefore this parallelization only improves larger runs. The :code:`Scripts/mpiMapSpecAngleScan.py` script checks for this and raises an error if otherwise. It's highly recommended that your scripts do the same. + +Parallelization also introduces sharp edges related to the gridder and loading of data. + +Gridder +``````` + +Two gridder settings are required for this form of parallelization to work: + +1. Normalization must be off. This is the default in xrayutilities==1.7.3. +2. The gridder must be a static size. This is hard-coded false in the mpigridmapper.py file. + +The gridder is passed through MPI channels :code:`nlog(n)` times (where n = num_procs) during the program. An extremely large gridder size may increase runtimes. + +Loading +``````` + +Loading of data into the datasource is parallelized. As such, a datasource merging operation is required. This is implemented via the following block and functions in :code:`MpiSector33SpecDataSource.py`: + + +.. code-block:: python + + scanData = self.exportScans() + scanData = self.mpiMergeSources(scanData) + + scanData = self.mpiComm.bcast(scanData, root=0) + self.importScans(scanData) + +If your implementation requires additional data to be loaded, :code:`exportScans` and :code:`importScans` will need to be modified. If the data is not list or dict based, :code:`mpiMergeSources` will need modification. Custom merge operations can be added to the conditional in the merge method. Any new data being passed through MPI must be able to be pickled. + +Performance Characteristics +--------------------------- + +This section will cover expected program performance and recommendations for optimizing runs. + +Performance Factors +``````````````````` + +The program adds a :code:`nlog(n)` operation where :code:`n = num_procs` to both loading and gridding. The length of a single operation depends primarily on the size of data being passed through a MPI pipe. This means single-node runs will perform this operation faster than multi-node runs. + +Otherwise, gridding is expected to scale with a factor of :code:`1/(2^n)`. Loading is expected to scale the same, but may be limited by disk access speeds. + +Recommended Settings +```````````````````` + +While multi-threading is applied by xrayutilities, it only affects about <5% of execution time. As such, the main parameter to vary is the number of processes. Through testing we recommend :code:`num_procs = num MPI slots` as a reasonable starting point. Experiment with your system to find what works best. \ No newline at end of file diff --git a/rsMap3D/mappers/mpigridmapper.py b/rsMap3D/mappers/mpigridmapper.py new file mode 100644 index 0000000..0ea7483 --- /dev/null +++ b/rsMap3D/mappers/mpigridmapper.py @@ -0,0 +1,229 @@ +''' + Copyright (c) 2012, UChicago Argonne, LLC + See LICENSE file. +''' + +import time +from xml.dom.expatbuilder import InternalSubsetExtractor +import xrayutilities as xu +from rsMap3D.mappers.abstractmapper import AbstractGridMapper +import numpy as np +from xrayutilities.exception import InputError +import logging +from mpi4py import MPI +logger = logging.getLogger(__name__) +from datetime import datetime +import sys +import pickle +import math + +SCAN_WIN_SIZE = 4 + + +class MPIQGridMapper(AbstractGridMapper): + ''' + Override parent class to add in output type + ''' + def __init__(self, dataSource, \ + outputFileName, \ + outputType, \ + mpiComm, \ + nx=200, ny=201, nz=202, \ + transform = None, \ + gridWriter = None, + **kwargs): + super(MPIQGridMapper, self).__init__(dataSource, \ + outputFileName, \ + nx=nx, ny=ny, nz=nz, \ + transform = transform, \ + gridWriter = gridWriter, \ + **kwargs) + self.outputType = outputType + self.mpiComm = mpiComm + self.mpiRank = mpiComm.Get_rank() + + def getFileInfo(self): + ''' + Override parent class to add in output type + ''' + return (self.dataSource.projectName, + self.dataSource.availableScans[0], + self.nx, self.ny, self.nz, + self.outputFileName, + self.outputType) + + ''' + This map provides an x, y, z grid of the data. + ''' + #@profile + def processMap(self, **kwargs): + """ + read ad frames and grid them in reciprocal space + angular coordinates are taken from the spec file + + **kwargs are passed to the rawmap function + """ + maxImageMem = self.appConfig.getMaxImageMemory() + gridder = xu.Gridder3D(self.nx, self.ny, self.nz) + gridder.KeepData(True) + rangeBounds = self.dataSource.getRangeBounds() + try: + # repository version or xrayutilities > 1.0.6 + gridder.dataRange(rangeBounds[0], rangeBounds[1], + rangeBounds[2], rangeBounds[3], + rangeBounds[4], rangeBounds[5], + True) + except: + # xrayutilities 1.0.6 and below + gridder.dataRange((rangeBounds[0], rangeBounds[1]), + (rangeBounds[2], rangeBounds[3]), + (rangeBounds[4], rangeBounds[5]), + True) + + imageToBeUsed = self.dataSource.getImageToBeUsed() + + + scans = self.dataSource.getAvailableScans() + + if self.mpiRank == 0: + scanWinSize = SCAN_WIN_SIZE + else: + scanWinSize = 0 + + scanWin = MPI.Win.Allocate(scanWinSize, comm=self.mpiComm) + + scanIdx = self.mpiRank + if self.mpiRank == 0: + scanWin.Lock(rank=0) + scanWin.Put([self.mpiComm.size.to_bytes(SCAN_WIN_SIZE, 'little'), MPI.BYTE], target_rank=0) + scanWin.Unlock(rank=0) + + self.mpiComm.Barrier() + print(f'Proc {self.mpiRank} Beginning Gridding {scanIdx+1}/{len(scans)}') + while scanIdx < len(scans): + scan = scans[scanIdx] + + if True in imageToBeUsed[scan]: + imageSize = self.dataSource.getDetectorDimensions()[0] * \ + self.dataSource.getDetectorDimensions()[1] + numImages = len(imageToBeUsed[scan]) + if imageSize*4*numImages <= maxImageMem: + kwargs['mask'] = imageToBeUsed[scan] + qx, qy, qz, intensity = self.dataSource.rawmap((scan,), **kwargs) + + try: + + gridder(qx, qy, qz, intensity) + + except InputError as ex: + print ("Wrong Input to gridder") + print ("qx Size: " + str( qx.shape)) + print ("qy Size: " + str( qy.shape)) + print ("qz Size: " + str( qz.shape)) + print ("intensity Size: " + str(intensity.shape)) + raise InputError(ex) + + + else: + nPasses = int(imageSize*4*numImages/ maxImageMem + 1) + for thisPass in range(nPasses): + imageToBeUsedInPass = np.array(imageToBeUsed[scan]) + imageToBeUsedInPass[:int(thisPass*numImages/nPasses)] = False + imageToBeUsedInPass[int((thisPass+1)*numImages/nPasses):] = False + + if True in imageToBeUsedInPass: + kwargs['mask'] = imageToBeUsedInPass + qx, qy, qz, intensity = \ + self.dataSource.rawmap((scan,), **kwargs) + # convert data to rectangular grid in reciprocal space + try: + + gridder(qx, qy, qz, intensity) + + except InputError as ex: + print ("Wrong Input to gridder") + print ("qx Size: " + str( qx.shape)) + print ("qy Size: " + str( qy.shape)) + print ("qz Size: " + str( qz.shape)) + print ("intensity Size: " + str(intensity.shape)) + raise InputError(ex) + + scanBuff = bytearray(SCAN_WIN_SIZE) + scanWin.Lock(rank=0) + scanWin.Get([scanBuff, MPI.BYTE], target_rank=0) + scanIdx = int.from_bytes(scanBuff, 'little') + if scanIdx < len(scans): + print(f'Proc {self.mpiRank} Gridding {scanIdx+1}/{len(scans)}') + else: + print(f'Proc {self.mpiRank} finished grid. Beginning merge.') + + scanNext = scanIdx + 1 + scanWin.Put([scanNext.to_bytes(SCAN_WIN_SIZE, 'little'), MPI.BYTE], target_rank=0) + scanWin.Unlock(rank=0) + + + + + gridder = self.mergeGridders(gridder) + self.mpiComm.Barrier() + + return gridder.xaxis,gridder.yaxis,gridder.zaxis,gridder.data,gridder + + + def mergeGridders(self, gridder): + """ + Merges gridders, combining their grids till all data is collated at proc 0. + Similar to the upward execution of a distributed merge-sort. + """ + worldSize = self.mpiComm.Get_size() + + depth = math.floor(np.log2(worldSize)) + 1 + for currDepth in range(depth): + + # Exclude procs that have merged + if self.mpiRank % (2**currDepth) != 0: + break + + # Determine if sending or receiving + if (self.mpiRank / (2**currDepth)) % 2 == 0: + source = self.mpiRank + (2**currDepth) + + # Odd # world size + if source >= worldSize: + continue + + incomingGrid = self.mpiComm.recv(source=source) + # Normalization must be OFF and range must be FIXED for this to work + # These are the defaults as of the current version of xu (1.7.3) + gridder._gdata += incomingGrid._gdata + gridder._gnorm += incomingGrid._gnorm + + else: + dest = self.mpiRank - (2 ** currDepth) + self.mpiComm.send(gridder, dest=dest) + return gridder + + + def doMap(self): + ''' + Produce a q map of the data. This is the method typically called to + run the mapper. This method calls the processMap method which is an + abstract method which needs to be defined in subclasses. + ''' + + # read and grid data with helper function + _start_time = time.time() + #rangeBounds = self.dataSource.getRangeBounds() + qx, qy, qz, gint, gridder = \ + self.processMap() + print ('Elapsed time for gridding: %.3f seconds' % \ + (time.time() - _start_time)) + + if self.mpiRank == 0: + # print some information + print ('qx: ', qx.min(), ' .... ', qx.max()) + print ('qy: ', qy.min(), ' .... ', qy.max()) + print ('qz: ', qz.min(), ' .... ', qz.max()) + self.gridWriter.setData(qx, qy, qz, gint) + self.gridWriter.setFileInfo(self.getFileInfo()) + self.gridWriter.write() \ No newline at end of file