Skip to content
Open
115 changes: 83 additions & 32 deletions pypmj/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@

# Set warning filters
warnings.filterwarnings(action='ignore',
message= '.*The\\ Leaf.*is\\ exceeding\\ the\\' + \
' maximum\\ recommended\\ rowsize.*\\Z(?ms)')
message= '(?ms).*The\\ Leaf.*is\\ exceeding\\ the\\' + \
' maximum\\ recommended\\ rowsize.*\\Z')
warnings.filterwarnings(action='ignore',
message= '.*your\\ performance\\ may\\ suffer\\ ' + \
message= '(?ms).*your\\ performance\\ may\\ suffer\\ ' + \
'as\\ PyTables\\ will\\ pickle\\ object\\ types\\' + \
' that\\ it\\ cannot.*\\Z(?ms)')
' that\\ it\\ cannot.*\\Z')

# Set text template for strings replacing class attributes deleted for
# memory efficiency (occurs if `minimize_memory_usage=True`
Expand Down Expand Up @@ -715,7 +715,7 @@ def process_results(self, processing_func=None, overwrite=False):

# We try to call the processing_func now. If it fails or its results
# are not of type dict, it is ignored and the user will be warned
signature = inspect.getargspec(processing_func)
signature = inspect.getfullargspec(processing_func)
if len(signature.args) == 1:
procargs = [jcm_results_to_pass]
elif len(signature.args) == 2:
Expand Down Expand Up @@ -809,7 +809,7 @@ def compute_geometry(self, **jcm_kwargs):
"""Computes the geometry (i.e. runs jcm.geo) for this simulation.
The jcm_kwargs are directly passed to jcm.geo, except for
`project_dir`, `keys` and `working_dir`, which are set automatically
(ignored if provided).
(ignored if provided). Returns False if JCMgeo fails, True otherwise.
"""
self.logger.debug('Computing geometry.')
# Copy project to its working directory
Expand Down Expand Up @@ -838,13 +838,20 @@ def compute_geometry(self, **jcm_kwargs):
_thisdir = os.getcwd()
os.chdir(self.project.working_dir)
with utils.Capturing() as output:
jcm.geo(project_dir=self.project.working_dir,
keys=self.keys,
working_dir=self.project.working_dir,
**jcm_kwargs)
try:
jcm.geo(project_dir=self.project.working_dir,
keys=self.keys,
working_dir=self.project.working_dir,
**jcm_kwargs)
except RuntimeError as e:
self.logger.warn('Failed to compute geometry for simulation {}. JCMgeo returned "{}".'.format(self.number, str(e)))
return False

for line in output:
logger_JCMgeo.debug(line)
os.chdir(_thisdir)

return True

def solve_standalone(self, processing_func=None, wdir_mode='keep',
run_post_process_files=None, resource_manager=None,
Expand Down Expand Up @@ -1213,6 +1220,16 @@ class SimulationSet(object):
the results and logs are kept for each simulation. Set this parameter
to true to minimize the memory usage. Caution: you will loose all the
`jcm_results` and `logs` in the `Simulation`-instances.
skip_existent_simulations_by_folder : bool, default False
Determines whether to skip simulations by the existence of a
project_results/fieldbag.jcm file in the storage folder of a respective
simulation with index i. The storage folder of a single simulation i is
given by the parameter `storage_folder` and a subfolder 'simulation[i]'.
Setting this parameter to True is handy if a simulation series is to be
continued when no HDF5 store is present. Simulation keys `geometry` and
`parameters` must not have changed when restarting the simulation series!
Otherwise, simulation indices might not match the assumed folder
structure any longer.
"""

# Names of the groups in the HDF5 store which are used to store metadata
Expand All @@ -1224,13 +1241,14 @@ def __init__(self, project, keys, duplicate_path_levels=0,
use_resultbag=False, transitional_storage_base=None,
combination_mode='product', check_version_match=True,
resource_manager=None, store_logs=False,
minimize_memory_usage=False):
minimize_memory_usage=False, skip_existent_simulations_by_folder=False):
self.logger = logging.getLogger('core.' + self.__class__.__name__)

# Save initialization arguments into namespace
self.combination_mode = combination_mode
self.store_logs = store_logs
self.minimize_memory_usage = minimize_memory_usage
self.skip_existent_simulations_by_folder = skip_existent_simulations_by_folder

# Analyze the provided keys
self._check_keys(keys)
Expand Down Expand Up @@ -1776,17 +1794,20 @@ def fix_h5_store(self, try_restructure=True, brute_force=False):
self.open_store()
self.logger.info('Successfully restructured HDF5 store.')

def make_simulation_schedule(self, fix_h5_duplicated_rows=False):
def make_simulation_schedule(self, fix_h5_duplicated_rows=False, constraint_func=None):
"""Makes a schedule by getting a list of simulations that must be
performed, reorders them to avoid unnecessary calls of JCMgeo, and
checks the HDF5 store for simulation data which is already known.
If duplicated rows are found, a `RuntimeError` is raised. In this
case, you can rerun `make_simulation_schedule` with
`fix_h5_duplicated_rows=True` to try to automatically fix it.
Alternatively, you could call the `fix_h5_store`-method yourself.

`constraint_func` is a function taking a set of parameter and
geometry keys as a dictionary. It should return True if a
simulation is to be performed for the respective set. False, if
the set is to be skipped.
"""
self._get_simulation_list()
self._get_simulation_list(constraint_func)
self._sort_simulations()

# Init the failed simulation list
Expand All @@ -1804,15 +1825,28 @@ def make_simulation_schedule(self, fix_h5_duplicated_rows=False):

precheck = self._precheck_store()
self.logger.debug('Result of the store pre-check: {}'.format(precheck))
if precheck == 'Empty':
self.finished_sim_numbers = []
if precheck == 'Extended Check':
if precheck == 'Empty' or self.skip_existent_simulations_by_folder:
stored_sim_numbers = []

if self.skip_existent_simulations_by_folder:
for i in range(self.num_sims):
simdir = _default_sim_wdir(self.storage_dir, self.simulations[i].number)
if os.path.exists(os.path.join(simdir, 'project_results/fieldbag.jcm')):
stored_sim_numbers.append(i)

self.finished_sim_numbers = stored_sim_numbers
if len(self.finished_sim_numbers) > 0:
self.logger.info('Ignoring HDF5 store. Determining already finished simulations by ' +
'existence of fieldbag.jcm instead. Number of found simulations: {}'.format(
len(self.finished_sim_numbers)))

if precheck == 'Extended Check' and not self.skip_existent_simulations_by_folder:
self.logger.info('Running extended check ...')
self._extended_store_check()
self.logger.info('Found matches in the extended check of the ' +
'HDF5 store. Number of stored simulations: {}'.
format(len(self.finished_sim_numbers)))
elif precheck == 'Match':
elif precheck == 'Match' and not self.skip_existent_simulations_by_folder:
stored_sim_numbers = list(self.get_store_data().index)
if len(stored_sim_numbers) > self.num_sims:
if fix_h5_duplicated_rows:
Expand All @@ -1830,7 +1864,7 @@ def make_simulation_schedule(self, fix_h5_duplicated_rows=False):
'store. Number of stored simulations: {}'.format(
len(self.finished_sim_numbers)))

def _get_simulation_list(self):
def _get_simulation_list(self, constraint_func):
"""Check the `parameters`- and `geometry`-dictionaries for sequences
and generate a list which has a keys-dictionary for each distinct
simulation by using the `combination_mode` as specified.
Expand Down Expand Up @@ -1899,6 +1933,14 @@ def _get_simulation_list(self):
propertyCombinations = []
for iSim in range(Nsims):
propertyCombinations.append(tuple([l[iSim] for l in loopList]))

# Remove combinations based on constraint_func.
if constraint_func is not None:
propertyCombinationsFiltered = [x for x in propertyCombinations if \
constraint_func(dict((k, v) for k, v in x))]
self.logger.info("Removed {} combinations based on the given constraint function.".format( \
len(propertyCombinations) - len(propertyCombinationsFiltered)))
propertyCombinations = propertyCombinationsFiltered

self.num_sims = len(propertyCombinations) # total num of simulations
if self.num_sims == 1:
Expand Down Expand Up @@ -2064,7 +2106,7 @@ def __restore_from_meta_dframe(self, which):
if len(vals) == 1:
dict_[col] = vals.iat[0]
else:
dict_[col] = pd.to_numeric(vals, errors='ignore').values
dict_[col] = pd.to_numeric(vals).values
return dict_

def _store_metadata(self):
Expand Down Expand Up @@ -2290,7 +2332,7 @@ def _resources_ready(self):

def compute_geometry(self, simulation, **jcm_kwargs):
"""Computes the geometry (i.e. runs jcm.geo) for a specific simulation
of the simulation set.
of the simulation set. Returns False in case of an error, True otherwise.
Parameters
----------
simulation : Simulation or int
Expand All @@ -2307,10 +2349,10 @@ def compute_geometry(self, simulation, **jcm_kwargs):
raise ValueError('`simulation` must be a Simulation of the ' +
'current SimulationSet or a simulation index' +
' (int).')
return
return False

# Call the compute_geometry-method of the simulation
simulation.compute_geometry(**jcm_kwargs)
return simulation.compute_geometry(**jcm_kwargs)

def solve_single_simulation(self, simulation, compute_geometry=True,
run_post_process_files=None,
Expand Down Expand Up @@ -2462,17 +2504,26 @@ def _start_simulations(self, N='all', processing_func=None,
# Start the simulation if it is not already finished
if not sim.number in self.finished_sim_numbers:
# Compute the geometry if necessary
geo_succeeded = True
if sim.rerun_JCMgeo or force_geo_run:
self.compute_geometry(sim, **jcm_geo_kwargs)
force_geo_run = False
if self.compute_geometry(sim, **jcm_geo_kwargs):
force_geo_run = False
else:
geo_succeeded = False;

# Start to solve the simulation and receive a job ID
job_id = sim.solve(**jcm_solve_kwargs)
self.logger.debug(
'Queued simulation {0} of {1} with job_id {2}'.
format(i + 1, self.num_sims, sim.job_id))
job_ids.append(job_id)
ids_to_sim_number[job_id] = sim.number
if geo_succeeded:
# Start to solve the simulation and receive a job ID
job_id = sim.solve(**jcm_solve_kwargs)
self.logger.debug(
'Queued simulation {0} of {1} with job_id {2}'.
format(i + 1, self.num_sims, sim.job_id))
job_ids.append(job_id)
ids_to_sim_number[job_id] = sim.number
else:
sim.status = 'Skipped'
self.logger.debug(
'Skipping simulation {0} of {1}'.
format(i + 1, self.num_sims))
else:
# Set `force_geo_run` to True if this finished simulation would
# have caused to compute the geometry
Expand Down
Loading